Skip to content

Commit

Permalink
Add handy-grpc
Browse files Browse the repository at this point in the history
  • Loading branch information
try-box committed Oct 5, 2023
1 parent ad11175 commit 6bbfca7
Show file tree
Hide file tree
Showing 9 changed files with 810 additions and 0 deletions.
33 changes: 33 additions & 0 deletions handy-grpc/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
[package]
name = "handy-grpc"
version = "0.1.0"
edition = "2021"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[features]
default = []
rate = ["update_rate"]
rate_print = ["rate"]

[dependencies]
mpsc = { version = "0.2", path = "../mpsc", default-features = false, features = ["priority"] }
collections = { version = "0.1", path = "../collections" , default-features = false, features = ["priority-queue"] }
tokio = { version = "1", default-features = false, features = ["time"] }
tonic = { version = "0.9", features = ["tls"] }
prost = "0.11"
serde = { version = "1", features = ["derive"] }
futures = "0.3"
log = "0.4"
anyhow = "1"
update_rate = { version = "2", optional = true }
once_cell = "1"
parking_lot = "0.12"

[dev-dependencies]
tokio = { version = "1", default-features = false, features = ["net", "rt-multi-thread", "sync"] }
env_logger = "0.8"

[build-dependencies]
tonic-build = { version = "0.9", features = ["prost"] }

5 changes: 5 additions & 0 deletions handy-grpc/build.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
fn main() {
tonic_build::configure()
.compile(&["proto/transferpb.proto"], &["proto"])
.unwrap();
}
38 changes: 38 additions & 0 deletions handy-grpc/examples/client.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
use handy_grpc::client::{Client, Message};

// cargo run -r --example client

#[tokio::main]
async fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
std::env::set_var("RUST_LOG", "client=debug,handy_grpc=debug");
env_logger::init();

let addr = "[::1]:10000";

let runner = async move {
let client = Client::new(addr.into()).connect().await.unwrap();
let send_data_futs = async move {
let send = |mut c: Client| async move {
loop {
let send_result = c
.send(Message {
ver: 1,
data: vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 9],
})
.await;
log::trace!("send_result, {:?}", send_result);
}
};

let mut sends = Vec::new();
for _ in 0..32 {
sends.push(send(client.clone()));
}
futures::future::join_all(sends).await;
};
send_data_futs.await;
};

runner.await;
Ok(())
}
50 changes: 50 additions & 0 deletions handy-grpc/examples/receiver.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
use std::time::Duration;

use futures::channel::mpsc::channel;
use futures::StreamExt;

use handy_grpc::server::{run, Message};

// cargo run -r --example receiver --features rate_print

#[tokio::main]
async fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
std::env::set_var("RUST_LOG", "receiver=debug,handy_grpc=debug");
env_logger::init();

let addr = "[::1]:10000".parse().unwrap();

let runner = async move {
let (tx, mut rx) = channel::<Message>(100_000);

let recv_data_fut = async move {
while let Some((msg, reply_tx)) = rx.next().await {
if msg.data.len() == 4 {
log::info!(" ==> High priority message, data len 4",);
} else {
log::trace!(" ==> req = ver: {}, data len {}", msg.ver, msg.data.len());
}
if let Some(reply_tx) = reply_tx {
if let Err(e) = reply_tx.send(Ok(msg)) {
log::error!("gRPC send result failure, {:?}", e);
}
}
//tokio::time::sleep(Duration::from_nanos(1)).await;
}
log::error!("Recv None");
};

let run_receiver_fut = async move {
loop {
if let Err(e) = run(addr, tx.clone(), None, None).await {
log::error!("Run gRPC receiver error, {:?}", e);
}
tokio::time::sleep(Duration::from_secs(3)).await;
}
};
futures::future::join(recv_data_fut, run_receiver_fut).await;
};

runner.await;
Ok(())
}
126 changes: 126 additions & 0 deletions handy-grpc/examples/sender.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use std::time::Duration;

use handy_grpc::client::{Client, Mailbox, Message, Priority};

// cargo run -r --example sender

#[tokio::main]
async fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
std::env::set_var("RUST_LOG", "sender=debug,handy_grpc=debug");
env_logger::init();

let addr = "[::1]:10000";

let runner = async move {
let mut c = Client::new(addr.into()).concurrency_limit(32).build().await;
let send_count = Arc::new(AtomicUsize::new(0));
let complete_count = Arc::new(AtomicUsize::new(0));
let fail_count = Arc::new(AtomicUsize::new(0));
let mailbox = c.transfer_start(10_000_000).await;
let msg = vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 9];
let send_data_futs = async move {
let send = |mailbox: Mailbox,
data: Vec<u8>,
p: Priority,
sleep_dur: Duration,
complete_count: Arc<AtomicUsize>,
send_count: Arc<AtomicUsize>,
fail_count: Arc<AtomicUsize>| async move {
let send_fut = |mut mailbox: Mailbox,
msg: Message,
p: Priority,
complete_count: Arc<AtomicUsize>,
fail_count: Arc<AtomicUsize>| async move {
let msg_bak = msg.clone();
let p_bak = p.clone();
let mut msg1 = Some(msg);
let mut p1 = Some(p);
loop {
let send_result = mailbox
.send_priority(msg1.take().unwrap(), p1.take().unwrap())
.await;
if let Err(e) = send_result {
fail_count.fetch_add(1, Ordering::SeqCst);
log::trace!("send error, {:?}", e);
if let Some((pp, mm)) = e.into_inner() {
msg1 = Some(mm);
p1 = Some(pp);
} else {
log::warn!("send error, into_inner is None");
msg1 = Some(msg_bak.clone());
p1 = Some(p_bak.clone());
//break
}
tokio::time::sleep(Duration::from_millis(1)).await;
} else {
complete_count.fetch_add(1, Ordering::SeqCst);
break;
}
}
};

for _ in 0..200_000 {
//loop {
send_count.fetch_add(1, Ordering::SeqCst);
send_fut(
mailbox.clone(),
Message {
ver: 1,
data: data.clone(),
},
p,
complete_count.clone(),
fail_count.clone(),
)
.await;
if sleep_dur > Duration::ZERO {
tokio::time::sleep(sleep_dur).await;
}
}
};

let mut sends = Vec::new();
for i in 0..100 {
sends.push(send(
mailbox.clone(),
msg.clone(),
i,
Duration::from_millis(0),
complete_count.clone(),
send_count.clone(),
fail_count.clone(),
));
}
sends.push(send(
mailbox.clone(),
vec![0, 1, 2, 3],
Priority::MAX,
Duration::from_millis(5000),
complete_count.clone(),
send_count.clone(),
fail_count.clone(),
));

let stats_fut = async move {
loop {
log::info!(
"queue_len: {:?}, completes: {:?}, sends: {:?}, fails: {:?}",
mailbox.queue_len(),
complete_count.load(Ordering::SeqCst),
send_count.load(Ordering::SeqCst),
fail_count.load(Ordering::SeqCst),
);
tokio::time::sleep(Duration::from_secs(5)).await;
}
};

futures::future::join(futures::future::join_all(sends), stats_fut).await;
};
send_data_futs.await;
};

runner.await;
Ok(())
}
16 changes: 16 additions & 0 deletions handy-grpc/proto/transferpb.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
syntax = "proto3";

package transferpb;

service DataTransfer {
rpc Transfer(stream Message) returns (Empty) {}
rpc Send(Message) returns (Message) {}
}

message Message {
int32 ver = 1;
bytes data = 2;
}

message Empty {
}

0 comments on commit 6bbfca7

Please sign in to comment.