Skip to content

Commit

Permalink
Merge pull request #13 from johnmanjiro13/tokio-feature
Browse files Browse the repository at this point in the history
Use only necessary feature and add tests
  • Loading branch information
johnmanjiro13 committed Jan 19, 2023
2 parents c37247b + 567b7c2 commit 6f46b89
Show file tree
Hide file tree
Showing 8 changed files with 148 additions and 31 deletions.
5 changes: 5 additions & 0 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,11 @@ on:
jobs:
test:
runs-on: ubuntu-latest
services:
fluentd:
image: fluent/fluentd
ports:
- 24224:24224
steps:
- uses: actions/checkout@v3
- uses: actions-rs/toolchain@v1
Expand Down
2 changes: 1 addition & 1 deletion .tagpr
Original file line number Diff line number Diff line change
Expand Up @@ -32,4 +32,4 @@
[tagpr]
vPrefix = true
releaseBranch = main
versionFile = Cargo.toml
versionFile = Cargo.toml,README.md
9 changes: 6 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,13 @@ include = [
async-trait = "0.1.61"
base64 = "0.21.0"
bytes = { version = "1.3.0", features = ["serde"] }
chrono = "0.4.23"
crossbeam = "0.8.2"
log = "0.4.17"
rmp-serde = "1.1.1"
serde = {version = "1.0.152", features = ["derive"]}
# TODO: Remove unneeded features
tokio = {version = "1.23.0", features = ["full"]}
serde = { version = "1.0.152", features = ["derive"] }
tokio = { version = "1.23.0", features = ["net", "time", "io-util", "rt"] }
uuid = { version = "1.2.2", features = ["v4"] }

[dev-dependencies]
tokio = { version = "1.23.0", features = ["macros", "rt-multi-thread"] }
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ let client = Client::new(&Config {
.unwrap();
```

### Timeout
### timeout

Set the timeout value of `std::time::Duration` to connect to the destination. The default is 3 seconds.

Expand Down
109 changes: 93 additions & 16 deletions src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
//!
//! ## Example
//!
//! ```no_run
//! ```
//! use tokio_fluent::{Client, Config, FluentClient};
//! use tokio_fluent::record::{Map, Value};
//!
Expand All @@ -22,7 +22,7 @@
//! ```

use std::net::SocketAddr;
use std::time::{Duration, SystemTime};
use std::time::Duration;

use async_trait::async_trait;
use base64::{engine::general_purpose, Engine};
Expand Down Expand Up @@ -113,24 +113,17 @@ impl Client {

Ok(Self { sender })
}
}

#[async_trait]
impl FluentClient for Client {
/// Send a fluent record to the fluentd server.
///
/// ## Params:
/// `tag` - Event category of a record to send.
///
/// `record` - Map object to send as a fluent record.
fn send(&self, tag: &'static str, record: Map) -> Result<(), SendError> {
fn send_with_time(
&self,
tag: &'static str,
record: Map,
timestamp: i64,
) -> Result<(), SendError> {
let record = Record {
tag,
record,
timestamp: SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
.unwrap()
.as_secs(),
timestamp,
options: Options {
chunk: general_purpose::STANDARD.encode(Uuid::new_v4()),
},
Expand All @@ -141,6 +134,19 @@ impl FluentClient for Client {
source: e.to_string(),
})
}
}

#[async_trait]
impl FluentClient for Client {
/// Send a fluent record to the fluentd server.
///
/// ## Params:
/// `tag` - Event category of a record to send.
///
/// `record` - Map object to send as a fluent record.
fn send(&self, tag: &'static str, record: Map) -> Result<(), SendError> {
self.send_with_time(tag, record, chrono::Local::now().timestamp())
}

/// Stop the worker.
async fn stop(self) -> Result<(), SendError> {
Expand Down Expand Up @@ -171,3 +177,74 @@ impl FluentClient for NopClient {
Ok(())
}
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn test_send_with_time() {
use std::collections::HashMap;

use chrono::TimeZone;

use crate::record::Value;
use crate::record_map;

let (sender, receiver) = channel::unbounded();
let client = Client { sender };

let timestamp = chrono::Utc.timestamp_opt(1234567, 0).unwrap().timestamp();
let record = record_map!("age".to_string() => 20.into());
assert!(
client.send_with_time("test", record, timestamp).is_ok(),
"failed to send with time"
);

let got = receiver.recv().expect("failed to receive");
match got {
Message::Record(r) => {
assert_eq!(r.tag, "test");
assert_eq!(r.record, record_map!("age".to_string() => 20.into()));
assert_eq!(r.timestamp, 1234567);
}
Message::Terminate => unreachable!("got terminate message"),
}
}

#[tokio::test]
async fn test_stop() {
let (sender, receiver) = channel::unbounded();
let client = Client { sender };
assert!(client.stop().await.is_ok(), "faled to stop");

let got = receiver.recv().expect("failed to receive");
match got {
Message::Record(_) => unreachable!("got record message"),
Message::Terminate => {}
};
}

#[test]
fn test_client_drop_sends_terminate() {
let (sender, receiver) = channel::unbounded();
{
Client { sender };
}
let got = receiver.recv().expect("failed to receive");
match got {
Message::Record(_) => unreachable!("got record message"),
Message::Terminate => {}
};
}

#[test]
fn test_default_config() {
let config: Config = Default::default();
assert_eq!(config.addr, "127.0.0.1:24224".parse().unwrap());
assert_eq!(config.timeout, Duration::new(3, 0));
assert_eq!(config.retry_wait, 500);
assert_eq!(config.max_retry, 10);
assert_eq!(config.max_retry_wait, 60000);
}
}
2 changes: 1 addition & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
//!
//! ## Example
//!
//! ```no_run
//! ```
//! use std::collections::HashMap;
//!
//! use tokio_fluent::record_map;
Expand Down
2 changes: 1 addition & 1 deletion src/record.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ impl Map {
///
/// ## Example
///
/// ```rust
/// ```
/// use std::collections::HashMap;
///
/// use tokio_fluent::record_map;
Expand Down
48 changes: 40 additions & 8 deletions src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use log::warn;
use rmp_serde::Serializer;
use serde::{ser::SerializeMap, Deserialize, Serialize};
use tokio::{
io::{AsyncReadExt, AsyncWriteExt},
io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt, BufWriter},
net::TcpStream,
time::Duration,
};
Expand Down Expand Up @@ -40,7 +40,7 @@ impl std::fmt::Display for Error {
#[derive(Debug, Serialize)]
pub struct Record {
pub tag: &'static str,
pub timestamp: u64,
pub timestamp: i64,
pub record: Map,
pub options: Options,
}
Expand Down Expand Up @@ -72,7 +72,7 @@ struct SerializedRecord {
chunk: String,
}

#[derive(Debug, Deserialize)]
#[derive(Debug, Serialize, Deserialize)]
struct AckResponse {
ack: String,
}
Expand All @@ -83,16 +83,19 @@ pub struct RetryConfig {
pub max_wait: u64,
}

pub struct Worker {
stream: TcpStream,
pub struct Worker<T = TcpStream> {
stream: BufWriter<T>,
receiver: Receiver<Message>,
retry_config: RetryConfig,
}

impl Worker {
pub fn new(stream: TcpStream, receiver: Receiver<Message>, retry_config: RetryConfig) -> Self {
impl<T> Worker<T>
where
T: AsyncWrite + AsyncRead + Unpin,
{
pub fn new(stream: T, receiver: Receiver<Message>, retry_config: RetryConfig) -> Self {
Self {
stream,
stream: BufWriter::new(stream),
receiver,
retry_config,
}
Expand Down Expand Up @@ -189,3 +192,32 @@ impl Worker {
}
}
}

#[cfg(test)]
mod tests {
use super::*;

#[tokio::test]
async fn test_read_ack() {
let (client, mut server) = tokio::io::duplex(64);
let (_, receiver) = channel::unbounded();
let retry_config = RetryConfig {
initial_wait: 50,
max: 13,
max_wait: 60000,
};
let mut worker = Worker::new(client, receiver, retry_config);

let ack_response = AckResponse {
ack: "Mzc4NDQwMzctNGY4Ni00MmI2LWFiYjMtMjk3MGZkNDUzY2Y2".to_string(),
};
let ack_response = rmp_serde::to_vec(&ack_response).unwrap();
server.write_all(&ack_response).await.unwrap();

let ack = worker.read_ack().await.expect("failed to read ack");
assert_eq!(
ack.ack,
"Mzc4NDQwMzctNGY4Ni00MmI2LWFiYjMtMjk3MGZkNDUzY2Y2".to_string()
);
}
}

0 comments on commit 6f46b89

Please sign in to comment.