Skip to content

Commit a78c212

Browse files
committed
Update tests
1 parent b23995e commit a78c212

File tree

9 files changed

+261
-333
lines changed

9 files changed

+261
-333
lines changed

examples/custom-protocol.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ use iroh::{
4848
protocol::{AcceptError, ProtocolHandler, Router},
4949
Endpoint, NodeId,
5050
};
51-
use iroh_blobs::{api::Store, store::mem::MemStore, BlobsProtocol, Hash};
51+
use iroh_blobs::{api::Store, provider::EventSender2, store::mem::MemStore, BlobsProtocol, Hash};
5252
mod common;
5353
use common::{get_or_generate_secret_key, setup_logging};
5454

@@ -100,7 +100,7 @@ async fn listen(text: Vec<String>) -> Result<()> {
100100
proto.insert_and_index(text).await?;
101101
}
102102
// Build the iroh-blobs protocol handler, which is used to download blobs.
103-
let blobs = BlobsProtocol::new(&store, endpoint.clone(), None);
103+
let blobs = BlobsProtocol::new(&store, endpoint.clone(), EventSender2::NONE);
104104

105105
// create a router that handles both our custom protocol and the iroh-blobs protocol.
106106
let node = Router::builder(endpoint)

examples/mdns-discovery.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ use clap::{Parser, Subcommand};
1818
use iroh::{
1919
discovery::mdns::MdnsDiscovery, protocol::Router, Endpoint, PublicKey, RelayMode, SecretKey,
2020
};
21-
use iroh_blobs::{store::mem::MemStore, BlobsProtocol, Hash};
21+
use iroh_blobs::{provider::EventSender2, store::mem::MemStore, BlobsProtocol, Hash};
2222

2323
mod common;
2424
use common::{get_or_generate_secret_key, setup_logging};
@@ -68,7 +68,7 @@ async fn accept(path: &Path) -> Result<()> {
6868
.await?;
6969
let builder = Router::builder(endpoint.clone());
7070
let store = MemStore::new();
71-
let blobs = BlobsProtocol::new(&store, endpoint.clone(), None);
71+
let blobs = BlobsProtocol::new(&store, endpoint.clone(), EventSender2::NONE);
7272
let builder = builder.accept(iroh_blobs::ALPN, blobs.clone());
7373
let node = builder.spawn();
7474

examples/random_store.rs

Lines changed: 46 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ use iroh::{SecretKey, Watcher};
66
use iroh_base::ticket::NodeTicket;
77
use iroh_blobs::{
88
api::downloader::Shuffled,
9-
provider::Event,
9+
provider::{AbortReason, Event, EventMask, EventSender2, ProviderMessage},
1010
store::fs::FsStore,
1111
test::{add_hash_sequences, create_random_blobs},
1212
HashAndFormat,
@@ -104,78 +104,66 @@ pub fn dump_provider_events(
104104
allow_push: bool,
105105
) -> (
106106
tokio::task::JoinHandle<()>,
107-
mpsc::Sender<iroh_blobs::provider::Event>,
107+
EventSender2,
108108
) {
109109
let (tx, mut rx) = mpsc::channel(100);
110110
let dump_task = tokio::spawn(async move {
111111
while let Some(event) = rx.recv().await {
112112
match event {
113-
Event::ClientConnected {
114-
node_id,
115-
connection_id,
116-
permitted,
117-
} => {
118-
permitted.send(true).await.ok();
119-
println!("Client connected: {node_id} {connection_id}");
113+
ProviderMessage::ClientConnected(msg) => {
114+
println!("{:?}", msg.inner);
115+
msg.tx.send(Ok(())).await.ok();
120116
}
121-
Event::GetRequestReceived {
122-
connection_id,
123-
request_id,
124-
hash,
125-
ranges,
126-
} => {
127-
println!(
128-
"Get request received: {connection_id} {request_id} {hash} {ranges:?}"
129-
);
117+
ProviderMessage::ClientConnectedNotify(msg) => {
118+
println!("{:?}", msg.inner);
130119
}
131-
Event::TransferCompleted {
132-
connection_id,
133-
request_id,
134-
stats,
135-
} => {
136-
println!("Transfer completed: {connection_id} {request_id} {stats:?}");
120+
ProviderMessage::ConnectionClosed(msg) => {
121+
println!("{:?}", msg.inner);
137122
}
138-
Event::TransferAborted {
139-
connection_id,
140-
request_id,
141-
stats,
142-
} => {
143-
println!("Transfer aborted: {connection_id} {request_id} {stats:?}");
123+
ProviderMessage::GetRequestReceived(mut msg) => {
124+
println!("{:?}", msg.inner);
125+
msg.tx.send(Ok(())).await.ok();
126+
tokio::spawn(async move {
127+
while let Ok(update) = msg.rx.recv().await {
128+
info!("{update:?}");
129+
}
130+
});
144131
}
145-
Event::TransferProgress {
146-
connection_id,
147-
request_id,
148-
index,
149-
end_offset,
150-
} => {
151-
info!("Transfer progress: {connection_id} {request_id} {index} {end_offset}");
132+
ProviderMessage::GetRequestReceivedNotify(msg) => {
133+
println!("{:?}", msg.inner);
152134
}
153-
Event::PushRequestReceived {
154-
connection_id,
155-
request_id,
156-
hash,
157-
ranges,
158-
permitted,
159-
} => {
160-
if allow_push {
161-
permitted.send(true).await.ok();
162-
println!(
163-
"Push request received: {connection_id} {request_id} {hash} {ranges:?}"
164-
);
135+
ProviderMessage::GetManyRequestReceived(mut msg) => {
136+
println!("{:?}", msg.inner);
137+
msg.tx.send(Ok(())).await.ok();
138+
tokio::spawn(async move {
139+
while let Ok(update) = msg.rx.recv().await {
140+
info!("{update:?}");
141+
}
142+
});
143+
}
144+
ProviderMessage::GetManyRequestReceivedNotify(msg) => {
145+
println!("{:?}", msg.inner);
146+
}
147+
ProviderMessage::PushRequestReceived(msg) => {
148+
println!("{:?}", msg.inner);
149+
let res = if allow_push {
150+
Ok(())
165151
} else {
166-
permitted.send(false).await.ok();
167-
println!(
168-
"Push request denied: {connection_id} {request_id} {hash} {ranges:?}"
169-
);
170-
}
152+
Err(AbortReason::Permission)
153+
};
154+
msg.tx.send(res).await.ok();
155+
}
156+
ProviderMessage::PushRequestReceivedNotify(msg) => {
157+
println!("{:?}", msg.inner);
171158
}
172-
_ => {
173-
info!("Received event: {:?}", event);
159+
ProviderMessage::Throttle(msg) => {
160+
println!("{:?}", msg.inner);
161+
msg.tx.send(Ok(())).await.ok();
174162
}
175163
}
176164
}
177165
});
178-
(dump_task, tx)
166+
(dump_task, EventSender2::new(tx, EventMask::ALL))
179167
}
180168

181169
#[tokio::main]
@@ -237,7 +225,7 @@ async fn provide(args: ProvideArgs) -> anyhow::Result<()> {
237225
.bind()
238226
.await?;
239227
let (dump_task, events_tx) = dump_provider_events(args.allow_push);
240-
let blobs = iroh_blobs::BlobsProtocol::new(&store, endpoint.clone(), Some(events_tx));
228+
let blobs = iroh_blobs::BlobsProtocol::new(&store, endpoint.clone(), events_tx);
241229
let router = iroh::protocol::Router::builder(endpoint.clone())
242230
.accept(iroh_blobs::ALPN, blobs)
243231
.spawn();

examples/transfer.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
use std::path::PathBuf;
22

33
use iroh::{protocol::Router, Endpoint};
4-
use iroh_blobs::{store::mem::MemStore, ticket::BlobTicket, BlobsProtocol};
4+
use iroh_blobs::{provider::EventSender2, store::mem::MemStore, ticket::BlobTicket, BlobsProtocol};
55

66
#[tokio::main]
77
async fn main() -> anyhow::Result<()> {
@@ -12,7 +12,7 @@ async fn main() -> anyhow::Result<()> {
1212
// We initialize an in-memory backing store for iroh-blobs
1313
let store = MemStore::new();
1414
// Then we initialize a struct that can accept blobs requests over iroh connections
15-
let blobs = BlobsProtocol::new(&store, endpoint.clone(), None);
15+
let blobs = BlobsProtocol::new(&store, endpoint.clone(), EventSender2::NONE);
1616

1717
// Grab all passed in arguments, the first one is the binary itself, so we skip it.
1818
let args: Vec<String> = std::env::args().skip(1).collect();

src/api/blobs.rs

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ use super::{
5757
};
5858
use crate::{
5959
api::proto::{BatchRequest, ImportByteStreamUpdate},
60-
provider::StreamContext,
60+
provider::{ReaderContext, WriterContext},
6161
store::IROH_BLOCK_SIZE,
6262
util::temp_tag::TempTag,
6363
BlobFormat, Hash, HashAndFormat,
@@ -1168,16 +1168,21 @@ pub(crate) trait WriteProgress {
11681168
async fn send_transfer_started(&mut self, index: u64, hash: &Hash, size: u64);
11691169
}
11701170

1171-
impl WriteProgress for StreamContext {
1172-
async fn notify_payload_write(&mut self, index: u64, offset: u64, len: usize) {
1173-
StreamContext::notify_payload_write(self, index, offset, len);
1171+
impl WriteProgress for WriterContext {
1172+
async fn notify_payload_write(&mut self, _index: u64, offset: u64, len: usize) {
1173+
let end_offset = offset + len as u64;
1174+
self.payload_bytes_written += len as u64;
1175+
self.tracker.transfer_progress(end_offset).await.ok();
11741176
}
11751177

11761178
fn log_other_write(&mut self, len: usize) {
1177-
StreamContext::log_other_write(self, len);
1179+
self.other_bytes_written += len as u64;
11781180
}
11791181

11801182
async fn send_transfer_started(&mut self, index: u64, hash: &Hash, size: u64) {
1181-
StreamContext::send_transfer_started(self, index, hash, size).await
1183+
self.tracker
1184+
.transfer_started(index, hash, size)
1185+
.await
1186+
.ok();
11821187
}
11831188
}

src/net_protocol.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ use tracing::error;
4848

4949
use crate::{
5050
api::Store,
51-
provider::{Event, EventSender},
51+
provider::{Event, EventSender2},
5252
ticket::BlobTicket,
5353
HashAndFormat,
5454
};
@@ -57,7 +57,7 @@ use crate::{
5757
pub(crate) struct BlobsInner {
5858
pub(crate) store: Store,
5959
pub(crate) endpoint: Endpoint,
60-
pub(crate) events: EventSender,
60+
pub(crate) events: EventSender2,
6161
}
6262

6363
/// A protocol handler for the blobs protocol.
@@ -75,12 +75,12 @@ impl Deref for BlobsProtocol {
7575
}
7676

7777
impl BlobsProtocol {
78-
pub fn new(store: &Store, endpoint: Endpoint, events: Option<mpsc::Sender<Event>>) -> Self {
78+
pub fn new(store: &Store, endpoint: Endpoint, events: EventSender2) -> Self {
7979
Self {
8080
inner: Arc::new(BlobsInner {
8181
store: store.clone(),
8282
endpoint,
83-
events: EventSender::new(events),
83+
events,
8484
}),
8585
}
8686
}

0 commit comments

Comments
 (0)