Skip to content

Commit

Permalink
Include V2 query for Mongo messages
Browse files Browse the repository at this point in the history
  • Loading branch information
lmsilva-wls committed Jul 1, 2021
1 parent 8dcf5bb commit ace1be0
Showing 1 changed file with 59 additions and 0 deletions.
59 changes: 59 additions & 0 deletions app/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package app

import (
"context"
"encoding/json"
"time"

"github.com/topfreegames/mqtt-history/cassandra"
Expand Down Expand Up @@ -69,3 +70,61 @@ func SelectFromCollection(ctx context.Context, topic string, from int64, limit i
}
return searchResults
}

// SelectFromCollectionV2 expects the message to be stored in mongo with a specific structure,
// the main difference being that the payload field is now referred to as "original_payload" and
// is a JSON object, not a string, and also the timestamp is int64 seconds since Unix epoch, not an ISODate on Mongo
func SelectFromCollectionV2(ctx context.Context, topic string, from int64, limit int64, collection string) []*models.Message {
type message struct {
Timestamp int64 `bson:"timestamp"`
OriginalPayload bson.M `bson:"original_payload"`
}

searchResults := make([]message, 0)

callback := func(coll *mongo.Collection) error {
query := bson.M{
"topic": topic,
"timestamp": bson.M{
"$lte": from, // less than or equal
},
}

sort := bson.D{
{"topic", 1},
{"timestamp", -1},
}

opts := options.Find()
opts.SetSort(sort)
opts.SetLimit(limit)

cursor, err := coll.Find(ctx, query, opts)
if err != nil {
return err
}

return cursor.All(ctx, &searchResults)
}

err := mongoclient.GetCollection(collection, callback)
if err != nil {
return []*models.Message{}
}

messages := make([]*models.Message, 0)
for _, result := range searchResults {
payload := result.OriginalPayload
bytes, _ := json.Marshal(payload)

finalStr := string(bytes)
message := &models.Message{
Timestamp: time.Unix(result.Timestamp, 0),
Payload: finalStr,
Topic: topic,
}
messages = append(messages, message)
}

return messages
}

0 comments on commit ace1be0

Please sign in to comment.