Skip to content

Commit

Permalink
Added support for writing messages to files
Browse files Browse the repository at this point in the history
  • Loading branch information
Triston Whetten committed Feb 23, 2021
1 parent 5b9d7b4 commit 4be4273
Showing 1 changed file with 32 additions and 0 deletions.
32 changes: 32 additions & 0 deletions apps/nsq_tail/nsq_tail.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ var (
maxInFlight = flag.Int("max-in-flight", 200, "max number of messages to allow in flight")
totalMessages = flag.Int("n", 0, "total messages to show (will wait if starved)")
printTopic = flag.Bool("print-topic", false, "print topic name where message was received")
writeToFiles = flag.Bool("write-to-files", false, "prints messages to file - one file per message")
filePrefix = flag.String("file-prefix", "message", "filename prefix when printing to file")

nsqdTCPAddrs = app.StringArray{}
lookupdHTTPAddrs = app.StringArray{}
Expand All @@ -40,9 +42,35 @@ type TailHandler struct {
messagesShown int
}

func (th *TailHandler) writeMessageToFile(message []byte) error {
f, err := os.OpenFile(fmt.Sprintf("%s-%d.bin", *filePrefix, th.messagesShown), os.O_RDWR|os.O_CREATE, 0755)
if err != nil {
log.Fatalf("ERROR: failed to open file to write message to - %s", err)
}

defer f.Close()

_, err = f.Write(message)
if err != nil {
log.Fatalf("ERROR: failed to write to file - %s", err)
}
return nil
}

func (th *TailHandler) HandleMessage(m *nsq.Message) error {
th.messagesShown++

if *writeToFiles {
err := th.writeMessageToFile(m.Body)
if err != nil {
log.Fatalf("ERROR: failed to write message to file - %s", err)
}
if th.totalMessages > 0 && th.messagesShown >= th.totalMessages {
os.Exit(0)
}
return nil
}

if *printTopic {
_, err := os.Stdout.WriteString(th.topicName)
if err != nil {
Expand Down Expand Up @@ -74,6 +102,10 @@ func main() {
flag.Var(&nsq.ConfigFlag{cfg}, "consumer-opt", "option to passthrough to nsq.Consumer (may be given multiple times, http://godoc.org/github.com/nsqio/go-nsq#Config)")
flag.Parse()

if *writeToFiles && *totalMessages == 0 {
log.Println("WARNING: writing messages to file but NO limit on message count to process")
}

if *showVersion {
fmt.Printf("nsq_tail v%s\n", version.Binary)
return
Expand Down

0 comments on commit 4be4273

Please sign in to comment.