Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use only necessary feature and add tests #13

Merged
merged 6 commits into from
Jan 19, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
5 changes: 5 additions & 0 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,11 @@ on:
jobs:
test:
runs-on: ubuntu-latest
services:
fluentd:
image: fluent/fluentd
ports:
- 24224:24224
steps:
- uses: actions/checkout@v3
- uses: actions-rs/toolchain@v1
Expand Down
2 changes: 1 addition & 1 deletion .tagpr
Original file line number Diff line number Diff line change
Expand Up @@ -32,4 +32,4 @@
[tagpr]
vPrefix = true
releaseBranch = main
versionFile = Cargo.toml
versionFile = Cargo.toml,README.md
9 changes: 6 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,13 @@ include = [
async-trait = "0.1.61"
base64 = "0.21.0"
bytes = { version = "1.3.0", features = ["serde"] }
chrono = "0.4.23"
crossbeam = "0.8.2"
log = "0.4.17"
rmp-serde = "1.1.1"
serde = {version = "1.0.152", features = ["derive"]}
# TODO: Remove unneeded features
tokio = {version = "1.23.0", features = ["full"]}
serde = { version = "1.0.152", features = ["derive"] }
tokio = { version = "1.23.0", features = ["net", "time", "io-util", "rt"] }
uuid = { version = "1.2.2", features = ["v4"] }

[dev-dependencies]
tokio = { version = "1.23.0", features = ["macros", "rt-multi-thread"] }
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ let client = Client::new(&Config {
.unwrap();
```

### Timeout
### timeout

Set the timeout value of `std::time::Duration` to connect to the destination. The default is 3 seconds.

Expand Down
109 changes: 93 additions & 16 deletions src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
//!
//! ## Example
//!
//! ```no_run
//! ```
//! use tokio_fluent::{Client, Config, FluentClient};
//! use tokio_fluent::record::{Map, Value};
//!
Expand All @@ -22,7 +22,7 @@
//! ```

use std::net::SocketAddr;
use std::time::{Duration, SystemTime};
use std::time::Duration;

use async_trait::async_trait;
use base64::{engine::general_purpose, Engine};
Expand Down Expand Up @@ -113,24 +113,17 @@ impl Client {

Ok(Self { sender })
}
}

#[async_trait]
impl FluentClient for Client {
/// Send a fluent record to the fluentd server.
///
/// ## Params:
/// `tag` - Event category of a record to send.
///
/// `record` - Map object to send as a fluent record.
fn send(&self, tag: &'static str, record: Map) -> Result<(), SendError> {
fn send_with_time(
&self,
tag: &'static str,
record: Map,
timestamp: i64,
) -> Result<(), SendError> {
let record = Record {
tag,
record,
timestamp: SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
.unwrap()
.as_secs(),
timestamp,
options: Options {
chunk: general_purpose::STANDARD.encode(Uuid::new_v4()),
},
Expand All @@ -141,6 +134,19 @@ impl FluentClient for Client {
source: e.to_string(),
})
}
}

#[async_trait]
impl FluentClient for Client {
/// Send a fluent record to the fluentd server.
///
/// ## Params:
/// `tag` - Event category of a record to send.
///
/// `record` - Map object to send as a fluent record.
fn send(&self, tag: &'static str, record: Map) -> Result<(), SendError> {
self.send_with_time(tag, record, chrono::Local::now().timestamp())
}

/// Stop the worker.
async fn stop(self) -> Result<(), SendError> {
Expand Down Expand Up @@ -171,3 +177,74 @@ impl FluentClient for NopClient {
Ok(())
}
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn test_send_with_time() {
use std::collections::HashMap;

use chrono::TimeZone;

use crate::record::Value;
use crate::record_map;

let (sender, receiver) = channel::unbounded();
let client = Client { sender };

let timestamp = chrono::Utc.timestamp_opt(1234567, 0).unwrap().timestamp();
let record = record_map!("age".to_string() => 20.into());
assert!(
client.send_with_time("test", record, timestamp).is_ok(),
"failed to send with time"
);

let got = receiver.recv().expect("failed to receive");
match got {
Message::Record(r) => {
assert_eq!(r.tag, "test");
assert_eq!(r.record, record_map!("age".to_string() => 20.into()));
assert_eq!(r.timestamp, 1234567);
}
Message::Terminate => unreachable!("got terminate message"),
}
}

#[tokio::test]
async fn test_stop() {
let (sender, receiver) = channel::unbounded();
let client = Client { sender };
assert!(client.stop().await.is_ok(), "faled to stop");

let got = receiver.recv().expect("failed to receive");
match got {
Message::Record(_) => unreachable!("got record message"),
Message::Terminate => {}
};
}

#[test]
fn test_client_drop_sends_terminate() {
let (sender, receiver) = channel::unbounded();
{
Client { sender };
}
let got = receiver.recv().expect("failed to receive");
match got {
Message::Record(_) => unreachable!("got record message"),
Message::Terminate => {}
};
}

#[test]
fn test_default_config() {
let config: Config = Default::default();
assert_eq!(config.addr, "127.0.0.1:24224".parse().unwrap());
assert_eq!(config.timeout, Duration::new(3, 0));
assert_eq!(config.retry_wait, 500);
assert_eq!(config.max_retry, 10);
assert_eq!(config.max_retry_wait, 60000);
}
}
2 changes: 1 addition & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
//!
//! ## Example
//!
//! ```no_run
//! ```
//! use std::collections::HashMap;
//!
//! use tokio_fluent::record_map;
Expand Down
2 changes: 1 addition & 1 deletion src/record.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ impl Map {
///
/// ## Example
///
/// ```rust
/// ```
/// use std::collections::HashMap;
///
/// use tokio_fluent::record_map;
Expand Down
48 changes: 40 additions & 8 deletions src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use log::warn;
use rmp_serde::Serializer;
use serde::{ser::SerializeMap, Deserialize, Serialize};
use tokio::{
io::{AsyncReadExt, AsyncWriteExt},
io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt, BufWriter},
net::TcpStream,
time::Duration,
};
Expand Down Expand Up @@ -40,7 +40,7 @@ impl std::fmt::Display for Error {
#[derive(Debug, Serialize)]
pub struct Record {
pub tag: &'static str,
pub timestamp: u64,
pub timestamp: i64,
pub record: Map,
pub options: Options,
}
Expand Down Expand Up @@ -72,7 +72,7 @@ struct SerializedRecord {
chunk: String,
}

#[derive(Debug, Deserialize)]
#[derive(Debug, Serialize, Deserialize)]
struct AckResponse {
ack: String,
}
Expand All @@ -83,16 +83,19 @@ pub struct RetryConfig {
pub max_wait: u64,
}

pub struct Worker {
stream: TcpStream,
pub struct Worker<T = TcpStream> {
stream: BufWriter<T>,
receiver: Receiver<Message>,
retry_config: RetryConfig,
}

impl Worker {
pub fn new(stream: TcpStream, receiver: Receiver<Message>, retry_config: RetryConfig) -> Self {
impl<T> Worker<T>
where
T: AsyncWrite + AsyncRead + Unpin,
{
pub fn new(stream: T, receiver: Receiver<Message>, retry_config: RetryConfig) -> Self {
Self {
stream,
stream: BufWriter::new(stream),
receiver,
retry_config,
}
Expand Down Expand Up @@ -189,3 +192,32 @@ impl Worker {
}
}
}

#[cfg(test)]
mod tests {
use super::*;

#[tokio::test]
async fn test_read_ack() {
let (client, mut server) = tokio::io::duplex(64);
let (_, receiver) = channel::unbounded();
let retry_config = RetryConfig {
initial_wait: 50,
max: 13,
max_wait: 60000,
};
let mut worker = Worker::new(client, receiver, retry_config);

let ack_response = AckResponse {
ack: "Mzc4NDQwMzctNGY4Ni00MmI2LWFiYjMtMjk3MGZkNDUzY2Y2".to_string(),
};
let ack_response = rmp_serde::to_vec(&ack_response).unwrap();
server.write_all(&ack_response).await.unwrap();

let ack = worker.read_ack().await.expect("failed to read ack");
assert_eq!(
ack.ack,
"Mzc4NDQwMzctNGY4Ni00MmI2LWFiYjMtMjk3MGZkNDUzY2Y2".to_string()
);
}
}