Skip to content

Commit

Permalink
Merge pull request #50 from shotover/buffer
Browse files Browse the repository at this point in the history
Buffer
  • Loading branch information
benbromhead committed Sep 16, 2020
2 parents 43859e8 + b12aac0 commit 06d7765
Show file tree
Hide file tree
Showing 11 changed files with 403 additions and 289 deletions.
42 changes: 33 additions & 9 deletions src/config/topology.rs
Original file line number Diff line number Diff line change
@@ -1,18 +1,20 @@
use crate::error::ChainResponse;
use crate::message::Messages;
use crate::sources::cassandra_source::CassandraConfig;
use crate::sources::mpsc_source::AsyncMpscConfig;
use crate::sources::{Sources, SourcesConfig};
use crate::transforms::cassandra_codec_destination::CodecConfiguration;
use crate::transforms::chain::TransformChain;
use crate::transforms::kafka_destination::KafkaConfig;
use crate::transforms::mpsc::AsyncMpscTeeConfig;
use crate::transforms::mpsc::TeeConfig;
use crate::transforms::{build_chain_from_config, TransformsConfig};
use anyhow::{anyhow, Result};
use bytes::Bytes;
use evmap::ReadHandleFactory;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use tokio::sync::mpsc::{channel, Receiver, Sender};
use tokio::sync::oneshot::Sender as OneSender;
use tokio::sync::{broadcast, mpsc};
use tracing::info;

Expand All @@ -24,20 +26,42 @@ pub struct Topology {
pub source_to_chain_mapping: HashMap<String, String>,
}

#[derive(Debug)]
pub struct ChannelMessage {
pub messages: Messages,
pub return_chan: Option<OneSender<ChainResponse>>,
}

impl ChannelMessage {
pub fn new_with_no_return(m: Messages) -> Self {
return ChannelMessage {
messages: m,
return_chan: None,
};
}

pub fn new(m: Messages, return_chan: OneSender<ChainResponse>) -> Self {
return ChannelMessage {
messages: m,
return_chan: Some(return_chan),
};
}
}

pub struct TopicHolder {
pub topics_rx: HashMap<String, Receiver<Messages>>,
pub topics_tx: HashMap<String, Sender<Messages>>,
pub topics_rx: HashMap<String, Receiver<ChannelMessage>>,
pub topics_tx: HashMap<String, Sender<ChannelMessage>>,
pub global_tx: Sender<(String, Bytes)>,
pub global_map_handle: ReadHandleFactory<String, Bytes>,
}

impl TopicHolder {
pub fn get_rx(&mut self, name: &str) -> Option<Receiver<Messages>> {
pub fn get_rx(&mut self, name: &str) -> Option<Receiver<ChannelMessage>> {
let rx = self.topics_rx.remove(name)?;
Some(rx)
}

pub fn get_tx(&self, name: &str) -> Option<Sender<Messages>> {
pub fn get_tx(&self, name: &str) -> Option<Sender<ChannelMessage>> {
let tx = self.topics_tx.get(name)?;
Some(tx.clone())
}
Expand Down Expand Up @@ -74,10 +98,10 @@ impl Topology {
global_tx: Sender<(String, Bytes)>,
global_map_handle: ReadHandleFactory<String, Bytes>,
) -> TopicHolder {
let mut topics_rx: HashMap<String, Receiver<Messages>> = HashMap::new();
let mut topics_tx: HashMap<String, Sender<Messages>> = HashMap::new();
let mut topics_rx: HashMap<String, Receiver<ChannelMessage>> = HashMap::new();
let mut topics_tx: HashMap<String, Sender<ChannelMessage>> = HashMap::new();
for name in &self.named_topics {
let (tx, rx) = channel::<Messages>(5);
let (tx, rx) = channel::<ChannelMessage>(5);
topics_rx.insert(name.clone(), rx);
topics_tx.insert(name.clone(), tx);
}
Expand Down Expand Up @@ -205,7 +229,7 @@ impl Topology {
bypass_query_processing: false,
});

let tee_conf = TransformsConfig::MPSCTee(AsyncMpscTeeConfig {
let tee_conf = TransformsConfig::MPSCTee(TeeConfig {
topic_name: String::from("test_topic"),
});

Expand Down
10 changes: 1 addition & 9 deletions src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -150,8 +150,6 @@ where
// Notifies the receiver half once all clones are
// dropped.
_shutdown_complete: self.shutdown_complete_tx.clone(),

connection_clock: Wrapping(0),
};

// Spawn a new task to process the connections. Tokio tasks are like
Expand Down Expand Up @@ -243,8 +241,6 @@ where

/// Not used directly. Instead, when `Handler` is dropped...?
_shutdown_complete: mpsc::Sender<()>,

connection_clock: Wrapping<u32>,
}

impl<S, C> Handler<S, C>
Expand Down Expand Up @@ -300,13 +296,9 @@ where
match frame {
Ok(message) => {
trace!("Received raw message {:?}", message);
self.connection_clock += Wrapping(1);
match self
.chain
.process_request(
Wrapper::new_with_rnd(message, self.connection_clock),
self.client_details.clone(),
)
.process_request(Wrapper::new(message), self.client_details.clone())
.await
{
Ok(modified_message) => {
Expand Down
7 changes: 3 additions & 4 deletions src/sources/mpsc_source.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
use crate::transforms::chain::TransformChain;
use tokio::sync::mpsc::Receiver;

use crate::config::topology::TopicHolder;
use crate::message::Messages;
use crate::config::topology::{ChannelMessage, TopicHolder};
use crate::server::Shutdown;
use crate::sources::{Sources, SourcesFromConfig};
use crate::transforms::Wrapper;
Expand Down Expand Up @@ -55,7 +54,7 @@ pub struct AsyncMpsc {
impl AsyncMpsc {
pub fn new(
mut chain: TransformChain,
mut rx: Receiver<Messages>,
mut rx: Receiver<ChannelMessage>,
name: &str,
shutdown: Shutdown,
shutdown_complete: mpsc::Sender<()>,
Expand All @@ -67,7 +66,7 @@ impl AsyncMpsc {
let _notifier = shutdown_complete.clone();
while !shutdown.is_shutdown() {
if let Some(m) = rx.recv().await {
let w: Wrapper = Wrapper::new(m.clone());
let w: Wrapper = Wrapper::new(m.messages.clone());
if let Err(e) = chain.process_request(w, "AsyncMpsc".to_string()).await {
warn!("Something went wrong {}", e)
}
Expand Down
2 changes: 1 addition & 1 deletion src/transforms/distributed/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
pub mod route;
pub mod scatter;
pub mod tuneable_consistency_scatter;
pub mod tunable_consistency_scatter;

0 comments on commit 06d7765

Please sign in to comment.