Skip to content

Commit

Permalink
codec direction logging (#1066)
Browse files Browse the repository at this point in the history
  • Loading branch information
conorbros committed Mar 7, 2023
1 parent c83fe0d commit ef64015
Show file tree
Hide file tree
Showing 15 changed files with 146 additions and 95 deletions.
6 changes: 3 additions & 3 deletions shotover-proxy/benches/benches/codec.rs
Expand Up @@ -5,7 +5,7 @@ use cassandra_protocol::frame::message_result::{
use cassandra_protocol::frame::Version;
use criterion::{black_box, criterion_group, BatchSize, Criterion};
use shotover_proxy::codec::cassandra::CassandraCodecBuilder;
use shotover_proxy::codec::CodecBuilder;
use shotover_proxy::codec::{CodecBuilder, Direction};
use shotover_proxy::frame::cassandra::{parse_statement_single, Tracing};
use shotover_proxy::frame::{CassandraFrame, CassandraOperation, CassandraResult, Frame};
use shotover_proxy::message::Message;
Expand All @@ -28,7 +28,7 @@ fn criterion_benchmark(c: &mut Criterion) {
},
}))];

let (_, mut encoder) = CassandraCodecBuilder::new().build();
let (_, mut encoder) = CassandraCodecBuilder::new(Direction::Sink).build();

group.bench_function("encode_cassandra_system.local_query", |b| {
b.iter_batched(
Expand All @@ -52,7 +52,7 @@ fn criterion_benchmark(c: &mut Criterion) {
operation: CassandraOperation::Result(peers_v2_result()),
}))];

let (_, mut encoder) = CassandraCodecBuilder::new().build();
let (_, mut encoder) = CassandraCodecBuilder::new(Direction::Sink).build();

group.bench_function("encode_cassandra_system.local_result", |b| {
b.iter_batched(
Expand Down
54 changes: 32 additions & 22 deletions shotover-proxy/src/codec/cassandra.rs
@@ -1,3 +1,4 @@
use super::Direction;
use crate::codec::{CodecBuilder, CodecReadError};
use crate::frame::cassandra::{CassandraMetadata, CassandraOperation, Tracing};
use crate::frame::{CassandraFrame, Frame, MessageType};
Expand All @@ -17,23 +18,24 @@ use std::sync::RwLock;
use tokio_util::codec::{Decoder, Encoder};
use tracing::info;

#[derive(Clone, Default)]
pub struct CassandraCodecBuilder {}

impl CassandraCodecBuilder {
pub fn new() -> Self {
Self::default()
}
#[derive(Clone)]
pub struct CassandraCodecBuilder {
direction: Direction,
}

impl CodecBuilder for CassandraCodecBuilder {
type Decoder = CassandraDecoder;
type Encoder = CassandraEncoder;

fn new(direction: Direction) -> Self {
Self { direction }
}

fn build(&self) -> (CassandraDecoder, CassandraEncoder) {
let compression = Arc::new(RwLock::new(Compression::None));
(
CassandraDecoder::new(compression.clone()),
CassandraEncoder::new(compression),
CassandraDecoder::new(compression.clone(), self.direction),
CassandraEncoder::new(compression, self.direction),
)
}
}
Expand All @@ -42,14 +44,16 @@ pub struct CassandraDecoder {
compression: Arc<RwLock<Compression>>,
messages: Vec<Message>,
current_use_keyspace: Option<Identifier>,
direction: Direction,
}

impl CassandraDecoder {
pub fn new(compression: Arc<RwLock<Compression>>) -> CassandraDecoder {
pub fn new(compression: Arc<RwLock<Compression>>, direction: Direction) -> CassandraDecoder {
CassandraDecoder {
compression,
messages: vec![],
current_use_keyspace: None,
direction,
}
}
}
Expand Down Expand Up @@ -102,7 +106,8 @@ impl Decoder for CassandraDecoder {
// Clear the read bytes from the FramedReader
let bytes = src.split_to(frame_len);
tracing::debug!(
"incoming cassandra message:\n{}",
"{}: incoming cassandra message:\n{}",
self.direction,
pretty_hex::pretty_hex(&bytes)
);

Expand Down Expand Up @@ -253,11 +258,15 @@ fn reject_protocol_version(version: u8) -> CodecReadError {

pub struct CassandraEncoder {
compression: Arc<RwLock<Compression>>,
direction: Direction,
}

impl CassandraEncoder {
pub fn new(compression: Arc<RwLock<Compression>>) -> CassandraEncoder {
CassandraEncoder { compression }
pub fn new(compression: Arc<RwLock<Compression>>, direction: Direction) -> CassandraEncoder {
CassandraEncoder {
compression,
direction,
}
}
}

Expand Down Expand Up @@ -308,7 +317,8 @@ impl Encoder<Messages> for CassandraEncoder {
}
}
tracing::debug!(
"outgoing cassandra message:\n{}",
"{}: outgoing cassandra message:\n{}",
self.direction,
pretty_hex::pretty_hex(&&dst[start..])
);
}
Expand All @@ -319,7 +329,7 @@ impl Encoder<Messages> for CassandraEncoder {
#[cfg(test)]
mod cassandra_protocol_tests {
use crate::codec::cassandra::CassandraCodecBuilder;
use crate::codec::CodecBuilder;
use crate::codec::{CodecBuilder, Direction};
use crate::frame::cassandra::{
parse_statement_single, CassandraFrame, CassandraOperation, CassandraResult, Tracing,
};
Expand Down Expand Up @@ -376,7 +386,7 @@ mod cassandra_protocol_tests {

#[test]
fn test_codec_startup() {
let mut codec = CassandraCodecBuilder::new();
let mut codec = CassandraCodecBuilder::new(Direction::Sink);
let mut startup_body: HashMap<String, String> = HashMap::new();
startup_body.insert("CQL_VERSION".into(), "3.0.0".into());
let bytes = hex!("0400000001000000160001000b43514c5f56455253494f4e0005332e302e30");
Expand All @@ -392,7 +402,7 @@ mod cassandra_protocol_tests {

#[test]
fn test_codec_options() {
let mut codec = CassandraCodecBuilder::new();
let mut codec = CassandraCodecBuilder::new(Direction::Sink);
let bytes = hex!("040000000500000000");
let messages = vec![Message::from_frame(Frame::Cassandra(CassandraFrame {
version: Version::V4,
Expand All @@ -406,7 +416,7 @@ mod cassandra_protocol_tests {

#[test]
fn test_codec_ready() {
let mut codec = CassandraCodecBuilder::new();
let mut codec = CassandraCodecBuilder::new(Direction::Sink);
let bytes = hex!("840000000200000000");
let messages = vec![Message::from_frame(Frame::Cassandra(CassandraFrame {
version: Version::V4,
Expand All @@ -420,7 +430,7 @@ mod cassandra_protocol_tests {

#[test]
fn test_codec_register() {
let mut codec = CassandraCodecBuilder::new();
let mut codec = CassandraCodecBuilder::new(Direction::Sink);
let bytes = hex!(
"040000010b000000310003000f544f504f4c4f47595f4348414e4745
000d5354415455535f4348414e4745000d534348454d415f4348414e4745"
Expand All @@ -443,7 +453,7 @@ mod cassandra_protocol_tests {

#[test]
fn test_codec_result() {
let mut codec = CassandraCodecBuilder::new();
let mut codec = CassandraCodecBuilder::new(Direction::Sink);
let bytes = hex!(
"840000020800000099000000020000000100000009000673797374656
d000570656572730004706565720010000b646174615f63656e746572000d0007686f73745f6964000c000c70726566
Expand Down Expand Up @@ -551,7 +561,7 @@ mod cassandra_protocol_tests {

#[test]
fn test_codec_query_select() {
let mut codec = CassandraCodecBuilder::new();
let mut codec = CassandraCodecBuilder::new(Direction::Sink);
let bytes = hex!(
"0400000307000000350000002e53454c454354202a2046524f4d20737973
74656d2e6c6f63616c205748455245206b6579203d20276c6f63616c27000100"
Expand All @@ -574,7 +584,7 @@ mod cassandra_protocol_tests {

#[test]
fn test_codec_query_insert() {
let mut codec = CassandraCodecBuilder::new();
let mut codec = CassandraCodecBuilder::new(Direction::Sink);
let bytes = hex!(
"0400000307000000330000002c494e5345525420494e544f207379737465
6d2e666f6f2028626172292056414c554553202827626172322729000100"
Expand Down
55 changes: 33 additions & 22 deletions shotover-proxy/src/codec/kafka.rs
@@ -1,3 +1,4 @@
use super::Direction;
use crate::codec::{CodecBuilder, CodecReadError};
use crate::frame::MessageType;
use crate::message::{Encodable, Message, Messages, ProtocolType};
Expand All @@ -7,16 +8,6 @@ use kafka_protocol::messages::ApiKey;
use std::sync::mpsc;
use tokio_util::codec::{Decoder, Encoder};

/// Depending on if the codec is used in a sink or a source requires different processing logic:
/// * Sources parse requests which do not require any special handling
/// * Sinks parse responses which requires first matching up the version and api_key with its corresponding request
/// + To achieve this Sinks use an mpsc channel to send header data from the encoder to the decoder
#[derive(Copy, Clone)]
pub enum Direction {
Source,
Sink,
}

#[derive(Copy, Clone, Debug, PartialEq)]
pub struct RequestHeader {
pub api_key: ApiKey,
Expand All @@ -28,15 +19,18 @@ pub struct KafkaCodecBuilder {
direction: Direction,
}

impl KafkaCodecBuilder {
pub fn new(direction: Direction) -> Self {
KafkaCodecBuilder { direction }
}
}

// Depending on if the codec is used in a sink or a source requires different processing logic:
// * Sources parse requests which do not require any special handling
// * Sinks parse responses which requires first matching up the version and api_key with its corresponding request
// + To achieve this Sinks use an mpsc channel to send header data from the encoder to the decoder
impl CodecBuilder for KafkaCodecBuilder {
type Decoder = KafkaDecoder;
type Encoder = KafkaEncoder;

fn new(direction: Direction) -> Self {
Self { direction }
}

fn build(&self) -> (KafkaDecoder, KafkaEncoder) {
let (tx, rx) = match self.direction {
Direction::Source => (None, None),
Expand All @@ -45,20 +39,28 @@ impl CodecBuilder for KafkaCodecBuilder {
(Some(tx), Some(rx))
}
};
(KafkaDecoder::new(rx), KafkaEncoder::new(tx))
(
KafkaDecoder::new(rx, self.direction),
KafkaEncoder::new(tx, self.direction),
)
}
}

pub struct KafkaDecoder {
request_header_rx: Option<mpsc::Receiver<RequestHeader>>,
messages: Messages,
direction: Direction,
}

impl KafkaDecoder {
pub fn new(request_header_rx: Option<mpsc::Receiver<RequestHeader>>) -> Self {
pub fn new(
request_header_rx: Option<mpsc::Receiver<RequestHeader>>,
direction: Direction,
) -> Self {
KafkaDecoder {
request_header_rx,
messages: vec![],
direction,
}
}
}
Expand All @@ -85,7 +87,8 @@ impl Decoder for KafkaDecoder {
if let Some(size) = get_length_of_full_message(src) {
let bytes = src.split_to(size);
tracing::debug!(
"incoming kafka message:\n{}",
"{}: incoming kafka message:\n{}",
self.direction,
pretty_hex::pretty_hex(&bytes)
);
let request_header = if let Some(rx) = self.request_header_rx.as_ref() {
Expand All @@ -110,11 +113,18 @@ impl Decoder for KafkaDecoder {

pub struct KafkaEncoder {
request_header_tx: Option<mpsc::Sender<RequestHeader>>,
direction: Direction,
}

impl KafkaEncoder {
pub fn new(request_header_tx: Option<mpsc::Sender<RequestHeader>>) -> Self {
KafkaEncoder { request_header_tx }
pub fn new(
request_header_tx: Option<mpsc::Sender<RequestHeader>>,
direction: Direction,
) -> Self {
KafkaEncoder {
request_header_tx,
direction,
}
}
}

Expand All @@ -140,7 +150,8 @@ impl Encoder<Messages> for KafkaEncoder {
tx.send(RequestHeader { api_key, version })?;
}
tracing::debug!(
"outgoing kafka message:\n{}",
"{}: outgoing kafka message:\n{}",
self.direction,
pretty_hex::pretty_hex(&&dst[start..])
);
result
Expand Down
18 changes: 18 additions & 0 deletions shotover-proxy/src/codec/mod.rs
@@ -1,12 +1,28 @@
use crate::message::Messages;
use cassandra_protocol::compression::Compression;
use core::fmt;
use kafka::RequestHeader;
use tokio_util::codec::{Decoder, Encoder};

pub mod cassandra;
pub mod kafka;
pub mod redis;

#[derive(Copy, Clone)]
pub enum Direction {
Source,
Sink,
}

impl fmt::Display for Direction {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::Sink => write!(f, "Sink"),
Self::Source => write!(f, "Source"),
}
}
}

#[derive(Debug, Clone, PartialEq, Copy)]
pub enum CodecState {
Cassandra {
Expand Down Expand Up @@ -66,4 +82,6 @@ pub trait CodecBuilder: Clone + Send {
type Decoder: DecoderHalf;
type Encoder: EncoderHalf;
fn build(&self) -> (Self::Decoder, Self::Encoder);

fn new(direction: Direction) -> Self;
}

0 comments on commit ef64015

Please sign in to comment.