Skip to content

Commit

Permalink
refactor: rename server & client
Browse files Browse the repository at this point in the history
Closes #40
  • Loading branch information
dignifiedquire committed Jan 23, 2023
1 parent 8798f60 commit de34409
Show file tree
Hide file tree
Showing 5 changed files with 56 additions and 56 deletions.
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,12 +42,12 @@

Sending data
```sh
$ ./senmde server <file>
$ ./senmde provide <file>
```

Receiving data
```sh
$ ./sendme client <hash>
$ ./sendme get <hash>
```

# License
Expand Down
10 changes: 5 additions & 5 deletions src/client.rs → src/get.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ impl Default for Options {
}
}

/// Setup a QUIC connection to the provided server address
/// Setup a QUIC connection to the provided address.
async fn setup(opts: Options) -> Result<(Client, Connection)> {
let keypair = Keypair::generate();

Expand All @@ -43,7 +43,7 @@ async fn setup(opts: Options) -> Result<(Client, Connection)> {
.start()
.map_err(|e| anyhow!("{:?}", e))?;

debug!("client: connecting to {}", opts.addr);
debug!("connecting to {}", opts.addr);
let connect = Connect::new(opts.addr).with_server_name("localhost");
let mut connection = client.connect(connect).await?;

Expand All @@ -60,9 +60,9 @@ pub struct Stats {

/// The events that are emitted while running a transfer.
pub enum Event {
/// The connection to the server was established.
/// The connection to the provider was established.
Connected,
/// The server has the content.
/// The provider has the content.
Requested {
/// The size of the requested content.
size: usize,
Expand Down Expand Up @@ -197,7 +197,7 @@ pub fn run(hash: bao::Hash, opts: Options) -> impl Stream<Item = Result<Event>>
}
}
None => {
Err(anyhow!("server disconnected"))?;
Err(anyhow!("provider disconnected"))?;
}
}
}
Expand Down
48 changes: 24 additions & 24 deletions src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
pub mod client;
pub mod get;
pub mod protocol;
pub mod server;
pub mod provider;

mod tls;

Expand All @@ -10,7 +10,7 @@ pub use tls::{PeerId, PeerIdError};
mod tests {
use std::{net::SocketAddr, path::PathBuf};

use crate::client::Event;
use crate::get::Event;
use crate::tls::PeerId;

use super::*;
Expand All @@ -25,21 +25,21 @@ mod tests {
let dir: PathBuf = testdir!();
let path = dir.join("hello_world");
tokio::fs::write(&path, "hello world!").await?;
let db = server::create_db(vec![server::DataSource::File(path.clone())]).await?;
let db = provider::create_db(vec![provider::DataSource::File(path.clone())]).await?;
let hash = *db.iter().next().unwrap().0;
let addr = "127.0.0.1:4443".parse().unwrap();
let mut server = server::Server::new(db);
let peer_id = server.peer_id();
let mut provider = provider::Provider::new(db);
let peer_id = provider.peer_id();

tokio::task::spawn(async move {
server.run(server::Options { addr }).await.unwrap();
provider.run(provider::Options { addr }).await.unwrap();
});

let opts = client::Options {
let opts = get::Options {
addr,
peer_id: Some(peer_id),
};
let stream = client::run(hash, opts);
let stream = get::run(hash, opts);
tokio::pin!(stream);
while let Some(event) = stream.next().await {
let event = event?;
Expand Down Expand Up @@ -84,20 +84,20 @@ mod tests {

tokio::fs::write(&path, &content).await?;

let db = server::create_db(vec![server::DataSource::File(path)]).await?;
let db = provider::create_db(vec![provider::DataSource::File(path)]).await?;
let hash = *db.iter().next().unwrap().0;
let mut server = server::Server::new(db);
let peer_id = server.peer_id();
let mut provider = provider::Provider::new(db);
let peer_id = provider.peer_id();

let server_task = tokio::task::spawn(async move {
server.run(server::Options { addr }).await.unwrap();
let provider_task = tokio::task::spawn(async move {
provider.run(provider::Options { addr }).await.unwrap();
});

let opts = client::Options {
let opts = get::Options {
addr,
peer_id: Some(peer_id),
};
let stream = client::run(hash, opts);
let stream = get::run(hash, opts);
tokio::pin!(stream);
while let Some(event) = stream.next().await {
let event = event?;
Expand All @@ -113,8 +113,8 @@ mod tests {
}
}

server_task.abort();
let _ = server_task.await;
provider_task.abort();
let _ = provider_task.await;
}

Ok(())
Expand All @@ -128,13 +128,13 @@ mod tests {
let addr = "127.0.0.1:4444".parse().unwrap();

tokio::fs::write(&path, content).await?;
let db = server::create_db(vec![server::DataSource::File(path)]).await?;
let db = provider::create_db(vec![provider::DataSource::File(path)]).await?;
let hash = *db.iter().next().unwrap().0;
let mut server = server::Server::new(db);
let peer_id = server.peer_id();
let mut provider = provider::Provider::new(db);
let peer_id = provider.peer_id();

tokio::task::spawn(async move {
server.run(server::Options { addr }).await.unwrap();
provider.run(provider::Options { addr }).await.unwrap();
});

async fn run_client(
Expand All @@ -143,11 +143,11 @@ mod tests {
peer_id: PeerId,
content: Vec<u8>,
) -> Result<()> {
let opts = client::Options {
let opts = get::Options {
addr,
peer_id: Some(peer_id),
};
let stream = client::run(hash, opts);
let stream = get::run(hash, opts);
tokio::pin!(stream);
while let Some(event) = stream.next().await {
let event = event?;
Expand Down
40 changes: 20 additions & 20 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use futures::StreamExt;
use indicatif::{HumanDuration, ProgressBar, ProgressDrawTarget, ProgressState, ProgressStyle};
use tracing_subscriber::{fmt, prelude::*, EnvFilter};

use sendme::{client, server, PeerId};
use sendme::{get, provider, PeerId};

#[derive(Parser, Debug, Clone)]
#[clap(version, about, long_about = None)]
Expand All @@ -22,21 +22,21 @@ struct Cli {
enum Commands {
/// Serve the data from the given path. If none is specified reads from STDIN.
#[clap(about = "Serve the data from the given path")]
Server {
Provide {
path: Option<PathBuf>,
#[clap(long, short)]
/// Optional port, defaults to 127.0.01:4433.
addr: Option<SocketAddr>,
},
/// Fetch some data
#[clap(about = "Fetch the data from the hash")]
Client {
Get {
hash: bao::Hash,
#[clap(long)]
/// PeerId of the server.
/// PeerId of the provider.
peer_id: PeerId,
#[clap(long, short)]
/// Optional address of the server, defaults to 127.0.0.1:4433.
/// Optional address of the provider, defaults to 127.0.0.1:4433.
addr: Option<SocketAddr>,
/// Optional path to save the file. If none is specified writes the data to STDOUT.
out: Option<PathBuf>,
Expand All @@ -53,14 +53,14 @@ async fn main() -> Result<()> {
let cli = Cli::parse();

match cli.command {
Commands::Client {
Commands::Get {
hash,
peer_id,
addr,
out,
} => {
println!("Fetching: {}", hash.to_hex());
let mut opts = client::Options {
let mut opts = get::Options {
peer_id: Some(peer_id),
..Default::default()
};
Expand All @@ -70,14 +70,14 @@ async fn main() -> Result<()> {

println!("{} Connecting ...", style("[1/3]").bold().dim());
let pb = ProgressBar::hidden();
let stream = client::run(hash, opts);
let stream = get::run(hash, opts);
tokio::pin!(stream);
while let Some(event) = stream.next().await {
match event? {
client::Event::Connected => {
get::Event::Connected => {
println!("{} Requesting ...", style("[2/3]").bold().dim());
}
client::Event::Requested { size } => {
get::Event::Requested { size } => {
println!("{} Downloading ...", style("[3/3]").bold().dim());
pb.set_style(
ProgressStyle::with_template("{spinner:.green} [{elapsed_precise}] [{wide_bar:.cyan/blue}] {bytes}/{total_bytes} ({eta})")
Expand All @@ -88,7 +88,7 @@ async fn main() -> Result<()> {
pb.set_length(size as u64);
pb.set_draw_target(ProgressDrawTarget::stderr());
}
client::Event::Receiving {
get::Event::Receiving {
hash: new_hash,
mut reader,
} => {
Expand Down Expand Up @@ -122,38 +122,38 @@ async fn main() -> Result<()> {
tokio::io::copy(&mut reader, &mut stdout).await?;
}
}
client::Event::Done(stats) => {
get::Event::Done(stats) => {
pb.finish_and_clear();

println!("Done in {}", HumanDuration(stats.elapsed));
}
}
}
}
Commands::Server { path, addr } => {
Commands::Provide { path, addr } => {
let mut tmp_path = None;

let sources = if let Some(path) = path {
vec![server::DataSource::File(path)]
vec![provider::DataSource::File(path)]
} else {
// Store STDIN content into a temporary file
let (file, path) = tempfile::NamedTempFile::new()?.into_parts();
let mut file = tokio::fs::File::from_std(file);
let path_buf = path.to_path_buf();
tmp_path = Some(path);
tokio::io::copy(&mut tokio::io::stdin(), &mut file).await?;
vec![server::DataSource::File(path_buf)]
vec![provider::DataSource::File(path_buf)]
};

let db = server::create_db(sources).await?;
let mut opts = server::Options::default();
let db = provider::create_db(sources).await?;
let mut opts = provider::Options::default();
if let Some(addr) = addr {
opts.addr = addr;
}
let mut server = server::Server::new(db);
let mut provider = provider::Provider::new(db);

println!("Serving from {}", server.peer_id());
server.run(opts).await?;
println!("PeerID: {}", provider.peer_id());
provider.run(opts).await?;

// Drop tempath to signal it can be destroyed
drop(tmp_path);
Expand Down
10 changes: 5 additions & 5 deletions src/server.rs → src/provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,20 +26,20 @@ impl Default for Options {
}
}

const MAX_CLIENTS: u64 = 1024;
const MAX_CONNECTIONS: u64 = 1024;
const MAX_STREAMS: u64 = 10;

pub type Database = Arc<HashMap<bao::Hash, Data>>;

pub struct Server {
pub struct Provider {
keypair: Keypair,
db: Database,
}

impl Server {
impl Provider {
pub fn new(db: Database) -> Self {
let keypair = Keypair::generate();
Server { keypair, db }
Provider { keypair, db }
}

pub fn peer_id(&self) -> PeerId {
Expand All @@ -50,7 +50,7 @@ impl Server {
let server_config = tls::make_server_config(&self.keypair)?;
let tls = s2n_quic::provider::tls::rustls::Server::from(server_config);
let limits = s2n_quic::provider::limits::Limits::default()
.with_max_active_connection_ids(MAX_CLIENTS)?
.with_max_active_connection_ids(MAX_CONNECTIONS)?
.with_max_open_local_bidirectional_streams(MAX_STREAMS)?
.with_max_open_remote_bidirectional_streams(MAX_STREAMS)?;

Expand Down

0 comments on commit de34409

Please sign in to comment.