Skip to content

Commit

Permalink
feat(iroh): add blobs.get_collection
Browse files Browse the repository at this point in the history
allows fetching a collection via rpc
  • Loading branch information
dignifiedquire committed Jan 19, 2024
1 parent 73592a3 commit c3abd8f
Show file tree
Hide file tree
Showing 3 changed files with 140 additions and 24 deletions.
103 changes: 90 additions & 13 deletions iroh/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,19 +35,19 @@ use tracing::warn;

use crate::rpc_protocol::{
AuthorCreateRequest, AuthorListRequest, BlobAddPathRequest, BlobAddStreamRequest,
BlobAddStreamUpdate, BlobDeleteBlobRequest, BlobDownloadRequest, BlobListCollectionsRequest,
BlobListCollectionsResponse, BlobListIncompleteRequest, BlobListIncompleteResponse,
BlobListRequest, BlobListResponse, BlobReadAtRequest, BlobReadAtResponse, BlobReadRequest,
BlobReadResponse, BlobValidateRequest, CounterStats, CreateCollectionRequest,
CreateCollectionResponse, DeleteTagRequest, DocCloseRequest, DocCreateRequest, DocDelRequest,
DocDelResponse, DocDropRequest, DocExportFileRequest, DocExportProgress,
DocGetDownloadPolicyRequest, DocGetExactRequest, DocGetManyRequest, DocImportFileRequest,
DocImportProgress, DocImportRequest, DocLeaveRequest, DocListRequest, DocOpenRequest,
DocSetDownloadPolicyRequest, DocSetHashRequest, DocSetRequest, DocShareRequest,
DocStartSyncRequest, DocStatusRequest, DocSubscribeRequest, DocTicket, DownloadProgress,
ListTagsRequest, ListTagsResponse, NodeConnectionInfoRequest, NodeConnectionInfoResponse,
NodeConnectionsRequest, NodeShutdownRequest, NodeStatsRequest, NodeStatusRequest,
NodeStatusResponse, ProviderService, SetTagOption, ShareMode, WrapOption,
BlobAddStreamUpdate, BlobDeleteBlobRequest, BlobDownloadRequest, BlobGetCollectionRequest,
BlobGetCollectionResponse, BlobListCollectionsRequest, BlobListCollectionsResponse,
BlobListIncompleteRequest, BlobListIncompleteResponse, BlobListRequest, BlobListResponse,
BlobReadAtRequest, BlobReadAtResponse, BlobReadRequest, BlobReadResponse, BlobValidateRequest,
CounterStats, CreateCollectionRequest, CreateCollectionResponse, DeleteTagRequest,
DocCloseRequest, DocCreateRequest, DocDelRequest, DocDelResponse, DocDropRequest,
DocExportFileRequest, DocExportProgress, DocGetDownloadPolicyRequest, DocGetExactRequest,
DocGetManyRequest, DocImportFileRequest, DocImportProgress, DocImportRequest, DocLeaveRequest,
DocListRequest, DocOpenRequest, DocSetDownloadPolicyRequest, DocSetHashRequest, DocSetRequest,
DocShareRequest, DocStartSyncRequest, DocStatusRequest, DocSubscribeRequest, DocTicket,
DownloadProgress, ListTagsRequest, ListTagsResponse, NodeConnectionInfoRequest,
NodeConnectionInfoResponse, NodeConnectionsRequest, NodeShutdownRequest, NodeStatsRequest,
NodeStatusRequest, NodeStatusResponse, ProviderService, SetTagOption, ShareMode, WrapOption,
};
use crate::sync_engine::SyncEvent;

Expand Down Expand Up @@ -410,6 +410,13 @@ where
Ok(stream.map_err(anyhow::Error::from))
}

/// Read the content of a collection.
pub async fn get_collection(&self, hash: Hash) -> Result<Collection> {
let BlobGetCollectionResponse { collection } =
self.rpc.rpc(BlobGetCollectionRequest { hash }).await??;
Ok(collection)
}

/// List all collections.
pub async fn list_collections(
&self,
Expand Down Expand Up @@ -1476,4 +1483,74 @@ mod tests {

Ok(())
}

#[tokio::test]
async fn test_blob_get_collection() -> Result<()> {
let _guard = iroh_test::logging::setup();

let doc_store = iroh_sync::store::memory::Store::default();
let db = iroh_bytes::store::mem::Store::new();
let node = crate::node::Node::builder(db, doc_store).spawn().await?;

// create temp file
let temp_dir = tempfile::tempdir().context("tempdir")?;

let in_root = temp_dir.path().join("in");
tokio::fs::create_dir_all(in_root.clone())
.await
.context("create dir all")?;

let mut paths = Vec::new();
for i in 0..5 {
let path = in_root.join(format!("test-{i}"));
let size = 100;
let mut buf = vec![0u8; size];
rand::thread_rng().fill_bytes(&mut buf);
let mut file = tokio::fs::File::create(path.clone())
.await
.context("create file")?;
file.write_all(&buf.clone()).await.context("write_all")?;
file.flush().await.context("flush")?;
paths.push(path);
}

let client = node.client();

let mut collection = Collection::default();
let mut tags = Vec::new();
// import files
for path in &paths {
let import_outcome = client
.blobs
.add_from_path(
path.to_path_buf(),
false,
SetTagOption::Auto,
WrapOption::NoWrap,
)
.await
.context("import file")?
.finish()
.await
.context("import finish")?;

collection.push(
path.file_name().unwrap().to_str().unwrap().to_string(),
import_outcome.hash,
);
tags.push(import_outcome.tag);
}

let (hash, _tag) = client
.blobs
.create_collection(collection, SetTagOption::Auto, tags)
.await?;

let collection = client.blobs.get_collection(hash).await?;

// 5 blobs
assert_eq!(collection.len(), 5);

Ok(())
}
}
41 changes: 30 additions & 11 deletions iroh/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,17 +54,17 @@ use url::Url;
use crate::downloader::Downloader;
use crate::rpc_protocol::{
BlobAddPathRequest, BlobAddPathResponse, BlobAddStreamRequest, BlobAddStreamResponse,
BlobAddStreamUpdate, BlobDeleteBlobRequest, BlobDownloadRequest, BlobListCollectionsRequest,
BlobListCollectionsResponse, BlobListIncompleteRequest, BlobListIncompleteResponse,
BlobListRequest, BlobListResponse, BlobReadAtRequest, BlobReadAtResponse, BlobReadRequest,
BlobReadResponse, BlobValidateRequest, CreateCollectionRequest, CreateCollectionResponse,
DeleteTagRequest, DocExportFileRequest, DocExportFileResponse, DocExportProgress,
DocImportFileRequest, DocImportFileResponse, DocImportProgress, DocSetHashRequest,
DownloadLocation, ListTagsRequest, ListTagsResponse, NodeConnectionInfoRequest,
NodeConnectionInfoResponse, NodeConnectionsRequest, NodeConnectionsResponse,
NodeShutdownRequest, NodeStatsRequest, NodeStatsResponse, NodeStatusRequest,
NodeStatusResponse, NodeWatchRequest, NodeWatchResponse, ProviderRequest, ProviderResponse,
ProviderService, SetTagOption,
BlobAddStreamUpdate, BlobDeleteBlobRequest, BlobDownloadRequest, BlobGetCollectionRequest,
BlobGetCollectionResponse, BlobListCollectionsRequest, BlobListCollectionsResponse,
BlobListIncompleteRequest, BlobListIncompleteResponse, BlobListRequest, BlobListResponse,
BlobReadAtRequest, BlobReadAtResponse, BlobReadRequest, BlobReadResponse, BlobValidateRequest,
CreateCollectionRequest, CreateCollectionResponse, DeleteTagRequest, DocExportFileRequest,
DocExportFileResponse, DocExportProgress, DocImportFileRequest, DocImportFileResponse,
DocImportProgress, DocSetHashRequest, DownloadLocation, ListTagsRequest, ListTagsResponse,
NodeConnectionInfoRequest, NodeConnectionInfoResponse, NodeConnectionsRequest,
NodeConnectionsResponse, NodeShutdownRequest, NodeStatsRequest, NodeStatsResponse,
NodeStatusRequest, NodeStatusResponse, NodeWatchRequest, NodeWatchResponse, ProviderRequest,
ProviderResponse, ProviderService, SetTagOption,
};
use crate::sync_engine::{SyncEngine, SYNC_ALPN};
use crate::ticket::BlobTicket;
Expand Down Expand Up @@ -1617,6 +1617,21 @@ impl<D: BaoStore> RpcHandler<D> {

Ok(CreateCollectionResponse { hash, tag })
}

async fn blob_get_collection(
self,
req: BlobGetCollectionRequest,
) -> RpcResult<BlobGetCollectionResponse> {
let hash = req.hash;
let db = self.inner.db.clone();
let collection = self
.rt()
.spawn_pinned(move || async move { Collection::load(&db, &hash).await })
.await
.map_err(|_| anyhow!("join failed"))??;

Ok(BlobGetCollectionResponse { collection })
}
}

fn handle_rpc_request<D: BaoStore, E: ServiceEndpoint<ProviderService>>(
Expand Down Expand Up @@ -1657,6 +1672,10 @@ fn handle_rpc_request<D: BaoStore, E: ServiceEndpoint<ProviderService>>(
.await
}
CreateCollection(msg) => chan.rpc(msg, handler, RpcHandler::create_collection).await,
BlobGetCollection(msg) => {
chan.rpc(msg, handler, RpcHandler::blob_get_collection)
.await
}
ListTags(msg) => {
chan.server_streaming(msg, handler, RpcHandler::blob_list_tags)
.await
Expand Down
20 changes: 20 additions & 0 deletions iroh/src/rpc_protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,24 @@ impl RpcMsg<ProviderService> for DeleteTagRequest {
type Response = RpcResult<()>;
}

/// Get a collection
#[derive(Debug, Serialize, Deserialize)]
pub struct BlobGetCollectionRequest {
/// Hash of the collection
pub hash: Hash,
}

impl RpcMsg<ProviderService> for BlobGetCollectionRequest {
type Response = RpcResult<BlobGetCollectionResponse>;
}

/// The response for a `BlobGetCollectionRequest`.
#[derive(Debug, Serialize, Deserialize)]
pub struct BlobGetCollectionResponse {
/// The collection.
pub collection: Collection,
}

/// Create a collection.
#[derive(Debug, Serialize, Deserialize)]
pub struct CreateCollectionRequest {
Expand Down Expand Up @@ -1083,6 +1101,7 @@ pub enum ProviderRequest {
BlobDeleteBlob(BlobDeleteBlobRequest),
BlobValidate(BlobValidateRequest),
CreateCollection(CreateCollectionRequest),
BlobGetCollection(BlobGetCollectionRequest),

DeleteTag(DeleteTagRequest),
ListTags(ListTagsRequest),
Expand Down Expand Up @@ -1134,6 +1153,7 @@ pub enum ProviderResponse {
BlobListCollections(BlobListCollectionsResponse),
BlobValidate(ValidateProgress),
CreateCollection(RpcResult<CreateCollectionResponse>),
BlobGetCollection(RpcResult<BlobGetCollectionResponse>),

ListTags(ListTagsResponse),
DeleteTag(RpcResult<()>),
Expand Down

0 comments on commit c3abd8f

Please sign in to comment.