Skip to content

Commit

Permalink
add rustfmt config
Browse files Browse the repository at this point in the history
  • Loading branch information
thanhminhmr committed May 21, 2023
1 parent b3ccc9a commit 53e0b1e
Show file tree
Hide file tree
Showing 15 changed files with 810 additions and 797 deletions.
1 change: 1 addition & 0 deletions .rustfmt.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
hard_tabs = true
38 changes: 19 additions & 19 deletions src/basic/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,33 +29,33 @@ pub type AnyResult<T> = Result<T, AnyError>;

#[derive(Debug)]
pub enum AnyError {
String(String),
Error(Box<dyn Error + Send>),
Box(Box<dyn Any + Send>),
String(String),
Error(Box<dyn Error + Send>),
Box(Box<dyn Any + Send>),
}

impl AnyError {
pub fn from_string<S: Into<String>>(into_string: S) -> Self {
Self::String(into_string.into())
}
pub fn from_string<S: Into<String>>(into_string: S) -> Self {
Self::String(into_string.into())
}

pub fn from_box(any: Box<dyn Any + Send>) -> Self {
Self::Box(any)
}
pub fn from_box(any: Box<dyn Any + Send>) -> Self {
Self::Box(any)
}
}

impl Display for AnyError {
fn fmt(&self, f: &mut Formatter) -> fmt::Result {
match self {
AnyError::String(value) => Display::fmt(value, f),
AnyError::Error(value) => Display::fmt(value, f),
AnyError::Box(value) => Debug::fmt(value, f),
}
}
fn fmt(&self, f: &mut Formatter) -> fmt::Result {
match self {
AnyError::String(value) => Display::fmt(value, f),
AnyError::Error(value) => Display::fmt(value, f),
AnyError::Box(value) => Debug::fmt(value, f),
}
}
}

impl<E: Error + Send + 'static> From<E> for AnyError {
fn from(e: E) -> Self {
Self::Error(Box::new(e))
}
fn from(e: E) -> Self {
Self::Error(Box::new(e))
}
}
278 changes: 139 additions & 139 deletions src/basic/pipe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,178 +29,178 @@ type ReaderToWriter<T, const SIZE: usize> = Buffer<T, SIZE>;
// -----------------------------------------------

pub fn pipe<T: Copy + Send + 'static, const SIZE: usize>(
value: T,
value: T,
) -> (PipedWriter<T, SIZE>, PipedReader<T, SIZE>) {
let (writer_sender, reader_receiver): (
SyncSender<WriterToReader<T, SIZE>>,
Receiver<WriterToReader<T, SIZE>>,
) = sync_channel(1);
let (reader_sender, writer_receiver): (
SyncSender<ReaderToWriter<T, SIZE>>,
Receiver<ReaderToWriter<T, SIZE>>,
) = sync_channel(1);
(
PipedWriter {
sender: writer_sender,
receiver: writer_receiver,
buffer: Some(Buffer::new(value)),
index: 0,
},
PipedReader {
sender: reader_sender,
receiver: reader_receiver,
buffer: Some(Buffer::new(value)),
index: 0,
length: 0,
},
)
let (writer_sender, reader_receiver): (
SyncSender<WriterToReader<T, SIZE>>,
Receiver<WriterToReader<T, SIZE>>,
) = sync_channel(1);
let (reader_sender, writer_receiver): (
SyncSender<ReaderToWriter<T, SIZE>>,
Receiver<ReaderToWriter<T, SIZE>>,
) = sync_channel(1);
(
PipedWriter {
sender: writer_sender,
receiver: writer_receiver,
buffer: Some(Buffer::new(value)),
index: 0,
},
PipedReader {
sender: reader_sender,
receiver: reader_receiver,
buffer: Some(Buffer::new(value)),
index: 0,
length: 0,
},
)
}

// -----------------------------------------------

pub struct PipedWriter<T: Copy + Send + 'static, const SIZE: usize> {
sender: SyncSender<WriterToReader<T, SIZE>>,
receiver: Receiver<ReaderToWriter<T, SIZE>>,
buffer: Option<Buffer<T, SIZE>>,
index: usize,
sender: SyncSender<WriterToReader<T, SIZE>>,
receiver: Receiver<ReaderToWriter<T, SIZE>>,
buffer: Option<Buffer<T, SIZE>>,
index: usize,
}

impl<T: Copy + Send + 'static, const SIZE: usize> PipedWriter<T, SIZE> {
// private sync
fn sync(&mut self) -> AnyResult<()> {
debug_assert!(self.buffer.is_some());
debug_assert!(self.index > 0 && self.index <= SIZE);
let buffer: Buffer<T, SIZE> = self.buffer.take().unwrap();
self.sender.send((buffer, self.index))?;
self.buffer = Some(self.receiver.recv()?);
self.index = 0;
Ok(())
}
// private sync
fn sync(&mut self) -> AnyResult<()> {
debug_assert!(self.buffer.is_some());
debug_assert!(self.index > 0 && self.index <= SIZE);
let buffer: Buffer<T, SIZE> = self.buffer.take().unwrap();
self.sender.send((buffer, self.index))?;
self.buffer = Some(self.receiver.recv()?);
self.index = 0;
Ok(())
}
}

impl<T: Copy + Send + 'static, const SIZE: usize> Writer<T> for PipedWriter<T, SIZE> {
fn write(&mut self, value: T) -> AnyResult<()> {
match &mut self.buffer {
None => Err(AnyError::from_string("Broken pipe!")),
Some(buffer) => {
debug_assert!(self.index < SIZE);
buffer[self.index] = value;
self.index += 1;
debug_assert!(self.index <= SIZE);
if self.index == SIZE {
self.sync()?;
}
debug_assert!(self.index < SIZE);
Ok(())
}
}
}
fn write(&mut self, value: T) -> AnyResult<()> {
match &mut self.buffer {
None => Err(AnyError::from_string("Broken pipe!")),
Some(buffer) => {
debug_assert!(self.index < SIZE);
buffer[self.index] = value;
self.index += 1;
debug_assert!(self.index <= SIZE);
if self.index == SIZE {
self.sync()?;
}
debug_assert!(self.index < SIZE);
Ok(())
}
}
}
}

impl<T: Copy + Send + 'static, const SIZE: usize> FromProducer<T> for PipedWriter<T, SIZE> {
fn produce<P: Producer<T>>(&mut self, producer: &mut P) -> AnyResult<usize> {
match &mut self.buffer {
None => Err(AnyError::from_string("Broken pipe!")),
Some(buffer) => {
debug_assert!(self.index < SIZE);
let sliced_buffer: &mut [T] = &mut buffer[self.index..SIZE];
let produced_length: usize = producer.produce(sliced_buffer)?;
debug_assert!(produced_length <= sliced_buffer.len());
self.index += produced_length;
debug_assert!(self.index <= SIZE);
if self.index == SIZE {
self.sync()?;
}
Ok(produced_length)
}
}
}
fn produce<P: Producer<T>>(&mut self, producer: &mut P) -> AnyResult<usize> {
match &mut self.buffer {
None => Err(AnyError::from_string("Broken pipe!")),
Some(buffer) => {
debug_assert!(self.index < SIZE);
let sliced_buffer: &mut [T] = &mut buffer[self.index..SIZE];
let produced_length: usize = producer.produce(sliced_buffer)?;
debug_assert!(produced_length <= sliced_buffer.len());
self.index += produced_length;
debug_assert!(self.index <= SIZE);
if self.index == SIZE {
self.sync()?;
}
Ok(produced_length)
}
}
}
}

impl<T: Copy + Send + 'static, const SIZE: usize> Closable<()> for PipedWriter<T, SIZE> {
fn close(mut self) -> AnyResult<()> {
if self.buffer.is_some() && self.index > 0 {
debug_assert!(self.index <= SIZE);
self.sync()
} else {
Ok(())
}
}
fn close(mut self) -> AnyResult<()> {
if self.buffer.is_some() && self.index > 0 {
debug_assert!(self.index <= SIZE);
self.sync()
} else {
Ok(())
}
}
}

// -----------------------------------------------

pub struct PipedReader<T: Copy + Send + 'static, const SIZE: usize> {
sender: SyncSender<ReaderToWriter<T, SIZE>>,
receiver: Receiver<WriterToReader<T, SIZE>>,
buffer: Option<Buffer<T, SIZE>>,
length: usize,
index: usize,
sender: SyncSender<ReaderToWriter<T, SIZE>>,
receiver: Receiver<WriterToReader<T, SIZE>>,
buffer: Option<Buffer<T, SIZE>>,
length: usize,
index: usize,
}

impl<T: Copy + Send + 'static, const SIZE: usize> PipedReader<T, SIZE> {
fn sync(&mut self) {
debug_assert!(self.index <= self.length && self.length <= SIZE);
if !self.buffer.is_none() && self.index >= self.length {
// take the old buffer and set it to None
let old_buffer: Buffer<T, SIZE> = self.buffer.take().unwrap();
// receive the new buffer
if let Ok((new_buffer, length)) = self.receiver.recv() {
debug_assert!(length > 0 && length <= SIZE);
// set the new buffer and its length
self.buffer = Some(new_buffer);
self.length = length;
self.index = 0;
// send the old buffer away, maybe print something to log if error?
let _error_ignored_ = self.sender.send(old_buffer);
}
}
}
fn sync(&mut self) {
debug_assert!(self.index <= self.length && self.length <= SIZE);
if !self.buffer.is_none() && self.index >= self.length {
// take the old buffer and set it to None
let old_buffer: Buffer<T, SIZE> = self.buffer.take().unwrap();
// receive the new buffer
if let Ok((new_buffer, length)) = self.receiver.recv() {
debug_assert!(length > 0 && length <= SIZE);
// set the new buffer and its length
self.buffer = Some(new_buffer);
self.length = length;
self.index = 0;
// send the old buffer away, maybe print something to log if error?
let _error_ignored_ = self.sender.send(old_buffer);
}
}
}
}

impl<T: Copy + Send + 'static, const SIZE: usize> Reader<T> for PipedReader<T, SIZE> {
fn read(&mut self) -> AnyResult<Option<T>> {
debug_assert!(self.index <= self.length && self.length <= SIZE);
self.sync();
match &mut self.buffer {
None => Ok(None),
Some(buffer) => {
debug_assert!(self.index < self.length && self.length <= SIZE);
let value: T = buffer[self.index];
self.index += 1;
debug_assert!(self.index <= self.length);
Ok(Some(value))
}
}
}
fn read(&mut self) -> AnyResult<Option<T>> {
debug_assert!(self.index <= self.length && self.length <= SIZE);
self.sync();
match &mut self.buffer {
None => Ok(None),
Some(buffer) => {
debug_assert!(self.index < self.length && self.length <= SIZE);
let value: T = buffer[self.index];
self.index += 1;
debug_assert!(self.index <= self.length);
Ok(Some(value))
}
}
}
}

impl<T: Copy + Send + 'static, const SIZE: usize> ToConsumer<T> for PipedReader<T, SIZE> {
fn consume<C: Consumer<T>>(&mut self, consumer: &mut C) -> AnyResult<usize> {
debug_assert!(self.index <= self.length && self.length <= SIZE);
self.sync();
match &mut self.buffer {
None => Ok(0),
Some(buffer) => {
debug_assert!(self.index < self.length && self.length <= SIZE);
let sliced_buffer: &[T] = &buffer[self.index..self.length];
let consumed_length: usize = consumer.consume(sliced_buffer)?;
if consumed_length <= sliced_buffer.len() {
self.index += consumed_length;
debug_assert!(self.index <= SIZE);
Ok(consumed_length)
} else {
Err(AnyError::from_string(
"Consumed length is greater than available lenght!",
))
}
}
}
}
fn consume<C: Consumer<T>>(&mut self, consumer: &mut C) -> AnyResult<usize> {
debug_assert!(self.index <= self.length && self.length <= SIZE);
self.sync();
match &mut self.buffer {
None => Ok(0),
Some(buffer) => {
debug_assert!(self.index < self.length && self.length <= SIZE);
let sliced_buffer: &[T] = &buffer[self.index..self.length];
let consumed_length: usize = consumer.consume(sliced_buffer)?;
if consumed_length <= sliced_buffer.len() {
self.index += consumed_length;
debug_assert!(self.index <= SIZE);
Ok(consumed_length)
} else {
Err(AnyError::from_string(
"Consumed length is greater than available lenght!",
))
}
}
}
}
}

impl<T: Copy + Send + 'static, const SIZE: usize> Closable<()> for PipedReader<T, SIZE> {
fn close(self) -> AnyResult<()> {
Ok(())
}
fn close(self) -> AnyResult<()> {
Ok(())
}
}

0 comments on commit 53e0b1e

Please sign in to comment.