Skip to content

Commit

Permalink
new JSON field XRabtapReceivedTimestamp recording timestamp when a me…
Browse files Browse the repository at this point in the history
…ssage was received added

simplified code
  • Loading branch information
jandelgado committed Jun 13, 2019
1 parent a9ecde7 commit b5a16a2
Show file tree
Hide file tree
Showing 11 changed files with 83 additions and 119 deletions.
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,12 @@

# Changelog for rabtap

## v1.17 (2019-06-13)

* Timestamp when message was received by rabtap now stored in JSON format
in `XRabtapReceivedTimestamp` field.
* Simplified code

## v1.16 (2019-04-03)

* new option `--by-connection` for info command added, making `info` show
Expand Down
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -444,6 +444,7 @@ messages in the following format:
"Redelivered": false,
"Exchange": "amq.topic",
"RoutingKey": "test-q-amq.topic-0",
"XRabtapReceivedTimestamp": "2019-06-13T19:33:51.920711583+02:00",
"Body": "dGhpcyB0ZXN0IG1lc3NhZ2U .... IGFuZCBoZWFkZXJzIGFtcXAuVGFibGV7fQ=="
}
...
Expand Down
10 changes: 4 additions & 6 deletions cmd/rabtap/message_printer.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright (C) 2017 Jan Delgado
// Copyright (C) 2017-2019 Jan Delgado

package main

Expand All @@ -10,7 +10,8 @@ import (
)

// messageTemplate is the default template to print a message
const messageTemplate = `------ {{ .Title }} ------
// TODO allow externalization of template
const messageTemplate = `------ message received on {{ .Message.ReceivedTimestamp.Format "2006-01-02T15:04:05Z07:00" }} ------
exchange.......: {{ ExchangeColor .Message.AmqpMessage.Exchange }}
{{with .Message.AmqpMessage.RoutingKey}}routingkey.....: {{ KeyColor .}}
{{end}}{{with .Message.AmqpMessage.Priority}}priority.......: {{.}}
Expand All @@ -29,8 +30,6 @@ exchange.......: {{ ExchangeColor .Message.AmqpMessage.Exchange }}

// PrintMessageInfo holds info for template
type PrintMessageInfo struct {
// Title to print
Title string
// Message receveived
Message rabtap.TapMessage
// formatted body
Expand Down Expand Up @@ -62,14 +61,13 @@ func NewMessageFormatter(contentType string) MessageFormatter {

// PrettyPrintMessage formats and prints a tapped message
func PrettyPrintMessage(out io.Writer, message rabtap.TapMessage,
title string, noColor bool) error {
noColor bool) error {

colorizer := NewColorPrinter(noColor)

formatter := NewMessageFormatter(message.AmqpMessage.ContentType)

printStruct := PrintMessageInfo{
Title: title,
Message: message,
Body: formatter.Format(message),
}
Expand Down
12 changes: 7 additions & 5 deletions cmd/rabtap/message_printer_test.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright (C) 2017 Jan Delgado
// Copyright (C) 2017-2019 Jan Delgado

package main

Expand Down Expand Up @@ -38,10 +38,11 @@ func ExamplePrettyPrintMessage() {
}

ts := time.Date(2019, time.June, 6, 23, 0, 0, 0, time.UTC)
_ = PrettyPrintMessage(os.Stdout, rabtap.NewTapMessage(&message, nil, ts), "title", true)
noColor := true
_ = PrettyPrintMessage(os.Stdout, rabtap.NewTapMessage(&message, nil, ts), noColor)

// Output:
// ------ title ------
// ------ message received on 2019-06-06T23:00:00Z ------
// exchange.......: exchange
// routingkey.....: routingkey
// priority.......: 99
Expand All @@ -64,11 +65,12 @@ func ExamplePrettyPrintMessage_withFilteredAtributes() {
Body: []byte("simple test message"),
}

noColor := true
ts := time.Date(2019, time.June, 6, 23, 0, 0, 0, time.UTC)
_ = PrettyPrintMessage(os.Stdout, rabtap.NewTapMessage(&message, nil, ts), "title", true)
_ = PrettyPrintMessage(os.Stdout, rabtap.NewTapMessage(&message, nil, ts), noColor)

// Output:
// ------ title ------
// ------ message received on 2019-06-06T23:00:00Z ------
// exchange.......: exchange
// simple test message
//
Expand Down
77 changes: 37 additions & 40 deletions cmd/rabtap/message_writer.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright (C) 2017 Jan Delgado
// Copyright (C) 2017-2019 Jan Delgado

package main

Expand Down Expand Up @@ -38,6 +38,9 @@ type RabtapPersistentMessage struct {
Exchange string
RoutingKey string

// rabtap specific fields
XRabtapReceivedTimestamp time.Time

Body []byte
}

Expand All @@ -48,34 +51,29 @@ func CreateTimestampFilename(t time.Time) string {
return strings.Replace(basename, ":", "_", -1)
}

// NewRabtapPersistentMessage creates RabtapPersistentMessage a object
// from an amqp.Delivery
func NewRabtapPersistentMessage(message rabtap.TapMessage,
includeBody bool) RabtapPersistentMessage {
// NewRabtapPersistentMessage creates RabtapPersistentMessage object
// from a rabtap.TapMessage
func NewRabtapPersistentMessage(message rabtap.TapMessage) RabtapPersistentMessage {

m := message.AmqpMessage

body := []byte{}
if includeBody {
body = m.Body
}
return RabtapPersistentMessage{
Headers: m.Headers,
ContentType: m.ContentType,
ContentEncoding: m.ContentEncoding,
Priority: m.Priority,
CorrelationID: m.CorrelationId,
ReplyTo: m.ReplyTo,
Expiration: m.Expiration,
MessageID: m.MessageId,
Timestamp: m.Timestamp,
Type: m.Type,
UserID: m.UserId,
AppID: m.AppId,
DeliveryTag: m.DeliveryTag,
Exchange: m.Exchange,
RoutingKey: m.RoutingKey,
Body: body,
Headers: m.Headers,
ContentType: m.ContentType,
ContentEncoding: m.ContentEncoding,
Priority: m.Priority,
CorrelationID: m.CorrelationId,
ReplyTo: m.ReplyTo,
Expiration: m.Expiration,
MessageID: m.MessageId,
Timestamp: m.Timestamp,
Type: m.Type,
UserID: m.UserId,
AppID: m.AppId,
DeliveryTag: m.DeliveryTag,
Exchange: m.Exchange,
RoutingKey: m.RoutingKey,
XRabtapReceivedTimestamp: message.ReceivedTimestamp,
Body: m.Body,
}
}

Expand All @@ -97,17 +95,10 @@ func (s RabtapPersistentMessage) ToAmqpPublishing() amqp.Publishing {
Body: s.Body}
}

// WriteMessageBodyBlob writes the given message the provided stream.
func WriteMessageBodyBlob(out io.Writer, body []byte) error {
_, err := out.Write(body)
return err
}

// WriteMessageJSON writes the given message as JSON, optionally with the
// body included to a stream.
func WriteMessageJSON(out io.Writer, includeBody bool, message rabtap.TapMessage) error {
// serialize message without body
metadata, err := json.MarshalIndent(NewRabtapPersistentMessage(message, includeBody), "", " ")
func WriteMessageJSON(out io.Writer, message rabtap.TapMessage) error {
metadata, err := json.MarshalIndent(NewRabtapPersistentMessage(message), "", " ")
if err != nil {
return err
}
Expand All @@ -122,21 +113,21 @@ func saveMessageBodyAsBlobFile(filename string, body []byte) error {
}
defer file.Close()
writer := bufio.NewWriter(file)
err = WriteMessageBodyBlob(writer, body)
_, err = writer.Write(body)
if err != nil {
return err
}
return writer.Flush()
}

func saveMessageAsJSONFile(filename string, includeBody bool, message rabtap.TapMessage) error {
func saveMessageAsJSONFile(filename string, message rabtap.TapMessage) error {
file, err := os.Create(filename)
if err != nil {
return err
}
defer file.Close()
writer := bufio.NewWriter(file)
err = WriteMessageJSON(writer, includeBody, message)
err = WriteMessageJSON(writer, message)
if err != nil {
return err
}
Expand All @@ -153,12 +144,18 @@ func SaveMessageToRawFile(basename string, message rabtap.TapMessage) error {
if err != nil {
return err
}
return saveMessageAsJSONFile(filenameJSON, false, message)
//return saveMessageAsJSONFile(filenameJSON, false, message)
// save metadata file without the body
oldBody := message.AmqpMessage.Body
message.AmqpMessage.Body = []byte{}
err = saveMessageAsJSONFile(filenameJSON, message)
message.AmqpMessage.Body = oldBody
return err
}

// SaveMessageToJSONFile writes a message to a single JSON file, where
// the body will be BASE64 encoded
func SaveMessageToJSONFile(filename string, message rabtap.TapMessage) error {
log.Debugf("saving message to %s (JSON)", filename)
return saveMessageAsJSONFile(filename, true, message)
return saveMessageAsJSONFile(filename, message)
}
57 changes: 10 additions & 47 deletions cmd/rabtap/message_writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,8 @@ func TestSaveMessageToRawFile(t *testing.T) {
// SaveMessagesToFiles() will create files "test.dat" and "test.json" in
// testdir.
basename := filepath.Join(testdir, "test")
err = SaveMessageToRawFile(basename, rabtap.NewTapMessage(testMessage, nil, time.Now()))
createdTs := time.Date(2019, time.June, 13, 17, 45, 1, 0, time.UTC)
err = SaveMessageToRawFile(basename, rabtap.NewTapMessage(testMessage, nil, createdTs))
assert.Nil(t, err)

// check contents of message body .dat file
Expand All @@ -67,6 +68,7 @@ func TestSaveMessageToRawFile(t *testing.T) {
assert.Equal(t, len(testMessage.Headers), len(jsonMetaActual.Headers))
assert.Equal(t, testMessage.Headers["header"], jsonMetaActual.Headers["header"])
assert.Equal(t, testMessage.Timestamp, jsonMetaActual.Timestamp)
assert.Equal(t, createdTs, jsonMetaActual.XRabtapReceivedTimestamp)
}

func TestSaveMessageToFilesToInvalidDir(t *testing.T) {
Expand All @@ -84,7 +86,8 @@ func TestSaveMessageToJSONFile(t *testing.T) {
defer os.RemoveAll(testdir)

filename := filepath.Join(testdir, "test")
err = SaveMessageToJSONFile(filename, rabtap.NewTapMessage(testMessage, nil, time.Now()))
createdTs := time.Date(2019, time.June, 13, 17, 45, 1, 0, time.UTC)
err = SaveMessageToJSONFile(filename, rabtap.NewTapMessage(testMessage, nil, createdTs))
assert.Nil(t, err)

contents, err := ioutil.ReadFile(filename)
Expand All @@ -98,6 +101,7 @@ func TestSaveMessageToJSONFile(t *testing.T) {
assert.Equal(t, len(testMessage.Headers), len(jsonActual.Headers))
assert.Equal(t, testMessage.Headers["header"], jsonActual.Headers["header"])
assert.Equal(t, testMessage.Timestamp, jsonActual.Timestamp)
assert.Equal(t, createdTs, jsonActual.XRabtapReceivedTimestamp)
assert.Equal(t, []byte("simple test message."), jsonActual.Body)
}

Expand All @@ -114,22 +118,11 @@ func TestCreateTimestampFilename(t *testing.T) {
assert.Equal(t, "2009-11-10T23_01_02.000000003Z", filename)
}

func ExampleWriteMessageBodyBlob() {
body := []byte("simple test message.")
err := WriteMessageBodyBlob(os.Stdout, body)
if err != nil {
log.Fatal(err)
}

// Output:
// simple test message.

}

func ExampleWriteMessageJSON_withBody() {
func ExampleWriteMessageJSON() {

// serialize with message body, Body will be base64 encoded.
err := WriteMessageJSON(os.Stdout, true /* w/ body*/, rabtap.NewTapMessage(testMessage, nil, time.Now()))
createdTs := time.Date(2019, time.June, 13, 17, 45, 1, 0, time.UTC)
err := WriteMessageJSON(os.Stdout, rabtap.NewTapMessage(testMessage, nil, createdTs))
if err != nil {
log.Fatal(err)
}
Expand All @@ -155,37 +148,7 @@ func ExampleWriteMessageJSON_withBody() {
// "Redelivered": false,
// "Exchange": "exchange",
// "RoutingKey": "routingkey",
// "XRabtapReceivedTimestamp": "2019-06-13T17:45:01Z",
// "Body": "c2ltcGxlIHRlc3QgbWVzc2FnZS4="
// }
}

func ExampleWriteMessageJSON_withoutBody() {
err := WriteMessageJSON(os.Stdout, false /*w/o body*/, rabtap.NewTapMessage(testMessage, nil, time.Now()))
if err != nil {
log.Fatal(err)
}

// Output:
// {
// "Headers": {
// "header": "value"
// },
// "ContentType": "plain/text",
// "ContentEncoding": "utf-8",
// "DeliveryMode": 0,
// "Priority": 99,
// "CorrelationID": "4712",
// "ReplyTo": "",
// "Expiration": "2017-05-22 17:00:00",
// "MessageID": "4711",
// "Timestamp": "2009-11-10T23:00:00Z",
// "Type": "some type",
// "UserID": "456",
// "AppID": "123",
// "DeliveryTag": 0,
// "Redelivered": false,
// "Exchange": "exchange",
// "RoutingKey": "routingkey",
// "Body": ""
// }
}
12 changes: 4 additions & 8 deletions cmd/rabtap/subscribe.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright (C) 2017 Jan Delgado
// Copyright (C) 2017-2019 Jan Delgado

package main

Expand Down Expand Up @@ -34,7 +34,7 @@ func messageReceiveLoop(messageChan rabtap.TapChannel,
return message.Error
}
// let the receiveFunc do the actual message processing
if err := messageReceiveFunc(*message); err != nil {
if err := messageReceiveFunc(message); err != nil {
log.Error(err)
}
case <-signalChannel:
Expand All @@ -50,7 +50,7 @@ func messageReceiveLoop(messageChan rabtap.TapChannel,
func createMessageReceiveFuncJSON(out io.Writer, optSaveDir *string,
_ /* noColor */ bool) MessageReceiveFunc {
return func(message rabtap.TapMessage) error {
err := WriteMessageJSON(out, true, message)
err := WriteMessageJSON(out, message)
if err != nil || optSaveDir == nil {
return err
}
Expand All @@ -67,11 +67,7 @@ func createMessageReceiveFuncRaw(out io.Writer, optSaveDir *string,
noColor bool) MessageReceiveFunc {

return func(message rabtap.TapMessage) error {
err := PrettyPrintMessage(out, message,
fmt.Sprintf("message received on %s",
time.Now().Format(time.RFC3339)),
noColor,
)
err := PrettyPrintMessage(out, message, noColor)
if err != nil || optSaveDir == nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/rabtap/subscribe_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func TestMessageReceiveLoop(t *testing.T) {
}
go messageReceiveLoop(messageChan, receiveFunc, signalChannel)

messageChan <- &rabtap.TapMessage{}
messageChan <- rabtap.TapMessage{}
<-done // TODO add timeout
signalChannel <- os.Interrupt // terminates go routine
assert.Equal(t, 1, received)
Expand Down
1 change: 1 addition & 0 deletions cmd/testgen/testgen.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
// (c) copyright 2017-2019 by Jan Delgado
package main

// testgen is a simple test data generator, which sets up a topology and
Expand Down

0 comments on commit b5a16a2

Please sign in to comment.