diff --git a/README.md b/README.md index 80ac2c7..3f78e49 100644 --- a/README.md +++ b/README.md @@ -5,6 +5,15 @@ An MQTT-based history handler for messages recorded by [mqttbot](https://github.com/topfreegames/mqttbot) in Cassandra +There's also support for messages stored in MongoDB, assuming the message documents contain these **required** fields: +``` +{ + "topic": "", + "original_payload": "", + "timestamp": +} +``` + ## Features - Listen to healthcheck requests - Retrieve message history from Cassandra when requested by users diff --git a/app/common.go b/app/common.go index ddf9549..6e9c3f6 100644 --- a/app/common.go +++ b/app/common.go @@ -35,46 +35,10 @@ func selectFromBuckets( return messages } -func SelectFromCollection(ctx context.Context, topic string, from int64, limit int64, collection string) []*models.Message { - searchResults := make([]*models.Message, 0) - - date := time.Unix(from, 0) - callback := func(coll *mongo.Collection) error { - query := bson.M{ - "topic": topic, - "timestamp": bson.M{ - "$lte": date, // less than or equal - }, - } - - sort := bson.D{ - {"topic", 1}, - {"timestamp", -1}, - } - - opts := options.Find() - opts.SetSort(sort) - opts.SetLimit(limit) - - cursor, err := coll.Find(ctx, query, opts) - if err != nil { - return err - } - - return cursor.All(ctx, &searchResults) - } - - err := mongoclient.GetCollection(collection, callback) - if err != nil { - return []*models.Message{} - } - return searchResults -} - // SelectFromCollectionV2 expects the message to be stored in mongo with a specific structure, // the main difference being that the payload field is now referred to as "original_payload" and // is a JSON object, not a string, and also the timestamp is int64 seconds since Unix epoch, not an ISODate on Mongo -func SelectFromCollectionV2(ctx context.Context, topic string, from int64, limit int64, collection string) []*models.Message { +func SelectFromCollection(ctx context.Context, topic string, from int64, limit int64, collection string) []*models.Message { type message struct { Timestamp int64 `bson:"timestamp"` OriginalPayload bson.M `bson:"original_payload"` diff --git a/app/histories_test.go b/app/histories_test.go index ab527c4..00d3d79 100644 --- a/app/histories_test.go +++ b/app/histories_test.go @@ -22,6 +22,7 @@ import ( "github.com/topfreegames/mqtt-history/models" "github.com/topfreegames/mqtt-history/mongoclient" . "github.com/topfreegames/mqtt-history/testing" + "go.mongodb.org/mongo-driver/bson" "go.mongodb.org/mongo-driver/mongo" ) @@ -117,16 +118,24 @@ func TestHistoriesHandler(t *testing.T) { err := mongoclient.GetCollection("mqtt_acl", insertAuthCallback) Expect(err).To(BeNil()) - testMessage := models.Message{ - Timestamp: time.Now().AddDate(0, 0, -1), - Payload: "{\"test1\":\"test2\"}", + testMessage := models.MongoMessage{ + Timestamp: time.Now().AddDate(0, 0, -1).Unix(), + Payload: bson.M{ + "original_payload": bson.M{ + "test1": "test2", + }, + }, Topic: topic, } - testMessage2 := models.Message{ + testMessage2 := models.MongoMessage{ // ensure the message was received 1 second before so that the mongo query can pick up this message - Timestamp: time.Now().Add(-1 * time.Second), - Payload: "{\"test3\":\"test4\"}", + Timestamp: time.Now().Add(-1 * time.Second).Unix(), + Payload: bson.M{ + "original_payload": bson.M{ + "test3": "test4", + }, + }, Topic: topic2, } @@ -153,8 +162,8 @@ func TestHistoriesHandler(t *testing.T) { err = json.Unmarshal([]byte(body), &messages) Expect(err).To(BeNil()) g.Assert(len(messages)).Equal(2) - g.Assert(messages[0].Payload).Equal("{\"test1\":\"test2\"}") - g.Assert(messages[1].Payload).Equal("{\"test3\":\"test4\"}") + g.Assert(messages[0].Payload).Equal("{\"original_payload\":{\"test1\":\"test2\"}}") + g.Assert(messages[1].Payload).Equal("{\"original_payload\":{\"test3\":\"test4\"}}") }) g.It("It should return 200 if the user is authorized into at least one topic", func() { diff --git a/app/history_test.go b/app/history_test.go index 48bc994..3be2034 100644 --- a/app/history_test.go +++ b/app/history_test.go @@ -23,6 +23,7 @@ import ( "github.com/topfreegames/mqtt-history/models" "github.com/topfreegames/mqtt-history/mongoclient" . "github.com/topfreegames/mqtt-history/testing" + "go.mongodb.org/mongo-driver/bson" "go.mongodb.org/mongo-driver/mongo" ) @@ -113,9 +114,13 @@ func TestHistoryHandler(t *testing.T) { Expect(err).To(BeNil()) - testMessage := models.Message{ - Timestamp: time.Now().Add(-1 * time.Second), - Payload: "{\"test1\":\"test2\"}", + testMessage := models.MongoMessage{ + Timestamp: time.Now().Add(-1 * time.Second).Unix(), + Payload: bson.M{ + "original_payload": bson.M{ + "test1": "test2", + }, + }, Topic: topic, } @@ -138,6 +143,9 @@ func TestHistoryHandler(t *testing.T) { var messages []models.Message err = json.Unmarshal([]byte(body), &messages) Expect(err).To(BeNil()) + + g.Assert(len(messages)).Equal(1) + g.Assert(messages[0].Payload).Equal("{\"original_payload\":{\"test1\":\"test2\"}}") }) g.It("It should return 200 and [] if the user is authorized into the topic and there are no messages", func() { diff --git a/models/message.go b/models/message.go index f8a3adb..981e1a0 100644 --- a/models/message.go +++ b/models/message.go @@ -1,6 +1,10 @@ package models -import "time" +import ( + "time" + + "go.mongodb.org/mongo-driver/bson" +) // Message represents a chat message type Message struct { @@ -8,3 +12,10 @@ type Message struct { Payload string `json:"payload" bson:"payload"` Topic string `json:"topic" bson:"topic"` } + +// MongoMessage represents a chat message stored in Mongo. +type MongoMessage struct { + Timestamp int64 `bson:"timestamp"` + Payload bson.M `bson:"original_payload"` + Topic string `bson:"topic"` +}