Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cleanup from ground testing #67

Merged
merged 10 commits into from
Jun 6, 2023
15 changes: 14 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,19 @@
# Changelog

## [0.6.5] - Unreleased
## [0.6.6] - Unreleased

### Added

- Added `chunk_transmit_throttle` config option, which adds a throttle delay between transmission of chunks.
- Added `radio_address` config option, which hardcodes an address for `myceli` to send responses to.

### Changed

- MTU now maxes out at 3072. This max value is now used as the size of the receive buffer, fixing any MTU mismatch errors.
- Logging level for `myceli`, `hyphae`, and `controller` can now be controlled using the `RUST_LOG` environment variable.
- Added additional debug logging around chunk sending and receiving.

## [0.6.5] - 05-22-23

### Added

Expand Down
24 changes: 24 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ tokio-serial = "5.4"
tokio-util = "0.7.8"
toml = { version = "0.7.3", default-features = false }
tracing = { version = "0.1.34", default-features = false }
tracing-subscriber = { version = "0.3.14", default-features = false, features = ["fmt"] }
tracing-subscriber = { version = "0.3.14", default-features = false, features = ["fmt", "env-filter"] }

# Internal deps
ipfs-unixfs = { path = "ipfs-unixfs" }
Expand Down
2 changes: 2 additions & 0 deletions controller/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,6 @@ clap.workspace = true
messages.workspace = true
serde_json.workspace = true
tokio.workspace = true
tracing.workspace = true
tracing-subscriber.workspace = true
transports.workspace = true
26 changes: 22 additions & 4 deletions controller/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
use anyhow::{bail, Result};
use clap::{arg, Parser};
use messages::{ApplicationAPI, Message};
use transports::{Transport, UdpTransport};
use tracing::{info, metadata::LevelFilter};
use tracing_subscriber::{fmt, EnvFilter};
use transports::{Transport, UdpTransport, MAX_MTU};

#[derive(Parser, Debug, Clone)]
#[clap(version, long_about = None, propagate_version = true)]
Expand All @@ -11,6 +13,8 @@ pub struct Cli {
#[arg(short, long, default_value = "512")]
mtu: u16,
#[arg(short, long)]
chunk_transmit_throttle: Option<u32>,
#[arg(short, long)]
listen_mode: bool,
#[arg(short, long, default_value = "0.0.0.0:8090")]
bind_address: String,
Expand All @@ -20,17 +24,18 @@ pub struct Cli {

impl Cli {
pub async fn run(&self) -> Result<()> {
let transport = UdpTransport::new(&self.bind_address, self.mtu)?;
let transport =
UdpTransport::new(&self.bind_address, self.mtu, self.chunk_transmit_throttle)?;

let command = Message::ApplicationAPI(self.command.clone());
let cmd_str = serde_json::to_string(&command)?;
println!("Transmitting: {}", &cmd_str);
info!("Transmitting: {}", &cmd_str);

transport.send(command, &self.instance_addr)?;
if self.listen_mode {
match transport.receive() {
Ok((msg, _)) => {
println!("{msg:?}");
info!("Received: {msg:?}");
return Ok(());
}
Err(e) => bail!("{e:?}"),
Expand All @@ -43,6 +48,19 @@ impl Cli {

#[tokio::main(flavor = "current_thread")]
async fn main() -> Result<()> {
fmt::fmt()
.with_env_filter(
EnvFilter::builder()
.with_default_directive(LevelFilter::INFO.into())
.from_env_lossy(),
)
.init();

let cli = Cli::parse();

if cli.mtu > MAX_MTU {
bail!("Configured MTU is too large, cannot exceed {MAX_MTU}",);
}

cli.run().await
}
15 changes: 12 additions & 3 deletions hyphae/src/config.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
use anyhow::Result;
use anyhow::{bail, Result};
use figment::{
providers::{Format, Serialized, Toml},
Figment,
};
use serde::{Deserialize, Serialize};
use transports::MAX_MTU;

#[derive(Debug, Deserialize, Serialize)]
pub struct Config {
Expand All @@ -12,6 +13,7 @@ pub struct Config {
pub kubo_address: String,
pub sync_interval: u64,
pub myceli_mtu: u16,
pub chunk_transmit_throttle: Option<u32>,
}

impl Default for Config {
Expand All @@ -21,7 +23,8 @@ impl Default for Config {
listen_to_myceli_address: "0.0.0.0:8090".to_string(),
kubo_address: "0.0.0.0:5001".to_string(),
sync_interval: 10_000,
myceli_mtu: 60,
myceli_mtu: 512,
chunk_transmit_throttle: None,
}
}
}
Expand All @@ -32,6 +35,12 @@ impl Config {
if let Some(path) = path {
config = config.merge(Toml::file(path));
}
Ok(config.extract()?)
let config: Self = config.extract()?;

if config.myceli_mtu > MAX_MTU {
bail!("Configured MTU is too large, cannot exceed {MAX_MTU}",);
}

Ok(config)
}
}
12 changes: 10 additions & 2 deletions hyphae/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@ use myceli_api::MyceliApi;
use std::collections::HashSet;
use std::thread::sleep;
use std::time::Duration;
use tracing::{error, info, warn, Level};
use tracing::{error, info, metadata::LevelFilter, warn};
use tracing_subscriber::{fmt, EnvFilter};

pub const RAW_CODEC_PREFIX: &str = "bafkrei";
pub const DAG_PB_CODEC_PREFIX: &str = "bafybei";
Expand Down Expand Up @@ -85,7 +86,13 @@ fn sync_blocks(kubo: &KuboApi, myceli: &MyceliApi) -> Result<()> {
}

fn main() -> Result<()> {
tracing_subscriber::fmt().with_max_level(Level::INFO).init();
fmt::fmt()
.with_env_filter(
EnvFilter::builder()
.with_default_directive(LevelFilter::INFO.into())
.from_env_lossy(),
)
.init();

let args = Args::parse();
let cfg: Config = Config::parse(args.config_path).expect("Configuration parsing failed");
Expand All @@ -99,6 +106,7 @@ fn main() -> Result<()> {
&cfg.myceli_address,
&cfg.listen_to_myceli_address,
cfg.myceli_mtu,
cfg.chunk_transmit_throttle,
)
.expect("Failed to create MyceliAPi");

Expand Down
13 changes: 11 additions & 2 deletions hyphae/src/myceli_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,17 @@ pub struct MyceliApi {
}

impl MyceliApi {
pub fn new(myceli_address: &str, listen_address: &str, mtu: u16) -> Result<Self> {
let transport = Rc::new(UdpTransport::new(listen_address, mtu)?);
pub fn new(
myceli_address: &str,
listen_address: &str,
mtu: u16,
chunk_transmit_throttle: Option<u32>,
) -> Result<Self> {
let transport = Rc::new(UdpTransport::new(
listen_address,
mtu,
chunk_transmit_throttle,
)?);
Ok(MyceliApi {
address: myceli_address.to_string(),
listen_address: listen_address.to_string(),
Expand Down
10 changes: 10 additions & 0 deletions messages/src/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,16 @@ impl Message {
self.encode()
}

pub fn to_hex(&self) -> String {
let mut hex_str = String::new();

for b in self.to_bytes() {
hex_str = format!("{}{:02X}", hex_str, b);
}

hex_str
}

pub fn available_blocks(cids: Vec<String>) -> Self {
Message::ApplicationAPI(ApplicationAPI::AvailableBlocks { cids })
}
Expand Down
18 changes: 16 additions & 2 deletions myceli/src/config.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
use anyhow::Result;
use anyhow::{bail, Result};
use figment::{
providers::{Format, Serialized, Toml},
Figment,
};
use serde::{Deserialize, Serialize};
use transports::MAX_MTU;

#[derive(Debug, Deserialize, Serialize)]
pub struct Config {
Expand All @@ -13,6 +14,8 @@ pub struct Config {
pub mtu: u16,
pub window_size: u32,
pub block_size: u32,
pub chunk_transmit_throttle: Option<u32>,
pub radio_address: Option<String>,
}

impl Default for Config {
Expand All @@ -25,11 +28,16 @@ impl Default for Config {
// Default storage dir
storage_path: "storage".to_string(),
// Default MTU appropriate for dev radio
// Maxes out at 1024 * 3 bytes
mtu: 512,
// Default to sending five blocks at a time
window_size: 5,
// Default to 3 kilobyte blocks
block_size: 1024 * 3,
// Default to no throttling of chunks
chunk_transmit_throttle: None,
// Default to no set radio address
radio_address: None,
}
}
}
Expand All @@ -40,6 +48,12 @@ impl Config {
if let Some(path) = path {
config = config.merge(Toml::file(path));
}
Ok(config.extract()?)
let config: Self = config.extract()?;

if config.mtu > MAX_MTU {
bail!("Configured MTU is too large, cannot exceed {MAX_MTU}",);
}

Ok(config)
}
}
Loading