Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Merged by Bors] - Consumer option to display json obj as table #1796

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
## Platform Version 0.9.13 - UNRELEASED
* Fix connector create with `create_topic` option to succeed if topic already exists. ([#1823](https://github.com/infinyon/fluvio/pull/1823))
* Add `#[smartstream(filter_map)]` for filtering and transforming at the same time. ([#1826](https://github.com/infinyon/fluvio/issues/1826))
* Add table display output option to consumer for json objects ([#1642](https://github.com/infinyon/fluvio/issues/1642))

## Platform Version 0.9.12 - 2021-10-27
* Add examples for ArrayMap. ([#1804](https://github.com/infinyon/fluvio/issues/1804))
Expand Down
33 changes: 22 additions & 11 deletions crates/fluvio-cli/src/consume/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ use fluvio::consumer::{PartitionSelectionStrategy, Record};
use crate::{CliError, Result};
use crate::common::FluvioExtensionMetadata;
use self::record_format::{
format_text_record, format_binary_record, format_dynamic_record, format_raw_record, format_json,
format_text_record, format_binary_record, format_dynamic_record, format_raw_record,
format_json, print_table_record,
};
use handlebars::Handlebars;

Expand Down Expand Up @@ -255,6 +256,8 @@ impl ConsumeOpt {
}
};

// This is used by table output, to print the table titles only once
let mut record_count = 0;
while let Some(result) = stream.next().await {
let result: std::result::Result<Record, _> = result;
let record = match result {
Expand All @@ -266,15 +269,16 @@ impl ConsumeOpt {
Err(other) => return Err(other.into()),
};

self.print_record(templates.as_ref(), &record);
self.print_record(templates.as_ref(), &record, record_count);
record_count += 1;
}

debug!("fetch loop exited");
Ok(())
}

/// Process fetch topic response based on output type
pub fn print_record(&self, templates: Option<&Handlebars>, record: &Record) {
pub fn print_record(&self, templates: Option<&Handlebars>, record: &Record, count: i32) {
let formatted_key = record
.key()
.map(|key| String::from_utf8_lossy(key).to_string())
Expand All @@ -292,6 +296,9 @@ impl ConsumeOpt {
Some(format_dynamic_record(record.value()))
}
(Some(ConsumeOutputType::raw), None) => Some(format_raw_record(record.value())),
(Some(ConsumeOutputType::table), None) => {
Some(print_table_record(record.value(), count))
}
(_, Some(templates)) => {
let value = String::from_utf8_lossy(record.value()).to_string();
let object = serde_json::json!({
Expand All @@ -304,15 +311,18 @@ impl ConsumeOpt {
}
};

match formatted_value {
Some(value) if self.key_value => {
println!("[{}] {}", formatted_key, value);
}
Some(value) => {
println!("{}", value);
// If the consume type is table, we don't want to accidentally print a newline
if self.output != Some(ConsumeOutputType::table) {
match formatted_value {
Some(value) if self.key_value => {
println!("[{}] {}", formatted_key, value);
}
Some(value) => {
println!("{}", value);
}
// (Some(_), None) only if JSON cannot be printed, so skip.
_ => debug!("Skipping record that cannot be formatted"),
}
// (Some(_), None) only if JSON cannot be printed, so skip.
_ => debug!("Skipping record that cannot be formatted"),
}
}

Expand Down Expand Up @@ -421,6 +431,7 @@ arg_enum! {
binary,
json,
raw,
table,
}
}

Expand Down
58 changes: 58 additions & 0 deletions crates/fluvio-cli/src/consume/record_format.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,64 @@ pub fn format_raw_record(record: &[u8]) -> String {
String::from_utf8_lossy(record).to_string()
}

// -----------------------------------
// Table
// -----------------------------------

/// Print records in table format
pub fn print_table_record(record: &[u8], count: i32) -> String {
use prettytable::{Row, cell, Cell, Slice};
use prettytable::format::{self, FormatBuilder};

let maybe_json: serde_json::Value = match serde_json::from_slice(record) {
Ok(value) => value,
Err(_e) => panic!("Value not json"),
};

let obj = maybe_json.as_object().unwrap();

// This is the case where we don't provide any table info. We want to print a table w/ all top-level keys as headers
// Think about how we might only select specific keys
let keys_str: Vec<String> = obj.keys().map(|k| k.to_string()).collect();

// serde_json's Value::String() gets wrapped in quotes if we use `to_string()`
let values_str: Vec<String> = obj
.values()
.map(|v| {
if v.is_string() {
v.as_str()
.expect("Value not representable as str")
.to_string()
} else {
v.to_string()
}
})
.collect();

let header: Row = Row::new(keys_str.iter().map(|k| cell!(k.to_owned())).collect());
let entries: Row = Row::new(values_str.iter().map(|v| Cell::new(v)).collect());

// Print the table
let t_print = vec![header, entries];

let mut table = prettytable::Table::init(t_print);

let base_format: FormatBuilder = (*format::consts::FORMAT_NO_BORDER_LINE_SEPARATOR).into();
let table_format = base_format;
table.set_format(table_format.build());

// FIXME: Live display of table data easily misaligns column widths
// if there is a length diff between the header and the data
// The rows after the first (count == 0) don't line up with the header
// prettytable might not support the live display use-case we want
if count == 0 {
table.printstd();
} else {
let slice = table.slice(1..);
slice.printstd();
}
format!("")
}
// -----------------------------------
// Utilities
// -----------------------------------
Expand Down