Skip to content

Commit

Permalink
Remove update_rate, add box-counter
Browse files Browse the repository at this point in the history
  • Loading branch information
try-box committed Oct 14, 2023
1 parent f373a1a commit d28b4dd
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 20 deletions.
5 changes: 2 additions & 3 deletions handy-grpc/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "handy-grpc"
version = "0.1.2"
version = "0.1.3"
edition = "2021"
authors = ["try <trywen@qq.com>"]
rust-version = "1.56"
Expand All @@ -15,7 +15,6 @@ categories = []

[features]
default = []
rate = ["update_rate"]
rate_print = ["rate"]
reuse = ["reuseport", "reuseaddr"]
reuseport = ["socket2", "tokio-stream"]
Expand All @@ -32,10 +31,10 @@ serde = { version = "1", features = ["derive"] }
futures = "0.3"
log = "0.4"
anyhow = "1"
update_rate = { version = "2", optional = true }
once_cell = "1"
parking_lot = "0.12"

rate = { package = "box-counter", version = "0.1", path = "../counter", default-features = false, features = ["rate"], optional = true }
socket2 = { version = "0.5", features = ["all"], optional = true }
tokio-stream = { version = "0.1", features = ["net"], optional = true }

Expand Down
26 changes: 9 additions & 17 deletions handy-grpc/src/server.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,5 @@
use std::net::SocketAddr;

#[cfg(feature = "rate")]
use std::sync::atomic::{AtomicUsize, Ordering};

use futures::channel::oneshot;
use futures::StreamExt;
use tonic::metadata::{Ascii, MetadataValue};
Expand All @@ -12,6 +9,9 @@ use tonic::{Request, Response, Status};

use anyhow::{Error, Result};

#[cfg(feature = "rate")]
use rate::Counter;

use super::transferpb::data_transfer_server::{DataTransfer, DataTransferServer};
use super::transferpb::{self, Empty};
use super::Priority;
Expand All @@ -25,30 +25,21 @@ pub type Message = (

pub struct DataTransferService {
#[cfg(feature = "rate")]
counter: std::sync::Arc<AtomicUsize>,
counter: Counter,
tx: TX,
}

impl DataTransferService {
pub fn new(tx: TX) -> Self {
#[cfg(feature = "rate")]
let counter = std::sync::Arc::new(AtomicUsize::new(0));
let counter = Counter::new(std::time::Duration::from_secs(5));
#[cfg(feature = "rate_print")]
{
let c = counter.clone();
tokio::spawn(async move {
let mut last = 0;
loop {
let now = std::time::Instant::now();
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
let curr = c.load(Ordering::SeqCst);
log::info!(
"total: {}, diff: {} rate: {:?}",
curr,
(curr - last),
(curr - last) as f64 / (now.elapsed().as_millis() as f64 / 1000.0)
);
last = curr;
log::info!("total: {}, rate: {:?}", c.total().await, c.rate().await);
}
});
}
Expand All @@ -73,7 +64,8 @@ impl DataTransfer for DataTransferService {
log::trace!("Request: {:?}", req);
let req = req?;
#[cfg(feature = "rate")]
self.counter.fetch_add(1, Ordering::SeqCst);
self.counter.inc().await;

tx.send((req.priority as Priority, (req, None)))
.await
.map_err(|e| Status::cancelled(e.to_string()))?;
Expand All @@ -91,7 +83,7 @@ impl DataTransfer for DataTransferService {
log::trace!("Request: {:?}", req);

#[cfg(feature = "rate")]
self.counter.fetch_add(1, Ordering::SeqCst);
self.counter.inc().await;

let mut tx = self.tx.clone();
let (res_tx, res_rx) = oneshot::channel();
Expand Down

0 comments on commit d28b4dd

Please sign in to comment.