Skip to content

Commit

Permalink
remove MessageValue::Rows (#787)
Browse files Browse the repository at this point in the history
  • Loading branch information
rukai committed Sep 8, 2022
1 parent 04a744e commit 1c31882
Show file tree
Hide file tree
Showing 7 changed files with 37 additions and 63 deletions.
4 changes: 2 additions & 2 deletions shotover-proxy/src/codec/cassandra.rs
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ mod cassandra_protocol_tests {
parse_statement_single, CassandraFrame, CassandraOperation, CassandraResult,
};
use crate::frame::Frame;
use crate::message::{Message, MessageValue};
use crate::message::Message;
use bytes::BytesMut;
use cassandra_protocol::frame::message_result::{
ColSpec, ColType, ColTypeOption, ColTypeOptionValue, RowsMetadata, RowsMetadataFlags,
Expand Down Expand Up @@ -266,7 +266,7 @@ mod cassandra_protocol_tests {
let messages = vec![Message::from_frame(Frame::Cassandra(CassandraFrame {
version: Version::V4,
operation: CassandraOperation::Result(CassandraResult::Rows {
value: MessageValue::Rows(vec![]),
rows: vec![],
metadata: Box::new(RowsMetadata {
flags: RowsMetadataFlags::GLOBAL_TABLE_SPACE,
columns_count: 9,
Expand Down
53 changes: 25 additions & 28 deletions shotover-proxy/src/frame/cassandra.rs
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ impl CassandraFrame {
.collect()
};
CassandraOperation::Result(CassandraResult::Rows {
value: MessageValue::Rows(converted_rows),
rows: converted_rows,
metadata: Box::new(rows.metadata),
})
}
Expand Down Expand Up @@ -422,8 +422,8 @@ impl CassandraOperation {
}
.serialize_to_vec(version),
CassandraOperation::Result(result) => match result {
CassandraResult::Rows { value, metadata } => {
Self::build_cassandra_result_body(version, value, *metadata)
CassandraResult::Rows { rows, metadata } => {
Self::build_cassandra_result_body(version, rows, *metadata)
}
CassandraResult::SetKeyspace(set_keyspace) => {
ResResultBody::SetKeyspace(*set_keyspace)
Expand Down Expand Up @@ -475,32 +475,29 @@ impl CassandraOperation {

fn build_cassandra_result_body(
protocol_version: Version,
result: MessageValue,
rows: Vec<Vec<MessageValue>>,
metadata: RowsMetadata,
) -> ResResultBody {
if let MessageValue::Rows(rows) = result {
let rows_count = rows.len() as CInt;
let rows_content = rows
.into_iter()
.map(|row| {
row.into_iter()
.map(|value| {
CBytes::new(
cassandra_protocol::types::value::Bytes::from(value).into_inner(),
)
})
.collect()
})
.collect();

return ResResultBody::Rows(BodyResResultRows {
protocol_version,
metadata,
rows_count,
rows_content,
});
}
unreachable!()
let rows_count = rows.len() as CInt;
let rows_content = rows
.into_iter()
.map(|row| {
row.into_iter()
.map(|value| {
CBytes::new(
cassandra_protocol::types::value::Bytes::from(value).into_inner(),
)
})
.collect()
})
.collect();

ResResultBody::Rows(BodyResResultRows {
protocol_version,
metadata,
rows_count,
rows_content,
})
}
}

Expand Down Expand Up @@ -658,7 +655,7 @@ pub fn to_cassandra_type(operand: &Operand) -> CassandraType {
pub enum CassandraResult {
// values are boxed so that Void takes minimal stack space
Rows {
value: MessageValue,
rows: Vec<Vec<MessageValue>>,
metadata: Box<RowsMetadata>,
},
SetKeyspace(Box<BodyResResultSetKeyspace>),
Expand Down
7 changes: 0 additions & 7 deletions shotover-proxy/src/message/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -425,7 +425,6 @@ pub enum MessageValue {
Boolean(bool),
Inet(IpAddr),
List(Vec<MessageValue>),
Rows(Vec<Vec<MessageValue>>),
NamedRows(Vec<BTreeMap<String, MessageValue>>),
Document(BTreeMap<String, MessageValue>),
FragmentedResponse(Vec<MessageValue>),
Expand Down Expand Up @@ -512,11 +511,6 @@ impl From<MessageValue> for RedisFrame {
MessageValue::Boolean(b) => RedisFrame::Integer(i64::from(b)),
MessageValue::Inet(i) => RedisFrame::SimpleString(i.to_string().into()),
MessageValue::List(l) => RedisFrame::Array(l.into_iter().map(|v| v.into()).collect()),
MessageValue::Rows(r) => RedisFrame::Array(
r.into_iter()
.map(|v| MessageValue::List(v).into())
.collect(),
),
MessageValue::NamedRows(_) => todo!(),
MessageValue::Document(_) => todo!(),
MessageValue::FragmentedResponse(l) => {
Expand Down Expand Up @@ -651,7 +645,6 @@ impl From<MessageValue> for cassandra_protocol::types::value::Bytes {
MessageValue::Float(f) => f.into_inner().into(),
MessageValue::Boolean(b) => b.into(),
MessageValue::List(l) => l.into(),
MessageValue::Rows(r) => cassandra_protocol::types::value::Bytes::from(r),
MessageValue::NamedRows(n) => cassandra_protocol::types::value::Bytes::from(n),
MessageValue::Document(d) => cassandra_protocol::types::value::Bytes::from(d),
MessageValue::Inet(i) => i.into(),
Expand Down
8 changes: 3 additions & 5 deletions shotover-proxy/src/transforms/cassandra/peers_rewrite.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,10 +117,8 @@ fn extract_native_port_column(peer_table: &FQName, message: &mut Message) -> Vec
fn rewrite_port(message: &mut Message, column_names: &[Identifier], new_port: u16) {
if let Some(Frame::Cassandra(frame)) = message.frame() {
// CassandraOperation::Error(_) is another possible case, we should silently ignore such cases
if let CassandraOperation::Result(CassandraResult::Rows {
value: MessageValue::Rows(rows),
metadata,
}) = &mut frame.operation
if let CassandraOperation::Result(CassandraResult::Rows { rows, metadata }) =
&mut frame.operation
{
for (i, col) in metadata.col_specs.iter().enumerate() {
if column_names.contains(&Identifier::parse(&col.name)) {
Expand Down Expand Up @@ -177,7 +175,7 @@ mod test {
tracing_id: None,
warnings: vec![],
operation: CassandraOperation::Result(Rows {
value: MessageValue::Rows(rows),
rows,
metadata: Box::new(RowsMetadata {
flags: RowsMetadataFlags::GLOBAL_TABLE_SPACE,
columns_count: 1,
Expand Down
17 changes: 5 additions & 12 deletions shotover-proxy/src/transforms/cassandra/sink_cluster/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -511,10 +511,8 @@ impl CassandraSinkCluster {
}

if let Some(Frame::Cassandra(frame)) = peers_response.frame() {
if let CassandraOperation::Result(CassandraResult::Rows {
value: MessageValue::Rows(rows),
metadata,
}) = &mut frame.operation
if let CassandraOperation::Result(CassandraResult::Rows { rows, metadata }) =
&mut frame.operation
{
*rows = self
.shotover_peers
Expand Down Expand Up @@ -646,10 +644,8 @@ impl CassandraSinkCluster {
}

if let Some(Frame::Cassandra(frame)) = local_response.frame() {
if let CassandraOperation::Result(CassandraResult::Rows {
value: MessageValue::Rows(rows),
metadata,
}) = &mut frame.operation
if let CassandraOperation::Result(CassandraResult::Rows { rows, metadata }) =
&mut frame.operation
{
// The local_response message is guaranteed to come from a node that is in our configured data_center/rack.
// That means we can leave fields like rack and data_center alone and get exactly what we want.
Expand Down Expand Up @@ -813,10 +809,7 @@ struct NodeInfo {
fn parse_system_nodes(mut response: Message) -> Result<Vec<NodeInfo>> {
if let Some(Frame::Cassandra(frame)) = response.frame() {
match &mut frame.operation {
CassandraOperation::Result(CassandraResult::Rows {
value: MessageValue::Rows(rows),
..
}) => rows
CassandraOperation::Result(CassandraResult::Rows { rows, .. }) => rows
.iter_mut()
.map(|row| {
if row.len() != 5 {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,10 +109,7 @@ fn system_peers_into_nodes(
) -> Result<Vec<CassandraNode>> {
if let Some(Frame::Cassandra(frame)) = response.frame() {
match &mut frame.operation {
CassandraOperation::Result(CassandraResult::Rows {
value: MessageValue::Rows(rows),
..
}) => rows
CassandraOperation::Result(CassandraResult::Rows { rows, .. }) => rows
.iter_mut()
.filter(|row| {
if let Some(MessageValue::Varchar(data_center)) = row.get(2) {
Expand Down
6 changes: 1 addition & 5 deletions shotover-proxy/src/transforms/protect/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -171,11 +171,7 @@ impl Transform for Protect {
let mut invalidate_cache = false;
if let Some(Frame::Cassandra(CassandraFrame { operation, .. })) = request.frame() {
if let Some(Frame::Cassandra(CassandraFrame {
operation:
CassandraOperation::Result(CassandraResult::Rows {
value: MessageValue::Rows(rows),
..
}),
operation: CassandraOperation::Result(CassandraResult::Rows { rows, .. }),
..
})) = response.frame()
{
Expand Down

0 comments on commit 1c31882

Please sign in to comment.