From 85a26288be70611093c81389755f138e51372d72 Mon Sep 17 00:00:00 2001 From: Ruediger Klaehn Date: Fri, 30 May 2025 09:53:24 +0300 Subject: [PATCH] Add utils for writing after all... --- src/util.rs | 62 ++++++++++++++++++++++++++++++++++++++++++++++++++--- 1 file changed, 59 insertions(+), 3 deletions(-) diff --git a/src/util.rs b/src/util.rs index 189a4e7..5d04877 100644 --- a/src/util.rs +++ b/src/util.rs @@ -174,8 +174,9 @@ mod varint_util { io::{self, Error}, }; - use serde::Serialize; - use tokio::io::{AsyncRead, AsyncReadExt}; + use serde::{de::DeserializeOwned, Serialize}; + use smallvec::SmallVec; + use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt}; /// Reads a u64 varint from an AsyncRead source, using the Postcard/LEB128 format. /// @@ -291,12 +292,38 @@ mod varint_util { /// /// If the stream is at the end, this returns `Ok(None)`. fn read_varint_u64(&mut self) -> impl Future>>; + + fn read_length_prefixed( + &mut self, + max_size: usize, + ) -> impl Future>; } impl AsyncReadVarintExt for T { fn read_varint_u64(&mut self) -> impl Future>> { read_varint_u64(self) } + + async fn read_length_prefixed( + &mut self, + max_size: usize, + ) -> io::Result { + let size = match self.read_varint_u64().await? { + Some(size) => size, + None => return Err(io::Error::new(io::ErrorKind::UnexpectedEof, "EOF reached")), + }; + + if size > max_size as u64 { + return Err(io::Error::new( + io::ErrorKind::InvalidData, + "Length-prefixed value too large", + )); + } + + let mut buf = vec![0; size as usize]; + self.read_exact(&mut buf).await?; + postcard::from_bytes(&buf).map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e)) + } } /// Provides a fn to write a varint to an [`io::Write`] target, as well as a @@ -318,9 +345,38 @@ mod varint_util { write_length_prefixed(self, value) } } + + /// Provides a fn to write a varint to an [`io::Write`] target, as well as a + /// helper to write a length-prefixed value. + pub trait AsyncWriteVarintExt: AsyncWrite + Unpin { + /// Write a varint + fn write_varint_u64(&mut self, value: u64) -> impl Future>; + /// Write a value with a varint enoded length prefix. + fn write_length_prefixed( + &mut self, + value: T, + ) -> impl Future>; + } + + impl AsyncWriteVarintExt for T { + async fn write_varint_u64(&mut self, value: u64) -> io::Result { + let mut buf: SmallVec<[u8; 10]> = Default::default(); + write_varint_u64_sync(&mut buf, value).unwrap(); + self.write_all(&buf[..]).await?; + Ok(buf.len()) + } + + async fn write_length_prefixed(&mut self, value: V) -> io::Result { + let mut buf = Vec::new(); + write_length_prefixed(&mut buf, value)?; + let size = buf.len(); + self.write_all(&buf).await?; + Ok(size) + } + } } #[cfg(feature = "rpc")] -pub use varint_util::{AsyncReadVarintExt, WriteVarintExt}; +pub use varint_util::{AsyncReadVarintExt, AsyncWriteVarintExt, WriteVarintExt}; mod fuse_wrapper { use std::{