Skip to content

Commit

Permalink
Remove guess, force msgtype (#61)
Browse files Browse the repository at this point in the history
* Remove guess, force msgtype

* Reflect removed guessing in README

* Remove file argument

* Fix failing unit test

* Cargo fmt
  • Loading branch information
sevagh committed Jun 3, 2017
1 parent e09fbf5 commit 98a4e47
Show file tree
Hide file tree
Showing 6 changed files with 87 additions and 182 deletions.
29 changes: 12 additions & 17 deletions README.md
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -6,35 +6,30 @@


`pq` will pretty-print when outputting to a tty, but you should pipe it to `jq` for more fully-featured json handling. `pq` will pretty-print when outputting to a tty, but you should pipe it to `jq` for more fully-featured json handling.


### :warning: Guess feature removed

Guessing has been removed. I suspect it was never correct, and furthermore counting "NULL" fields is not robust - the fields could be nulled by the user and I end up discarding valid messages. Ultimately, decoding protobuf without knowing the type is pointless and I won't be doing it.

Going forward, the advantage of `pq` is that instead of compiling schema-specific code, you can use a single binary (distributed everywhere) and just drop `*.fdset` files into `~/.pq` to support new message types.

### Download ### Download


pq is on [crates.io](https://crates.io/crates/pq): `cargo install pq`. You can also download a static binary from the [releases page](https://github.com/sevagh/pq/releases). pq is on [crates.io](https://crates.io/crates/pq): `cargo install pq`. You can also download a static binary from the [releases page](https://github.com/sevagh/pq/releases).


### Usage ### Usage


To set up, put your `*.fdset` files in `~/.pq`: To set up, put your `*.fdset` files in `~/.pq` (specify an alternate directory with the `FDSET_PATH=` env var):


``` ```
$ protoc -o dog.fdset dog.proto $ protoc -o dog.fdset dog.proto
$ protoc -o person.fdset person.proto $ protoc -o person.fdset person.proto
$ cp *.fdset ~/.pq/ $ cp *.fdset ~/.pq/
``` ```


Pipe a single compiled protobuf message - pq will guess the type: Pipe a single compiled protobuf message:

```
$ pq <./tests/samples/dog
{
"age": 4,
"breed": "poodle",
"temperament": "excited"
}
```

Provide a more explicit message type with `--msgtype`:


``` ```
$ pq <./tests/samples/dog --msgtype=com.example.dog.Dog $ pq com.example.dog.Dog <./tests/samples/dog
{ {
"age": 4, "age": 4,
"breed": "poodle", "breed": "poodle",
Expand All @@ -45,7 +40,7 @@ $ pq <./tests/samples/dog --msgtype=com.example.dog.Dog
Pipe a `varint`-delimited stream: Pipe a `varint`-delimited stream:


``` ```
$ pq --stream="varint" <./tests/samples/dog_stream $ pq com.example.dog.Dog --stream varint <./tests/samples/dog_stream
{ {
"age": 10, "age": 10,
"breed": "gsd", "breed": "gsd",
Expand All @@ -56,7 +51,7 @@ $ pq --stream="varint" <./tests/samples/dog_stream
Consume from a Kafka stream: Consume from a Kafka stream:


``` ```
$ pq kafka my_topic --brokers=192.168.0.1:9092 --from-beginning --count=1 $ pq kafka my_topic --brokers 192.168.0.1:9092 --beginning --count 1 com.example.dog.Dog
{ {
"age": 10, "age": 10,
"breed": "gsd", "breed": "gsd",
Expand All @@ -67,7 +62,7 @@ $ pq kafka my_topic --brokers=192.168.0.1:9092 --from-beginning --count=1
Convert a Kafka stream to varint-delimited: Convert a Kafka stream to varint-delimited:


``` ```
$ pq kafka my_topic --brokers=192.168.0.1:9092 --beginning --count=1 --convert=varint | pq --stream=varint $ pq kafka my_topic --brokers=192.168.0.1:9092 --beginning --count 1 --convert varint | pq com.example.dog.Dog --stream varint
{ {
"age": 10, "age": 10,
"breed": "gsd", "breed": "gsd",
Expand Down
91 changes: 14 additions & 77 deletions src/decode.rs
Original file line number Original file line Diff line number Diff line change
@@ -1,58 +1,34 @@
use fdset_discovery::get_loaded_descriptors; use discovery::get_loaded_descriptors;
use error::*; use error::*;
use std::io::Write; use std::io::Write;
use std::result::Result; use std::result::Result;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use serde_json::ser::Serializer; use serde_json::ser::Serializer;
use serde_protobuf::de::Deserializer; use serde_protobuf::de::Deserializer;
use serde_protobuf::error::{Error, ErrorKind}; use serde_protobuf::error::{Error, ErrorKind};
use serde_protobuf::descriptor::{Descriptors, MessageDescriptor}; use serde_protobuf::descriptor::Descriptors;
use serde_value::Value; use serde_value::Value;
use protobuf::CodedInputStream; use protobuf::CodedInputStream;
use newline_pretty_formatter::NewlineFormatter; use newline_pretty_formatter::NewlineFormatter;


pub struct PqrsDecoder<'a> { pub struct PqrsDecoder<'a> {
pub descriptors: Descriptors, pub descriptors: Descriptors,
pub message_descriptors: Vec<MessageDescriptor>, pub message_type: &'a str,
pub message_type: Option<&'a str>,
} }


impl<'a> PqrsDecoder<'a> { impl<'a> PqrsDecoder<'a> {
pub fn new(msgtype: Option<&str>) -> Result<PqrsDecoder, PqrsError> { pub fn new(msgtype: &str) -> Result<PqrsDecoder, PqrsError> {
let loaded_descs = match get_loaded_descriptors() { let loaded_descs = match get_loaded_descriptors() {
Err(e) => return Err(PqrsError::FdsetDiscoveryError(e)), Err(e) => return Err(PqrsError::FdsetDiscoveryError(e)),
Ok(x) => x, Ok(x) => x,
}; };
let mut descriptors = Descriptors::new(); let mut descriptors = Descriptors::new();
let mut message_descriptors = Vec::new(); for fdset in loaded_descs {
for (fdset_path, fdset) in loaded_descs {
descriptors.add_file_set_proto(&fdset); descriptors.add_file_set_proto(&fdset);
match msgtype {
None => {
message_descriptors.append(&mut fdset
.get_file()
.iter()
.flat_map(|x| {
x.get_message_type()
.iter()
.map(|y| {
MessageDescriptor::from_proto(fdset_path
.to_string_lossy()
.into_owned()
.as_str(),
y)
})
.collect::<Vec<_>>()
})
.collect::<Vec<_>>());
}
Some(_) => (),
}
} }
descriptors.resolve_refs(); descriptors.resolve_refs();
Ok(PqrsDecoder { Ok(PqrsDecoder {
descriptors: descriptors, descriptors: descriptors,
message_descriptors: message_descriptors,
message_type: msgtype, message_type: msgtype,
}) })
} }
Expand All @@ -62,24 +38,15 @@ impl<'a> PqrsDecoder<'a> {
out: &mut Write, out: &mut Write,
is_tty: bool) is_tty: bool)
-> Result<(), DecodeError> { -> Result<(), DecodeError> {
let value = match self.message_type { let stream = CodedInputStream::from_bytes(data);
None => { let mut deserializer = Deserializer::for_named_message(&self.descriptors,
match discover_contenders(data, &self.descriptors, &self.message_descriptors) { &(format!(".{}",
Ok(value) => value, self.message_type)),
Err(e) => return Err(e), stream)
} .unwrap();
} let value = match deser(&mut deserializer) {
Some(ref x) => { Ok(value) => value,
let stream = CodedInputStream::from_bytes(data); Err(e) => return Err(e),
let mut deserializer = Deserializer::for_named_message(&self.descriptors,
&(format!(".{}", x)),
stream)
.unwrap();
match deser(&mut deserializer) {
Ok(value) => value,
Err(e) => return Err(e),
}
}
}; };
if is_tty { if is_tty {
let formatter = NewlineFormatter::default(); let formatter = NewlineFormatter::default();
Expand All @@ -96,36 +63,6 @@ impl<'a> PqrsDecoder<'a> {
} }
} }


fn discover_contenders(data: &[u8],
d: &Descriptors,
mds: &[MessageDescriptor])
-> Result<Value, DecodeError> {
let mut contenders = Vec::new();
for md in mds {
let stream = CodedInputStream::from_bytes(data);
let mut deserializer = Deserializer::new(d, md, stream);
match deser(&mut deserializer) {
Ok(Value::Map(value)) => {
let mut unknowns_found = 0;
for v in value.values() {
match *v {
Value::Unit => unknowns_found += 1,
_ => continue,
}
}
if unknowns_found == 0 {
contenders.push(value);
}
}
Ok(_) | Err(_) => continue,
}
}
if contenders.is_empty() {
return Err(DecodeError::NoSuccessfulAttempts);
}
Ok(Value::Map(contenders.into_iter().max_by_key(|x| x.len()).unwrap()))
}

fn deser(deserializer: &mut Deserializer) -> Result<Value, DecodeError> { fn deser(deserializer: &mut Deserializer) -> Result<Value, DecodeError> {
match Value::deserialize(deserializer) { match Value::deserialize(deserializer) {
Ok(x) => Ok(x), Ok(x) => Ok(x),
Expand Down
45 changes: 4 additions & 41 deletions src/fdset_discovery/mod.rs → src/discovery.rs
Original file line number Original file line Diff line number Diff line change
@@ -1,62 +1,25 @@
extern crate protobuf; extern crate protobuf;


use error::DiscoveryError;
use std::env; use std::env;
use std::fs::{File, read_dir}; use std::fs::{File, read_dir};
use std::path::PathBuf; use std::path::PathBuf;
use std::result::Result; use std::result::Result;
use protobuf::parse_from_reader; use protobuf::parse_from_reader;
use protobuf::descriptor::FileDescriptorSet; use protobuf::descriptor::FileDescriptorSet;
use std::fmt;
use std::error::Error;


#[derive(Debug)] pub fn get_loaded_descriptors() -> Result<Vec<FileDescriptorSet>, DiscoveryError> {
pub enum DiscoveryError {
NoHome,
NoFdsetPath(String),
NoFiles(String),
}

impl fmt::Display for DiscoveryError {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match *self {
DiscoveryError::NoHome => write!(f, "$HOME not defined"),
DiscoveryError::NoFdsetPath(ref path) => write!(f, "Path {} doesn't exist", path),
DiscoveryError::NoFiles(ref path) => write!(f, "No valid fdset files in path {}", path),
}
}
}

impl Error for DiscoveryError {
fn description(&self) -> &str {
match *self {
DiscoveryError::NoHome => "$HOME not defined",
DiscoveryError::NoFdsetPath(_) => "fdset_path doesn't exist",
DiscoveryError::NoFiles(_) => "no files in fdset_path",
}
}

fn cause(&self) -> Option<&Error> {
match *self {
DiscoveryError::NoHome |
DiscoveryError::NoFdsetPath(_) |
DiscoveryError::NoFiles(_) => None,
}
}
}


pub fn get_loaded_descriptors() -> Result<Vec<(PathBuf, FileDescriptorSet)>, DiscoveryError> {
let (fdsets, fdset_path) = match discover_fdsets() { let (fdsets, fdset_path) = match discover_fdsets() {
Ok((fdsets, fdsets_path)) => (fdsets, fdsets_path), Ok((fdsets, fdsets_path)) => (fdsets, fdsets_path),
Err(e) => return Err(e), Err(e) => return Err(e),
}; };
let mut descriptors: Vec<(PathBuf, FileDescriptorSet)> = Vec::new(); let mut descriptors: Vec<FileDescriptorSet> = Vec::new();


for fdset_path in fdsets { for fdset_path in fdsets {
let mut fdset_file = File::open(fdset_path.as_path()).unwrap(); let mut fdset_file = File::open(fdset_path.as_path()).unwrap();
match parse_from_reader(&mut fdset_file) { match parse_from_reader(&mut fdset_file) {
Err(_) => continue, Err(_) => continue,
Ok(x) => descriptors.push((fdset_path, x)), Ok(x) => descriptors.push(x),
} }
} }


Expand Down
42 changes: 35 additions & 7 deletions src/error.rs
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -3,7 +3,41 @@ use protobuf::ProtobufError;
use std::error::Error; use std::error::Error;
use serde_protobuf; use serde_protobuf;
use serde_json; use serde_json;
use fdset_discovery::DiscoveryError;
#[derive(Debug)]
pub enum DiscoveryError {
NoHome,
NoFdsetPath(String),
NoFiles(String),
}

impl fmt::Display for DiscoveryError {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match *self {
DiscoveryError::NoHome => write!(f, "$HOME not defined"),
DiscoveryError::NoFdsetPath(ref path) => write!(f, "Path {} doesn't exist", path),
DiscoveryError::NoFiles(ref path) => write!(f, "No valid fdset files in path {}", path),
}
}
}

impl Error for DiscoveryError {
fn description(&self) -> &str {
match *self {
DiscoveryError::NoHome => "$HOME not defined",
DiscoveryError::NoFdsetPath(_) => "fdset_path doesn't exist",
DiscoveryError::NoFiles(_) => "no files in fdset_path",
}
}

fn cause(&self) -> Option<&Error> {
match *self {
DiscoveryError::NoHome |
DiscoveryError::NoFdsetPath(_) |
DiscoveryError::NoFiles(_) => None,
}
}
}


#[derive(Debug)] #[derive(Debug)]
pub enum PqrsError { pub enum PqrsError {
Expand All @@ -14,7 +48,6 @@ pub enum PqrsError {


#[derive(Debug)] #[derive(Debug)]
pub enum DecodeError { pub enum DecodeError {
NoSuccessfulAttempts,
ProtobufError(ProtobufError), ProtobufError(ProtobufError),
SerdeProtobufError(serde_protobuf::error::Error), SerdeProtobufError(serde_protobuf::error::Error),
SerializeError(serde_json::Error), SerializeError(serde_json::Error),
Expand Down Expand Up @@ -63,9 +96,6 @@ impl From<DecodeError> for PqrsError {
impl fmt::Display for DecodeError { impl fmt::Display for DecodeError {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match *self { match *self {
DecodeError::NoSuccessfulAttempts => {
write!(f, "Couldn't decode with any message descriptor")
}
DecodeError::ProtobufError(ref err) => err.fmt(f), DecodeError::ProtobufError(ref err) => err.fmt(f),
DecodeError::SerdeProtobufError(ref err) => err.fmt(f), DecodeError::SerdeProtobufError(ref err) => err.fmt(f),
DecodeError::SerializeError(ref err) => err.fmt(f), DecodeError::SerializeError(ref err) => err.fmt(f),
Expand All @@ -76,7 +106,6 @@ impl fmt::Display for DecodeError {
impl Error for DecodeError { impl Error for DecodeError {
fn description(&self) -> &str { fn description(&self) -> &str {
match *self { match *self {
DecodeError::NoSuccessfulAttempts => "no suitable message descriptor",
DecodeError::ProtobufError(ref err) => err.description(), DecodeError::ProtobufError(ref err) => err.description(),
DecodeError::SerdeProtobufError(ref err) => err.description(), DecodeError::SerdeProtobufError(ref err) => err.description(),
DecodeError::SerializeError(ref err) => err.description(), DecodeError::SerializeError(ref err) => err.description(),
Expand All @@ -85,7 +114,6 @@ impl Error for DecodeError {


fn cause(&self) -> Option<&Error> { fn cause(&self) -> Option<&Error> {
match *self { match *self {
DecodeError::NoSuccessfulAttempts => None,
DecodeError::ProtobufError(ref err) => Some(err), DecodeError::ProtobufError(ref err) => Some(err),
DecodeError::SerdeProtobufError(ref err) => Some(err), DecodeError::SerdeProtobufError(ref err) => Some(err),
DecodeError::SerializeError(ref err) => Some(err), DecodeError::SerializeError(ref err) => Some(err),
Expand Down
Loading

0 comments on commit 98a4e47

Please sign in to comment.