Scala NSQ client, based on netty, typesafe config, slf4j and json4s.
Client requires scala 2.11.
For SBT users:
libraryDependencies += "com.github.mitallast" %% "scala-nsq" % "1.10"
For Maven users:
<dependency>
<groupId>com.github.mitallast</groupId>
<artifactId>scala-nsq_2.11</artifactId>
<version>1.10</version>
</dependency>
Configuration based on typesafe config library. See reference for available configuration options
By default, client connect to nsqlookupd
node at http://127.0.0.1:4161
, you can override
default settings using standard typesafehub/config
ways.
For consumer instance, client send GET /lookup?topic=...
for concrete topic, and
send GET /nodes
for provider instance to retrieve list of addresses nsqd
nodes and connect to them.
and request GET /lookup?topic=...
for concrete topic, o
import org.mitallast.nsq._
val client = NSQClient()
Also, you can provide it programmatically:
import org.mitallast.nsq._
import com.typesafe.config.Config
val config: Config = ...
val client = NSQClient(config)
By default, client use NSQLookupDefault
with configurable addresses
to nsqlookupd
instances. By default, http://127.0.0.1:4161
address using.
Set config property nsq.lookup-address
and lookup-period
to override.
Also, you can implement trait NSQLookup
and provide it programmatically:
import org.mitallast.nsq._
import java.net.InetSocketAddress
val lookup = new NSQLookup {
def nodes(): List[SocketAddress] = ...
def lookup(topic: String): List[SocketAddress] = ...
}
val client = NSQClient(lookup)
// or with config:
val config: Config = ...
val client = NSQClient(lookup, config)
val producer = client.producer()
val listener = match {
case Success(_:OK) => log.info("message pub successfully")
case Failure(error) => log.info("message pub failed", error)
}
// publish one message
producer.pub(topic="test", data=Array[Byte](1,0,1,1)).onComplete(listener)
producer.pubStr(topic="test", data="hello").onComplete(listener)
// publish multiple messages
producer.mpub(topic="test", data=Seq(Array[Byte](1,0,1,1), Array[Byte](1,0,1,1))).onComplete(listener)
producer.mpubStr(topic="test", data=Seq("hello", "world")).onComplete(listener)
Consumer automatically send RDY <number>\n
command. By default, 1 message.
Set config property nsq.max-ready-count
to override.
val consumer = client.consumer(topic="test", channel="default") { message =>
log.info("received: {}", msg)
// send `TOUCH msgid` message request
msg.touch()
// send `REQ msdid 100` message request
msg.req(100)
// send `FIN msgid` message request
msg.fin()
}