Permalink
Browse files

completes MVP

  • Loading branch information...
1 parent f013853 commit c66b79271ec509fa533cb2c0fb1ca35e6a4a7d92 @mhausenblas committed Dec 11, 2016
Showing with 156 additions and 2 deletions.
  1. +3 −0 .gitignore
  2. +29 −2 README.md
  3. BIN img/example-session.png
  4. +124 −0 main.go
View
@@ -1,3 +1,6 @@
+.DS_Store
+dnpipes
+
# Compiled Object files, Static and Dynamic libs (Shared Objects)
*.o
*.a
View
@@ -17,8 +17,35 @@ The reference implementation of dnpipes is based on [DC/OS](https://dcos.io) usi
### Install
-TBD.
+From source:
+
+```bash
+$ go get github.com/mhausenblas/dnpipes
+$ go build
+```
+
+From binaries:
+
+TBD
### Use
-TBD.
+An example session looks as follows. I've set up two terminals, in one I'm starting the publisher:
+
+```bash
+$ ./dnpipes --mode=publisher --broker=broker-0.kafka.mesos:9951 --topic=test
+PUBLISH> hello!
+PUBLISH> bye
+```
+
+The second terminal has a subscriber running:
+
+```bash
+$ ./dnpipes --mode=subscriber --broker=broker-0.kafka.mesos:9951 --topic=test 2>/dev/null
+hello!
+bye
+```
+
+And here's a screen shot of the whole thing:
+
+![screen shot of example dnpipes session](img/example-session.png)
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
View
@@ -0,0 +1,124 @@
+package main
+
+import (
+ "flag"
+ "fmt"
+ "github.com/Shopify/sarama"
+ log "github.com/Sirupsen/logrus"
+ "os"
+)
+
+const (
+ VERSION string = "0.1.0"
+ MODE_PUBLISHER = "publisher"
+ MODE_SUBSCRIBER = "subscriber"
+)
+
+var (
+ version bool
+ // operations mode of this agent:
+ omode string
+ // FQDN/IP + port of a Kafka broker:
+ broker string
+ // the active Kafka topic:
+ topic string
+ // the Kafka producer:
+ producer sarama.SyncProducer
+)
+
+func init() {
+ flag.BoolVar(&version, "version", false, "Display version information")
+ flag.StringVar(&omode, "mode", MODE_SUBSCRIBER, fmt.Sprintf("The operations mode of this agent, can be either \"%s\" or \"%s\".", MODE_PUBLISHER, MODE_SUBSCRIBER))
+ flag.StringVar(&broker, "broker", "", "The FQDN or IP address and port of a Kafka broker. Example: broker-1.kafka.mesos:9382 or localhost:9092")
+ flag.StringVar(&topic, "topic", "", "The topic to publish to or pull from. Example: test")
+
+ flag.Usage = func() {
+ fmt.Printf("Usage: %s [args]\n\n", os.Args[0])
+ fmt.Println("Arguments:")
+ flag.PrintDefaults()
+ }
+ flag.Parse()
+}
+
+func about() {
+ fmt.Printf("This is the dnpipes reference implementation in version %s\n", VERSION)
+}
+
+func handlePublisher() {
+ if p, err := sarama.NewSyncProducer([]string{broker}, nil); err != nil {
+ log.Error(err)
+ os.Exit(1)
+ } else {
+ producer = p
+ }
+ defer func() {
+ if err := producer.Close(); err != nil {
+ log.Error(err)
+ os.Exit(1)
+ }
+ }()
+ imsg := ""
+ for {
+ fmt.Print("PUBLISH> ")
+ fmt.Scanln(&imsg)
+ msg := &sarama.ProducerMessage{Topic: string(topic), Value: sarama.StringEncoder(imsg)}
+ if _, _, err := producer.SendMessage(msg); err != nil {
+ log.Error("Failed to send message ", err)
+ } else {
+ log.Debug(fmt.Sprintf("%#v", msg))
+ }
+ }
+}
+
+func handleSubscriber() {
+ var consumer sarama.Consumer
+ if c, err := sarama.NewConsumer([]string{broker}, nil); err != nil {
+ log.WithFields(log.Fields{"func": "handleSubscriber"}).Error(err)
+ return
+ } else {
+ consumer = c
+ }
+ defer func() {
+ if err := consumer.Close(); err != nil {
+ log.WithFields(log.Fields{"func": "handleSubscriber"}).Error(err)
+ }
+ }()
+
+ if partitionConsumer, err := consumer.ConsumePartition(topic, 0, sarama.OffsetNewest); err != nil {
+ log.WithFields(log.Fields{"func": "handleSubscriber"}).Error(err)
+ return
+ } else {
+ defer func() {
+ if err := partitionConsumer.Close(); err != nil {
+ log.WithFields(log.Fields{"func": "handleSubscriber"}).Error(err)
+ }
+ }()
+ for {
+ msg := <-partitionConsumer.Messages()
+ log.Debug(fmt.Sprintf("%#v", msg))
+ fmt.Println(string(msg.Value))
+ }
+ }
+}
+
+func main() {
+ if version {
+ about()
+ os.Exit(0)
+ }
+ if broker == "" {
+ flag.Usage()
+ os.Exit(1)
+ }
+
+ switch omode {
+ case MODE_PUBLISHER:
+ handlePublisher()
+ case MODE_SUBSCRIBER:
+ handleSubscriber()
+ default:
+ fmt.Println("Usage error, you provided an unknown mode")
+ flag.Usage()
+ os.Exit(1)
+ }
+}

0 comments on commit c66b792

Please sign in to comment.