Skip to content

Commit

Permalink
feat: Implement self retry
Browse files Browse the repository at this point in the history
  • Loading branch information
johnmanjiro13 committed Jan 10, 2023
1 parent c90a4ef commit 12e2172
Show file tree
Hide file tree
Showing 4 changed files with 102 additions and 64 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,11 @@ include = [
[dependencies]
async-trait = "0.1.61"
base64 = "0.21.0"
bytes = { version = "1.3.0", features = ["serde"] }
crossbeam = "0.8.2"
log = "0.4.17"
rmp-serde = "1.1.1"
serde = {version = "1.0.152", features = ["derive"]}
# TODO: Remove unneeded features
tokio = {version = "1.23.0", features = ["full"]}
tokio-retry = "0.3.0"
uuid = { version = "1.2.2", features = ["v4"] }
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -80,4 +80,4 @@ The default is 500.
### MaxRetry

Sets the maximum number of retries.
If the number of retries become larger than this value, the write/send operation will fail. The default is 5.
If the number of retries become larger than this value, the write/send operation will fail. The default is 10.
11 changes: 5 additions & 6 deletions src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,12 @@
//! ```

use std::net::SocketAddr;
use std::sync::Arc;
use std::time::{Duration, SystemTime};

use async_trait::async_trait;
use base64::{engine::general_purpose, Engine};
use crossbeam::channel::{self, Sender};
use tokio::{net::TcpStream, sync::Mutex, time::timeout};
use tokio::{net::TcpStream, time::timeout};
use uuid::Uuid;

use crate::record::Map;
Expand Down Expand Up @@ -67,7 +66,7 @@ pub struct Config {
pub retry_wait: u64,
/// The maximum number of retries. If the number of retries become larger
/// than this value, the write/send operation will fail. The default is 13.
pub max_retry: usize,
pub max_retry: u32,
}

impl Default for Config {
Expand All @@ -76,7 +75,7 @@ impl Default for Config {
addr: "127.0.0.1:24224".parse().unwrap(),
timeout: Duration::new(3, 0),
retry_wait: 500,
max_retry: 5,
max_retry: 10,
}
}
}
Expand All @@ -101,8 +100,8 @@ impl Client {

let config = config.clone();
let _ = tokio::spawn(async move {
let worker = Worker::new(
Arc::new(Mutex::new(stream)),
let mut worker = Worker::new(
stream,
receiver,
RetryConfig {
initial_wait: config.retry_wait,
Expand Down
151 changes: 95 additions & 56 deletions src/worker.rs
Original file line number Diff line number Diff line change
@@ -1,31 +1,37 @@
use std::sync::Arc;

use bytes::{Buf, BufMut};
use crossbeam::channel::{self, Receiver};
use log::warn;
use rmp_serde::{Deserializer, Serializer};
use rmp_serde::Serializer;
use serde::{ser::SerializeMap, Deserialize, Serialize};
use tokio::{
io::{AsyncReadExt, AsyncWriteExt},
net::TcpStream,
sync::Mutex,
time::Duration,
};
use tokio_retry::{strategy::ExponentialBackoff, Retry};

use crate::record::Map;

const RETRY_INCREMENT_RATE: f64 = 1.5;

#[derive(Debug, Clone)]
pub enum WorkerError {
DeriveError(String),
AckUnmatchedError(String, String),
pub enum Error {
WriteFailed(String),
ReadFailed(String),
AckUnmatched(String, String),
MaxRetriesExceeded,
ConnectionClosed,
}

impl std::error::Error for WorkerError {}
impl std::error::Error for Error {}

impl std::fmt::Display for WorkerError {
impl std::fmt::Display for Error {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let s = match *self {
WorkerError::DeriveError(ref e) => e,
WorkerError::AckUnmatchedError(_, _) => "request chunk and response ack did not match",
Error::WriteFailed(ref e) => e,
Error::ReadFailed(ref e) => e,
Error::AckUnmatched(_, _) => "request chunk and response ack did not match",
Error::MaxRetriesExceeded => "max retries exceeded",
Error::ConnectionClosed => "connection closed",
};
write!(f, "{}", s)
}
Expand Down Expand Up @@ -60,92 +66,125 @@ pub enum Message {
Terminate,
}

#[derive(Debug)]
struct SerializedRecord {
record: bytes::Bytes,
chunk: String,
}

#[derive(Debug, Deserialize)]
struct Response {
struct AckResponse {
ack: String,
}

pub struct RetryConfig {
pub initial_wait: u64,
pub max: usize,
pub max: u32,
}

pub struct Worker {
stream: Arc<Mutex<TcpStream>>,
stream: TcpStream,
receiver: Receiver<Message>,
retry_config: RetryConfig,
}

impl Worker {
pub fn new(
stream: Arc<Mutex<TcpStream>>,
receiver: Receiver<Message>,
retry_config: RetryConfig,
) -> Self {
pub fn new(stream: TcpStream, receiver: Receiver<Message>, retry_config: RetryConfig) -> Self {
Self {
stream,
receiver,
retry_config,
}
}

pub async fn run(&self) {
pub async fn run(&mut self) {
loop {
match self.receiver.try_recv() {
Ok(Message::Record(record)) => {
let mut buf = Vec::new();
match record.serialize(&mut Serializer::new(&mut buf)) {
Ok(_) => (),
let record = match self.encode(record) {
Ok(record) => record,
Err(e) => {
warn!("failed to serialize a message: {}", e);
continue;
}
}

let retry_strategy =
ExponentialBackoff::from_millis(self.retry_config.initial_wait)
.take(self.retry_config.max);
let retry_task = Retry::spawn(retry_strategy, || async {
self.send(&buf, record.options.chunk.clone()).await
});
};

match retry_task.await {
Ok(_) => (),
Err(e) => {
warn!("failed to send a message to the fluent server: {}", e);
continue;
}
}
match self.write_with_retry(&record).await {
Ok(_) => {}
Err(_) => continue,
};
}
Err(channel::TryRecvError::Empty) => continue,
Ok(Message::Terminate) | Err(channel::TryRecvError::Disconnected) => break,
}
}
}

async fn send(&self, src: &[u8], chunk: String) -> Result<(), WorkerError> {
let stream = self.stream.clone();
let mut stream = stream.lock().await;
stream
.write_all(src)
fn encode(&self, record: Record) -> Result<SerializedRecord, rmp_serde::encode::Error> {
let mut writer = bytes::BytesMut::new().writer();
record.serialize(&mut Serializer::new(&mut writer))?;
Ok(SerializedRecord {
record: writer.into_inner().freeze(),
chunk: record.options.chunk,
})
}

async fn write_with_retry(&mut self, record: &SerializedRecord) -> Result<(), Error> {
let mut wait_time = Duration::from_millis(0);
for i in 0..self.retry_config.max as i32 {
tokio::time::sleep(wait_time).await;

match self.write(record).await {
Ok(_) => return Ok(()),
Err(Error::ConnectionClosed) => return Err(Error::ConnectionClosed),
Err(_) => {}
}

wait_time = Duration::from_millis(
(self.retry_config.initial_wait as f64 * RETRY_INCREMENT_RATE.powi(i - 1)) as u64,
);
}
warn!("write's max retries exceeded.");
Err(Error::MaxRetriesExceeded)
}

async fn write(&mut self, record: &SerializedRecord) -> Result<(), Error> {
self.stream
.write_all(record.record.chunk())
.await
.map_err(|e| WorkerError::DeriveError(e.to_string()))?;
.map_err(|e| Error::WriteFailed(e.to_string()))?;

let mut buf = vec![0; 128];
let n = stream
.read(&mut buf)
let received_ack = self
.read_ack()
.await
.map_err(|e| WorkerError::DeriveError(e.to_string()))?;
let response: Response = Deserialize::deserialize(&mut Deserializer::new(&buf[0..n]))
.map_err(|e| WorkerError::DeriveError(e.to_string()))?;
if response.ack == chunk {
Ok(())
} else {
.map_err(|e| Error::ReadFailed(e.to_string()))?;

if received_ack.ack != record.chunk {
warn!(
"ack and chunk did not match. ack: {}, chunk: {}",
response.ack, chunk
received_ack.ack, record.chunk
);
Err(WorkerError::AckUnmatchedError(response.ack, chunk))
return Err(Error::AckUnmatched(received_ack.ack, record.chunk.clone()));
}
Ok(())
}

async fn read_ack(&mut self) -> Result<AckResponse, Error> {
let mut buf = bytes::BytesMut::new();
loop {
if let Ok(ack) = rmp_serde::from_slice::<AckResponse>(&buf) {
return Ok(ack);
}

if self
.stream
.read_buf(&mut buf)
.await
.map_err(|e| Error::ReadFailed(e.to_string()))?
== 0
{
return Err(Error::ConnectionClosed);
}
}
}
}

0 comments on commit 12e2172

Please sign in to comment.