diff --git a/CHANGELOG.md b/CHANGELOG.md index 8a46956..6dea1a5 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,7 +1,20 @@ # Changelog +## [0.6.5] - Unreleased -## [0.6.4] - Unreleased +### Added + +- Added `DagExported` and `DagExportFailed` responses to `ExportDag` command. +- Added `DagTransmissionComplete` response when a DAG has been completely transmitted. + +### Changed + +- MTU is now an optional flag on `controller` +- Added `block_size` as a `myceli` config option for controlling IPFS block size in file chunking +- Changed default `block_size` to 3072 bytes +- Fixed cases where responses inside of dag transfer session weren't sent to original target address + +## [0.6.4] - 2023-05-15 ### Added diff --git a/Cargo.lock b/Cargo.lock index 34bb6d0..bb7797b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -429,7 +429,7 @@ checksum = "13418e745008f7349ec7e449155f419a61b92b58a99cc3616942b926825ec76b" [[package]] name = "controller" -version = "0.6.3" +version = "0.6.5" dependencies = [ "anyhow", "clap", @@ -928,7 +928,7 @@ dependencies = [ [[package]] name = "hyphae" -version = "0.6.3" +version = "0.6.5" dependencies = [ "anyhow", "clap", @@ -1012,7 +1012,7 @@ dependencies = [ [[package]] name = "ipfs-unixfs" -version = "0.6.3" +version = "0.6.5" dependencies = [ "anyhow", "async-recursion", @@ -1194,7 +1194,7 @@ checksum = "ece97ea872ece730aed82664c424eb4c8291e1ff2480247ccf7409044bc6479f" [[package]] name = "local-storage" -version = "0.6.3" +version = "0.6.5" dependencies = [ "anyhow", "assert_fs", @@ -1206,6 +1206,7 @@ dependencies = [ "rusqlite", "thiserror", "tokio", + "tokio-util", "tracing", ] @@ -1262,7 +1263,7 @@ dependencies = [ [[package]] name = "messages" -version = "0.6.3" +version = "0.6.5" dependencies = [ "cid", "clap", @@ -1389,7 +1390,7 @@ checksum = "e5ce46fe64a9d73be07dcbe690a38ce1b293be448fd8ce1e6c1b8062c9f72c6a" [[package]] name = "myceli" -version = "0.6.3" +version = "0.6.5" dependencies = [ "anyhow", "assert_fs", @@ -2285,7 +2286,7 @@ dependencies = [ [[package]] name = "transports" -version = "0.6.3" +version = "0.6.5" dependencies = [ "anyhow", "cid", diff --git a/Cargo.toml b/Cargo.toml index 22a2745..236836e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -11,7 +11,7 @@ members = [ ] [workspace.package] -version = "0.6.3" +version = "0.6.5" edition = "2021" license = "Apache-2.0/MIT" rust-version = "1.68.1" diff --git a/controller/src/main.rs b/controller/src/main.rs index d079627..81447a1 100644 --- a/controller/src/main.rs +++ b/controller/src/main.rs @@ -8,6 +8,8 @@ use transports::{Transport, UdpTransport}; #[clap(about = "Control a Myceli instance")] pub struct Cli { instance_addr: String, + #[arg(short, long, default_value = "512")] + mtu: u16, #[arg(short, long)] listen_mode: bool, #[arg(short, long, default_value = "0.0.0.0:8090")] @@ -18,7 +20,7 @@ pub struct Cli { impl Cli { pub async fn run(&self) -> Result<()> { - let transport = UdpTransport::new(&self.bind_address, 512)?; + let transport = UdpTransport::new(&self.bind_address, self.mtu)?; let command = Message::ApplicationAPI(self.command.clone()); let cmd_str = serde_json::to_string(&command)?; diff --git a/ipfs-unixfs/Cargo.toml b/ipfs-unixfs/Cargo.toml index 7ce25c8..65be05b 100644 --- a/ipfs-unixfs/Cargo.toml +++ b/ipfs-unixfs/Cargo.toml @@ -20,6 +20,7 @@ multihash.workspace = true num_enum.workspace = true prost.workspace = true tokio = { workspace = true, features = ["fs"] } +tokio-util = { workspace = true, features = ["io-util"] } [dev-dependencies] # criterion = { workspace = true, features = ["async_tokio"] } diff --git a/local-storage/Cargo.toml b/local-storage/Cargo.toml index eea4452..d7d0ef0 100644 --- a/local-storage/Cargo.toml +++ b/local-storage/Cargo.toml @@ -15,6 +15,7 @@ ipfs-unixfs.workspace = true rusqlite.workspace = true thiserror.workspace = true tokio = { workspace = true, features = ["rt", "rt-multi-thread"] } +tokio-util = { workspace = true, features = ["io-util"] } tracing.workspace = true [dev-dependencies] diff --git a/local-storage/src/block.rs b/local-storage/src/block.rs index bb49774..548d144 100644 --- a/local-storage/src/block.rs +++ b/local-storage/src/block.rs @@ -3,9 +3,10 @@ use anyhow::{anyhow, bail, Result}; use bytes::Bytes; use cid::Cid; use ipfs_unixfs::Block; +use std::fmt; use std::str::FromStr; -#[derive(Debug, PartialEq)] +#[derive(PartialEq)] pub struct StoredBlock { pub cid: String, pub data: Vec, @@ -21,6 +22,20 @@ impl StoredBlock { } } +impl fmt::Debug for StoredBlock { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + let cid_str = Cid::try_from(self.cid.clone()) + .map(|c| c.to_string()) + .unwrap(); + + f.debug_struct("StoredBlock") + .field("cid", &cid_str) + .field("data", &self.data.len()) + .field("links", &self.links.len()) + .finish() + } +} + impl TryInto for &StoredBlock { type Error = anyhow::Error; @@ -40,9 +55,6 @@ pub fn validate_dag(stored_blocks: &[StoredBlock]) -> Result<()> { if stored_blocks.is_empty() { bail!("No blocks found in dag") } - for block in stored_blocks.iter() { - block.validate()?; - } verify_dag(stored_blocks)?; Ok(()) } diff --git a/local-storage/src/storage.rs b/local-storage/src/storage.rs index e0076d0..8e6f19e 100644 --- a/local-storage/src/storage.rs +++ b/local-storage/src/storage.rs @@ -15,15 +15,15 @@ use tracing::{error, info}; pub struct Storage { pub provider: Box, + block_size: u32, } -// TODO: Make this configurable -// Changing to 1MB to optimize for larger files -const BLOCK_SIZE: usize = 1024 * 100; - impl Storage { - pub fn new(provider: Box) -> Self { - Storage { provider } + pub fn new(provider: Box, block_size: u32) -> Self { + Storage { + provider, + block_size, + } } pub fn import_path(&self, path: &Path) -> Result { @@ -31,7 +31,7 @@ impl Storage { let blocks: Result> = rt.block_on(async { let file: File = FileBuilder::new() .path(path) - .fixed_chunker(BLOCK_SIZE) + .fixed_chunker(self.block_size.try_into()?) .build() .await?; let blocks: Vec<_> = file.encode().await?.try_collect().await?; @@ -40,8 +40,6 @@ impl Storage { let blocks = blocks?; let mut root_cid: Option = None; - let mut stored_blocks = vec![]; - blocks.iter().for_each(|b| { let links = b .links() @@ -61,14 +59,9 @@ impl Storage { error!("Failed to import block {e}"); } if !stored.links.is_empty() { - root_cid = Some(stored.cid.clone()); + root_cid = Some(stored.cid); } - stored_blocks.push(stored); }); - info!("Validating imported blocks {}", blocks.len()); - if let Err(e) = crate::block::validate_dag(&stored_blocks) { - error!("Failed to validate dag on import: {e}"); - } if blocks.len() == 1 { if let Some(first) = blocks.first() { root_cid = Some(first.cid().to_string()); @@ -141,7 +134,6 @@ impl Storage { window_size: u32, window_num: u32, ) -> Result> { - println!("offset = {} * {}", window_size, window_num); let offset = window_size * window_num; self.provider @@ -156,6 +148,8 @@ pub mod tests { use assert_fs::{fixture::FileWriteBin, fixture::PathChild, TempDir}; use rand::{thread_rng, RngCore}; + const BLOCK_SIZE: usize = 1024 * 10; + struct TestHarness { storage: Storage, _db_dir: TempDir, @@ -167,7 +161,7 @@ pub mod tests { let db_path = db_dir.child("storage.db"); let provider = SqliteStorageProvider::new(db_path.path().to_str().unwrap()).unwrap(); provider.setup().unwrap(); - let storage = Storage::new(Box::new(provider)); + let storage = Storage::new(Box::new(provider), BLOCK_SIZE.try_into().unwrap()); TestHarness { storage, _db_dir: db_dir, @@ -264,17 +258,14 @@ pub mod tests { let cid = harness.storage.import_path(test_file.path()).unwrap(); let window_size: u32 = 10; - let mut window_num = 0; - let all_dag_blocks = harness.storage.get_all_dag_blocks(&cid).unwrap(); - for chunk in all_dag_blocks.chunks(window_size as usize).into_iter() { + for (window_num, chunk) in all_dag_blocks.chunks(window_size as usize).enumerate() { let window_blocks = harness .storage - .get_dag_blocks_by_window(&cid, window_size, window_num) + .get_dag_blocks_by_window(&cid, window_size, window_num.try_into().unwrap()) .unwrap(); assert_eq!(chunk, &window_blocks); - window_num += 1; } } diff --git a/local-storage/src/util.rs b/local-storage/src/util.rs index f864ace..f47ec9f 100644 --- a/local-storage/src/util.rs +++ b/local-storage/src/util.rs @@ -51,7 +51,7 @@ pub(crate) fn verify_dag(blocks: &[StoredBlock]) -> Result<()> { while !queue.is_empty() { // this is a safe unwrap as the queue is not empty let node: &StoredBlock = queue.pop_front().unwrap(); - // ignore a visite node + // ignore a visited node if visited.contains(&node.cid.as_str()) { continue; } diff --git a/messages/src/api.rs b/messages/src/api.rs index ccea14b..7da4c99 100644 --- a/messages/src/api.rs +++ b/messages/src/api.rs @@ -18,6 +18,17 @@ pub enum ApplicationAPI { cid: String, path: String, }, + /// Used to indicate the failure of a dag export + DagExportFailed { + cid: String, + path: String, + error: String, + }, + /// Used to indicate a successful dag export + DagExported { + cid: String, + path: String, + }, /// Sets current connected state SetConnected { #[arg(action(clap::ArgAction::Set), required(true))] @@ -44,6 +55,10 @@ pub enum ApplicationAPI { target_addr: String, retries: u8, }, + /// Indicates that a Dag has been transmitted completely successfully + DagTransmissionComplete { + cid: String, + }, /// Initiates transmission of block corresponding to the given CID TransmitBlock { cid: String, diff --git a/messages/src/protocol.rs b/messages/src/protocol.rs index a4e89ef..230dd70 100644 --- a/messages/src/protocol.rs +++ b/messages/src/protocol.rs @@ -37,7 +37,6 @@ pub enum DataProtocol { // in order to continue transmitting the dag RetryDagSession { cid: String, - target_addr: String, }, // Requests windowed transmission of a dag RequestTransmitDag { diff --git a/myceli/src/config.rs b/myceli/src/config.rs index 9472704..0007e06 100644 --- a/myceli/src/config.rs +++ b/myceli/src/config.rs @@ -12,6 +12,7 @@ pub struct Config { pub storage_path: String, pub mtu: u16, pub window_size: u32, + pub block_size: u32, } impl Default for Config { @@ -24,8 +25,11 @@ impl Default for Config { // Default storage dir storage_path: "storage".to_string(), // Default MTU appropriate for dev radio - mtu: 60, + mtu: 512, + // Default to sending five blocks at a time window_size: 5, + // Default to 3 kilobyte blocks + block_size: 1024 * 3, } } } diff --git a/myceli/src/handlers.rs b/myceli/src/handlers.rs index f597da4..45f7a94 100644 --- a/myceli/src/handlers.rs +++ b/myceli/src/handlers.rs @@ -74,6 +74,8 @@ pub mod tests { use local_storage::provider::SqliteStorageProvider; use rand::{thread_rng, RngCore}; + const BLOCK_SIZE: u32 = 1024 * 3; + struct TestHarness { storage: Rc, db_dir: TempDir, @@ -85,7 +87,7 @@ pub mod tests { let db_path = db_dir.child("storage.db"); let provider = SqliteStorageProvider::new(db_path.path().to_str().unwrap()).unwrap(); provider.setup().unwrap(); - let storage = Rc::new(Storage::new(Box::new(provider))); + let storage = Rc::new(Storage::new(Box::new(provider), BLOCK_SIZE)); TestHarness { storage, db_dir } } diff --git a/myceli/src/listener.rs b/myceli/src/listener.rs index 8e5fb44..27bbc07 100644 --- a/myceli/src/listener.rs +++ b/myceli/src/listener.rs @@ -25,10 +25,11 @@ impl Listener { listen_address: &SocketAddr, storage_path: &str, transport: Arc, + block_size: u32, ) -> Result> { let provider = SqliteStorageProvider::new(storage_path)?; provider.setup()?; - let storage = Rc::new(Storage::new(Box::new(provider))); + let storage = Rc::new(Storage::new(Box::new(provider), block_size)); info!("Listening on {listen_address}"); Ok(Listener { storage_path: storage_path.to_string(), @@ -38,7 +39,12 @@ impl Listener { }) } - pub fn start(&mut self, shipper_timeout_duration: u64, shipper_window_size: u32) -> Result<()> { + pub fn start( + &mut self, + shipper_timeout_duration: u64, + shipper_window_size: u32, + block_size: u32, + ) -> Result<()> { // First setup the shipper and its pieces let (shipper_sender, shipper_receiver) = mpsc::channel(); let shipper_storage_path = self.storage_path.to_string(); @@ -54,6 +60,7 @@ impl Listener { shipper_window_size, shipper_transport, initial_connected, + block_size, ) .expect("Shipper creation failed"); shipper.receive_msg_loop(); @@ -121,8 +128,17 @@ impl Listener { Some(handlers::import_file(&path, self.storage.clone())?) } Message::ApplicationAPI(ApplicationAPI::ExportDag { cid, path }) => { - self.storage.export_cid(&cid, &PathBuf::from(path))?; - None + match self.storage.export_cid(&cid, &PathBuf::from(path.clone())) { + Ok(()) => Some(Message::ApplicationAPI(ApplicationAPI::DagExported { + cid, + path, + })), + Err(e) => Some(Message::ApplicationAPI(ApplicationAPI::DagExportFailed { + cid, + path, + error: e.to_string(), + })), + } } Message::ApplicationAPI(ApplicationAPI::RequestAvailableBlocks) => { Some(handlers::request_available_blocks(self.storage.clone())?) @@ -176,6 +192,17 @@ impl Listener { info!("Received FileImported from {sender_addr}: {path} -> {cid}"); None } + Message::ApplicationAPI(ApplicationAPI::DagTransmissionComplete { cid }) => { + let dag_blocks = self.storage.get_all_dag_blocks(&cid)?; + match local_storage::block::validate_dag(&dag_blocks) { + Ok(_) => info!("Sucessfully received and validated dag {cid}"), + Err(e) => { + error!("Failure in receiving dag {cid}: {}", e.to_string()); + // TOOD: Delete dag and restart transmission at this point? + } + } + None + } // Default case for valid messages which don't have handling code implemented yet message => { info!("Received unhandled message: {:?}", message); diff --git a/myceli/src/main.rs b/myceli/src/main.rs index 56768ae..b688ed0 100644 --- a/myceli/src/main.rs +++ b/myceli/src/main.rs @@ -36,10 +36,15 @@ fn main() -> Result<()> { let udp_transport = UdpTransport::new(&cfg.listen_address, cfg.mtu).expect("Failed to create udp transport"); - let mut listener = Listener::new(&resolved_listen_addr, &db_path, Arc::new(udp_transport)) - .expect("Listener creation failed"); + let mut listener = Listener::new( + &resolved_listen_addr, + &db_path, + Arc::new(udp_transport), + cfg.block_size, + ) + .expect("Listener creation failed"); listener - .start(cfg.retry_timeout_duration, cfg.window_size) + .start(cfg.retry_timeout_duration, cfg.window_size, cfg.block_size) .expect("Error encountered in listener operation"); Ok(()) } diff --git a/myceli/src/shipper.rs b/myceli/src/shipper.rs index 1b81efe..1ce74e7 100644 --- a/myceli/src/shipper.rs +++ b/myceli/src/shipper.rs @@ -45,6 +45,7 @@ pub struct Shipper { } impl Shipper { + #[allow(clippy::too_many_arguments)] pub fn new( storage_path: &str, receiver: Receiver<(DataProtocol, String)>, @@ -53,11 +54,11 @@ impl Shipper { window_size: u32, transport: Arc, connected: Arc>, + block_size: u32, ) -> Result> { let provider = SqliteStorageProvider::new(storage_path)?; provider.setup()?; - let storage = Rc::new(Storage::new(Box::new(provider))); - + let storage = Rc::new(Storage::new(Box::new(provider), block_size)); Ok(Shipper { storage, window_sessions: BTreeMap::new(), @@ -95,20 +96,16 @@ impl Shipper { } => { self.begin_dag_window_session(&cid, &target_addr, retries)?; } - DataProtocol::RetryDagSession { cid, target_addr } => { - if *self.connected.lock().unwrap() && self.window_sessions.contains_key(&cid) { - info!("Received retry dag session, sending get missing req to {target_addr}"); + DataProtocol::RetryDagSession { cid } => { + if *self.connected.lock().unwrap() { if let Some(session) = self.window_sessions.get(&cid) { - let blocks = self.get_dag_window_blocks(&cid, session.window_num)?; - let blocks = blocks.iter().map(|s| s.cid.to_string()).collect(); - self.transmit_msg( - Message::DataProtocol(DataProtocol::RequestMissingDagWindowBlocks { - cid: cid.to_string(), - blocks, - }), - &target_addr, - )?; - self.retry_dag_window_session(&cid, &target_addr); + info!( + "Received retry dag session for {cid}, sending get missing req to {}", + &session.target_addr + ); + let target_addr = session.target_addr.clone(); + self.dag_window_session_run(&cid, session.window_num, &target_addr)?; + self.retry_dag_window_session(&cid); } } } @@ -131,23 +128,28 @@ impl Shipper { } DataProtocol::MissingDagBlocks { cid, blocks } => { if *self.connected.lock().unwrap() { + let target_addr = if let Some(session) = self.window_sessions.get(&cid) { + session.target_addr.to_string() + } else { + sender_addr.to_string() + }; // If no blocks are missing, then attempt to move to next window if blocks.is_empty() { - self.increment_dag_window_session(&cid, sender_addr)?; + self.increment_dag_window_session(&cid, &target_addr)?; } else { info!( "Dag {cid} is missing {} blocks, sending again", blocks.len() ); for b in blocks.clone() { - self.transmit_block(&b, sender_addr)?; + self.transmit_block(&b, &target_addr)?; } self.transmit_msg( Message::DataProtocol(DataProtocol::RequestMissingDagWindowBlocks { cid, blocks, }), - sender_addr, + &target_addr, )?; } } @@ -194,30 +196,27 @@ impl Shipper { self.window_sessions.remove(cid); } - fn start_dag_window_retry_timeout(&mut self, cid: &str, target_addr: &str) { + fn start_dag_window_retry_timeout(&mut self, cid: &str) { let sender_clone = self.sender.clone(); let cid_str = cid.to_string(); - let target_addr_str = target_addr.to_string(); + info!("Starting retry timer at {}", self.retry_timeout_duration); let timeout_duration = Duration::from_millis(self.retry_timeout_duration); spawn(move || { sleep(timeout_duration); sender_clone .send(( - DataProtocol::RetryDagSession { - cid: cid_str, - target_addr: target_addr_str, - }, + DataProtocol::RetryDagSession { cid: cid_str }, "127.0.0.1:0".to_string(), )) .unwrap(); }); } - fn retry_dag_window_session(&mut self, cid: &str, target_addr: &str) { + fn retry_dag_window_session(&mut self, cid: &str) { if let Some(session) = self.window_sessions.get_mut(cid) { if session.remaining_window_retries > 0 { session.remaining_window_retries -= 1; - self.start_dag_window_retry_timeout(cid, target_addr); + self.start_dag_window_retry_timeout(cid); } } } @@ -247,6 +246,12 @@ impl Shipper { } else { info!("Dag transfer session for {cid} is complete"); self.end_dag_window_session(cid); + self.transmit_msg( + Message::ApplicationAPI(messages::ApplicationAPI::DagTransmissionComplete { + cid: cid.to_string(), + }), + target_addr, + )?; } } Ok(()) @@ -265,7 +270,7 @@ impl Shipper { info!("start dag window session for {cid}"); // Need to reset the window retries here self.dag_window_session_run(cid, session.window_num, &session.target_addr)?; - self.start_dag_window_retry_timeout(cid, &session.target_addr); + self.start_dag_window_retry_timeout(cid); } Ok(()) @@ -290,7 +295,7 @@ impl Shipper { self.dag_window_session_run(cid, 0, target_addr)?; let retries = if retries == 0 { 0 } else { retries - 1 }; self.open_dag_window_session(cid, retries, target_addr); - self.start_dag_window_retry_timeout(cid, target_addr); + self.start_dag_window_retry_timeout(cid); } else { self.open_dag_window_session(cid, retries, target_addr); } @@ -442,6 +447,8 @@ mod tests { test_dir: TempDir, } + const BLOCK_SIZE: u32 = 1024 * 3; + impl TestShipper { pub fn new() -> Self { let mut rng = thread_rng(); @@ -459,7 +466,7 @@ mod tests { let db_path = test_dir.child("storage.db"); let provider = SqliteStorageProvider::new(db_path.path().to_str().unwrap()).unwrap(); provider.setup().unwrap(); - let _storage = Rc::new(Storage::new(Box::new(provider))); + let _storage = Rc::new(Storage::new(Box::new(provider), BLOCK_SIZE)); let (shipper_sender, shipper_receiver) = mpsc::channel(); let shipper = Shipper::new( @@ -470,6 +477,7 @@ mod tests { 5, shipper_transport, Arc::new(Mutex::new(true)), + BLOCK_SIZE, ) .unwrap(); TestShipper { diff --git a/myceli/tests/listener_test.rs b/myceli/tests/listener_test.rs index 66d25aa..90f81ac 100644 --- a/myceli/tests/listener_test.rs +++ b/myceli/tests/listener_test.rs @@ -42,7 +42,7 @@ pub fn test_transmit_receive_block() { &transmitter.listen_addr, ); - sleep(Duration::from_millis(100)); + utils::wait_receiving_done(&receiver, &mut controller); let resp = controller.send_and_recv(&receiver.listen_addr, Message::request_available_blocks()); @@ -73,7 +73,7 @@ pub fn test_transmit_receive_dag() { &transmitter.listen_addr, ); - sleep(Duration::from_millis(2_000)); + utils::wait_receiving_done(&receiver, &mut controller); let receiver_blocks = controller.send_and_recv(&receiver.listen_addr, Message::request_available_blocks()); @@ -110,7 +110,7 @@ pub fn test_transmit_receive_dag_with_retries() { &transmitter.listen_addr, ); - sleep(Duration::from_millis(500)); + utils::wait_receiving_done(&receiver, &mut controller); let receiver_blocks = controller.send_and_recv(&receiver.listen_addr, Message::request_available_blocks()); @@ -147,7 +147,7 @@ pub fn test_import_transmit_export_file() { &transmitter.listen_addr, ); - sleep(Duration::from_millis(600)); + utils::wait_receiving_done(&receiver, &mut controller); let export_path = format!("{}/export", &receiver.test_dir.to_str().unwrap()); controller.send_msg( @@ -246,7 +246,7 @@ pub fn test_resume_dag_after_reconnect() { &transmitter.listen_addr, ); - sleep(Duration::from_millis(2_000)); + utils::wait_receiving_done(&receiver, &mut controller); let receiver_blocks = controller.send_and_recv(&receiver.listen_addr, Message::request_available_blocks()); @@ -261,7 +261,7 @@ pub fn test_resume_dag_after_reconnect() { &transmitter.listen_addr, ); - sleep(Duration::from_millis(2_000)); + utils::wait_receiving_done(&receiver, &mut controller); let receiver_blocks = controller.send_and_recv(&receiver.listen_addr, Message::request_available_blocks()); @@ -303,7 +303,7 @@ pub fn test_no_transmit_after_disconnect() { &transmitter.listen_addr, ); - sleep(Duration::from_millis(2_000)); + utils::wait_receiving_done(&receiver, &mut controller); let receiver_blocks = controller.send_and_recv(&receiver.listen_addr, Message::request_available_blocks()); @@ -337,7 +337,7 @@ pub fn test_transmit_resume_after_timeout() { &transmitter.listen_addr, ); - sleep(Duration::from_millis(2_000)); + sleep(Duration::from_secs(1)); receiver.start().unwrap(); @@ -354,7 +354,7 @@ pub fn test_transmit_resume_after_timeout() { &transmitter.listen_addr, ); - sleep(Duration::from_millis(2_000)); + utils::wait_receiving_done(&receiver, &mut controller); let receiver_blocks = controller.send_and_recv(&receiver.listen_addr, Message::request_available_blocks()); diff --git a/myceli/tests/utils/mod.rs b/myceli/tests/utils/mod.rs index 1f9e191..5c41740 100644 --- a/myceli/tests/utils/mod.rs +++ b/myceli/tests/utils/mod.rs @@ -13,6 +13,35 @@ use std::sync::Arc; use std::thread::{sleep, spawn}; use transports::{Transport, UdpTransport}; +const BLOCK_SIZE: u32 = 1024 * 3; + +pub fn wait_receiving_done(receiver: &TestListener, controller: &mut TestController) { + let mut prev_num_blocks = 0; + let mut num_retries = 0; + + loop { + let current_blocks = + if let Message::ApplicationAPI(messages::ApplicationAPI::AvailableBlocks { cids }) = + controller.send_and_recv(&receiver.listen_addr, Message::request_available_blocks()) + { + cids + } else { + panic!("Failed to get correct response to blocks request"); + }; + let current_num_blocks = current_blocks.len(); + if current_num_blocks > prev_num_blocks { + prev_num_blocks = current_num_blocks; + num_retries = 0; + } else { + if num_retries > 10 { + break; + } + num_retries += 1; + } + sleep(Duration::from_millis(10)); + } +} + pub struct TestListener { pub listen_addr: String, pub test_dir: TempDir, @@ -67,9 +96,9 @@ fn start_listener_thread(listen_addr: SocketAddr, db_path: ChildPath) { .unwrap(); transport.set_max_read_attempts(Some(1)); let transport = Arc::new(transport); - let mut listener = Listener::new(&listen_addr, db_path, transport).unwrap(); + let mut listener = Listener::new(&listen_addr, db_path, transport, BLOCK_SIZE).unwrap(); listener - .start(10, 2) + .start(10, 2, BLOCK_SIZE) .expect("Error encountered in listener"); }