Skip to content

Commit 31d57eb

Browse files
authored
Merge 18c972b into f890c79
2 parents f890c79 + 18c972b commit 31d57eb

File tree

2 files changed

+146
-1
lines changed

2 files changed

+146
-1
lines changed

examples/transfer-collection.rs

Lines changed: 145 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,145 @@
1+
//! Example that shows how to create a collection, and transfer it to another
2+
//! node. It also shows patterns for defining a "Node" struct in higher-level
3+
//! code that abstracts over these operations with an API that feels closer to
4+
//! what an application would use.
5+
//!
6+
//! Run the entire example in one command:
7+
//! $ cargo run --example transfer-collection
8+
use std::collections::HashMap;
9+
10+
use anyhow::{Context, Result};
11+
use iroh::{
12+
discovery::static_provider::StaticProvider, protocol::Router, Endpoint, NodeAddr, Watcher,
13+
};
14+
use iroh_blobs::{
15+
api::{downloader::Shuffled, Store, TempTag},
16+
format::collection::Collection,
17+
store::mem::MemStore,
18+
BlobsProtocol, Hash, HashAndFormat,
19+
};
20+
21+
/// Node is something you'd define in your application. It can contain whatever
22+
/// shared state you'd want to couple with network operations.
23+
struct Node {
24+
store: Store,
25+
/// Router with the blobs protocol registered, to accept blobs requests.
26+
/// We can always get the endpoint with router.endpoint()
27+
router: Router,
28+
}
29+
30+
impl Node {
31+
async fn new(disc: &StaticProvider) -> Result<Self> {
32+
let endpoint = Endpoint::builder()
33+
.add_discovery(disc.clone())
34+
.bind()
35+
.await?;
36+
37+
let store = MemStore::new();
38+
39+
// this BlobsProtocol accepts connections from other nodes and serves blobs from the store
40+
// we pass None to skip subscribing to request events
41+
let blobs = BlobsProtocol::new(&store, endpoint.clone(), None);
42+
// Routers group one or more protocols together to accept connections from other nodes,
43+
// here we're only using one, but could add more in a real world use case as needed
44+
let router = Router::builder(endpoint)
45+
.accept(iroh_blobs::ALPN, blobs)
46+
.spawn();
47+
48+
Ok(Self {
49+
store: store.into(),
50+
router,
51+
})
52+
}
53+
54+
// get address of this node. Has the side effect of waiting for the node
55+
// to be online & ready to accept connections
56+
async fn node_addr(&self) -> Result<NodeAddr> {
57+
let addr = self.router.endpoint().node_addr().initialized().await;
58+
Ok(addr)
59+
}
60+
61+
async fn list_hashes(&self) -> Result<Vec<Hash>> {
62+
self.store
63+
.blobs()
64+
.list()
65+
.hashes()
66+
.await
67+
.context("Failed to list hashes")
68+
}
69+
70+
/// creates a collection from a given set of named blobs, adds it to the local store
71+
/// and returns the hash of the collection.
72+
async fn create_collection(&self, named_blobs: Vec<(&str, Vec<u8>)>) -> Result<Hash> {
73+
let mut collection_items: HashMap<&str, TempTag> = HashMap::new();
74+
75+
let tx = self.store.batch().await?;
76+
for (name, data) in named_blobs {
77+
let tmp_tag = tx.add_bytes(data).await?;
78+
collection_items.insert(name, tmp_tag);
79+
}
80+
81+
let collection_items = collection_items
82+
.iter()
83+
.map(|(name, tag)| {
84+
let hash = tag.hash().clone();
85+
(name.to_string(), hash)
86+
})
87+
.collect::<Vec<_>>();
88+
89+
let collection = Collection::from_iter(collection_items);
90+
91+
let tt = collection.store(&self.store).await?;
92+
self.store.tags().create(*tt.hash_and_format()).await?;
93+
Ok(*tt.hash())
94+
}
95+
96+
/// retrive an entire collection from a given hash and provider
97+
async fn get_collection(&self, hash: Hash, provider: NodeAddr) -> Result<()> {
98+
let req = HashAndFormat::hash_seq(hash);
99+
let addrs = Shuffled::new(vec![provider.node_id]);
100+
self.store
101+
.downloader(self.router.endpoint())
102+
.download(req, addrs)
103+
.await?;
104+
Ok(())
105+
}
106+
}
107+
108+
#[tokio::main]
109+
async fn main() -> anyhow::Result<()> {
110+
// create a local provider for nodes to discover each other.
111+
// outside of a development environment, production apps would
112+
// use `Endpoint::builder().discovery_n0()` or a similar method
113+
let disc = StaticProvider::new();
114+
115+
// create a sending node
116+
let send_node = Node::new(&disc).await?;
117+
let send_node_addr = send_node.node_addr().await?;
118+
// add a collection with three files
119+
let hash = send_node
120+
.create_collection(vec![
121+
("a.txt", b"this is file a".into()),
122+
("b.txt", b"this is file b".into()),
123+
("c.txt", b"this is file c".into()),
124+
])
125+
.await?;
126+
127+
// create the receiving node
128+
let recv_node = Node::new(&disc).await?;
129+
130+
// add the send node to the discovery provider so the recv node can find it
131+
disc.add_node_info(send_node_addr.clone());
132+
// fetch the collection and all contents
133+
recv_node.get_collection(hash, send_node_addr).await?;
134+
135+
// when listing hashes, you'll see 5 hashes in total:
136+
// - one hash for each of the three files
137+
// - hash of the collection's metadata (this is where the "a.txt" filenames live)
138+
// - the hash of the entire collection which is just the above 4 hashes concatenated, then hashed
139+
let send_hashes = send_node.list_hashes().await?;
140+
let recv_hashes = recv_node.list_hashes().await?;
141+
assert_eq!(send_hashes.len(), recv_hashes.len());
142+
143+
println!("Transfer complete!");
144+
Ok(())
145+
}

src/api/blobs.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -616,7 +616,7 @@ pub struct AddPathOptions {
616616
/// stream directly can be inconvenient, so this struct provides some convenience
617617
/// methods to work with the result.
618618
///
619-
/// It also implements [`IntoFuture`], so you can await it to get the [`TempTag`] that
619+
/// It also implements [`IntoFuture`], so you can await it to get the [`TagInfo`] that
620620
/// contains the hash of the added content and also protects the content.
621621
///
622622
/// If you want access to the stream, you can use the [`AddProgress::stream`] method.

0 commit comments

Comments
 (0)