Skip to content

Commit

Permalink
Merge pull request #11 from topfreegames/supportMongoMessages
Browse files Browse the repository at this point in the history
Support mongo messages
  • Loading branch information
lmsilva-wls committed Jul 1, 2021
2 parents 0a68760 + fa264a6 commit b22f68e
Show file tree
Hide file tree
Showing 770 changed files with 132,733 additions and 43,944 deletions.
9 changes: 9 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,15 @@

An MQTT-based history handler for messages recorded by [mqttbot](https://github.com/topfreegames/mqttbot) in Cassandra

There's also support for messages stored in MongoDB, assuming the message documents contain these **required** fields:
```
{
"topic": "<mqtt history target topic name>",
"original_payload": "<message payload>",
"timestamp": <int64 seconds from Unix epoch>
}
```

## Features
- Listen to healthcheck requests
- Retrieve message history from Cassandra when requested by users
Expand Down
26 changes: 18 additions & 8 deletions app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ func GetApp(host string, port int, debug bool, configPath string) *App {
app := &App{
Host: host,
Port: port,
Config: viper.New(),
Config: viper.GetViper(),
ConfigPath: configPath,
Debug: debug,
}
Expand All @@ -65,27 +65,37 @@ func GetApp(host string, port int, debug bool, configPath string) *App {
func (app *App) Configure() {
app.setConfigurationDefaults()
app.loadConfiguration()
app.configureDefaults()

app.configureSentry()

app.configureNewRelic()
app.configureStatsD()
app.configureJaeger()
app.configureCassandra()
app.configureDefaults()
app.configureBucket()

app.configureStorage()
app.configureApplication()
}

func (app *App) configureBucket() {
app.Bucket = models.NewBucket(app.Config)
}

func (app *App) configureStorage() {
if app.Defaults.MongoEnabled {
app.Defaults.LimitOfMessages = app.Config.GetInt64("mongo.messages.limit")
return
}

app.configureBucket()
app.configureCassandra()
}

func (app *App) configureDefaults() {
app.Defaults = &models.Defaults{
BucketQuantityOnSelect: app.Config.GetInt("cassandra.bucket.quantity"),
LimitOfMessages: app.Config.GetInt("cassandra.messages.limit"),
BucketQuantityOnSelect: app.Config.GetInt("cassandra.bucket.quantity"),
LimitOfMessages: app.Config.GetInt64("cassandra.messages.limit"),
MongoEnabled: app.Config.GetBool("mongo.messages.enabled"),
MongoMessagesCollection: app.Config.GetString("mongo.messages.collection"),
}
}

Expand Down Expand Up @@ -200,7 +210,7 @@ func (app *App) configureApplication() {
a.Get("/:other", NotFoundHandler(app))
}

//OnErrorHandler handles application panics
// OnErrorHandler handles application panics
func (app *App) OnErrorHandler(err interface{}, stack []byte) {
logger.Logger.Error(err)

Expand Down
59 changes: 59 additions & 0 deletions app/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,15 @@ package app

import (
"context"
"encoding/json"
"time"

"github.com/topfreegames/mqtt-history/cassandra"
"github.com/topfreegames/mqtt-history/models"
"github.com/topfreegames/mqtt-history/mongoclient"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
)

func selectFromBuckets(
Expand All @@ -28,3 +34,56 @@ func selectFromBuckets(

return messages
}

// SelectFromCollection expects the message to be stored in mongo with a specific structure,
// the main difference being that the payload field is now referred to as "original_payload" and
// is a JSON object, not a string, and also the timestamp is int64 seconds since Unix epoch, not an ISODate on Mongo
func SelectFromCollection(ctx context.Context, topic string, from int64, limit int64, collection string) []*models.Message {
searchResults := make([]models.MongoMessage, 0)

callback := func(coll *mongo.Collection) error {
query := bson.M{
"topic": topic,
"timestamp": bson.M{
"$lte": from, // less than or equal
},
}

sort := bson.D{
{"topic", 1},
{"timestamp", -1},
}

opts := options.Find()
opts.SetSort(sort)
opts.SetLimit(limit)

cursor, err := coll.Find(ctx, query, opts)
if err != nil {
return err
}

return cursor.All(ctx, &searchResults)
}

err := mongoclient.GetCollection(collection, callback)
if err != nil {
return []*models.Message{}
}

messages := make([]*models.Message, 0)
for _, result := range searchResults {
payload := result.Payload
bytes, _ := json.Marshal(payload)

finalStr := string(bytes)
message := &models.Message{
Timestamp: time.Unix(result.Timestamp, 0),
Payload: finalStr,
Topic: topic,
}
messages = append(messages, message)
}

return messages
}
42 changes: 29 additions & 13 deletions app/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,18 @@ import (
"github.com/labstack/echo"
newrelic "github.com/newrelic/go-agent"
"github.com/spf13/viper"
"github.com/topfreegames/extensions/mongo/interfaces"
"github.com/topfreegames/mqtt-history/mongoclient"
"gopkg.in/mgo.v2/bson"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/bson/primitive"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
)

// ACL is the acl struct
type ACL struct {
ID bson.ObjectId `bson:"_id,omitempty"`
Username string `bson:"username"`
Pubsub []string `bson:"pubsub"`
ID primitive.ObjectID `bson:"_id,omitempty"`
Username string `bson:"username"`
Pubsub []string `bson:"pubsub"`
}

type authRequest struct {
Expand Down Expand Up @@ -58,15 +60,29 @@ func WithSegment(name string, c echo.Context, f func() error) error {
return f()
}

// MongoSearch searches on mongo
func MongoSearch(ctx context.Context, q interface{}) ([]ACL, error) {
func findAuthorizedTopics(ctx context.Context, username string, topics []string) ([]ACL, error) {
searchResults := make([]ACL, 0)
query := func(c interfaces.Collection) error {
fn := c.Find(q).All(&searchResults)
return fn
query := func(c *mongo.Collection) error {
opts := options.Find()

defaultACLSort := bson.D{
{"username", 1},
{"pubsub", 1},
}

// add sort to match index
opts.SetSort(defaultACLSort)

query := bson.M{"username": username, "pubsub": bson.M{"$in": topics}}
cursor, err := c.Find(ctx, query)
if err != nil {
return err
}

return cursor.All(ctx, &searchResults)
}
search := func() error {
return mongoclient.GetCollection(ctx, "mqtt_acl", query)
return mongoclient.GetCollection("mqtt_acl", query)
}
err := search()
return searchResults, err
Expand All @@ -78,11 +94,11 @@ func GetTopics(ctx context.Context, username string, _topics []string) ([]string
return _topics, nil
}
var topics []string
searchResults, err := MongoSearch(ctx, bson.M{"username": username, "pubsub": bson.M{"$in": _topics}})
authorizedTopics, err := findAuthorizedTopics(ctx, username, _topics)
if err != nil {
return nil, err
}
for _, elem := range searchResults {
for _, elem := range authorizedTopics {
topics = append(topics, elem.Pubsub[0])
}
return topics, err
Expand Down
17 changes: 14 additions & 3 deletions app/histories.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ func HistoriesHandler(app *App) func(c echo.Context) error {
topicsSuffix := strings.Split(c.QueryParam("topics"), ",")
topics := make([]string, len(topicsSuffix))
from, err := strconv.ParseInt(c.QueryParam("from"), 10, 64)
limit, err := strconv.Atoi(c.QueryParam("limit"))
limit, err := strconv.ParseInt(c.QueryParam("limit"), 10, 64)
for i, topicSuffix := range topicsSuffix {
topics[i] = topicPrefix + "/" + topicSuffix
}
Expand All @@ -42,12 +42,23 @@ func HistoriesHandler(app *App) func(c echo.Context) error {
return c.String(echo.ErrUnauthorized.Code, echo.ErrUnauthorized.Message)
}

// retrieve messages
messages := make([]*models.Message, 0)
if app.Defaults.MongoEnabled {
collection := app.Defaults.MongoMessagesCollection

for _, topic := range authorizedTopics {
topicMessages := SelectFromCollection(c, topic, from, limit, collection)
messages = append(messages, topicMessages...)
}
return c.JSON(http.StatusOK, messages)
}

bucketQnt := app.Defaults.BucketQuantityOnSelect
currentBucket := app.Bucket.Get(from)
messages := []*models.Message{}

for _, topic := range authorizedTopics {
topicMessages := selectFromBuckets(c.StdContext(), bucketQnt, limit, currentBucket, topic, app.Cassandra)
topicMessages := selectFromBuckets(c.StdContext(), bucketQnt, int(limit), currentBucket, topic, app.Cassandra)
messages = append(messages, topicMessages...)
}

Expand Down
Loading

0 comments on commit b22f68e

Please sign in to comment.