Permalink
Browse files

implements RESET

  • Loading branch information...
1 parent a8cc0ed commit 829bbc2d61e698cf61e43a569a261fcf07985166 @mhausenblas committed Dec 11, 2016
Showing with 33 additions and 5 deletions.
  1. +33 −5 main.go
View
@@ -5,7 +5,9 @@ import (
"fmt"
"github.com/Shopify/sarama"
log "github.com/Sirupsen/logrus"
+ "github.com/samuel/go-zookeeper/zk"
"os"
+ "time"
)
const (
@@ -44,6 +46,28 @@ func about() {
fmt.Printf("This is the dnpipes reference implementation in version %s\n", VERSION)
}
+func handleReset() {
+ zks := []string{"leader.mesos:2181"}
+ conn, _, _ := zk.Connect(zks, time.Second)
+ partitions, stat, _ := conn.Children("/dcos-service-kafka/brokers/topics/" + topic + "/partitions")
+ fmt.Println(fmt.Sprintf("%+v - %+v", partitions, stat))
+ for _, p := range partitions {
+ if err := conn.Delete("/dcos-service-kafka/brokers/topics/"+topic+"/partitions/"+p+"/state", -1); err != nil {
+ log.WithFields(log.Fields{"func": "handleSubscriber"}).Error("There was a problem resetting the topic:", err)
+ }
+ if err := conn.Delete("/dcos-service-kafka/brokers/topics/"+topic+"/partitions/"+p, -1); err != nil {
+ log.WithFields(log.Fields{"func": "handleSubscriber"}).Error("There was a problem resetting the topic:", err)
+ }
+ }
+ if err := conn.Delete("/dcos-service-kafka/brokers/topics/"+topic+"/partitions", -1); err != nil {
+ log.WithFields(log.Fields{"func": "handleSubscriber"}).Error("There was a problem resetting the topic:", err)
+ }
+ if err := conn.Delete("/dcos-service-kafka/brokers/topics/"+topic, -1); err != nil {
+ log.WithFields(log.Fields{"func": "handleSubscriber"}).Error("There was a problem resetting the topic:", err)
+ }
+ fmt.Println("reset this dnpipes")
+}
+
func handlePublisher() {
if p, err := sarama.NewSyncProducer([]string{broker}, nil); err != nil {
log.Error(err)
@@ -59,13 +83,17 @@ func handlePublisher() {
}()
imsg := ""
for {
- fmt.Print("PUBLISH> ")
+ fmt.Print("> ")
fmt.Scanln(&imsg)
- msg := &sarama.ProducerMessage{Topic: string(topic), Value: sarama.StringEncoder(imsg)}
- if _, _, err := producer.SendMessage(msg); err != nil {
- log.Error("Failed to send message ", err)
+ if imsg == "RESET" {
+ handleReset()
} else {
- log.Debug(fmt.Sprintf("%#v", msg))
+ msg := &sarama.ProducerMessage{Topic: string(topic), Value: sarama.StringEncoder(imsg)}
+ if _, _, err := producer.SendMessage(msg); err != nil {
+ log.WithFields(log.Fields{"func": "handlePublisher"}).Error("Failed to send message ", err)
+ } else {
+ log.Debug(fmt.Sprintf("%#v", msg))
+ }
}
}
}

0 comments on commit 829bbc2

Please sign in to comment.