Skip to content

Commit 65e5e40

Browse files
committed
fixup
1 parent b26b408 commit 65e5e40

File tree

3 files changed

+16
-21
lines changed

3 files changed

+16
-21
lines changed

examples/limit.rs

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -271,7 +271,7 @@ async fn main() -> Result<()> {
271271
let store = MemStore::new();
272272
let hashes = add_paths(&store, paths).await?;
273273
let events = limit_by_node_id(allowed_nodes.clone());
274-
let (router, addr) = setup(MemStore::new(), events).await?;
274+
let (router, addr) = setup(store, events).await?;
275275

276276
for (path, hash) in hashes {
277277
let ticket = BlobTicket::new(addr.clone(), hash, BlobFormat::Raw);
@@ -299,12 +299,16 @@ async fn main() -> Result<()> {
299299
}
300300
}
301301

302-
let events = limit_by_hash(allowed_hashes);
303-
let (router, addr) = setup(MemStore::new(), events).await?;
302+
let events = limit_by_hash(allowed_hashes.clone());
303+
let (router, addr) = setup(store, events).await?;
304304

305-
for (i, (path, hash)) in hashes.iter().enumerate() {
305+
for (path, hash) in hashes.iter() {
306306
let ticket = BlobTicket::new(addr.clone(), *hash, BlobFormat::Raw);
307-
let permitted = if i == 0 { "" } else { "limited" };
307+
let permitted = if allowed_hashes.contains(hash) {
308+
"allowed"
309+
} else {
310+
"forbidden"
311+
};
308312
println!("{}: {ticket} ({permitted})", path.display());
309313
}
310314
tokio::signal::ctrl_c().await?;
@@ -314,7 +318,7 @@ async fn main() -> Result<()> {
314318
let store = MemStore::new();
315319
let hashes = add_paths(&store, paths).await?;
316320
let events = throttle(delay_ms);
317-
let (router, addr) = setup(MemStore::new(), events).await?;
321+
let (router, addr) = setup(store, events).await?;
318322
for (path, hash) in hashes {
319323
let ticket = BlobTicket::new(addr.clone(), hash, BlobFormat::Raw);
320324
println!("{}: {ticket}", path.display());
@@ -329,7 +333,7 @@ async fn main() -> Result<()> {
329333
let store = MemStore::new();
330334
let hashes = add_paths(&store, paths).await?;
331335
let events = limit_max_connections(max_connections);
332-
let (router, addr) = setup(MemStore::new(), events).await?;
336+
let (router, addr) = setup(store, events).await?;
333337
for (path, hash) in hashes {
334338
let ticket = BlobTicket::new(addr.clone(), hash, BlobFormat::Raw);
335339
println!("{}: {ticket}", path.display());

examples/random_store.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ use iroh_blobs::{
1414
use irpc::RpcMessage;
1515
use n0_future::StreamExt;
1616
use rand::{rngs::StdRng, Rng, SeedableRng};
17-
use tokio::{signal::ctrl_c, sync::mpsc};
17+
use tokio::signal::ctrl_c;
1818
use tracing::info;
1919

2020
#[derive(Parser, Debug)]
@@ -102,7 +102,7 @@ pub fn get_or_generate_secret_key() -> Result<SecretKey> {
102102
}
103103

104104
pub fn dump_provider_events(allow_push: bool) -> (tokio::task::JoinHandle<()>, EventSender) {
105-
let (tx, mut rx) = mpsc::channel(100);
105+
let (tx, mut rx) = EventSender::channel(100, EventMask::ALL_READONLY);
106106
fn dump_updates<T: RpcMessage>(mut rx: irpc::channel::mpsc::Receiver<T>) {
107107
tokio::spawn(async move {
108108
while let Ok(Some(update)) = rx.recv().await {
@@ -176,7 +176,7 @@ pub fn dump_provider_events(allow_push: bool) -> (tokio::task::JoinHandle<()>, E
176176
}
177177
}
178178
});
179-
(dump_task, EventSender::new(tx, EventMask::ALL_READONLY))
179+
(dump_task, tx)
180180
}
181181

182182
#[tokio::main]

src/tests.rs

Lines changed: 2 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -342,7 +342,7 @@ fn event_handler(
342342
allowed_nodes: impl IntoIterator<Item = NodeId>,
343343
) -> (EventSender, watch::Receiver<usize>, AbortOnDropHandle<()>) {
344344
let (count_tx, count_rx) = tokio::sync::watch::channel(0usize);
345-
let (events_tx, mut events_rx) = mpsc::channel::<ProviderMessage>(16);
345+
let (events_tx, mut events_rx) = EventSender::channel(16, EventMask::ALL_READONLY);
346346
let allowed_nodes = allowed_nodes.into_iter().collect::<HashSet<_>>();
347347
let task = AbortOnDropHandle::new(tokio::task::spawn(async move {
348348
while let Some(event) = events_rx.recv().await {
@@ -370,16 +370,7 @@ fn event_handler(
370370
}
371371
}
372372
}));
373-
(
374-
EventSender::new(
375-
events_tx,
376-
EventMask {
377-
..EventMask::ALL_READONLY
378-
},
379-
),
380-
count_rx,
381-
task,
382-
)
373+
(events_tx, count_rx, task)
383374
}
384375

385376
async fn two_nodes_push_blobs(

0 commit comments

Comments
 (0)