Skip to content

Commit

Permalink
Optimize cassandra metadata (#981)
Browse files Browse the repository at this point in the history
  • Loading branch information
rukai committed Jan 9, 2023
1 parent 201047b commit 21d85bb
Show file tree
Hide file tree
Showing 3 changed files with 13 additions and 16 deletions.
22 changes: 10 additions & 12 deletions shotover-proxy/src/frame/cassandra.rs
Expand Up @@ -40,8 +40,9 @@ use uuid::Uuid;

/// Functions for operations on an unparsed Cassandra frame
pub mod raw_frame {
use super::{CassandraMetadata, RawCassandraFrame, Tracing};
use super::{CassandraMetadata, RawCassandraFrame};
use anyhow::{anyhow, bail, Result};
use cassandra_protocol::frame::Version;
use cassandra_protocol::{compression::Compression, frame::Opcode};
use nonzero_ext::nonzero;
use std::convert::TryInto;
Expand All @@ -63,15 +64,13 @@ pub mod raw_frame {

/// Parse metadata only from an unparsed Cassandra frame
pub(crate) fn metadata(bytes: &[u8]) -> Result<CassandraMetadata> {
let frame = RawCassandraFrame::from_buffer(bytes, Compression::None)
.map_err(|e| anyhow!("{e:?}"))?
.envelope;
let tracing = Tracing::from_frame(&frame);
if bytes.len() < 9 {
return Err(anyhow!("Not enough bytes for cassandra frame"));
}
Ok(CassandraMetadata {
version: frame.version,
stream_id: frame.stream_id,
tracing,
opcode: frame.opcode,
version: Version::try_from(bytes[0])?,
stream_id: i16::from_be_bytes(bytes[2..4].try_into()?),
opcode: Opcode::try_from(bytes[4])?,
})
}

Expand All @@ -88,12 +87,12 @@ pub mod raw_frame {
}
}

/// Only includes data within the header
/// Data within the body may require decompression which is too expensive
pub struct CassandraMetadata {
pub version: Version,
pub stream_id: StreamId,
pub tracing: Tracing,
pub opcode: Opcode,
// missing `warnings` field because we are not using it currently
}

#[derive(PartialEq, Debug, Clone, Copy)]
Expand Down Expand Up @@ -145,7 +144,6 @@ impl CassandraFrame {
CassandraMetadata {
version: self.version,
stream_id: self.stream_id,
tracing: self.tracing,
opcode: self.operation.to_opcode(),
}
}
Expand Down
5 changes: 3 additions & 2 deletions shotover-proxy/src/message/mod.rs
@@ -1,4 +1,5 @@
use crate::codec::redis::redis_query_type;
use crate::frame::cassandra::Tracing;
use crate::frame::{
cassandra,
cassandra::{to_cassandra_type, CassandraMetadata, CassandraOperation},
Expand Down Expand Up @@ -249,7 +250,7 @@ impl Message {
message: error,
ty: ErrorType::Server,
}),
tracing: frame.tracing,
tracing: Tracing::Response(None),
warnings: vec![],
}),
Metadata::None => Frame::None,
Expand Down Expand Up @@ -292,7 +293,7 @@ impl Message {
Frame::Cassandra(CassandraFrame {
version: metadata.version,
stream_id: metadata.stream_id,
tracing: metadata.tracing,
tracing: Tracing::Response(None),
warnings: vec![],
operation: body,
})
Expand Down
2 changes: 0 additions & 2 deletions shotover-proxy/src/transforms/cassandra/sink_cluster/mod.rs
Expand Up @@ -1194,7 +1194,6 @@ fn get_execute_message(message: &mut Message) -> Option<(&BodyReqExecuteOwned, C
operation: CassandraOperation::Execute(execute_body),
version,
stream_id,
tracing,
..
})) = message.frame()
{
Expand All @@ -1203,7 +1202,6 @@ fn get_execute_message(message: &mut Message) -> Option<(&BodyReqExecuteOwned, C
CassandraMetadata {
version: *version,
stream_id: *stream_id,
tracing: *tracing,
opcode: Opcode::Execute,
},
));
Expand Down

0 comments on commit 21d85bb

Please sign in to comment.