Permalink
Browse files

nsqadmin - UI to view stats, and perform administrative actions

* nsqlookupd times out client information
* nsqlookupd updates client info on pings
* nsqlookupd manages separate tcp/http port info
* new nsqd delete_channel endpoint
* nsqadmin UI for browsing stats, calling delete_channel and empty_channel api's
* updated README docs with usage info for each binary, and API docs
  • Loading branch information...
1 parent a464980 commit 93271f41e989ccddc3443fccb9736ddf90940a57 @jehiah jehiah committed Sep 4, 2012
View
@@ -2,6 +2,7 @@ nsqd/nsqd
nsqlookupd/nsqlookupd
nsqreader/nsqreader
nsqstatsd/nsqstatsd
+nsqadmin/nsqadmin
examples/nsq_to_file/nsq_to_file
examples/nsq_pubsub/nsq_pubsub
examples/nsq_to_http/nsq_to_http
View
@@ -3,6 +3,16 @@
An infrastructure component designed to support highly available, distributed, fault tolerant,
loosely guaranteed message processing.
+## Components
+
+* `nsqd` is the daemon that receives, buffers, and delivers messages to clients.
+* `nsqlookupd` is the daemon that manages topology information
+* `nsqadmin` is the Web UI to view message statistics and to perform administrative tasks
+* `nsq` is a go library for writing nsq clients
+* `pynsq` is a python library for writing nsq clients
+
+Examples: several example nsq clients are included: `nsq_pubsub`, `nsq_to_file`, `nsq_to_http`
+
## Background
[simplequeue][1] was developed, you guessed it, as a *simple* in-memory message queue with an HTTP
@@ -1,36 +0,0 @@
-package main
-
-import (
- "flag"
- "io"
- "log"
- "net/http"
- "strconv"
- "time"
-)
-
-var bindAddress = flag.String("address", "", "address to bind to")
-var httpPort = flag.Int("http-port", 5152, "port to listen on for HTTP connections")
-var debugMode = flag.Bool("debug", false, "enable debug mode")
-
-func pingHandler(w http.ResponseWriter, req *http.Request) {
- w.Header().Set("Content-Length", "2")
- io.WriteString(w, "OK")
-}
-
-func main() {
- flag.Parse()
- fqAddress := *bindAddress + ":" + strconv.Itoa(*httpPort)
- log.Printf("listening for http requests on %s", fqAddress)
- s := &http.Server{
- Addr: fqAddress,
- Handler: http.DefaultServeMux,
- ReadTimeout: 10 * time.Second,
- WriteTimeout: 10 * time.Second,
- MaxHeaderBytes: 1 << 20,
- }
- http.HandleFunc("/ping", pingHandler)
- http.Handle("/static", http.FileServer(http.Dir("static")))
- log.Fatal(s.ListenAndServe())
-
-}
View
@@ -1,5 +1,5 @@
#!/bin/bash
-for d in nsq nsqd nsqlookupd util util/pqueue examples/nsq_to_file examples/nsqstatsd examples/nsq_pubsub examples/nsq_to_http; do
+for d in nsq nsqd nsqlookupd nsqadmin util util/pqueue examples/nsq_to_file examples/nsq_pubsub examples/nsq_to_http; do
pushd $d
go fmt
popd
View
@@ -287,12 +287,18 @@ func (q *Reader) queryLookupd() {
}
// do something with the data
- // {"data":{"channels":[],"producers":[{"address":"jehiah-air.local","port":"4150"}],"timestamp":1340152173},"status_code":200,"status_txt":"OK"}
+ // {"data":{"channels":[],"producers":[{"address":"jehiah-air.local","port":4150, "tpc_port":4150, "http_port":4151}],"timestamp":1340152173},"status_code":200,"status_txt":"OK"}
producers, _ := data.Get("data").Get("producers").Array()
for _, producer := range producers {
producerData, _ := producer.(map[string]interface{})
address := producerData["address"].(string)
- port := int(producerData["port"].(float64))
+ var port int
+ if _, ok := producerData["tcp_port"]; ok {
+ port = int(producerData["tcp_port"].(float64))
+ } else {
+ // backwards compatible
+ port = int(producerData["port"].(float64))
+ }
// make an address, start a connection
joined := net.JoinHostPort(address, strconv.Itoa(port))
View
@@ -0,0 +1,13 @@
+nsqadmin
+========
+
+`nsqadmin` is the Web UI to view message statistics and to perform administrative tasks like removing a channel.
+
+
+Command Line Options
+--------------------
+
+ Usage of ./nsqadmin:
+ -http-address="0.0.0.0:4171": <addr>:<port> to listen on for HTTP clients
+ -lookupd-http-address=[]: lookupd HTTP address (may be given multiple times)
+ -version=false: print version string
View
@@ -0,0 +1,191 @@
+package main
+
+import (
+ "../util"
+ "fmt"
+ "html/template"
+ "io"
+ "log"
+ "net"
+ "net/http"
+ "net/url"
+ "regexp"
+ "strings"
+ "time"
+)
+
+var templates *template.Template
+
+func httpServer(listener net.Listener) {
+ var err error
+ templates, err = template.ParseGlob(fmt.Sprintf("%s/*.html", *templateDir))
+ if err != nil {
+ log.Printf("ERROR: %s", err.Error())
+ }
+
+ log.Printf("HTTP: listening on %s", listener.Addr().String())
+
+ handler := http.NewServeMux()
+ handler.HandleFunc("/ping", pingHandler)
+ handler.HandleFunc("/", indexHandler)
+ handler.HandleFunc("/topic/", topicHandler)
+ handler.HandleFunc("/delete_channel", removeChannelHandler)
+ handler.HandleFunc("/empty_channel", emptyChannelHandler)
+
+ server := &http.Server{
+ Handler: handler,
+ ReadTimeout: 5 * time.Second,
+ WriteTimeout: 5 * time.Second,
+ }
+ err = server.Serve(listener)
+ // theres no direct way to detect this error because it is not exposed
+ if err != nil && !strings.Contains(err.Error(), "use of closed network connection") {
+ log.Printf("ERROR: http.Serve() - %s", err.Error())
+ }
+
+ log.Printf("HTTP: closing %s", listener.Addr().String())
+}
+
+func pingHandler(w http.ResponseWriter, req *http.Request) {
+ w.Header().Set("Content-Length", "2")
+ io.WriteString(w, "OK")
+}
+
+func indexHandler(w http.ResponseWriter, req *http.Request) {
+ topics, _ := getLookupdTopics(lookupdAddresses)
+ p := struct {
+ Topics []string
+ Version string
+ }{
+ Topics: topics,
+ Version: VERSION,
+ }
+ err := templates.ExecuteTemplate(w, "index.html", p)
+ if err != nil {
+ log.Printf("Template Error %s", err.Error())
+ http.Error(w, "Template Error", 500)
+ }
+}
+
+func topicHandler(w http.ResponseWriter, req *http.Request) {
+ var urlRegex = regexp.MustCompile(`^/topic/([a-zA-Z0-9_-]+)(/([-_a-zA-Z0-9]+(#ephemeral)?))?$`)
+ matches := urlRegex.FindStringSubmatch(req.URL.Path)
+ if len(matches) == 0 {
+ http.NotFound(w, req)
+ return
+ }
+ topic := matches[1]
+ var selectedChannel string
+ if len(matches) >= 4 {
+ selectedChannel = matches[3]
+ }
+
+ log.Printf("Topic:%s Channel:%s", topic, selectedChannel)
+
+ producers, _ := getLookupdTopicProducers(topic, lookupdAddresses)
+ hostStats, channelStats, _ := getNSQDStats(producers, topic)
+ var selectedChannelInfo ChannelStats
+ var ok bool
+ if len(selectedChannel) != 0 {
+ selectedChannelInfo, ok = channelStats[selectedChannel]
+ if !ok {
+ http.NotFound(w, req)
+ return
+ }
+ selectedChannelInfo.Selected = true
+ channelStats[selectedChannel] = selectedChannelInfo
+ }
+
+ p := struct {
+ Version string
+ Topic string
+ SelectedChannel string
+ TopicProducers []string
+ HostStats []HostStats
+ ChannelStats map[string]ChannelStats
+ SelectedChannelInfo ChannelStats
+ }{
+ Version: VERSION,
+ Topic: topic,
+ SelectedChannel: selectedChannel,
+ TopicProducers: producers,
+ HostStats: hostStats,
+ ChannelStats: channelStats,
+ SelectedChannelInfo: selectedChannelInfo,
+ }
+ err := templates.ExecuteTemplate(w, "topic.html", p)
+ if err != nil {
+ log.Printf("Template Error %s", err.Error())
+ http.Error(w, "Template Error", 500)
+ }
+}
+
+func removeChannelHandler(w http.ResponseWriter, req *http.Request) {
+ reqParams, err := util.NewReqParams(req)
+ if err != nil {
+ log.Printf("ERROR: failed to parse request params - %s", err.Error())
+ http.Error(w, "INVALID_REQUEST", 500)
+ return
+ }
+
+ topicName, channelName, err := util.GetTopicChannelArgs(reqParams)
+ if err != nil {
+ http.Error(w, err.Error(), 500)
+ return
+ }
+
+ for _, addr := range lookupdAddresses {
+ endpoint := fmt.Sprintf("http://%s/delete_channel?topic=%s&channel=%s", addr, url.QueryEscape(topicName), url.QueryEscape(channelName))
+ log.Printf("LOOKUPD: querying %s", endpoint)
+
+ _, err := makeReq(endpoint)
+ if err != nil {
+ log.Printf("ERROR: lookupd %s - %s", endpoint, err.Error())
+ continue
+ }
+ }
+
+ producers, _ := getLookupdTopicProducers(topicName, lookupdAddresses)
+ for _, addr := range producers {
+ endpoint := fmt.Sprintf("http://%s/delete_channel?topic=%s&channel=%s", addr, url.QueryEscape(topicName), url.QueryEscape(channelName))
+ log.Printf("NSQD: calling %s", endpoint)
+ _, err := makeReq(endpoint)
+ if err != nil {
+ log.Printf("ERROR: nsqd %s - %s", endpoint, err.Error())
+ continue
+ }
+ }
+
+ http.Redirect(w, req, fmt.Sprintf("/topic/%s", url.QueryEscape(topicName)), 302)
+
+}
+
+func emptyChannelHandler(w http.ResponseWriter, req *http.Request) {
+ reqParams, err := util.NewReqParams(req)
+ if err != nil {
+ log.Printf("ERROR: failed to parse request params - %s", err.Error())
+ http.Error(w, "INVALID_REQUEST", 500)
+ return
+ }
+
+ topicName, channelName, err := util.GetTopicChannelArgs(reqParams)
+ if err != nil {
+ http.Error(w, err.Error(), 500)
+ return
+ }
+
+ producers, _ := getLookupdTopicProducers(topicName, lookupdAddresses)
+ for _, addr := range producers {
+ endpoint := fmt.Sprintf("http://%s/empty_channel?topic=%s&channel=%s", addr, url.QueryEscape(topicName), url.QueryEscape(channelName))
+ log.Printf("NSQD: calling %s", endpoint)
+
+ _, err := makeReq(endpoint)
+ if err != nil {
+ log.Printf("ERROR: nsqd %s - %s", endpoint, err.Error())
+ continue
+ }
+ }
+
+ http.Redirect(w, req, fmt.Sprintf("/topic/%s", url.QueryEscape(topicName)), 302)
+
+}
Oops, something went wrong.

0 comments on commit 93271f4

Please sign in to comment.