Skip to content

Commit

Permalink
Add separate abstraction for messages stored in mongo
Browse files Browse the repository at this point in the history
  • Loading branch information
lmsilva-wls committed Jul 1, 2021
1 parent ace1be0 commit 3d702bc
Show file tree
Hide file tree
Showing 5 changed files with 52 additions and 56 deletions.
9 changes: 9 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,15 @@

An MQTT-based history handler for messages recorded by [mqttbot](https://github.com/topfreegames/mqttbot) in Cassandra

There's also support for messages stored in MongoDB, assuming the message documents contain these **required** fields:
```
{
"topic": "<mqtt history target topic name>",
"original_payload": "<message payload>",
"timestamp": <int64 seconds from Unix epoch>
}
```

## Features
- Listen to healthcheck requests
- Retrieve message history from Cassandra when requested by users
Expand Down
47 changes: 3 additions & 44 deletions app/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,52 +35,11 @@ func selectFromBuckets(
return messages
}

func SelectFromCollection(ctx context.Context, topic string, from int64, limit int64, collection string) []*models.Message {
searchResults := make([]*models.Message, 0)

date := time.Unix(from, 0)
callback := func(coll *mongo.Collection) error {
query := bson.M{
"topic": topic,
"timestamp": bson.M{
"$lte": date, // less than or equal
},
}

sort := bson.D{
{"topic", 1},
{"timestamp", -1},
}

opts := options.Find()
opts.SetSort(sort)
opts.SetLimit(limit)

cursor, err := coll.Find(ctx, query, opts)
if err != nil {
return err
}

return cursor.All(ctx, &searchResults)
}

err := mongoclient.GetCollection(collection, callback)
if err != nil {
return []*models.Message{}
}
return searchResults
}

// SelectFromCollectionV2 expects the message to be stored in mongo with a specific structure,
// the main difference being that the payload field is now referred to as "original_payload" and
// is a JSON object, not a string, and also the timestamp is int64 seconds since Unix epoch, not an ISODate on Mongo
func SelectFromCollectionV2(ctx context.Context, topic string, from int64, limit int64, collection string) []*models.Message {
type message struct {
Timestamp int64 `bson:"timestamp"`
OriginalPayload bson.M `bson:"original_payload"`
}

searchResults := make([]message, 0)
func SelectFromCollection(ctx context.Context, topic string, from int64, limit int64, collection string) []*models.Message {
searchResults := make([]models.MongoMessage, 0)

callback := func(coll *mongo.Collection) error {
query := bson.M{
Expand Down Expand Up @@ -114,7 +73,7 @@ func SelectFromCollectionV2(ctx context.Context, topic string, from int64, limit

messages := make([]*models.Message, 0)
for _, result := range searchResults {
payload := result.OriginalPayload
payload := result.Payload
bytes, _ := json.Marshal(payload)

finalStr := string(bytes)
Expand Down
25 changes: 17 additions & 8 deletions app/histories_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/topfreegames/mqtt-history/models"
"github.com/topfreegames/mqtt-history/mongoclient"
. "github.com/topfreegames/mqtt-history/testing"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/mongo"
)

Expand Down Expand Up @@ -117,16 +118,24 @@ func TestHistoriesHandler(t *testing.T) {
err := mongoclient.GetCollection("mqtt_acl", insertAuthCallback)
Expect(err).To(BeNil())

testMessage := models.Message{
Timestamp: time.Now().AddDate(0, 0, -1),
Payload: "{\"test1\":\"test2\"}",
testMessage := models.MongoMessage{
Timestamp: time.Now().AddDate(0, 0, -1).Unix(),
Payload: bson.M{
"original_payload": bson.M{
"test1": "test2",
},
},
Topic: topic,
}

testMessage2 := models.Message{
testMessage2 := models.MongoMessage{
// ensure the message was received 1 second before so that the mongo query can pick up this message
Timestamp: time.Now().Add(-1 * time.Second),
Payload: "{\"test3\":\"test4\"}",
Timestamp: time.Now().Add(-1 * time.Second).Unix(),
Payload: bson.M{
"original_payload": bson.M{
"test3": "test4",
},
},
Topic: topic2,
}

Expand All @@ -153,8 +162,8 @@ func TestHistoriesHandler(t *testing.T) {
err = json.Unmarshal([]byte(body), &messages)
Expect(err).To(BeNil())
g.Assert(len(messages)).Equal(2)
g.Assert(messages[0].Payload).Equal("{\"test1\":\"test2\"}")
g.Assert(messages[1].Payload).Equal("{\"test3\":\"test4\"}")
g.Assert(messages[0].Payload).Equal("{\"original_payload\":{\"test1\":\"test2\"}}")
g.Assert(messages[1].Payload).Equal("{\"original_payload\":{\"test3\":\"test4\"}}")
})

g.It("It should return 200 if the user is authorized into at least one topic", func() {
Expand Down
14 changes: 11 additions & 3 deletions app/history_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/topfreegames/mqtt-history/models"
"github.com/topfreegames/mqtt-history/mongoclient"
. "github.com/topfreegames/mqtt-history/testing"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/mongo"
)

Expand Down Expand Up @@ -113,9 +114,13 @@ func TestHistoryHandler(t *testing.T) {

Expect(err).To(BeNil())

testMessage := models.Message{
Timestamp: time.Now().Add(-1 * time.Second),
Payload: "{\"test1\":\"test2\"}",
testMessage := models.MongoMessage{
Timestamp: time.Now().Add(-1 * time.Second).Unix(),
Payload: bson.M{
"original_payload": bson.M{
"test1": "test2",
},
},
Topic: topic,
}

Expand All @@ -138,6 +143,9 @@ func TestHistoryHandler(t *testing.T) {
var messages []models.Message
err = json.Unmarshal([]byte(body), &messages)
Expect(err).To(BeNil())

g.Assert(len(messages)).Equal(1)
g.Assert(messages[0].Payload).Equal("{\"original_payload\":{\"test1\":\"test2\"}}")
})

g.It("It should return 200 and [] if the user is authorized into the topic and there are no messages", func() {
Expand Down
13 changes: 12 additions & 1 deletion models/message.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,21 @@
package models

import "time"
import (
"time"

"go.mongodb.org/mongo-driver/bson"
)

// Message represents a chat message
type Message struct {
Timestamp time.Time `json:"timestamp" bson:"timestamp"`
Payload string `json:"payload" bson:"payload"`
Topic string `json:"topic" bson:"topic"`
}

// MongoMessage represents a chat message stored in Mongo.
type MongoMessage struct {
Timestamp int64 `bson:"timestamp"`
Payload bson.M `bson:"original_payload"`
Topic string `bson:"topic"`
}

0 comments on commit 3d702bc

Please sign in to comment.