Skip to content

Commit

Permalink
Initial fix to mongo message unmarshalling
Browse files Browse the repository at this point in the history
  • Loading branch information
Daniel Dias committed Jan 31, 2022
1 parent 0804ec3 commit 8458700
Show file tree
Hide file tree
Showing 4 changed files with 165 additions and 72 deletions.
18 changes: 13 additions & 5 deletions Makefile
Original file line number Diff line number Diff line change
@@ -1,4 +1,12 @@
build:
PROJECT_NAME := "mqtt-history"

help: Makefile ## show list of commands
@echo "Choose a command run in "$(PROJECT_NAME)":"
@echo ""
@awk 'BEGIN {FS = ":.*?## "} /[a-zA-Z_-]+:.*?## / {sub("\\\\n",sprintf("\n%22c"," "), $$2);printf "\033[36m%-40s\033[0m %s\n", $$1, $$2}' $(MAKEFILE_LIST) | sort


build: ## build project
@go build -mod vendor -a -installsuffix cgo -o . .

vendor:
Expand All @@ -21,20 +29,20 @@ create-cassandra-table:
@echo 'Done'

# make setup/mongo MONGODB_HOST=mongodb://localhost:27017 or make MONGODB_HOST=mongodb://localhost:27017 setup/mongo
setup/mongo:
setup/mongo:
go run scripts/setup_mongo_messages-index.go

run-tests: run-containers
run-tests: run-containers ## run tests using the docker containers
@make CASSANDRA_CONTAINER=mqtthistory_test_cassandra create-cassandra-table
@make coverage
@make kill-containers

test: run-tests
test: run-tests ## run tests using the docker containers (alias to run-tests)

coverage:
@go test -coverprofile=coverage.out -covermode=count ./...

run:
run: ## start the API
@go run main.go start

deps:
Expand Down
4 changes: 2 additions & 2 deletions config/local.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,9 @@ cassandra:
mongo:
host: "mongodb://localhost:27017"
allow_anonymous: true
database: "mqtt"
database: "chat"
messages:
enabled: false
enabled: true
limit: 10
collection: "messages"
logger:
Expand Down
150 changes: 150 additions & 0 deletions mongoclient/get_messages.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
// mqtt-history
// https://github.com/topfreegames/mqtt-history
//
// Licensed under the MIT license:
// http://www.opensource.org/licenses/mit-license
// Copyright © 2017 Top Free Games <backend@tfgco.com>

package mongoclient

import (
"context"
"encoding/json"
"fmt"
"time"

"github.com/topfreegames/mqtt-history/logger"
"github.com/topfreegames/mqtt-history/models"
"go.mongodb.org/mongo-driver/bson"

"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
)

// MongoMessage represents new payload for the chat message
// that is stored in MongoDB
type MongoMessage struct {
Id string `json:"id" bson:"id"`
Timestamp int64 `json:"timestamp" bson:"timestamp"`
Payload bson.M `json:"original_payload" bson:"original_payload"`
Topic string `json:"topic" bson:"topic"`
PlayerId interface{} `json:"player_id" bson:"player_id"`
Message string `json:"message" bson:"message"`
GameId string `json:"game_id" bson:"game_id"`
Blocked bool `json:"blocked" bson:"blocked"`
ShouldModerate bool `json:"should_moderate" bson:"should_moderate"`
Metadata bson.M `json:"metadata" bson:"metadata"`
}

// GetMessagesV2 returns messages stored in MongoDB by topic
// It returns the MessageV2 model that is stored in MongoDB
func GetMessagesV2(ctx context.Context, topic string, from int64, limit int64, collection string) []*models.MessageV2 {
rawResults := make([]MongoMessage, 0)

callback := func(coll *mongo.Collection) error {
query := bson.M{
"topic": topic,
"timestamp": bson.M{
"$lte": from, // less than or equal
},
}

sort := bson.D{
{"topic", 1},
{"timestamp", -1},
}

opts := options.Find()
opts.SetSort(sort)
opts.SetLimit(limit)

cursor, err := coll.Find(ctx, query, opts)
if err != nil {
return err
}

return cursor.All(ctx, &rawResults)
}

// retrieve the collection data
err := GetCollection(collection, callback)
if err != nil {
logger.Logger.Warningf("Error getting messages from MongoDB: %s", err.Error())
return []*models.MessageV2{}
}

// convert the raw results to the MessageV2 model
searchResults := make([]*models.MessageV2, len(rawResults))

for i := 0; i < len(rawResults); i++ {
searchResults[i], err = convertRawMessageToModelMessage(rawResults[i])

if err != nil {
logger.Logger.Warningf("Error getting messages from MongoDB: %s", err.Error())
return []*models.MessageV2{}
}
}

return searchResults
}

// GetMessages returns messages stored in MongoDB by topic
// since MongoDB uses the MessageV2 format, this method converts
// the MessageV2 model into the Message one for retrocompatibility
// Rhe main difference being that the payload field is now referred to as "original_payload" and
// is a JSON object, not a string, and also the timestamp is int64 seconds since Unix epoch, not an ISODate
func GetMessages(ctx context.Context, topic string, from int64, limit int64, collection string) []*models.Message {
searchResults := GetMessagesV2(ctx, topic, from, limit, collection)
messages := make([]*models.Message, 0)
for _, result := range searchResults {
payload := result.Payload
bytes, _ := json.Marshal(payload)

finalStr := string(bytes)
message := &models.Message{
Timestamp: time.Unix(result.Timestamp, 0),
Payload: finalStr,
Topic: topic,
}
messages = append(messages, message)
}

return messages
}

func convertRawMessageToModelMessage(rawMessage MongoMessage) (*models.MessageV2, error) {
playerIdAsString, err := convertPlayerIdToString(rawMessage.PlayerId)
if err != nil {
return nil, err
}

return &models.MessageV2{
Id: rawMessage.Id,
Timestamp: rawMessage.Timestamp,
Payload: rawMessage.Payload,
Topic: rawMessage.Topic,
PlayerId: playerIdAsString,
Message: rawMessage.Message,
GameId: rawMessage.GameId,
Blocked: rawMessage.Blocked,
ShouldModerate: rawMessage.ShouldModerate,
Metadata: rawMessage.Metadata,
}, nil
}

func convertPlayerIdToString(playerID interface{}) (string, error) {
_, ok := playerID.(string)

if ok {
// force sprintf to avoid encoding issues
return fmt.Sprintf("%s", playerID), nil
}

playerIdAsNumber, ok := playerID.(float64)

if !ok {
return "", fmt.Errorf("error converting player id to float64 or string. player id raw value: %s", playerID)
}

return fmt.Sprintf("%f0", playerIdAsNumber), nil
}
65 changes: 0 additions & 65 deletions mongoclient/mongoclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,9 @@ package mongoclient

import (
"context"
"encoding/json"
"sync"
"time"

"github.com/topfreegames/mqtt-history/models"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/mongo/readpref"

"github.com/spf13/viper"
Expand Down Expand Up @@ -67,65 +64,3 @@ func GetCollection(collection string, s func(collection *mongo.Collection) error
c := mongoDB.Database(database, dbOpts).Collection(collection)
return s(c)
}

// GetMessagesV2 returns messages stored in MongoDB by topic
// It returns the MessageV2 model that is stored in MongoDB
func GetMessagesV2(ctx context.Context, topic string, from int64, limit int64, collection string) []*models.MessageV2 {
searchResults := make([]*models.MessageV2, 0)

callback := func(coll *mongo.Collection) error {
query := bson.M{
"topic": topic,
"timestamp": bson.M{
"$lte": from, // less than or equal
},
}

sort := bson.D{
{"topic", 1},
{"timestamp", -1},
}

opts := options.Find()
opts.SetSort(sort)
opts.SetLimit(limit)

cursor, err := coll.Find(ctx, query, opts)
if err != nil {
return err
}

return cursor.All(ctx, &searchResults)
}

err := GetCollection(collection, callback)
if err != nil {
return []*models.MessageV2{}
}

return searchResults
}

// GetMessages returns messages stored in MongoDB by topic
// since MongoDB uses the MessageV2 format, this method converts
// the MessageV2 model into the Message one for retrocompatibility
// Rhe main difference being that the payload field is now referred to as "original_payload" and
// is a JSON object, not a string, and also the timestamp is int64 seconds since Unix epoch, not an ISODate
func GetMessages(ctx context.Context, topic string, from int64, limit int64, collection string) []*models.Message {
searchResults := GetMessagesV2(ctx, topic, from, limit, collection)
messages := make([]*models.Message, 0)
for _, result := range searchResults {
payload := result.Payload
bytes, _ := json.Marshal(payload)

finalStr := string(bytes)
message := &models.Message{
Timestamp: time.Unix(result.Timestamp, 0),
Payload: finalStr,
Topic: topic,
}
messages = append(messages, message)
}

return messages
}

0 comments on commit 8458700

Please sign in to comment.