Skip to content

Commit

Permalink
Improve DebugPrinter (#817)
Browse files Browse the repository at this point in the history
  • Loading branch information
rukai committed Sep 21, 2022
1 parent df03487 commit 9d5b50c
Show file tree
Hide file tree
Showing 5 changed files with 172 additions and 14 deletions.
138 changes: 136 additions & 2 deletions shotover-proxy/src/frame/cassandra.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ use cassandra_protocol::frame::message_register::BodyReqRegister;
use cassandra_protocol::frame::message_request::RequestBody;
use cassandra_protocol::frame::message_response::ResponseBody;
use cassandra_protocol::frame::message_result::{
BodyResResultPrepared, BodyResResultSetKeyspace, ResResultBody, ResultKind, RowsMetadata,
RowsMetadataFlags,
BodyResResultPrepared, BodyResResultSetKeyspace, ColSpec, ColTypeOption, ResResultBody,
ResultKind, RowsMetadata, RowsMetadataFlags,
};
use cassandra_protocol::frame::{
Direction, Envelope as RawCassandraFrame, Flags, Opcode, Serialize, StreamId, Version,
Expand All @@ -31,6 +31,7 @@ use cql3_parser::cassandra_ast::CassandraAST;
use cql3_parser::cassandra_statement::CassandraStatement;
use cql3_parser::common::Operand;
use nonzero_ext::nonzero;
use std::fmt::{Display, Formatter, Result as FmtResult};
use std::io::Cursor;
use std::net::IpAddr;
use std::num::NonZeroU32;
Expand Down Expand Up @@ -687,6 +688,139 @@ pub struct CassandraBatch {
timestamp: Option<CLong>,
}

impl Display for CassandraFrame {
fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult {
write!(f, "{} stream:{}", self.version, self.stream_id)?;
if let Some(tracing_id) = self.tracing_id {
write!(f, " tracing_id:{}", tracing_id)?;
}
if !self.warnings.is_empty() {
write!(f, " warnings:{:?}", self.warnings)?;
}
match &self.operation {
CassandraOperation::Query { query, params } => {
let QueryParams {
consistency,
with_names,
values,
page_size,
paging_state,
serial_consistency,
timestamp,
keyspace,
now_in_seconds,
} = params.as_ref();

write!(
f,
" Query consistency:{} with_names:{:?}",
consistency, with_names,
)?;

if let Some(values) = values {
write!(f, " values:{:?}", values)?;
}
if let Some(page_size) = page_size {
write!(f, " page_size:{:?}", page_size)?;
}
if let Some(paging_state) = paging_state {
write!(f, " paging_state:{:?}", paging_state)?;
}
if let Some(serial_consistency) = serial_consistency {
write!(f, " serial_consistency:{:?}", serial_consistency)?;
}
if let Some(timestamp) = timestamp {
write!(f, " timestamp:{:?}", timestamp)?;
}
if let Some(keyspace) = keyspace {
write!(f, " keyspace:{:?}", keyspace)?;
}
if let Some(now_in_seconds) = now_in_seconds {
write!(f, " now_in_seconds:{:?}", now_in_seconds)?;
}
write!(f, " {}", query)
}
CassandraOperation::Register(BodyReqRegister { events }) => {
write!(f, " Register {:?}", events)
}
CassandraOperation::Error(ErrorBody {
error_code,
message,
additional_info,
}) => {
write!(
f,
" Error 0x{:x} {:?} {:?}",
error_code, additional_info, message
)
}
CassandraOperation::Result(result) => match result {
CassandraResult::Rows { rows, metadata } => {
let RowsMetadata {
flags,
columns_count,
paging_state,
new_metadata_id,
global_table_spec,
col_specs,
} = metadata.as_ref();

write!(
f,
" Result Rows {:?} columns_count:{}",
flags, columns_count,
)?;
if let Some(paging_state) = paging_state {
write!(f, " paging_state:{:?}", paging_state)?;
}
if let Some(new_metadata_id) = new_metadata_id {
write!(f, " new_metadata_id:{:?}", new_metadata_id)?;
}
if let Some(global_table_spec) = global_table_spec {
write!(
f,
" global_name:{}.{}",
global_table_spec.ks_name, global_table_spec.table_name
)?;
}
write!(f, " cols:[")?;
let mut need_comma = false;
for col_spec in col_specs {
let ColSpec {
table_spec,
name,
col_type,
} = col_spec;

let ColTypeOption { id, value } = col_type;

if need_comma {
write!(f, ", ")?;
}
need_comma = true;
write!(f, "{}:{:?}", name, id)?;
if let Some(value) = value {
write!(f, " of {:?}", value)?;
}
if let Some(table_spec) = table_spec {
write!(f, " table_spec:{:?}", table_spec)?;
}
}
write!(f, "]")?;
for row in rows {
write!(f, "\n {:?}", row)?;
}
Ok(())
}
CassandraResult::Void => write!(f, "Result Void"),
_ => write!(f, "Result {:?}", result),
},
CassandraOperation::Ready(_) => write!(f, " Ready"),
_ => write!(f, " {:?}", self.operation),
}
}
}

#[cfg(test)]
mod test {
use crate::frame::cassandra::{parse_statement_single, to_cassandra_type};
Expand Down
11 changes: 11 additions & 0 deletions shotover-proxy/src/frame/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ pub use redis_protocol::resp2::types::Frame as RedisFrame;

use anyhow::{anyhow, Result};
use bytes::Bytes;
use std::fmt::{Display, Formatter, Result as FmtResult};

#[derive(PartialEq, Debug, Clone, Copy)]
pub enum MessageType {
Expand Down Expand Up @@ -78,3 +79,13 @@ impl Frame {
}
}
}

impl Display for Frame {
fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult {
match self {
Frame::Cassandra(frame) => write!(f, "Cassandra {}", frame),
Frame::Redis(frame) => write!(f, "Redis {:?})", frame),
Frame::None => write!(f, "None"),
}
}
}
14 changes: 14 additions & 0 deletions shotover-proxy/src/message/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -347,6 +347,20 @@ impl Message {
None => None,
}
}

pub fn to_high_level_string(&mut self) -> String {
if let Some(response) = self.frame() {
format!("{}", response)
} else if let Some(MessageInner::RawBytes {
bytes,
message_type,
}) = &self.inner
{
format!("Unparseable {:?} message {:?}", message_type, bytes)
} else {
unreachable!("self.frame() failed so MessageInner must still be RawBytes")
}
}
}

/// There are 3 levels of processing the message can be in.
Expand Down
16 changes: 11 additions & 5 deletions shotover-proxy/src/transforms/debug/printer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,17 @@ impl DebugPrinter {

#[async_trait]
impl Transform for DebugPrinter {
async fn transform<'a>(&'a mut self, message_wrapper: Wrapper<'a>) -> ChainResponse {
info!("Request content: {:?}", message_wrapper.messages);
async fn transform<'a>(&'a mut self, mut message_wrapper: Wrapper<'a>) -> ChainResponse {
for request in &mut message_wrapper.messages {
info!("Request: {}", request.to_high_level_string());
}

self.counter += 1;
let response = message_wrapper.call_next_transform().await;
info!("Response content: {:?}", response);
response
let mut responses = message_wrapper.call_next_transform().await?;

for response in &mut responses {
info!("Response: {}", response.to_high_level_string());
}
Ok(responses)
}
}
7 changes: 0 additions & 7 deletions shotover-proxy/src/transforms/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ use anyhow::Result;
use async_recursion::async_recursion;
use async_trait::async_trait;
use core::fmt;
use core::fmt::Display;
use futures::Future;
use metrics::{counter, histogram};
use serde::Deserialize;
Expand Down Expand Up @@ -401,12 +400,6 @@ impl<'a> Clone for Wrapper<'a> {
}
}

impl<'a> Display for Wrapper<'a> {
fn fmt(&self, f: &mut Formatter<'_>) -> std::result::Result<(), std::fmt::Error> {
f.write_fmt(format_args!("{:#?}", self.messages))
}
}

tokio::task_local! {
#[allow(clippy::declare_interior_mutable_const)]
pub static CONTEXT_CHAIN_NAME: String;
Expand Down

0 comments on commit 9d5b50c

Please sign in to comment.