Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add retry option #11

Merged
merged 12 commits into from
Jan 12, 2023
2 changes: 2 additions & 0 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ jobs:
- uses: actions-rs/toolchain@v1
with:
toolchain: stable
- name: Run clippy
run: cargo clippy
- name: Run cargo fmt
run: cargo fmt --check
- name: Run test
Expand Down
4 changes: 4 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,12 @@ include = [

[dependencies]
async-trait = "0.1.61"
base64 = "0.21.0"
bytes = { version = "1.3.0", features = ["serde"] }
crossbeam = "0.8.2"
log = "0.4.17"
rmp-serde = "1.1.1"
serde = {version = "1.0.152", features = ["derive"]}
# TODO: Remove unneeded features
tokio = {version = "1.23.0", features = ["full"]}
uuid = { version = "1.2.2", features = ["v4"] }
30 changes: 30 additions & 0 deletions LICENSE
Original file line number Diff line number Diff line change
Expand Up @@ -19,3 +19,33 @@ AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.

---

Some code of this project is inspired by [tracing-fluentd](https://github.com/DoumanAsh/tracing-fluentd).

Copyright (c) 2021 DoumanAsh

Boost Software License - Version 1.0 - August 17th, 2003

Permission is hereby granted, free of charge, to any person or organization
obtaining a copy of the software and accompanying documentation covered by
this license (the "Software") to use, reproduce, display, distribute,
execute, and transmit the Software, and to prepare derivative works of the
Software, and to permit third-parties to whom the Software is furnished to
do so, all subject to the following:

The copyright notices in the Software and this entire statement, including
the above license grant, this restriction and the following disclaimer,
must be included in all copies of the Software, in whole or in part, and
all derivative works of the Software, unless such copies or derivative
works are solely in the form of machine-executable object code generated by
a source language processor.

THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE, TITLE AND NON-INFRINGEMENT. IN NO EVENT
SHALL THE COPYRIGHT HOLDERS OR ANYONE DISTRIBUTING THE SOFTWARE BE LIABLE
FOR ANY DAMAGES OR OTHER LIABILITY, WHETHER IN CONTRACT, TORT OR OTHERWISE,
ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
DEALINGS IN THE SOFTWARE.
26 changes: 24 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,18 @@ tokio-fluent = "0.2.1"
```rust
use std::collections::HashMap;

use tokio_fluent::client::{Client, Config, FluentClient};
use tokio_fluent::{Client, Config, FluentClient};
use tokio_fluent::record::{Map, Value};
use tokio_fluent::record_map;

#[tokio::main]
async fn main() {
let client = Client::new(&Config { addr: "127.0.0.1:24224".parse().unwrap() }).await.unwrap();
let client = Client::new(&Config {
addr: "127.0.0.1:24224".parse().unwrap(),
..Default::default()
})
.await
.unwrap();

// With Map::new()
let mut map = Map::new();
Expand Down Expand Up @@ -56,6 +61,7 @@ async fn main() {
```rust
let client = Client::new(&Config {
addr: "127.0.0.1:24224".parse().unwrap(),
..Default::default()
})
.await
.unwrap();
Expand All @@ -64,3 +70,19 @@ let client = Client::new(&Config {
### Timeout

Set the timeout value of `std::time::Duration` to connect to the destination. The default is 3 seconds.

### retry_wait

Set the duration of the initial wait for the first retry, in milliseconds.
The actual retry wait will be r * 1.5^(N-1) (r: this value, N: the number of retries).
The default is 500.

### max_retry

Sets the maximum number of retries.
If the number of retries become larger than this value, the write/send operation will fail. The default is 10.
johnmanjiro13 marked this conversation as resolved.
Show resolved Hide resolved

### max_retry_wait

The maximum duration of wait between retries, in milliseconds. If calculated retry wait is larger than this value, operation will fail.
The default is 60,000 (60 seconds).
79 changes: 63 additions & 16 deletions src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
//! ## Example
//!
//! ```no_run
//! use tokio_fluent::client::{Client, Config, FluentClient};
//! use tokio_fluent::{Client, Config, FluentClient};
//! use tokio_fluent::record::{Map, Value};
//!
//! #[tokio::main]
Expand All @@ -25,35 +25,64 @@ use std::net::SocketAddr;
use std::time::{Duration, SystemTime};

use async_trait::async_trait;
use base64::{engine::general_purpose, Engine};
use crossbeam::channel::{self, Sender};
use tokio::net::TcpStream;
use tokio::time::timeout;
use tokio::{net::TcpStream, time::timeout};
use uuid::Uuid;

use crate::record::Map;
use crate::worker::{Message, Record, Worker};
use crate::worker::{Message, Options, Record, RetryConfig, Worker};

#[derive(Debug, Clone)]
pub struct SendError {
source: String,
}

impl std::error::Error for SendError {}

impl std::fmt::Display for SendError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.source)
}
}

#[derive(Debug, Clone)]
/// Config for a client.
pub struct Config {
/// The address of the fluentd server.
/// The default is `127.0.0.1:24224`.
pub addr: SocketAddr,
/// The timeout value to connect to the fluentd server.
/// The default is 3 seconds.
pub timeout: Duration,
/// The duration of the initial wait for the first retry, in milliseconds.
/// The default is 500.
pub retry_wait: u64,
/// The maximum number of retries. If the number of retries become larger
/// than this value, the write/send operation will fail. The default is 10.
pub max_retry: u32,
/// The maximum duration of wait between retries, in milliseconds.
/// If calculated retry wait is larger than this value, operation will fail.
/// The default is 60,000 (60 seconds).
pub max_retry_wait: u64,
}

impl Default for Config {
fn default() -> Self {
Self {
addr: "127.0.0.1:24224".parse().unwrap(),
timeout: Duration::new(3, 0),
retry_wait: 500,
max_retry: 10,
max_retry_wait: 60000,
}
}
}

#[async_trait]
pub trait FluentClient: Send + Sync {
fn send(&self, tag: &'static str, record: Map) -> Result<(), Box<dyn std::error::Error>>;
async fn stop(self) -> Result<(), channel::SendError<Message>>;
fn send(&self, tag: &'static str, record: Map) -> Result<(), SendError>;
async fn stop(self) -> Result<(), SendError>;
}

#[derive(Debug, Clone)]
Expand All @@ -65,11 +94,20 @@ pub struct Client {
impl Client {
/// Connect to the fluentd server and create a worker with tokio::spawn.
pub async fn new(config: &Config) -> tokio::io::Result<Client> {
let socket = timeout(config.timeout, TcpStream::connect(config.addr)).await??;
let stream = timeout(config.timeout, TcpStream::connect(config.addr)).await??;
let (sender, receiver) = channel::unbounded();

let config = config.clone();
let _ = tokio::spawn(async move {
let mut worker = Worker::new(socket, receiver);
let mut worker = Worker::new(
stream,
receiver,
RetryConfig {
initial_wait: config.retry_wait,
max: config.max_retry,
max_wait: config.max_retry_wait,
},
);
worker.run().await
});

Expand All @@ -85,21 +123,30 @@ impl FluentClient for Client {
/// `tag` - Event category of a record to send.
///
/// `record` - Map object to send as a fluent record.
fn send(&self, tag: &'static str, record: Map) -> Result<(), Box<dyn std::error::Error>> {
fn send(&self, tag: &'static str, record: Map) -> Result<(), SendError> {
let record = Record {
tag,
record,
timestamp: SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)?
.duration_since(SystemTime::UNIX_EPOCH)
.unwrap()
.as_secs(),
options: Options {
chunk: general_purpose::STANDARD.encode(Uuid::new_v4()),
},
};
self.sender.send(Message::Record(record))?;
Ok(())
self.sender
.send(Message::Record(record))
.map_err(|e| SendError {
source: e.to_string(),
})
}

/// Stop the worker.
async fn stop(self) -> Result<(), channel::SendError<Message>> {
self.sender.send(Message::Terminate)
async fn stop(self) -> Result<(), SendError> {
self.sender.send(Message::Terminate).map_err(|e| SendError {
source: e.to_string(),
})
}
}

Expand All @@ -116,11 +163,11 @@ pub struct NopClient;

#[async_trait]
impl FluentClient for NopClient {
fn send(&self, _tag: &'static str, _record: Map) -> Result<(), Box<dyn std::error::Error>> {
fn send(&self, _tag: &'static str, _record: Map) -> Result<(), SendError> {
Ok(())
}

async fn stop(self) -> Result<(), channel::SendError<Message>> {
async fn stop(self) -> Result<(), SendError> {
Ok(())
}
}
4 changes: 3 additions & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
//! use std::collections::HashMap;
//!
//! use tokio_fluent::record_map;
//! use tokio_fluent::client::{Client, Config, FluentClient};
//! use tokio_fluent::{Client, Config, FluentClient};
//! use tokio_fluent::record::{Map, Value};
//!
//! #[tokio::main]
Expand Down Expand Up @@ -34,3 +34,5 @@
pub mod client;
pub mod record;
mod worker;

pub use client::{Client, Config, FluentClient};
28 changes: 0 additions & 28 deletions src/record.rs
Original file line number Diff line number Diff line change
@@ -1,33 +1,5 @@
//!Fluentd record definitions.

// This code is inspired by [tracing-fluentd](https://github.com/DoumanAsh/tracing-fluentd)
//
// Copyright (c) 2021 DoumanAsh
//
// Boost Software License - Version 1.0 - August 17th, 2003
//
// Permission is hereby granted, free of charge, to any person or organization
// obtaining a copy of the software and accompanying documentation covered by
// this license (the "Software") to use, reproduce, display, distribute,
// execute, and transmit the Software, and to prepare derivative works of the
// Software, and to permit third-parties to whom the Software is furnished to
// do so, all subject to the following:
//
// The copyright notices in the Software and this entire statement, including
// the above license grant, this restriction and the following disclaimer,
// must be included in all copies of the Software, in whole or in part, and
// all derivative works of the Software, unless such copies or derivative
// works are solely in the form of machine-executable object code generated by
// a source language processor.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE, TITLE AND NON-INFRINGEMENT. IN NO EVENT
// SHALL THE COPYRIGHT HOLDERS OR ANYONE DISTRIBUTING THE SOFTWARE BE LIABLE
// FOR ANY DAMAGES OR OTHER LIABILITY, WHETHER IN CONTRACT, TORT OR OTHERWISE,
// ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.

use core::fmt::Debug;
use std::collections::HashMap;

Expand Down
Loading