From 52bcaece8cc9ebc20296d705fae67b594b9caad7 Mon Sep 17 00:00:00 2001 From: dignifiedquire Date: Fri, 19 Jan 2024 13:35:57 +0100 Subject: [PATCH 1/5] feat(iroh): add blob.read_at method Allows to efficiently read parts of a blob via RPC --- iroh/src/client.rs | 154 +++++++++++++++++++++++++++++++++++---- iroh/src/node.rs | 92 +++++++++++++++++++++-- iroh/src/rpc_protocol.rs | 38 ++++++++++ 3 files changed, 261 insertions(+), 23 deletions(-) diff --git a/iroh/src/client.rs b/iroh/src/client.rs index de22cdd516..fb558ece9d 100644 --- a/iroh/src/client.rs +++ b/iroh/src/client.rs @@ -37,16 +37,17 @@ use crate::rpc_protocol::{ AuthorCreateRequest, AuthorListRequest, BlobAddPathRequest, BlobAddStreamRequest, BlobAddStreamUpdate, BlobDeleteBlobRequest, BlobDownloadRequest, BlobListCollectionsRequest, BlobListCollectionsResponse, BlobListIncompleteRequest, BlobListIncompleteResponse, - BlobListRequest, BlobListResponse, BlobReadRequest, BlobReadResponse, BlobValidateRequest, - CounterStats, CreateCollectionRequest, CreateCollectionResponse, DeleteTagRequest, - DocCloseRequest, DocCreateRequest, DocDelRequest, DocDelResponse, DocDropRequest, - DocExportFileRequest, DocExportProgress, DocGetDownloadPolicyRequest, DocGetExactRequest, - DocGetManyRequest, DocImportFileRequest, DocImportProgress, DocImportRequest, DocLeaveRequest, - DocListRequest, DocOpenRequest, DocSetDownloadPolicyRequest, DocSetHashRequest, DocSetRequest, - DocShareRequest, DocStartSyncRequest, DocStatusRequest, DocSubscribeRequest, DocTicket, - DownloadProgress, ListTagsRequest, ListTagsResponse, NodeConnectionInfoRequest, - NodeConnectionInfoResponse, NodeConnectionsRequest, NodeShutdownRequest, NodeStatsRequest, - NodeStatusRequest, NodeStatusResponse, ProviderService, SetTagOption, ShareMode, WrapOption, + BlobListRequest, BlobListResponse, BlobReadAtRequest, BlobReadAtResponse, BlobReadRequest, + BlobReadResponse, BlobValidateRequest, CounterStats, CreateCollectionRequest, + CreateCollectionResponse, DeleteTagRequest, DocCloseRequest, DocCreateRequest, DocDelRequest, + DocDelResponse, DocDropRequest, DocExportFileRequest, DocExportProgress, + DocGetDownloadPolicyRequest, DocGetExactRequest, DocGetManyRequest, DocImportFileRequest, + DocImportProgress, DocImportRequest, DocLeaveRequest, DocListRequest, DocOpenRequest, + DocSetDownloadPolicyRequest, DocSetHashRequest, DocSetRequest, DocShareRequest, + DocStartSyncRequest, DocStatusRequest, DocSubscribeRequest, DocTicket, DownloadProgress, + ListTagsRequest, ListTagsResponse, NodeConnectionInfoRequest, NodeConnectionInfoResponse, + NodeConnectionsRequest, NodeShutdownRequest, NodeStatsRequest, NodeStatusRequest, + NodeStatusResponse, ProviderService, SetTagOption, ShareMode, WrapOption, }; use crate::sync_engine::SyncEvent; @@ -240,7 +241,12 @@ where /// /// Returns a [`BlobReader`], which can report the size of the blob before reading it. pub async fn read(&self, hash: Hash) -> Result { - BlobReader::from_rpc(&self.rpc, hash).await + BlobReader::from_rpc_read(&self.rpc, hash).await + } + + /// Read offset + len from a single blob. + pub async fn read_at(&self, hash: Hash, offset: u64, len: usize) -> Result { + BlobReader::from_rpc_read_at(&self.rpc, hash, offset, len).await } /// Read all bytes of single blob. @@ -249,7 +255,17 @@ where /// reading is small. If not sure, use [`Self::read`] and check the size with /// [`BlobReader::size`] before calling [`BlobReader::read_to_bytes`]. pub async fn read_to_bytes(&self, hash: Hash) -> Result { - BlobReader::from_rpc(&self.rpc, hash) + BlobReader::from_rpc_read(&self.rpc, hash) + .await? + .read_to_bytes() + .await + } + + /// Read all bytes of single blob at `offset` for length `len`. + /// + /// This allocates a buffer for the full length. + pub async fn read_at_to_bytes(&self, hash: Hash, offset: u64, len: usize) -> Result { + BlobReader::from_rpc_read_at(&self.rpc, hash, offset, len) .await? .read_to_bytes() .await @@ -498,7 +514,7 @@ impl BlobReader { } } - async fn from_rpc>( + async fn from_rpc_read>( rpc: &RpcClient, hash: Hash, ) -> anyhow::Result { @@ -519,6 +535,31 @@ impl BlobReader { Ok(Self::new(size, is_complete, stream.boxed())) } + async fn from_rpc_read_at>( + rpc: &RpcClient, + hash: Hash, + offset: u64, + len: usize, + ) -> anyhow::Result { + let stream = rpc + .server_streaming(BlobReadAtRequest { hash, offset, len }) + .await?; + let mut stream = flatten(stream); + + let (size, is_complete) = match stream.next().await { + Some(Ok(BlobReadAtResponse::Entry { size, is_complete })) => (size, is_complete), + Some(Err(err)) => return Err(err), + None | Some(Ok(_)) => return Err(anyhow!("Expected header frame")), + }; + + let stream = stream.map(|item| match item { + Ok(BlobReadAtResponse::Data { chunk }) => Ok(chunk), + Ok(_) => Err(io::Error::new(io::ErrorKind::Other, "Expected data frame")), + Err(err) => Err(io::Error::new(io::ErrorKind::Other, format!("{err}"))), + }); + Ok(Self::new(size, is_complete, stream.boxed())) + } + /// Total size of this blob. pub fn size(&self) -> u64 { self.size @@ -908,7 +949,7 @@ impl Entry { where C: ServiceConnection, { - BlobReader::from_rpc(client.into(), self.content_hash()).await + BlobReader::from_rpc_read(client.into(), self.content_hash()).await } /// Read all content of an [`Entry`] into a buffer. @@ -921,7 +962,7 @@ impl Entry { where C: ServiceConnection, { - BlobReader::from_rpc(client.into(), self.content_hash()) + BlobReader::from_rpc_read(client.into(), self.content_hash()) .await? .read_to_bytes() .await @@ -1321,4 +1362,87 @@ mod tests { Ok(()) } + + #[tokio::test] + async fn test_blob_read_at() -> Result<()> { + // let _guard = iroh_test::logging::setup(); + + let doc_store = iroh_sync::store::memory::Store::default(); + let db = iroh_bytes::store::mem::Store::new(); + let node = crate::node::Node::builder(db, doc_store).spawn().await?; + + // create temp file + let temp_dir = tempfile::tempdir().context("tempdir")?; + + let in_root = temp_dir.path().join("in"); + tokio::fs::create_dir_all(in_root.clone()) + .await + .context("create dir all")?; + + let path = in_root.join("test-blob"); + let size = 1024 * 128; + let buf: Vec = (0..size).map(|i| i as u8).collect(); + let mut file = tokio::fs::File::create(path.clone()) + .await + .context("create file")?; + file.write_all(&buf.clone()).await.context("write_all")?; + file.flush().await.context("flush")?; + + let client = node.client(); + + let import_outcome = client + .blobs + .add_from_path( + path.to_path_buf(), + false, + SetTagOption::Auto, + WrapOption::NoWrap, + ) + .await + .context("import file")? + .finish() + .await + .context("import finish")?; + + let hash = import_outcome.hash; + + // Read everything + let res = client.blobs.read_to_bytes(hash).await?; + assert_eq!(&res, &buf[..]); + + // Read at smaller than blob_get_chunk_size + let res = client.blobs.read_at_to_bytes(hash, 0, 100).await?; + assert_eq!(res.len(), 100); + assert_eq!(&res[..], &buf[0..100]); + + let res = client.blobs.read_at_to_bytes(hash, 20, 120).await?; + assert_eq!(res.len(), 120); + assert_eq!(&res[..], &buf[20..140]); + + // Read at equal to blob_get_chunk_size + let res = client.blobs.read_at_to_bytes(hash, 0, 1024 * 64).await?; + assert_eq!(res.len(), 1024 * 64); + assert_eq!(&res[..], &buf[0..1024 * 64]); + + let res = client.blobs.read_at_to_bytes(hash, 20, 1024 * 64).await?; + assert_eq!(res.len(), 1024 * 64); + assert_eq!(&res[..], &buf[20..(20 + 1024 * 64)]); + + // Read at larger than blob_get_chunk_size + let res = client + .blobs + .read_at_to_bytes(hash, 0, 10 + 1024 * 64) + .await?; + assert_eq!(res.len(), 10 + 1024 * 64); + assert_eq!(&res[..], &buf[0..(10 + 1024 * 64)]); + + let res = client + .blobs + .read_at_to_bytes(hash, 20, 10 + 1024 * 64) + .await?; + assert_eq!(res.len(), 10 + 1024 * 64); + assert_eq!(&res[..], &buf[20..(20 + 10 + 1024 * 64)]); + + Ok(()) + } } diff --git a/iroh/src/node.rs b/iroh/src/node.rs index b6ffe671ff..e5ada73249 100644 --- a/iroh/src/node.rs +++ b/iroh/src/node.rs @@ -56,14 +56,15 @@ use crate::rpc_protocol::{ BlobAddPathRequest, BlobAddPathResponse, BlobAddStreamRequest, BlobAddStreamResponse, BlobAddStreamUpdate, BlobDeleteBlobRequest, BlobDownloadRequest, BlobListCollectionsRequest, BlobListCollectionsResponse, BlobListIncompleteRequest, BlobListIncompleteResponse, - BlobListRequest, BlobListResponse, BlobReadRequest, BlobReadResponse, BlobValidateRequest, - CreateCollectionRequest, CreateCollectionResponse, DeleteTagRequest, DocExportFileRequest, - DocExportFileResponse, DocExportProgress, DocImportFileRequest, DocImportFileResponse, - DocImportProgress, DocSetHashRequest, DownloadLocation, ListTagsRequest, ListTagsResponse, - NodeConnectionInfoRequest, NodeConnectionInfoResponse, NodeConnectionsRequest, - NodeConnectionsResponse, NodeShutdownRequest, NodeStatsRequest, NodeStatsResponse, - NodeStatusRequest, NodeStatusResponse, NodeWatchRequest, NodeWatchResponse, ProviderRequest, - ProviderResponse, ProviderService, SetTagOption, + BlobListRequest, BlobListResponse, BlobReadAtRequest, BlobReadAtResponse, BlobReadRequest, + BlobReadResponse, BlobValidateRequest, CreateCollectionRequest, CreateCollectionResponse, + DeleteTagRequest, DocExportFileRequest, DocExportFileResponse, DocExportProgress, + DocImportFileRequest, DocImportFileResponse, DocImportProgress, DocSetHashRequest, + DownloadLocation, ListTagsRequest, ListTagsResponse, NodeConnectionInfoRequest, + NodeConnectionInfoResponse, NodeConnectionsRequest, NodeConnectionsResponse, + NodeShutdownRequest, NodeStatsRequest, NodeStatsResponse, NodeStatusRequest, + NodeStatusResponse, NodeWatchRequest, NodeWatchResponse, ProviderRequest, ProviderResponse, + ProviderService, SetTagOption, }; use crate::sync_engine::{SyncEngine, SYNC_ALPN}; use crate::ticket::BlobTicket; @@ -1480,6 +1481,77 @@ impl RpcHandler { rx.into_stream() } + fn blob_read_at( + self, + req: BlobReadAtRequest, + ) -> impl Stream> + Send + 'static { + let (tx, rx) = flume::bounded(RPC_BLOB_GET_CHANNEL_CAP); + let entry = self.inner.db.get(&req.hash); + self.inner.rt.spawn_pinned(move || async move { + if let Err(err) = read_loop( + req.offset, + req.len, + entry, + tx.clone(), + RPC_BLOB_GET_CHUNK_SIZE, + ) + .await + { + tx.send_async(RpcResult::Err(err.into())).await.ok(); + } + }); + + async fn read_loop( + offset: u64, + len: usize, + entry: Option>, + tx: flume::Sender>, + max_chunk_size: usize, + ) -> anyhow::Result<()> { + let entry = entry.ok_or_else(|| anyhow!("Blob not found"))?; + let size = entry.size(); + tx.send_async(Ok(BlobReadAtResponse::Entry { + size, + is_complete: entry.is_complete(), + })) + .await?; + let mut reader = entry.data_reader().await?; + + let (num_chunks, chunk_size) = if len <= max_chunk_size { + (1, len) + } else { + ( + (len as f64 / max_chunk_size as f64).ceil() as usize, + max_chunk_size, + ) + }; + + let mut read = 0u64; + for i in 0..num_chunks { + let chunk_size = if i == num_chunks - 1 { + // last chunk might be smaller + len - read as usize + } else { + chunk_size + }; + let chunk = reader.read_at(offset + read, chunk_size).await?; + let chunk_len = chunk.len(); + if !chunk.is_empty() { + tx.send_async(Ok(BlobReadAtResponse::Data { chunk })) + .await?; + } + if chunk_len < chunk_size { + break; + } else { + read += chunk_len as u64; + } + } + Ok(()) + } + + rx.into_stream() + } + fn node_connections( self, _: NodeConnectionsRequest, @@ -1605,6 +1677,10 @@ fn handle_rpc_request>( chan.server_streaming(msg, handler, RpcHandler::blob_read) .await } + BlobReadAt(msg) => { + chan.server_streaming(msg, handler, RpcHandler::blob_read_at) + .await + } BlobAddStream(msg) => { chan.bidi_streaming(msg, handler, RpcHandler::blob_add_stream) .await diff --git a/iroh/src/rpc_protocol.rs b/iroh/src/rpc_protocol.rs index 8464b94dc5..ce767da3dd 100644 --- a/iroh/src/rpc_protocol.rs +++ b/iroh/src/rpc_protocol.rs @@ -967,6 +967,42 @@ pub enum BlobReadResponse { }, } +/// Get the bytes for a hash +#[derive(Serialize, Deserialize, Debug)] +pub struct BlobReadAtRequest { + /// Hash to get bytes for + pub hash: Hash, + /// Offset to start reading at + pub offset: u64, + /// Lenghth of the data to get + pub len: usize, +} + +impl Msg for BlobReadAtRequest { + type Pattern = ServerStreaming; +} + +impl ServerStreamingMsg for BlobReadAtRequest { + type Response = RpcResult; +} + +/// Response to [`BlobReadAtRequest`] +#[derive(Serialize, Deserialize, Debug)] +pub enum BlobReadAtResponse { + /// The entry header. + Entry { + /// The size of the blob + size: u64, + /// Wether the blob is complete + is_complete: bool, + }, + /// Chunks of entry data. + Data { + /// The data chunk + chunk: Bytes, + }, +} + /// Write a blob from a byte stream #[derive(Serialize, Deserialize, Debug)] pub struct BlobAddStreamRequest { @@ -1036,6 +1072,7 @@ pub enum ProviderRequest { NodeWatch(NodeWatchRequest), BlobRead(BlobReadRequest), + BlobReadAt(BlobReadAtRequest), BlobAddStream(BlobAddStreamRequest), BlobAddStreamUpdate(BlobAddStreamUpdate), BlobAddPath(BlobAddPathRequest), @@ -1088,6 +1125,7 @@ pub enum ProviderResponse { NodeWatch(NodeWatchResponse), BlobRead(RpcResult), + BlobReadAt(RpcResult), BlobAddStream(BlobAddStreamResponse), BlobAddPath(BlobAddPathResponse), BlobDownload(DownloadProgress), From 73592a3732a28285c575dc7275d7bcbf02a6563b Mon Sep 17 00:00:00 2001 From: dignifiedquire Date: Fri, 19 Jan 2024 14:05:27 +0100 Subject: [PATCH 2/5] make len optional --- iroh/src/client.rs | 57 +++++++++++++++++++++++++++++++--------- iroh/src/node.rs | 4 ++- iroh/src/rpc_protocol.rs | 2 +- 3 files changed, 48 insertions(+), 15 deletions(-) diff --git a/iroh/src/client.rs b/iroh/src/client.rs index fb558ece9d..5c87be1c05 100644 --- a/iroh/src/client.rs +++ b/iroh/src/client.rs @@ -245,7 +245,9 @@ where } /// Read offset + len from a single blob. - pub async fn read_at(&self, hash: Hash, offset: u64, len: usize) -> Result { + /// + /// If `len` is `None` it will read the full blob. + pub async fn read_at(&self, hash: Hash, offset: u64, len: Option) -> Result { BlobReader::from_rpc_read_at(&self.rpc, hash, offset, len).await } @@ -264,7 +266,12 @@ where /// Read all bytes of single blob at `offset` for length `len`. /// /// This allocates a buffer for the full length. - pub async fn read_at_to_bytes(&self, hash: Hash, offset: u64, len: usize) -> Result { + pub async fn read_at_to_bytes( + &self, + hash: Hash, + offset: u64, + len: Option, + ) -> Result { BlobReader::from_rpc_read_at(&self.rpc, hash, offset, len) .await? .read_to_bytes() @@ -501,14 +508,21 @@ impl Stream for BlobAddProgress { #[derive(derive_more::Debug)] pub struct BlobReader { size: u64, + response_size: u64, is_complete: bool, #[debug("StreamReader")] stream: tokio_util::io::StreamReader>, Bytes>, } impl BlobReader { - fn new(size: u64, is_complete: bool, stream: BoxStream<'static, io::Result>) -> Self { + fn new( + size: u64, + response_size: u64, + is_complete: bool, + stream: BoxStream<'static, io::Result>, + ) -> Self { Self { size, + response_size, is_complete, stream: StreamReader::new(stream), } @@ -532,14 +546,14 @@ impl BlobReader { Ok(_) => Err(io::Error::new(io::ErrorKind::Other, "Expected data frame")), Err(err) => Err(io::Error::new(io::ErrorKind::Other, format!("{err}"))), }); - Ok(Self::new(size, is_complete, stream.boxed())) + Ok(Self::new(size, size, is_complete, stream.boxed())) } async fn from_rpc_read_at>( rpc: &RpcClient, hash: Hash, offset: u64, - len: usize, + len: Option, ) -> anyhow::Result { let stream = rpc .server_streaming(BlobReadAtRequest { hash, offset, len }) @@ -557,7 +571,8 @@ impl BlobReader { Ok(_) => Err(io::Error::new(io::ErrorKind::Other, "Expected data frame")), Err(err) => Err(io::Error::new(io::ErrorKind::Other, format!("{err}"))), }); - Ok(Self::new(size, is_complete, stream.boxed())) + let len = len.map(|l| l as u64).unwrap_or_else(|| size - offset); + Ok(Self::new(size, len, is_complete, stream.boxed())) } /// Total size of this blob. @@ -574,7 +589,7 @@ impl BlobReader { /// Read all bytes of the blob. pub async fn read_to_bytes(&mut self) -> anyhow::Result { - let mut buf = Vec::with_capacity(self.size() as usize); + let mut buf = Vec::with_capacity(self.response_size as usize); self.read_to_end(&mut buf).await?; Ok(buf.into()) } @@ -1411,38 +1426,54 @@ mod tests { assert_eq!(&res, &buf[..]); // Read at smaller than blob_get_chunk_size - let res = client.blobs.read_at_to_bytes(hash, 0, 100).await?; + let res = client.blobs.read_at_to_bytes(hash, 0, Some(100)).await?; assert_eq!(res.len(), 100); assert_eq!(&res[..], &buf[0..100]); - let res = client.blobs.read_at_to_bytes(hash, 20, 120).await?; + let res = client.blobs.read_at_to_bytes(hash, 20, Some(120)).await?; assert_eq!(res.len(), 120); assert_eq!(&res[..], &buf[20..140]); // Read at equal to blob_get_chunk_size - let res = client.blobs.read_at_to_bytes(hash, 0, 1024 * 64).await?; + let res = client + .blobs + .read_at_to_bytes(hash, 0, Some(1024 * 64)) + .await?; assert_eq!(res.len(), 1024 * 64); assert_eq!(&res[..], &buf[0..1024 * 64]); - let res = client.blobs.read_at_to_bytes(hash, 20, 1024 * 64).await?; + let res = client + .blobs + .read_at_to_bytes(hash, 20, Some(1024 * 64)) + .await?; assert_eq!(res.len(), 1024 * 64); assert_eq!(&res[..], &buf[20..(20 + 1024 * 64)]); // Read at larger than blob_get_chunk_size let res = client .blobs - .read_at_to_bytes(hash, 0, 10 + 1024 * 64) + .read_at_to_bytes(hash, 0, Some(10 + 1024 * 64)) .await?; assert_eq!(res.len(), 10 + 1024 * 64); assert_eq!(&res[..], &buf[0..(10 + 1024 * 64)]); let res = client .blobs - .read_at_to_bytes(hash, 20, 10 + 1024 * 64) + .read_at_to_bytes(hash, 20, Some(10 + 1024 * 64)) .await?; assert_eq!(res.len(), 10 + 1024 * 64); assert_eq!(&res[..], &buf[20..(20 + 10 + 1024 * 64)]); + // full length + let res = client.blobs.read_at_to_bytes(hash, 20, None).await?; + assert_eq!(res.len(), 1024 * 128 - 20); + assert_eq!(&res[..], &buf[20..]); + + // size should be total + let reader = client.blobs.read_at(hash, 0, Some(20)).await?; + assert_eq!(reader.size(), 1024 * 128); + assert_eq!(reader.response_size, 20); + Ok(()) } } diff --git a/iroh/src/node.rs b/iroh/src/node.rs index e5ada73249..7bf9bb5351 100644 --- a/iroh/src/node.rs +++ b/iroh/src/node.rs @@ -1503,7 +1503,7 @@ impl RpcHandler { async fn read_loop( offset: u64, - len: usize, + len: Option, entry: Option>, tx: flume::Sender>, max_chunk_size: usize, @@ -1517,6 +1517,8 @@ impl RpcHandler { .await?; let mut reader = entry.data_reader().await?; + let len = len.unwrap_or_else(|| (size - offset) as usize); + let (num_chunks, chunk_size) = if len <= max_chunk_size { (1, len) } else { diff --git a/iroh/src/rpc_protocol.rs b/iroh/src/rpc_protocol.rs index ce767da3dd..bbccd68141 100644 --- a/iroh/src/rpc_protocol.rs +++ b/iroh/src/rpc_protocol.rs @@ -975,7 +975,7 @@ pub struct BlobReadAtRequest { /// Offset to start reading at pub offset: u64, /// Lenghth of the data to get - pub len: usize, + pub len: Option, } impl Msg for BlobReadAtRequest { From c3abd8fd86fd31e18d183c1ba36bab3002e97553 Mon Sep 17 00:00:00 2001 From: dignifiedquire Date: Fri, 19 Jan 2024 14:56:51 +0100 Subject: [PATCH 3/5] feat(iroh): add blobs.get_collection allows fetching a collection via rpc --- iroh/src/client.rs | 103 ++++++++++++++++++++++++++++++++++----- iroh/src/node.rs | 41 +++++++++++----- iroh/src/rpc_protocol.rs | 20 ++++++++ 3 files changed, 140 insertions(+), 24 deletions(-) diff --git a/iroh/src/client.rs b/iroh/src/client.rs index 5c87be1c05..6141c79f21 100644 --- a/iroh/src/client.rs +++ b/iroh/src/client.rs @@ -35,19 +35,19 @@ use tracing::warn; use crate::rpc_protocol::{ AuthorCreateRequest, AuthorListRequest, BlobAddPathRequest, BlobAddStreamRequest, - BlobAddStreamUpdate, BlobDeleteBlobRequest, BlobDownloadRequest, BlobListCollectionsRequest, - BlobListCollectionsResponse, BlobListIncompleteRequest, BlobListIncompleteResponse, - BlobListRequest, BlobListResponse, BlobReadAtRequest, BlobReadAtResponse, BlobReadRequest, - BlobReadResponse, BlobValidateRequest, CounterStats, CreateCollectionRequest, - CreateCollectionResponse, DeleteTagRequest, DocCloseRequest, DocCreateRequest, DocDelRequest, - DocDelResponse, DocDropRequest, DocExportFileRequest, DocExportProgress, - DocGetDownloadPolicyRequest, DocGetExactRequest, DocGetManyRequest, DocImportFileRequest, - DocImportProgress, DocImportRequest, DocLeaveRequest, DocListRequest, DocOpenRequest, - DocSetDownloadPolicyRequest, DocSetHashRequest, DocSetRequest, DocShareRequest, - DocStartSyncRequest, DocStatusRequest, DocSubscribeRequest, DocTicket, DownloadProgress, - ListTagsRequest, ListTagsResponse, NodeConnectionInfoRequest, NodeConnectionInfoResponse, - NodeConnectionsRequest, NodeShutdownRequest, NodeStatsRequest, NodeStatusRequest, - NodeStatusResponse, ProviderService, SetTagOption, ShareMode, WrapOption, + BlobAddStreamUpdate, BlobDeleteBlobRequest, BlobDownloadRequest, BlobGetCollectionRequest, + BlobGetCollectionResponse, BlobListCollectionsRequest, BlobListCollectionsResponse, + BlobListIncompleteRequest, BlobListIncompleteResponse, BlobListRequest, BlobListResponse, + BlobReadAtRequest, BlobReadAtResponse, BlobReadRequest, BlobReadResponse, BlobValidateRequest, + CounterStats, CreateCollectionRequest, CreateCollectionResponse, DeleteTagRequest, + DocCloseRequest, DocCreateRequest, DocDelRequest, DocDelResponse, DocDropRequest, + DocExportFileRequest, DocExportProgress, DocGetDownloadPolicyRequest, DocGetExactRequest, + DocGetManyRequest, DocImportFileRequest, DocImportProgress, DocImportRequest, DocLeaveRequest, + DocListRequest, DocOpenRequest, DocSetDownloadPolicyRequest, DocSetHashRequest, DocSetRequest, + DocShareRequest, DocStartSyncRequest, DocStatusRequest, DocSubscribeRequest, DocTicket, + DownloadProgress, ListTagsRequest, ListTagsResponse, NodeConnectionInfoRequest, + NodeConnectionInfoResponse, NodeConnectionsRequest, NodeShutdownRequest, NodeStatsRequest, + NodeStatusRequest, NodeStatusResponse, ProviderService, SetTagOption, ShareMode, WrapOption, }; use crate::sync_engine::SyncEvent; @@ -410,6 +410,13 @@ where Ok(stream.map_err(anyhow::Error::from)) } + /// Read the content of a collection. + pub async fn get_collection(&self, hash: Hash) -> Result { + let BlobGetCollectionResponse { collection } = + self.rpc.rpc(BlobGetCollectionRequest { hash }).await??; + Ok(collection) + } + /// List all collections. pub async fn list_collections( &self, @@ -1476,4 +1483,74 @@ mod tests { Ok(()) } + + #[tokio::test] + async fn test_blob_get_collection() -> Result<()> { + let _guard = iroh_test::logging::setup(); + + let doc_store = iroh_sync::store::memory::Store::default(); + let db = iroh_bytes::store::mem::Store::new(); + let node = crate::node::Node::builder(db, doc_store).spawn().await?; + + // create temp file + let temp_dir = tempfile::tempdir().context("tempdir")?; + + let in_root = temp_dir.path().join("in"); + tokio::fs::create_dir_all(in_root.clone()) + .await + .context("create dir all")?; + + let mut paths = Vec::new(); + for i in 0..5 { + let path = in_root.join(format!("test-{i}")); + let size = 100; + let mut buf = vec![0u8; size]; + rand::thread_rng().fill_bytes(&mut buf); + let mut file = tokio::fs::File::create(path.clone()) + .await + .context("create file")?; + file.write_all(&buf.clone()).await.context("write_all")?; + file.flush().await.context("flush")?; + paths.push(path); + } + + let client = node.client(); + + let mut collection = Collection::default(); + let mut tags = Vec::new(); + // import files + for path in &paths { + let import_outcome = client + .blobs + .add_from_path( + path.to_path_buf(), + false, + SetTagOption::Auto, + WrapOption::NoWrap, + ) + .await + .context("import file")? + .finish() + .await + .context("import finish")?; + + collection.push( + path.file_name().unwrap().to_str().unwrap().to_string(), + import_outcome.hash, + ); + tags.push(import_outcome.tag); + } + + let (hash, _tag) = client + .blobs + .create_collection(collection, SetTagOption::Auto, tags) + .await?; + + let collection = client.blobs.get_collection(hash).await?; + + // 5 blobs + assert_eq!(collection.len(), 5); + + Ok(()) + } } diff --git a/iroh/src/node.rs b/iroh/src/node.rs index 7bf9bb5351..7afb8bfd9a 100644 --- a/iroh/src/node.rs +++ b/iroh/src/node.rs @@ -54,17 +54,17 @@ use url::Url; use crate::downloader::Downloader; use crate::rpc_protocol::{ BlobAddPathRequest, BlobAddPathResponse, BlobAddStreamRequest, BlobAddStreamResponse, - BlobAddStreamUpdate, BlobDeleteBlobRequest, BlobDownloadRequest, BlobListCollectionsRequest, - BlobListCollectionsResponse, BlobListIncompleteRequest, BlobListIncompleteResponse, - BlobListRequest, BlobListResponse, BlobReadAtRequest, BlobReadAtResponse, BlobReadRequest, - BlobReadResponse, BlobValidateRequest, CreateCollectionRequest, CreateCollectionResponse, - DeleteTagRequest, DocExportFileRequest, DocExportFileResponse, DocExportProgress, - DocImportFileRequest, DocImportFileResponse, DocImportProgress, DocSetHashRequest, - DownloadLocation, ListTagsRequest, ListTagsResponse, NodeConnectionInfoRequest, - NodeConnectionInfoResponse, NodeConnectionsRequest, NodeConnectionsResponse, - NodeShutdownRequest, NodeStatsRequest, NodeStatsResponse, NodeStatusRequest, - NodeStatusResponse, NodeWatchRequest, NodeWatchResponse, ProviderRequest, ProviderResponse, - ProviderService, SetTagOption, + BlobAddStreamUpdate, BlobDeleteBlobRequest, BlobDownloadRequest, BlobGetCollectionRequest, + BlobGetCollectionResponse, BlobListCollectionsRequest, BlobListCollectionsResponse, + BlobListIncompleteRequest, BlobListIncompleteResponse, BlobListRequest, BlobListResponse, + BlobReadAtRequest, BlobReadAtResponse, BlobReadRequest, BlobReadResponse, BlobValidateRequest, + CreateCollectionRequest, CreateCollectionResponse, DeleteTagRequest, DocExportFileRequest, + DocExportFileResponse, DocExportProgress, DocImportFileRequest, DocImportFileResponse, + DocImportProgress, DocSetHashRequest, DownloadLocation, ListTagsRequest, ListTagsResponse, + NodeConnectionInfoRequest, NodeConnectionInfoResponse, NodeConnectionsRequest, + NodeConnectionsResponse, NodeShutdownRequest, NodeStatsRequest, NodeStatsResponse, + NodeStatusRequest, NodeStatusResponse, NodeWatchRequest, NodeWatchResponse, ProviderRequest, + ProviderResponse, ProviderService, SetTagOption, }; use crate::sync_engine::{SyncEngine, SYNC_ALPN}; use crate::ticket::BlobTicket; @@ -1617,6 +1617,21 @@ impl RpcHandler { Ok(CreateCollectionResponse { hash, tag }) } + + async fn blob_get_collection( + self, + req: BlobGetCollectionRequest, + ) -> RpcResult { + let hash = req.hash; + let db = self.inner.db.clone(); + let collection = self + .rt() + .spawn_pinned(move || async move { Collection::load(&db, &hash).await }) + .await + .map_err(|_| anyhow!("join failed"))??; + + Ok(BlobGetCollectionResponse { collection }) + } } fn handle_rpc_request>( @@ -1657,6 +1672,10 @@ fn handle_rpc_request>( .await } CreateCollection(msg) => chan.rpc(msg, handler, RpcHandler::create_collection).await, + BlobGetCollection(msg) => { + chan.rpc(msg, handler, RpcHandler::blob_get_collection) + .await + } ListTags(msg) => { chan.server_streaming(msg, handler, RpcHandler::blob_list_tags) .await diff --git a/iroh/src/rpc_protocol.rs b/iroh/src/rpc_protocol.rs index bbccd68141..fbee102adc 100644 --- a/iroh/src/rpc_protocol.rs +++ b/iroh/src/rpc_protocol.rs @@ -274,6 +274,24 @@ impl RpcMsg for DeleteTagRequest { type Response = RpcResult<()>; } +/// Get a collection +#[derive(Debug, Serialize, Deserialize)] +pub struct BlobGetCollectionRequest { + /// Hash of the collection + pub hash: Hash, +} + +impl RpcMsg for BlobGetCollectionRequest { + type Response = RpcResult; +} + +/// The response for a `BlobGetCollectionRequest`. +#[derive(Debug, Serialize, Deserialize)] +pub struct BlobGetCollectionResponse { + /// The collection. + pub collection: Collection, +} + /// Create a collection. #[derive(Debug, Serialize, Deserialize)] pub struct CreateCollectionRequest { @@ -1083,6 +1101,7 @@ pub enum ProviderRequest { BlobDeleteBlob(BlobDeleteBlobRequest), BlobValidate(BlobValidateRequest), CreateCollection(CreateCollectionRequest), + BlobGetCollection(BlobGetCollectionRequest), DeleteTag(DeleteTagRequest), ListTags(ListTagsRequest), @@ -1134,6 +1153,7 @@ pub enum ProviderResponse { BlobListCollections(BlobListCollectionsResponse), BlobValidate(ValidateProgress), CreateCollection(RpcResult), + BlobGetCollection(RpcResult), ListTags(ListTagsResponse), DeleteTag(RpcResult<()>), From 8799d7a194841d0355cb10253d529fe44dcc4de8 Mon Sep 17 00:00:00 2001 From: dignifiedquire Date: Tue, 23 Jan 2024 09:36:20 +0100 Subject: [PATCH 4/5] refactor: reduce code duplication --- iroh/src/client.rs | 37 +++++++++----------------- iroh/src/node.rs | 56 +++------------------------------------- iroh/src/rpc_protocol.rs | 34 ------------------------ 3 files changed, 16 insertions(+), 111 deletions(-) diff --git a/iroh/src/client.rs b/iroh/src/client.rs index 6141c79f21..c08351fcc3 100644 --- a/iroh/src/client.rs +++ b/iroh/src/client.rs @@ -38,16 +38,16 @@ use crate::rpc_protocol::{ BlobAddStreamUpdate, BlobDeleteBlobRequest, BlobDownloadRequest, BlobGetCollectionRequest, BlobGetCollectionResponse, BlobListCollectionsRequest, BlobListCollectionsResponse, BlobListIncompleteRequest, BlobListIncompleteResponse, BlobListRequest, BlobListResponse, - BlobReadAtRequest, BlobReadAtResponse, BlobReadRequest, BlobReadResponse, BlobValidateRequest, - CounterStats, CreateCollectionRequest, CreateCollectionResponse, DeleteTagRequest, - DocCloseRequest, DocCreateRequest, DocDelRequest, DocDelResponse, DocDropRequest, - DocExportFileRequest, DocExportProgress, DocGetDownloadPolicyRequest, DocGetExactRequest, - DocGetManyRequest, DocImportFileRequest, DocImportProgress, DocImportRequest, DocLeaveRequest, - DocListRequest, DocOpenRequest, DocSetDownloadPolicyRequest, DocSetHashRequest, DocSetRequest, - DocShareRequest, DocStartSyncRequest, DocStatusRequest, DocSubscribeRequest, DocTicket, - DownloadProgress, ListTagsRequest, ListTagsResponse, NodeConnectionInfoRequest, - NodeConnectionInfoResponse, NodeConnectionsRequest, NodeShutdownRequest, NodeStatsRequest, - NodeStatusRequest, NodeStatusResponse, ProviderService, SetTagOption, ShareMode, WrapOption, + BlobReadAtRequest, BlobReadAtResponse, BlobValidateRequest, CounterStats, + CreateCollectionRequest, CreateCollectionResponse, DeleteTagRequest, DocCloseRequest, + DocCreateRequest, DocDelRequest, DocDelResponse, DocDropRequest, DocExportFileRequest, + DocExportProgress, DocGetDownloadPolicyRequest, DocGetExactRequest, DocGetManyRequest, + DocImportFileRequest, DocImportProgress, DocImportRequest, DocLeaveRequest, DocListRequest, + DocOpenRequest, DocSetDownloadPolicyRequest, DocSetHashRequest, DocSetRequest, DocShareRequest, + DocStartSyncRequest, DocStatusRequest, DocSubscribeRequest, DocTicket, DownloadProgress, + ListTagsRequest, ListTagsResponse, NodeConnectionInfoRequest, NodeConnectionInfoResponse, + NodeConnectionsRequest, NodeShutdownRequest, NodeStatsRequest, NodeStatusRequest, + NodeStatusResponse, ProviderService, SetTagOption, ShareMode, WrapOption, }; use crate::sync_engine::SyncEvent; @@ -520,6 +520,7 @@ pub struct BlobReader { #[debug("StreamReader")] stream: tokio_util::io::StreamReader>, Bytes>, } + impl BlobReader { fn new( size: u64, @@ -539,21 +540,7 @@ impl BlobReader { rpc: &RpcClient, hash: Hash, ) -> anyhow::Result { - let stream = rpc.server_streaming(BlobReadRequest { hash }).await?; - let mut stream = flatten(stream); - - let (size, is_complete) = match stream.next().await { - Some(Ok(BlobReadResponse::Entry { size, is_complete })) => (size, is_complete), - Some(Err(err)) => return Err(err), - None | Some(Ok(_)) => return Err(anyhow!("Expected header frame")), - }; - - let stream = stream.map(|item| match item { - Ok(BlobReadResponse::Data { chunk }) => Ok(chunk), - Ok(_) => Err(io::Error::new(io::ErrorKind::Other, "Expected data frame")), - Err(err) => Err(io::Error::new(io::ErrorKind::Other, format!("{err}"))), - }); - Ok(Self::new(size, size, is_complete, stream.boxed())) + Self::from_rpc_read_at(rpc, hash, 0, None).await } async fn from_rpc_read_at>( diff --git a/iroh/src/node.rs b/iroh/src/node.rs index 7afb8bfd9a..5b2c2b22d2 100644 --- a/iroh/src/node.rs +++ b/iroh/src/node.rs @@ -57,10 +57,10 @@ use crate::rpc_protocol::{ BlobAddStreamUpdate, BlobDeleteBlobRequest, BlobDownloadRequest, BlobGetCollectionRequest, BlobGetCollectionResponse, BlobListCollectionsRequest, BlobListCollectionsResponse, BlobListIncompleteRequest, BlobListIncompleteResponse, BlobListRequest, BlobListResponse, - BlobReadAtRequest, BlobReadAtResponse, BlobReadRequest, BlobReadResponse, BlobValidateRequest, - CreateCollectionRequest, CreateCollectionResponse, DeleteTagRequest, DocExportFileRequest, - DocExportFileResponse, DocExportProgress, DocImportFileRequest, DocImportFileResponse, - DocImportProgress, DocSetHashRequest, DownloadLocation, ListTagsRequest, ListTagsResponse, + BlobReadAtRequest, BlobReadAtResponse, BlobValidateRequest, CreateCollectionRequest, + CreateCollectionResponse, DeleteTagRequest, DocExportFileRequest, DocExportFileResponse, + DocExportProgress, DocImportFileRequest, DocImportFileResponse, DocImportProgress, + DocSetHashRequest, DownloadLocation, ListTagsRequest, ListTagsResponse, NodeConnectionInfoRequest, NodeConnectionInfoResponse, NodeConnectionsRequest, NodeConnectionsResponse, NodeShutdownRequest, NodeStatsRequest, NodeStatsResponse, NodeStatusRequest, NodeStatusResponse, NodeWatchRequest, NodeWatchResponse, ProviderRequest, @@ -1437,50 +1437,6 @@ impl RpcHandler { Ok(()) } - fn blob_read( - self, - req: BlobReadRequest, - ) -> impl Stream> + Send + 'static { - let (tx, rx) = flume::bounded(RPC_BLOB_GET_CHANNEL_CAP); - let entry = self.inner.db.get(&req.hash); - self.inner.rt.spawn_pinned(move || async move { - if let Err(err) = read_loop(entry, tx.clone(), RPC_BLOB_GET_CHUNK_SIZE).await { - tx.send_async(RpcResult::Err(err.into())).await.ok(); - } - }); - - async fn read_loop( - entry: Option>, - tx: flume::Sender>, - chunk_size: usize, - ) -> anyhow::Result<()> { - let entry = entry.ok_or_else(|| anyhow!("Blob not found"))?; - let size = entry.size(); - tx.send_async(Ok(BlobReadResponse::Entry { - size, - is_complete: entry.is_complete(), - })) - .await?; - let mut reader = entry.data_reader().await?; - let mut offset = 0u64; - loop { - let chunk = reader.read_at(offset, chunk_size).await?; - let len = chunk.len(); - if !chunk.is_empty() { - tx.send_async(Ok(BlobReadResponse::Data { chunk })).await?; - } - if len < chunk_size { - break; - } else { - offset += len as u64; - } - } - Ok(()) - } - - rx.into_stream() - } - fn blob_read_at( self, req: BlobReadAtRequest, @@ -1694,10 +1650,6 @@ fn handle_rpc_request>( chan.server_streaming(msg, handler, RpcHandler::blob_validate) .await } - BlobRead(msg) => { - chan.server_streaming(msg, handler, RpcHandler::blob_read) - .await - } BlobReadAt(msg) => { chan.server_streaming(msg, handler, RpcHandler::blob_read_at) .await diff --git a/iroh/src/rpc_protocol.rs b/iroh/src/rpc_protocol.rs index fbee102adc..1e6ab8810d 100644 --- a/iroh/src/rpc_protocol.rs +++ b/iroh/src/rpc_protocol.rs @@ -953,38 +953,6 @@ pub struct DocGetDownloadPolicyResponse { pub policy: DownloadPolicy, } -/// Get the bytes for a hash -#[derive(Serialize, Deserialize, Debug)] -pub struct BlobReadRequest { - /// Hash to get bytes for - pub hash: Hash, -} - -impl Msg for BlobReadRequest { - type Pattern = ServerStreaming; -} - -impl ServerStreamingMsg for BlobReadRequest { - type Response = RpcResult; -} - -/// Response to [`BlobReadRequest`] -#[derive(Serialize, Deserialize, Debug)] -pub enum BlobReadResponse { - /// The entry header. - Entry { - /// The size of the blob - size: u64, - /// Wether the blob is complete - is_complete: bool, - }, - /// Chunks of entry data. - Data { - /// The data chunk - chunk: Bytes, - }, -} - /// Get the bytes for a hash #[derive(Serialize, Deserialize, Debug)] pub struct BlobReadAtRequest { @@ -1089,7 +1057,6 @@ pub enum ProviderRequest { NodeConnectionInfo(NodeConnectionInfoRequest), NodeWatch(NodeWatchRequest), - BlobRead(BlobReadRequest), BlobReadAt(BlobReadAtRequest), BlobAddStream(BlobAddStreamRequest), BlobAddStreamUpdate(BlobAddStreamUpdate), @@ -1143,7 +1110,6 @@ pub enum ProviderResponse { NodeShutdown(()), NodeWatch(NodeWatchResponse), - BlobRead(RpcResult), BlobReadAt(RpcResult), BlobAddStream(BlobAddStreamResponse), BlobAddPath(BlobAddPathResponse), From 7a32e5f0fb18409cce3ccb03549f992f30fa8760 Mon Sep 17 00:00:00 2001 From: dignifiedquire Date: Tue, 23 Jan 2024 09:40:29 +0100 Subject: [PATCH 5/5] avoid floats --- iroh/src/node.rs | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/iroh/src/node.rs b/iroh/src/node.rs index 5b2c2b22d2..3926a49d80 100644 --- a/iroh/src/node.rs +++ b/iroh/src/node.rs @@ -1478,10 +1478,8 @@ impl RpcHandler { let (num_chunks, chunk_size) = if len <= max_chunk_size { (1, len) } else { - ( - (len as f64 / max_chunk_size as f64).ceil() as usize, - max_chunk_size, - ) + let num_chunks = len / max_chunk_size + (len % max_chunk_size != 0) as usize; + (num_chunks, max_chunk_size) }; let mut read = 0u64;