Skip to content

Commit a9ac8e5

Browse files
committed
Everything works
1 parent df1e1ef commit a9ac8e5

File tree

10 files changed

+295
-488
lines changed

10 files changed

+295
-488
lines changed

README.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ Here is a basic example of how to set up `iroh-blobs` with `iroh`:
3434

3535
```rust,no_run
3636
use iroh::{protocol::Router, Endpoint};
37-
use iroh_blobs::{store::mem::MemStore, BlobsProtocol};
37+
use iroh_blobs::{store::mem::MemStore, BlobsProtocol, provider::events::EventSender};
3838
3939
#[tokio::main]
4040
async fn main() -> anyhow::Result<()> {
@@ -44,7 +44,7 @@ async fn main() -> anyhow::Result<()> {
4444
4545
// create a protocol handler using an in-memory blob store.
4646
let store = MemStore::new();
47-
let blobs = BlobsProtocol::new(&store, endpoint.clone(), None);
47+
let blobs = BlobsProtocol::new(&store, endpoint.clone(), EventSender::NONE);
4848
4949
// build the router
5050
let router = Router::builder(endpoint)

examples/custom-protocol.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,9 @@ use iroh::{
4848
protocol::{AcceptError, ProtocolHandler, Router},
4949
Endpoint, NodeId,
5050
};
51-
use iroh_blobs::{api::Store, provider::EventSender2, store::mem::MemStore, BlobsProtocol, Hash};
51+
use iroh_blobs::{
52+
api::Store, provider::events::EventSender, store::mem::MemStore, BlobsProtocol, Hash,
53+
};
5254
mod common;
5355
use common::{get_or_generate_secret_key, setup_logging};
5456

@@ -100,7 +102,7 @@ async fn listen(text: Vec<String>) -> Result<()> {
100102
proto.insert_and_index(text).await?;
101103
}
102104
// Build the iroh-blobs protocol handler, which is used to download blobs.
103-
let blobs = BlobsProtocol::new(&store, endpoint.clone(), EventSender2::NONE);
105+
let blobs = BlobsProtocol::new(&store, endpoint.clone(), EventSender::NONE);
104106

105107
// create a router that handles both our custom protocol and the iroh-blobs protocol.
106108
let node = Router::builder(endpoint)

examples/mdns-discovery.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ use clap::{Parser, Subcommand};
1818
use iroh::{
1919
discovery::mdns::MdnsDiscovery, protocol::Router, Endpoint, PublicKey, RelayMode, SecretKey,
2020
};
21-
use iroh_blobs::{provider::EventSender2, store::mem::MemStore, BlobsProtocol, Hash};
21+
use iroh_blobs::{provider::events::EventSender, store::mem::MemStore, BlobsProtocol, Hash};
2222

2323
mod common;
2424
use common::{get_or_generate_secret_key, setup_logging};
@@ -68,7 +68,7 @@ async fn accept(path: &Path) -> Result<()> {
6868
.await?;
6969
let builder = Router::builder(endpoint.clone());
7070
let store = MemStore::new();
71-
let blobs = BlobsProtocol::new(&store, endpoint.clone(), EventSender2::NONE);
71+
let blobs = BlobsProtocol::new(&store, endpoint.clone(), EventSender::NONE);
7272
let builder = builder.accept(iroh_blobs::ALPN, blobs.clone());
7373
let node = builder.spawn();
7474

examples/random_store.rs

Lines changed: 33 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -6,11 +6,12 @@ use iroh::{SecretKey, Watcher};
66
use iroh_base::ticket::NodeTicket;
77
use iroh_blobs::{
88
api::downloader::Shuffled,
9-
provider::{AbortReason, EventMask, EventSender2, ProviderMessage},
9+
provider::events::{AbortReason, EventMask, EventSender, ProviderMessage},
1010
store::fs::FsStore,
1111
test::{add_hash_sequences, create_random_blobs},
1212
HashAndFormat,
1313
};
14+
use irpc::RpcMessage;
1415
use n0_future::StreamExt;
1516
use rand::{rngs::StdRng, Rng, SeedableRng};
1617
use tokio::{signal::ctrl_c, sync::mpsc};
@@ -100,8 +101,15 @@ pub fn get_or_generate_secret_key() -> Result<SecretKey> {
100101
}
101102
}
102103

103-
pub fn dump_provider_events(allow_push: bool) -> (tokio::task::JoinHandle<()>, EventSender2) {
104+
pub fn dump_provider_events(allow_push: bool) -> (tokio::task::JoinHandle<()>, EventSender) {
104105
let (tx, mut rx) = mpsc::channel(100);
106+
fn dump_updates<T: RpcMessage>(mut rx: irpc::channel::mpsc::Receiver<T>) {
107+
tokio::spawn(async move {
108+
while let Ok(Some(update)) = rx.recv().await {
109+
println!("{update:?}");
110+
}
111+
});
112+
}
105113
let dump_task = tokio::spawn(async move {
106114
while let Some(event) = rx.recv().await {
107115
match event {
@@ -115,29 +123,23 @@ pub fn dump_provider_events(allow_push: bool) -> (tokio::task::JoinHandle<()>, E
115123
ProviderMessage::ConnectionClosed(msg) => {
116124
println!("{:?}", msg.inner);
117125
}
118-
ProviderMessage::GetRequestReceived(mut msg) => {
126+
ProviderMessage::GetRequestReceived(msg) => {
119127
println!("{:?}", msg.inner);
120128
msg.tx.send(Ok(())).await.ok();
121-
tokio::spawn(async move {
122-
while let Ok(update) = msg.rx.recv().await {
123-
info!("{update:?}");
124-
}
125-
});
129+
dump_updates(msg.rx);
126130
}
127131
ProviderMessage::GetRequestReceivedNotify(msg) => {
128132
println!("{:?}", msg.inner);
133+
dump_updates(msg.rx);
129134
}
130-
ProviderMessage::GetManyRequestReceived(mut msg) => {
135+
ProviderMessage::GetManyRequestReceived(msg) => {
131136
println!("{:?}", msg.inner);
132137
msg.tx.send(Ok(())).await.ok();
133-
tokio::spawn(async move {
134-
while let Ok(update) = msg.rx.recv().await {
135-
info!("{update:?}");
136-
}
137-
});
138+
dump_updates(msg.rx);
138139
}
139140
ProviderMessage::GetManyRequestReceivedNotify(msg) => {
140141
println!("{:?}", msg.inner);
142+
dump_updates(msg.rx);
141143
}
142144
ProviderMessage::PushRequestReceived(msg) => {
143145
println!("{:?}", msg.inner);
@@ -147,9 +149,25 @@ pub fn dump_provider_events(allow_push: bool) -> (tokio::task::JoinHandle<()>, E
147149
Err(AbortReason::Permission)
148150
};
149151
msg.tx.send(res).await.ok();
152+
dump_updates(msg.rx);
150153
}
151154
ProviderMessage::PushRequestReceivedNotify(msg) => {
152155
println!("{:?}", msg.inner);
156+
dump_updates(msg.rx);
157+
}
158+
ProviderMessage::ObserveRequestReceived(msg) => {
159+
println!("{:?}", msg.inner);
160+
let res = if allow_push {
161+
Ok(())
162+
} else {
163+
Err(AbortReason::Permission)
164+
};
165+
msg.tx.send(res).await.ok();
166+
dump_updates(msg.rx);
167+
}
168+
ProviderMessage::ObserveRequestReceivedNotify(msg) => {
169+
println!("{:?}", msg.inner);
170+
dump_updates(msg.rx);
153171
}
154172
ProviderMessage::Throttle(msg) => {
155173
println!("{:?}", msg.inner);
@@ -158,7 +176,7 @@ pub fn dump_provider_events(allow_push: bool) -> (tokio::task::JoinHandle<()>, E
158176
}
159177
}
160178
});
161-
(dump_task, EventSender2::new(tx, EventMask::ALL))
179+
(dump_task, EventSender::new(tx, EventMask::ALL))
162180
}
163181

164182
#[tokio::main]

examples/transfer.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,9 @@
11
use std::path::PathBuf;
22

33
use iroh::{protocol::Router, Endpoint};
4-
use iroh_blobs::{provider::EventSender2, store::mem::MemStore, ticket::BlobTicket, BlobsProtocol};
4+
use iroh_blobs::{
5+
provider::events::EventSender, store::mem::MemStore, ticket::BlobTicket, BlobsProtocol,
6+
};
57

68
#[tokio::main]
79
async fn main() -> anyhow::Result<()> {
@@ -12,7 +14,7 @@ async fn main() -> anyhow::Result<()> {
1214
// We initialize an in-memory backing store for iroh-blobs
1315
let store = MemStore::new();
1416
// Then we initialize a struct that can accept blobs requests over iroh connections
15-
let blobs = BlobsProtocol::new(&store, endpoint.clone(), EventSender2::NONE);
17+
let blobs = BlobsProtocol::new(&store, endpoint.clone(), EventSender::NONE);
1618

1719
// Grab all passed in arguments, the first one is the binary itself, so we skip it.
1820
let args: Vec<String> = std::env::args().skip(1).collect();
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
# Seeds for failure cases proptest has generated in the past. It is
2+
# automatically read and these particular cases re-run before any
3+
# novel cases are generated.
4+
#
5+
# It is recommended to check this file in to source control so that
6+
# everyone who runs the test benefits from these saved cases.
7+
cc 0f2ebc49ab2f84e112f08407bb94654fbcb1f19050a4a8a6196383557696438a # shrinks to input = _TestCountersManagerProptestFsArgs { entries: [(15313427648878534792, 264348813928009031854006459208395772047), (1642534478798447378, 15989109311941500072752977306696275871), (8755041673862065815, 172763711808688570294350362332402629716), (4993597758667891804, 114145440157220458287429360639759690928), (15031383154962489250, 63217081714858286463391060323168548783), (17668469631267503333, 11878544422669770587175118199598836678), (10507570291819955314, 126584081645379643144412921692654648228), (3979008599365278329, 283717221942996985486273080647433218905), (8316838360288996639, 334043288511621783152802090833905919408), (15673798930962474157, 77551315511802713260542200115027244708), (12058791254144360414, 56638044274259821850511200885092637649), (8191628769638031337, 314181956273420400069887649110740549194), (6290369460137232066, 255779791286732775990301011955519176773), (11919824746661852269, 319400891587146831511371932480749645441), (12491631698789073154, 271279849791970841069522263758329847554), (53891048909263304, 12061234604041487609497959407391945555), (9486366498650667097, 311383186592430597410801882015456718030), (15696332331789302593, 306911490707714340526403119780178604150), (8699088947997536151, 312272624973367009520183311568498652066), (1144772544750976199, 200591877747619565555594857038887015), (5907208586200645081, 299942008952473970881666769409865744975), (3384528743842518913, 26230956866762934113564101494944411446), (13877357832690956494, 229457597607752760006918374695475345151), (2965687966026226090, 306489188264741716662410004273408761623), (13624286905717143613, 232801392956394366686194314010536008033), (3622356130274722018, 162030840677521022192355139208505458492), (17807768575470996347, 264107246314713159406963697924105744409), (5103434150074147746, 331686166459964582006209321975587627262), (5962771466034321974, 300961804728115777587520888809168362574), (2930645694242691907, 127752709774252686733969795258447263979), (16197574560597474644, 245410120683069493317132088266217906749), (12478835478062365617, 103838791113879912161511798836229961653), (5503595333662805357, 92368472243854403026472376408708548349), (18122734335129614364, 288955542597300001147753560885976966029), (12688080215989274550, 85237436689682348751672119832134138932), (4148468277722853958, 297778117327421209654837771300216669574), (8749445804640085302, 79595866493078234154562014325793780126), (12442730869682574563, 196176786402808588883611974143577417817), (6110644747049355904, 26592587989877021920275416199052685135), (5851164380497779369, 158876888501825038083692899057819261957), (9497384378514985275, 15279835675313542048650599472403150097), (10661092311826161857, 250089949043892591422587928179995867509), (10046856000675345423, 231369150063141386398059701278066296663)] }

src/net_protocol.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
//! ```rust
88
//! # async fn example() -> anyhow::Result<()> {
99
//! use iroh::{protocol::Router, Endpoint};
10-
//! use iroh_blobs::{store, BlobsProtocol};
10+
//! use iroh_blobs::{provider::events::EventSender, store, BlobsProtocol};
1111
//!
1212
//! // create a store
1313
//! let store = store::fs::FsStore::load("blobs").await?;
@@ -19,7 +19,7 @@
1919
//! let endpoint = Endpoint::builder().discovery_n0().bind().await?;
2020
//!
2121
//! // create a blobs protocol handler
22-
//! let blobs = BlobsProtocol::new(&store, endpoint.clone(), None);
22+
//! let blobs = BlobsProtocol::new(&store, endpoint.clone(), EventSender::NONE);
2323
//!
2424
//! // create a router and add the blobs protocol handler
2525
//! let router = Router::builder(endpoint)
@@ -45,13 +45,13 @@ use iroh::{
4545
};
4646
use tracing::error;
4747

48-
use crate::{api::Store, provider::EventSender2, ticket::BlobTicket, HashAndFormat};
48+
use crate::{api::Store, provider::events::EventSender, ticket::BlobTicket, HashAndFormat};
4949

5050
#[derive(Debug)]
5151
pub(crate) struct BlobsInner {
5252
pub(crate) store: Store,
5353
pub(crate) endpoint: Endpoint,
54-
pub(crate) events: EventSender2,
54+
pub(crate) events: EventSender,
5555
}
5656

5757
/// A protocol handler for the blobs protocol.
@@ -69,7 +69,7 @@ impl Deref for BlobsProtocol {
6969
}
7070

7171
impl BlobsProtocol {
72-
pub fn new(store: &Store, endpoint: Endpoint, events: EventSender2) -> Self {
72+
pub fn new(store: &Store, endpoint: Endpoint, events: EventSender) -> Self {
7373
Self {
7474
inner: Arc::new(BlobsInner {
7575
store: store.clone(),

0 commit comments

Comments
 (0)