Rust client for the NSQ realtime message processing system
Rust
Switch branches/tags
Nothing to show
Clone or download
Fetching latest commit…
Cannot retrieve the latest commit at this time.
Permalink
Failed to load latest commit information.
examples
src
.gitignore
.travis.yml
Cargo.toml
LICENSE
README.md

README.md

Build Status Crates.io

nsqueue

A Tokio based client implementation for the NSQ realtime message processing system

WORK IN PROGRESS

Current features

  • PUB
  • SUB
  • Discovery
  • Backoff
  • TLS
  • Snappy
  • Auth

Launch NSQ

$ ./nsqlookupd & 
$ ./nsqd --lookupd-tcp-address=127.0.0.1:4160 &
$ ./nsqadmin --lookupd-http-address=127.0.0.1:4161 &

MPUB

extern crate futures;
extern crate tokio_core;
extern crate nsqueue;

use futures::Future;
use tokio_core::reactor::Core;

use nsqueue::config::*;
use nsqueue::producer::*;

fn main() {
    let mut core = Core::new().unwrap();
    let handle = core.handle();
    let addr = "127.0.0.1:4150".parse().unwrap();

    let mut messages: Vec<String> = Vec::new();
    messages.push("First message".into());
    messages.push("Second message".into());

    let res = Producer::connect(&addr, &handle, Config::default())
       .and_then(|conn| {
           conn.mpublish("some_topic".into(), messages)
           .and_then(move |response| {
              println!("Response: {:?}", response);
              Ok(())
           })
       });
    core.run(res).unwrap();
}

SUB

extern crate futures;
extern crate tokio_core;
extern crate nsqueue;

use futures::{Stream, Future};
use tokio_core::reactor::Core;
use nsqueue::config::*;
use nsqueue::consumer::*;

fn main() {
     let mut core = Core::new().unwrap();
     let handle = core.handle();
     let addr = "127.0.0.1:4150".parse().unwrap();

     core.run(
         Consumer::connect(&addr, &handle, Config::default())
         .and_then(|conn| {
            conn.subscribe("some_topic".into(), "some_channel".into())
            .and_then(move |response| {
                let ret = response.for_each(move |message| {
                    if message.message_id == "_heartbeat_" {
                        conn.nop();
                    } else {
                        println!("Response {:?} {:?}", message.message_id, message.message_body);
                        conn.fin(message.message_id); // Inform NSQ (Message consumed)
                    }
                    Ok(())
                });
                ret
            })
         })
     ).unwrap();
}

License

Licensed under either of