Permalink
Browse files

Add kafka (#49)

Adds Kafka as a stream source to stream_delimit
  • Loading branch information...
sevagh committed May 12, 2017
1 parent f78ba48 commit 1f55ea9d00131a60e4c337ff7e8171e6bfbb4fdf
Showing with 529 additions and 246 deletions.
  1. +265 −2 Cargo.lock
  2. +5 −1 Cargo.toml
  3. +5 −37 Makefile
  4. +22 −10 README.md
  5. +0 −42 docs/_pqrs.1
  6. +0 −85 docs/index.html
  7. +6 −0 src/error.rs
  8. +73 −33 src/main.rs
  9. +7 −1 stream_delimit/Cargo.toml
  10. +31 −0 stream_delimit/src/error.rs
  11. +115 −29 stream_delimit/src/lib.rs
  12. +0 −6 tests/python/testbench.py
View

Large diffs are not rendered by default.

Oops, something went wrong.
View
@@ -22,7 +22,11 @@ serde_json = "1.0.1"
serde-protobuf = "0.5.0"
protobuf = "1.2.1"
atty = "0.2.2"
stream_delimit = { path = "stream_delimit", version = "0.1.2" }
stream_delimit = { path = "stream_delimit", version = "0.2.0" }
[dependencies.rdkafka]
version = "^0.9.1"
features = ["ssl", "sasl"]
[replace]
"serde-protobuf:0.5.0" = { git = 'https://github.com/sevagh/rq' }
View
@@ -1,40 +1,8 @@
CARGO = $(PWD)/rust/bin/cargo
export RUSTC = $(PWD)/rust/bin/rustc
TARGET = x86_64-unknown-linux-musl
CARGO_FLAGS += --target=$(TARGET)
WORKSPACES = "./" "./stream_delimit/"
RUSTUP_URL = https://static.rust-lang.org/rustup.sh
include $(CONFIG)
all: build
cargo: rust/bin/cargo
build: cargo
$(CARGO) build $(CARGO_FLAGS)
build-release: cargo
$(CARGO) build $(CARGO_FLAGS) --release
clean:
$(CARGO) clean $(CARGO_FLAGS)
distclean: clean
-rm -rf rust
rust/rustup.sh:
mkdir -p rust
curl -sSf -o $@ $(RUSTUP_URL)
chmod +x $@
rust/bin/cargo: rust/rustup.sh
$< --disable-sudo --disable-ldconfig --yes --prefix=rust \
--with-target=$(TARGET)
docs:
mandoc -Thtml docs/_pqrs.1 >docs/index.html
build:
docker pull clux/muslrust
docker run -v $(PWD):/volume:Z -e USERID=1000 -w /volume -t clux/muslrust cargo build $(CARGOFLAGS)
lint:
@- $(foreach WORKSPACE,$(WORKSPACES), \
@@ -44,10 +12,10 @@ lint:
rustup default stable ;\
)
package: build-release
package: build
cd target/$(TARGET)/release;\
tar -czvf pq-bin.tar.gz pq;\
cd -;\
mv target/$(TARGET)/release/pq-bin.tar.gz ./pq-bin.tar.gz
.PHONY: all message build test clean distclean docs
.PHONY: all build docs lint package
View
@@ -10,7 +10,23 @@ pq is on [crates.io](https://crates.io/crates/pq): `cargo install pq`. You can a
### Usage
**Read the [manpage!](https://sevagh.github.io/pq/)**
```
pq - protobuf to json
Usage:
pq <infile> [--msgtype=<msgtype>] [--stream=<delim>] [--count=<count>]
pq kafka <topic> --brokers=<brokers> [--from-beginning] [--count=<count>]
pq (--help | --version)
Options:
--stream=<delim> Stream delimiter e.g. "varint", "leb128"
--msgtype=<msgtype> Message type e.g. com.example.Type
--brokers=<brokers> 1.2.3.4:9092,5.6.7.8:9092
--from-beginning Consume kafka from beginning
--count=<count> Stop after count messages
--help Show this screen.
--version Show version.
```
To set up, put your `*.fdset` files in `~/.pq`:
@@ -46,17 +62,13 @@ $ testbench.py "stream(limit=2)" | pq --stream="varint" | jq
}
```
Pipe a `varint`-delimited stream with trailing newlines:
Consume from a Kafka stream:
```
$ testbench.py "trail(trail=b'\n',limit=2)" | pq --stream=varint --trail=1 | jq
$ pq kafka my_topic --brokers=192.168.0.1:9092 --from-beginning --count=1 | jq
{
"age": 16,
"age": 10,
"breed": "gsd",
"temperament": "chill"
}
{
"id": 3,
"name": "raffi"
"temperament": "aggressive"
}
```
```
View

This file was deleted.

Oops, something went wrong.
View

This file was deleted.

Oops, something went wrong.
View
@@ -9,6 +9,7 @@ use fdset_discovery::DiscoveryError;
pub enum PqrsError {
FdsetDiscoveryError(DiscoveryError),
DecodeError(DecodeError),
ArgumentError,
}
#[derive(Debug)]
@@ -22,6 +23,9 @@ pub enum DecodeError {
impl fmt::Display for PqrsError {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match *self {
PqrsError::ArgumentError => {
writeln!(f, "Invalid arguments")
}
PqrsError::FdsetDiscoveryError(ref err) => err.fmt(f),
PqrsError::DecodeError(ref err) => err.fmt(f),
}
@@ -31,13 +35,15 @@ impl fmt::Display for PqrsError {
impl Error for PqrsError {
fn description(&self) -> &str {
match *self {
PqrsError::ArgumentError => "Invalid arguments",
PqrsError::FdsetDiscoveryError(ref err) => err.description(),
PqrsError::DecodeError(ref err) => err.description(),
}
}
fn cause(&self) -> Option<&Error> {
match *self {
PqrsError::ArgumentError => None,
PqrsError::FdsetDiscoveryError(ref err) => Some(err),
PqrsError::DecodeError(ref err) => Some(err),
}
Oops, something went wrong.

0 comments on commit 1f55ea9

Please sign in to comment.