Permalink
Browse files

nsq: add README and code comments (gopkgdoc)

  • Loading branch information...
1 parent 762dd84 commit 2fb1275ee7a78145d9f1d3aac0aed1d2e29611a3 @mreiferson mreiferson committed Oct 31, 2012
Showing with 253 additions and 85 deletions.
  1. +34 −0 nsq/README.md
  2. +4 −0 nsq/api_request.go
  3. +15 −6 nsq/command.go
  4. +15 −10 nsq/errors.go
  5. +20 −2 nsq/lookup_peer.go
  6. +8 −6 nsq/message.go
  7. +41 −11 nsq/protocol.go
  8. +96 −37 nsq/reader.go
  9. +2 −1 nsq/states.go
  10. +6 −0 nsq/version.go
  11. +3 −3 nsqd/diskqueue.go
  12. +4 −4 nsqd/lookup.go
  13. +4 −4 nsqd/protocol_v2.go
  14. +1 −1 pynsq/README.md
View
@@ -0,0 +1,34 @@
+## nsq
+
+`nsq` is the official Go package for [NSQ][nsq].
+
+It provides the building blocks for developing applications on the [NSQ][nsq] platform in Go.
+
+Low-level functions and types are provided to communicate over the [NSQ protocol][protocol] as well
+as a high-level [Reader][reader] library to implement consumers.
+
+See the [examples][examples] directory for utilities built using this package that provide support
+for common tasks.
+
+### Installing
+
+ $ go get github.com/bitly/nsq/nsq
+
+### Importing
+
+```go
+import "github.com/bitly/nsq/nsq"
+```
+
+### Docs
+
+See [gopkgdoc][nsq_gopkgdoc] for pretty documentation or:
+
+ # in the nsq package directory
+ $ go doc
+
+[nsq]: https://github.com/bitly/nsq
+[nsq_gopkgdoc]: http://go.pkgdoc.org/github.com/bitly/nsq/nsq
+[protocol]: https://github.com/bitly/nsq/blob/master/docs/protocol.md
+[examples]: https://github.com/bitly/nsq/tree/master/examples
+[reader]: http://go.pkgdoc.org/github.com/bitly/nsq/nsq#Reader
View
@@ -24,6 +24,10 @@ func (c *deadlinedConn) Write(b []byte) (n int, err error) {
return c.Conn.Write(b)
}
+// ApiRequest is a helper function to perform an HTTP request
+// and parse our NSQ daemon's expected response format, with deadlines.
+//
+// {"status_code":200, "status_txt":"OK", "data":{...}}
func ApiRequest(endpoint string) (*simplejson.Json, error) {
transport := &http.Transport{
Dial: func(netw, addr string) (net.Conn, error) {
View
@@ -11,19 +11,24 @@ import (
"strings"
)
+// Command represents a command from a client to an NSQ daemon
type Command struct {
Name []byte
Params [][]byte
Body []byte
}
+// String returns the name and parameters of the Command
func (c *Command) String() string {
if len(c.Params) > 0 {
return fmt.Sprintf("%s %s", c.Name, string(bytes.Join(c.Params, []byte(" "))))
}
return string(c.Name)
}
+// Write serializes the Command to the supplied Writer.
+//
+// It is suggested that the target Writer is buffered to avoid performing many system calls.
func (c *Command) Write(w io.Writer) error {
_, err := w.Write(c.Name)
if err != nil {
@@ -69,7 +74,8 @@ func Announce(topic string, channel string, port int, ips []string) *Command {
return &Command{[]byte("ANNOUNCE"), params, []byte(strings.Join(ips, "\n"))}
}
-// Identify is the first message sent to the Lookupd and provides information about the client
+// Identify creates a new Command to provide information about the client to nsqlookupd.
+// After connecting, it is the first message sent to nsqlookupd.
func Identify(version string, tcpPort int, httpPort int, address string) *Command {
body, err := json.Marshal(struct {
Version string `json:"version"`
@@ -88,7 +94,7 @@ func Identify(version string, tcpPort int, httpPort int, address string) *Comman
return &Command{[]byte("IDENTIFY"), [][]byte{}, body}
}
-// REGISTER a topic/channel for this nsqd
+// Register creates a new Command to add a topic/channel for the connected nsqd
func Register(topic string, channel string) *Command {
params := [][]byte{[]byte(topic)}
if len(channel) > 0 {
@@ -97,7 +103,7 @@ func Register(topic string, channel string) *Command {
return &Command{[]byte("REGISTER"), params, nil}
}
-// UNREGISTER removes a topic/channel from this nsqd
+// Unregister creates a new Command to remove a topic/channel for the connected nsqd
func UnRegister(topic string, channel string) *Command {
params := [][]byte{[]byte(topic)}
if len(channel) > 0 {
@@ -118,6 +124,8 @@ func Publish(topic string, body []byte) *Command {
return &Command{[]byte("PUB"), params, body}
}
+// MultiPublish creates a new Command to write more than one message to a given topic.
+// This is useful for high-throughput situations to avoid roundtrips and saturate the pipe.
func MultiPublish(topic string, bodies [][]byte) (*Command, error) {
var params = [][]byte{[]byte(topic)}
@@ -147,8 +155,7 @@ func MultiPublish(topic string, bodies [][]byte) (*Command, error) {
return &Command{[]byte("MPUB"), params, body}, nil
}
-// Subscribe creates a new Command to subscribe
-// to the given topic/channel
+// Subscribe creates a new Command to subscribe to the given topic/channel
func Subscribe(topic string, channel string, shortIdentifier string, longIdentifier string) *Command {
var params = [][]byte{[]byte(topic), []byte(channel), []byte(shortIdentifier), []byte(longIdentifier)}
return &Command{[]byte("SUB"), params, nil}
@@ -179,11 +186,13 @@ func Requeue(id []byte, timeoutMs int) *Command {
// StartClose creates a new Command to indicate that the
// client would like to start a close cycle. nsqd will no longer
// send messages to a client in this state and the client is expected
-// to ACK after which it can finish pending messages and close the connection
+// finish pending messages and close the connection
func StartClose() *Command {
return &Command{[]byte("CLS"), nil, nil}
}
+// Nop creates a new Command that has no effect server side.
+// Commonly used to respond to heartbeats
func Nop() *Command {
return &Command{[]byte("NOP"), nil, nil}
}
View
@@ -1,28 +1,33 @@
package nsq
-// E_INVALID
-// E_BAD_PROTOCOL
-// E_BAD_TOPIC
-// E_BAD_CHANNEL
-// E_BAD_BODY
-// E_REQ_FAILED
-// E_FIN_FAILED
-// E_PUT_FAILED
-// E_MISSING_PARAMS
-
+// ClientErr provides a way for NSQ daemons to log a human reabable
+// error string and return a machine readable string to the client.
+//
+// E_INVALID
+// E_BAD_PROTOCOL
+// E_BAD_TOPIC
+// E_BAD_CHANNEL
+// E_BAD_BODY
+// E_REQ_FAILED
+// E_FIN_FAILED
+// E_PUT_FAILED
+// E_MISSING_PARAMS
type ClientErr struct {
Err string
Desc string
}
+// Error returns the machine readable form
func (e *ClientErr) Error() string {
return e.Err
}
+// Description return the human readable form
func (e *ClientErr) Description() string {
return e.Desc
}
+// NewClientErr creates a ClientErr with the supplied human and machine readable strings
func NewClientErr(err string, description string) *ClientErr {
return &ClientErr{err, description}
}
View
@@ -7,22 +7,29 @@ import (
)
// LookupPeer is a low-level type for connecting/reading/writing to nsqlookupd
+//
+// A LookupPeer instance is designed to connect lazily to nsqlookupd and reconnect
+// gracefully (i.e. it is all handled by the library). Clients can simply use the
+// Command interface to perform a round-trip.
type LookupPeer struct {
addr string
conn net.Conn
state int32
connectCallback func(*LookupPeer)
- PeerInfo PeerInfo
+ Info PeerInfo
}
+// PeerInfo contains metadata for a LookupPeer instance (and is JSON marshalable)
type PeerInfo struct {
TcpPort int `json:"tcp_port"`
HttpPort int `json:"http_port"`
Version string `json:"version"`
Address string `json:"address"`
}
-// NewLookupPeer creates a new LookupPeer instance
+// NewLookupPeer creates a new LookupPeer instance connecting to the supplied address.
+//
+// The supplied connectCallback will be called *every* time the instance connects.
func NewLookupPeer(addr string, connectCallback func(*LookupPeer)) *LookupPeer {
return &LookupPeer{
addr: addr,
@@ -31,6 +38,7 @@ func NewLookupPeer(addr string, connectCallback func(*LookupPeer)) *LookupPeer {
}
}
+// Connect will Dial the specified address, with timeouts
func (lp *LookupPeer) Connect() error {
log.Printf("LOOKUP connecting to %s", lp.addr)
conn, err := net.DialTimeout("tcp", lp.addr, time.Second)
@@ -41,24 +49,34 @@ func (lp *LookupPeer) Connect() error {
return nil
}
+// String returns the specified address
func (lp *LookupPeer) String() string {
return lp.addr
}
+// Read implements the io.Reader interface, adding deadlines
func (lp *LookupPeer) Read(data []byte) (int, error) {
lp.conn.SetReadDeadline(time.Now().Add(time.Second))
return lp.conn.Read(data)
}
+// Write implements the io.Writer interface, adding deadlines
func (lp *LookupPeer) Write(data []byte) (int, error) {
lp.conn.SetWriteDeadline(time.Now().Add(time.Second))
return lp.conn.Write(data)
}
+// Close implements the io.Closer interface
func (lp *LookupPeer) Close() error {
return lp.conn.Close()
}
+// Command performs a round-trip for the specified Command.
+//
+// It will lazily connect to nsqlookupd and gracefully handle
+// reconnecting in the event of a failure.
+//
+// It returns the response from nsqlookupd as []byte
func (lp *LookupPeer) Command(cmd *Command) ([]byte, error) {
initialState := lp.state
if lp.state != StateConnected {
View
@@ -8,18 +8,19 @@ import (
"time"
)
+// The number of bytes for a Message.Id
const MsgIdLength = 16
// Message is the fundamental data type containing
-// the id, body, and meta-data
+// the id, body, and metadata
type Message struct {
Id []byte
Body []byte
Timestamp int64
Attempts uint16
}
-// NewMessage creates a Message, initializes some meta-data,
+// NewMessage creates a Message, initializes some metadata,
// and returns a pointer
func NewMessage(id []byte, body []byte) *Message {
return &Message{
@@ -29,7 +30,7 @@ func NewMessage(id []byte, body []byte) *Message {
}
}
-// EncodeBytes serializes the message into a new []byte
+// EncodeBytes serializes the message into a new, returned, []byte
func (m *Message) EncodeBytes() ([]byte, error) {
var buf bytes.Buffer
err := m.Write(&buf)
@@ -39,7 +40,9 @@ func (m *Message) EncodeBytes() ([]byte, error) {
return buf.Bytes(), nil
}
-// Write serializes the message into the supplied writer
+// Write serializes the message into the supplied writer.
+//
+// It is suggested that the target Writer is buffered to avoid performing many system calls.
func (m *Message) Write(w io.Writer) error {
err := binary.Write(w, binary.BigEndian, &m.Timestamp)
if err != nil {
@@ -64,8 +67,7 @@ func (m *Message) Write(w io.Writer) error {
return nil
}
-// DecodeMessage deseralizes data (as []byte) and creates/returns
-// a pointer to a new Message
+// DecodeMessage deseralizes data (as []byte) and creates a new Message
func DecodeMessage(byteBuf []byte) (*Message, error) {
var timestamp int64
var attempts uint16
Oops, something went wrong.

0 comments on commit 2fb1275

Please sign in to comment.