Skip to content

Commit

Permalink
Added optional throttling of chunk transmission
Browse files Browse the repository at this point in the history
  • Loading branch information
plauche committed May 26, 2023
1 parent 39156fd commit 80bcb0b
Show file tree
Hide file tree
Showing 7 changed files with 29 additions and 5 deletions.
5 changes: 4 additions & 1 deletion controller/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ pub struct Cli {
#[arg(short, long, default_value = "512")]
mtu: u16,
#[arg(short, long)]
chunk_transmit_throttle: Option<u32>,
#[arg(short, long)]
listen_mode: bool,
#[arg(short, long, default_value = "0.0.0.0:8090")]
bind_address: String,
Expand All @@ -20,7 +22,8 @@ pub struct Cli {

impl Cli {
pub async fn run(&self) -> Result<()> {
let transport = UdpTransport::new(&self.bind_address, self.mtu)?;
let transport =
UdpTransport::new(&self.bind_address, self.mtu, self.chunk_transmit_throttle)?;

let command = Message::ApplicationAPI(self.command.clone());
let cmd_str = serde_json::to_string(&command)?;
Expand Down
2 changes: 2 additions & 0 deletions hyphae/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ pub struct Config {
pub kubo_address: String,
pub sync_interval: u64,
pub myceli_mtu: u16,
pub chunk_transmit_throttle: Option<u32>,
}

impl Default for Config {
Expand All @@ -22,6 +23,7 @@ impl Default for Config {
kubo_address: "0.0.0.0:5001".to_string(),
sync_interval: 10_000,
myceli_mtu: 60,
chunk_transmit_throttle: None,
}
}
}
Expand Down
1 change: 1 addition & 0 deletions hyphae/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ fn main() -> Result<()> {
&cfg.myceli_address,
&cfg.listen_to_myceli_address,
cfg.myceli_mtu,
cfg.chunk_transmit_throttle,
)
.expect("Failed to create MyceliAPi");

Expand Down
13 changes: 11 additions & 2 deletions hyphae/src/myceli_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,17 @@ pub struct MyceliApi {
}

impl MyceliApi {
pub fn new(myceli_address: &str, listen_address: &str, mtu: u16) -> Result<Self> {
let transport = Rc::new(UdpTransport::new(listen_address, mtu)?);
pub fn new(
myceli_address: &str,
listen_address: &str,
mtu: u16,
chunk_transmit_throttle: Option<u32>,
) -> Result<Self> {
let transport = Rc::new(UdpTransport::new(
listen_address,
mtu,
chunk_transmit_throttle,
)?);
Ok(MyceliApi {
address: myceli_address.to_string(),
listen_address: listen_address.to_string(),
Expand Down
3 changes: 3 additions & 0 deletions myceli/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ pub struct Config {
pub mtu: u16,
pub window_size: u32,
pub block_size: u32,
pub chunk_transmit_throttle: Option<u32>,
}

impl Default for Config {
Expand All @@ -30,6 +31,8 @@ impl Default for Config {
window_size: 5,
// Default to 3 kilobyte blocks
block_size: 1024 * 3,
// Default to no throttling of chunks
chunk_transmit_throttle: None,
}
}
}
Expand Down
3 changes: 2 additions & 1 deletion myceli/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@ fn main() -> Result<()> {
let db_path = format!("{}/storage.db", cfg.storage_path);

let udp_transport =
UdpTransport::new(&cfg.listen_address, cfg.mtu).expect("Failed to create udp transport");
UdpTransport::new(&cfg.listen_address, cfg.mtu, cfg.chunk_transmit_throttle)
.expect("Failed to create udp transport");

let mut listener = Listener::new(
&resolved_listen_addr,
Expand Down
7 changes: 6 additions & 1 deletion transports/src/udp_transport.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,16 +13,18 @@ pub struct UdpTransport {
mtu: u16,
chunker: Arc<Mutex<SimpleChunker>>,
max_read_attempts: Option<u16>,
chunk_transmit_throttle: Option<u32>,
}

impl UdpTransport {
pub fn new(listen_addr: &str, mtu: u16) -> Result<Self> {
pub fn new(listen_addr: &str, mtu: u16, chunk_transmit_throttle: Option<u32>) -> Result<Self> {
let socket = UdpSocket::bind(listen_addr)?;
Ok(UdpTransport {
mtu,
socket,
chunker: Arc::new(Mutex::new(SimpleChunker::new(mtu))),
max_read_attempts: None,
chunk_transmit_throttle,
})
}

Expand Down Expand Up @@ -107,6 +109,9 @@ impl Transport for UdpTransport {
let hex_str = chunk.iter().map(|b| format!("{b:02X}")).collect::<String>();
info!("Transmitting chunk of hex {hex_str}");
self.socket.send_to(&chunk, addr)?;
if let Some(throttle) = self.chunk_transmit_throttle {
sleep(Duration::from_millis(throttle.into()));
}
}
Ok(())
}
Expand Down

0 comments on commit 80bcb0b

Please sign in to comment.