Permalink
Browse files

Use error chain (#65)

* Use errorkind

* Linted

* Iter over chained error in main

* Bump to versio 0.8.0 with error_chain

* Fix failing unit tests
  • Loading branch information...
sevagh committed Jul 28, 2017
1 parent 674cff0 commit a12bdbfd5a106cfe563f58ce83ba082c7bfdb052
Showing with 67 additions and 175 deletions.
  1. +2 −1 Cargo.lock
  2. +2 −1 Cargo.toml
  3. +15 −27 src/decode.rs
  4. +10 −9 src/discovery.rs
  5. +0 −122 src/error.rs
  6. +2 −0 src/errors.rs
  7. +34 −13 src/main.rs
  8. +2 −2 tests/test_pqrs_bin.rs
View

Some generated files are not rendered by default. Learn more.

Oops, something went wrong.
View
@@ -1,6 +1,6 @@
[package]
name = "pq"
version = "0.7.9"
version = "0.8.0"
authors = ["Sevag Hanssian <sevag.hanssian@gmail.com>"]
description = "jq for protobuf"
repository = "https://github.com/sevagh/pq"
@@ -18,6 +18,7 @@ protobuf = "1.2.1"
libc = "0.2.22"
stream_delimit = { path = "stream_delimit", version = "0.3.4" }
clap = "2.24.2"
error-chain = "0.10.0"
[workspace]
members = ["stream_delimit"]
View
@@ -1,27 +1,24 @@
use discovery::get_loaded_descriptors;
use error::*;
use std::io::Write;
use std::result::Result;
use serde::{Deserialize, Serialize};
use serde_json::ser::Serializer;
use serde_protobuf::de::Deserializer;
use serde_protobuf::error::{Error, ErrorKind};
use serde_protobuf::descriptor::Descriptors;
use serde_value::Value;
use protobuf::CodedInputStream;
use newline_pretty_formatter::NewlineFormatter;
use errors::*;
pub struct PqrsDecoder<'a> {
pub descriptors: Descriptors,
pub message_type: &'a str,
}
impl<'a> PqrsDecoder<'a> {
pub fn new(msgtype: &str) -> Result<PqrsDecoder, PqrsError> {
let loaded_descs = match get_loaded_descriptors() {
Err(e) => return Err(PqrsError::FdsetDiscoveryError(e)),
Ok(x) => x,
};
pub fn new(msgtype: &str) -> Result<PqrsDecoder> {
let loaded_descs = get_loaded_descriptors().chain_err(
|| "No loaded descriptors",
)?;
let mut descriptors = Descriptors::new();
for fdset in loaded_descs {
descriptors.add_file_set_proto(&fdset);
@@ -33,34 +30,25 @@ impl<'a> PqrsDecoder<'a> {
})
}
pub fn decode_message(
&self,
data: &[u8],
out: &mut Write,
is_tty: bool,
) -> Result<(), DecodeError> {
pub fn decode_message(&self, data: &[u8], out: &mut Write, is_tty: bool) -> Result<()> {
let stream = CodedInputStream::from_bytes(data);
let mut deserializer = Deserializer::for_named_message(
&self.descriptors,
&(format!(".{}", self.message_type)),
stream,
).expect("Couldn't initialize deserializer");
let value = match Value::deserialize(&mut deserializer) {
Ok(value) => value,
Err(Error(ErrorKind::Protobuf(e), _)) => return Err(DecodeError::ProtobufError(e)),
Err(e) => return Err(DecodeError::SerdeProtobufError(e)),
};
let value = Value::deserialize(&mut deserializer).chain_err(
|| "Deser error",
)?;
if is_tty {
let formatter = NewlineFormatter::default();
match value.serialize(&mut Serializer::with_formatter(out, formatter)) {
Ok(_) => Ok(()),
Err(e) => Err(DecodeError::SerializeError(e)),
}
Ok(value
.serialize(&mut Serializer::with_formatter(out, formatter))
.chain_err(|| "Ser error")?)
} else {
match value.serialize(&mut Serializer::new(out)) {
Ok(_) => Ok(()),
Err(e) => Err(DecodeError::SerializeError(e)),
}
Ok(value.serialize(&mut Serializer::new(out)).chain_err(
|| "Serr error",
)?)
}
}
}
View
@@ -1,43 +1,44 @@
extern crate protobuf;
use error::DiscoveryError;
use std::env;
use std::fs::{File, read_dir};
use std::path::PathBuf;
use std::result::Result;
use protobuf::parse_from_reader;
use protobuf::descriptor::FileDescriptorSet;
use errors::*;
pub fn get_loaded_descriptors() -> Result<Vec<FileDescriptorSet>, DiscoveryError> {
pub fn get_loaded_descriptors() -> Result<Vec<FileDescriptorSet>> {
let (fdsets, fdset_path) = match discover_fdsets() {
Ok((fdsets, fdsets_path)) => (fdsets, fdsets_path),
Err(e) => return Err(e),
};
let mut descriptors: Vec<FileDescriptorSet> = Vec::new();
for fdset_path in fdsets {
let mut fdset_file = File::open(fdset_path.as_path()).expect("Couldn't open fdset file");
let mut fdset_file = File::open(fdset_path.as_path()).chain_err(
|| "Couldn't open fdset file",
)?;
match parse_from_reader(&mut fdset_file) {
Err(_) => continue,
Ok(x) => descriptors.push(x),
}
}
if descriptors.is_empty() {
return Err(DiscoveryError::NoFiles(fdset_path));
return Err(format!("no valid fdset files in {}", fdset_path).into());
}
Ok(descriptors)
}
fn discover_fdsets() -> Result<(Vec<PathBuf>, String), DiscoveryError> {
fn discover_fdsets() -> Result<(Vec<PathBuf>, String)> {
let mut fdset_files = Vec::new();
let path = match env::var("FDSET_PATH") {
Ok(x) => PathBuf::from(x),
Err(_) => {
let mut home = match env::home_dir() {
Some(x) => x,
None => return Err(DiscoveryError::NoHome),
None => return Err("$HOME is not defined".into()),
};
home.push(".pq");
home
@@ -55,10 +56,10 @@ fn discover_fdsets() -> Result<(Vec<PathBuf>, String), DiscoveryError> {
}
}
}
Err(_) => return Err(DiscoveryError::NoFdsetPath(path_str)),
Err(_) => return Err(format!("Path {} not found", path_str).into()),
}
if fdset_files.is_empty() {
return Err(DiscoveryError::NoFiles(path_str));
return Err(format!("No valid fdset files in path {}", path_str).into());
}
Ok((fdset_files, path_str))
}
View

This file was deleted.

Oops, something went wrong.
View
@@ -0,0 +1,2 @@
// Create the Error, ErrorKind, ResultExt, and Result types
error_chain!{}
View
@@ -9,19 +9,19 @@ extern crate serde_protobuf;
extern crate serde_value;
extern crate serde_json;
extern crate stream_delimit;
#[macro_use]
extern crate error_chain;
mod discovery;
mod newline_pretty_formatter;
mod error;
mod decode;
mod errors;
use decode::PqrsDecoder;
use stream_delimit::consumer::*;
use stream_delimit::converter::StreamConverter;
use std::io::{self, Write};
use std::process;
use std::fmt::Display;
use error::PqrsError;
use clap::ArgMatches;
fn main() {
@@ -50,10 +50,19 @@ fn run_kafka(matches: &ArgMatches) {
if let (Some(brokers), Some(topic)) = (matches.value_of("BROKERS"), matches.value_of("TOPIC")) {
match KafkaConsumer::new(brokers, topic, matches.is_present("FROMBEG")) {
Ok(mut x) => decode_or_convert(StreamConsumer::new(&mut x), matches),
Err(e) => errexit(&e, 255),
Err(e) => {
eprintln!("Error: {}", e);
/*
for e in e.iter().skip(1) {
eprintln!("Reason: {}", e);
}
*/
process::exit(255);
}
}
} else {
errexit(&PqrsError::ArgumentError, 255);
eprintln!("Kafka needs a broker and topic");
process::exit(255);
}
}
@@ -68,7 +77,12 @@ fn run_byte(matches: &ArgMatches) {
match matches.value_of("STREAM").unwrap_or("single") {
"single" => Box::new(SingleConsumer::new(&mut stdin)),
"varint" => Box::new(VarintConsumer::new(&mut stdin)),
_ => errexit(&PqrsError::ArgumentError, 255),
_ => {
eprintln!(
"Only supports stream types single and varint",
);
process::exit(255);
}
};
decode_or_convert(StreamConsumer::new(byte_consumer.as_mut()), matches);
}
@@ -93,7 +107,13 @@ fn decode_or_convert(mut consumer: StreamConsumer, matches: &ArgMatches) {
"Must supply --msgtype or --convert",
)) {
Ok(x) => x,
Err(e) => errexit(&e, 255),
Err(e) => {
eprintln!("Error: {}", e);
for e in e.iter().skip(1) {
eprintln!("Reason: {}", e);
}
process::exit(255);
}
};
for (ctr, item) in consumer.enumerate() {
@@ -102,13 +122,14 @@ fn decode_or_convert(mut consumer: StreamConsumer, matches: &ArgMatches) {
}
match decoder.decode_message(&item, &mut stdout.lock(), out_is_tty) {
Ok(_) => (),
Err(e) => errexit(&e, 255),
Err(e) => {
eprintln!("Error: {}", e);
for e in e.iter().skip(1) {
eprintln!("Reason: {}", e);
}
process::exit(255);
}
}
}
}
}
fn errexit<T: Display>(msg: &T, exit_code: i32) -> ! {
eprintln!("{}", msg);
process::exit(exit_code);
}
Oops, something went wrong.

0 comments on commit a12bdbf

Please sign in to comment.