From 80bcb0bdd70094220625be88ba6edc986060244c Mon Sep 17 00:00:00 2001 From: Ryan Plauche Date: Fri, 26 May 2023 14:21:22 -0500 Subject: [PATCH] Added optional throttling of chunk transmission --- controller/src/main.rs | 5 ++++- hyphae/src/config.rs | 2 ++ hyphae/src/main.rs | 1 + hyphae/src/myceli_api.rs | 13 +++++++++++-- myceli/src/config.rs | 3 +++ myceli/src/main.rs | 3 ++- transports/src/udp_transport.rs | 7 ++++++- 7 files changed, 29 insertions(+), 5 deletions(-) diff --git a/controller/src/main.rs b/controller/src/main.rs index 4f5940c..ca2a13d 100644 --- a/controller/src/main.rs +++ b/controller/src/main.rs @@ -11,6 +11,8 @@ pub struct Cli { #[arg(short, long, default_value = "512")] mtu: u16, #[arg(short, long)] + chunk_transmit_throttle: Option, + #[arg(short, long)] listen_mode: bool, #[arg(short, long, default_value = "0.0.0.0:8090")] bind_address: String, @@ -20,7 +22,8 @@ pub struct Cli { impl Cli { pub async fn run(&self) -> Result<()> { - let transport = UdpTransport::new(&self.bind_address, self.mtu)?; + let transport = + UdpTransport::new(&self.bind_address, self.mtu, self.chunk_transmit_throttle)?; let command = Message::ApplicationAPI(self.command.clone()); let cmd_str = serde_json::to_string(&command)?; diff --git a/hyphae/src/config.rs b/hyphae/src/config.rs index 46c25d2..ba18d04 100644 --- a/hyphae/src/config.rs +++ b/hyphae/src/config.rs @@ -12,6 +12,7 @@ pub struct Config { pub kubo_address: String, pub sync_interval: u64, pub myceli_mtu: u16, + pub chunk_transmit_throttle: Option, } impl Default for Config { @@ -22,6 +23,7 @@ impl Default for Config { kubo_address: "0.0.0.0:5001".to_string(), sync_interval: 10_000, myceli_mtu: 60, + chunk_transmit_throttle: None, } } } diff --git a/hyphae/src/main.rs b/hyphae/src/main.rs index a67b2dd..ee11f49 100644 --- a/hyphae/src/main.rs +++ b/hyphae/src/main.rs @@ -99,6 +99,7 @@ fn main() -> Result<()> { &cfg.myceli_address, &cfg.listen_to_myceli_address, cfg.myceli_mtu, + cfg.chunk_transmit_throttle, ) .expect("Failed to create MyceliAPi"); diff --git a/hyphae/src/myceli_api.rs b/hyphae/src/myceli_api.rs index a1cafee..19ae064 100644 --- a/hyphae/src/myceli_api.rs +++ b/hyphae/src/myceli_api.rs @@ -11,8 +11,17 @@ pub struct MyceliApi { } impl MyceliApi { - pub fn new(myceli_address: &str, listen_address: &str, mtu: u16) -> Result { - let transport = Rc::new(UdpTransport::new(listen_address, mtu)?); + pub fn new( + myceli_address: &str, + listen_address: &str, + mtu: u16, + chunk_transmit_throttle: Option, + ) -> Result { + let transport = Rc::new(UdpTransport::new( + listen_address, + mtu, + chunk_transmit_throttle, + )?); Ok(MyceliApi { address: myceli_address.to_string(), listen_address: listen_address.to_string(), diff --git a/myceli/src/config.rs b/myceli/src/config.rs index 0007e06..90a191c 100644 --- a/myceli/src/config.rs +++ b/myceli/src/config.rs @@ -13,6 +13,7 @@ pub struct Config { pub mtu: u16, pub window_size: u32, pub block_size: u32, + pub chunk_transmit_throttle: Option, } impl Default for Config { @@ -30,6 +31,8 @@ impl Default for Config { window_size: 5, // Default to 3 kilobyte blocks block_size: 1024 * 3, + // Default to no throttling of chunks + chunk_transmit_throttle: None, } } } diff --git a/myceli/src/main.rs b/myceli/src/main.rs index b688ed0..4116369 100644 --- a/myceli/src/main.rs +++ b/myceli/src/main.rs @@ -34,7 +34,8 @@ fn main() -> Result<()> { let db_path = format!("{}/storage.db", cfg.storage_path); let udp_transport = - UdpTransport::new(&cfg.listen_address, cfg.mtu).expect("Failed to create udp transport"); + UdpTransport::new(&cfg.listen_address, cfg.mtu, cfg.chunk_transmit_throttle) + .expect("Failed to create udp transport"); let mut listener = Listener::new( &resolved_listen_addr, diff --git a/transports/src/udp_transport.rs b/transports/src/udp_transport.rs index 62c5512..1af5eac 100644 --- a/transports/src/udp_transport.rs +++ b/transports/src/udp_transport.rs @@ -13,16 +13,18 @@ pub struct UdpTransport { mtu: u16, chunker: Arc>, max_read_attempts: Option, + chunk_transmit_throttle: Option, } impl UdpTransport { - pub fn new(listen_addr: &str, mtu: u16) -> Result { + pub fn new(listen_addr: &str, mtu: u16, chunk_transmit_throttle: Option) -> Result { let socket = UdpSocket::bind(listen_addr)?; Ok(UdpTransport { mtu, socket, chunker: Arc::new(Mutex::new(SimpleChunker::new(mtu))), max_read_attempts: None, + chunk_transmit_throttle, }) } @@ -107,6 +109,9 @@ impl Transport for UdpTransport { let hex_str = chunk.iter().map(|b| format!("{b:02X}")).collect::(); info!("Transmitting chunk of hex {hex_str}"); self.socket.send_to(&chunk, addr)?; + if let Some(throttle) = self.chunk_transmit_throttle { + sleep(Duration::from_millis(throttle.into())); + } } Ok(()) }