Skip to content

Commit

Permalink
Merge pull request #12 from topfreegames/add/v2_history
Browse files Browse the repository at this point in the history
Add/v2 history
  • Loading branch information
flapezoti committed Jul 5, 2021
2 parents b22f68e + 4155674 commit 6d7e327
Show file tree
Hide file tree
Showing 17 changed files with 493 additions and 285 deletions.
17 changes: 16 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,21 @@ There's also support for messages stored in MongoDB, assuming the message docume
}
```

V2 returns the messages from Mongo in the following format:
```
{
"topic": "<mqtt history target topic name>",
"original_payload": "<message payload>",
"timestamp": <int64 seconds from Unix epoch>,
"game_id" : "",
"player_id": "",
"blocked" bool,
"should_moderate": bool,
"metadata" : {},
"id": ""
}
```

## Features
- Listen to healthcheck requests
- Retrieve message history from Cassandra when requested by users
Expand Down Expand Up @@ -41,7 +56,7 @@ by executing `make run-containers`

## Running the tests

The project is integrated with Travis CI and uses docker to run the needed services.
The project is integrated with Github Actions and uses docker to run the needed services.

If you are interested in running the tests yourself you will need docker (version 1.10
and up) and docker-compose.
Expand Down
2 changes: 2 additions & 0 deletions app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,8 @@ func (app *App) configureApplication() {
a.Get("/healthcheck", HealthCheckHandler(app))
a.Get("/history/*", HistoryHandler(app))
a.Get("/histories/*", HistoriesHandler(app))
a.Get("/v2/history/*", HistoryV2Handler(app))
a.Get("/v2/histories/*", HistoriesV2Handler(app))
a.Get("/:other", NotFoundHandler(app))
}

Expand Down
59 changes: 0 additions & 59 deletions app/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,9 @@ package app

import (
"context"
"encoding/json"
"time"

"github.com/topfreegames/mqtt-history/cassandra"
"github.com/topfreegames/mqtt-history/models"
"github.com/topfreegames/mqtt-history/mongoclient"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
)

func selectFromBuckets(
Expand All @@ -34,56 +28,3 @@ func selectFromBuckets(

return messages
}

// SelectFromCollection expects the message to be stored in mongo with a specific structure,
// the main difference being that the payload field is now referred to as "original_payload" and
// is a JSON object, not a string, and also the timestamp is int64 seconds since Unix epoch, not an ISODate on Mongo
func SelectFromCollection(ctx context.Context, topic string, from int64, limit int64, collection string) []*models.Message {
searchResults := make([]models.MongoMessage, 0)

callback := func(coll *mongo.Collection) error {
query := bson.M{
"topic": topic,
"timestamp": bson.M{
"$lte": from, // less than or equal
},
}

sort := bson.D{
{"topic", 1},
{"timestamp", -1},
}

opts := options.Find()
opts.SetSort(sort)
opts.SetLimit(limit)

cursor, err := coll.Find(ctx, query, opts)
if err != nil {
return err
}

return cursor.All(ctx, &searchResults)
}

err := mongoclient.GetCollection(collection, callback)
if err != nil {
return []*models.Message{}
}

messages := make([]*models.Message, 0)
for _, result := range searchResults {
payload := result.Payload
bytes, _ := json.Marshal(payload)

finalStr := string(bytes)
message := &models.Message{
Timestamp: time.Unix(result.Timestamp, 0),
Payload: finalStr,
Topic: topic,
}
messages = append(messages, message)
}

return messages
}
20 changes: 5 additions & 15 deletions app/histories.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,8 @@ package app

import (
"net/http"
"strconv"
"strings"
"time"

"github.com/topfreegames/mqtt-history/mongoclient"

"github.com/labstack/echo"
"github.com/topfreegames/mqtt-history/logger"
Expand All @@ -16,21 +15,12 @@ func HistoriesHandler(app *App) func(c echo.Context) error {
return func(c echo.Context) error {
c.Set("route", "Histories")
topicPrefix := c.ParamValues()[0]
userID := c.QueryParam("userid")
topicsSuffix := strings.Split(c.QueryParam("topics"), ",")
topicsSuffix, userID, from, limit := ParseHistoriesQueryParams(c, app.Defaults.LimitOfMessages)
topics := make([]string, len(topicsSuffix))
from, err := strconv.ParseInt(c.QueryParam("from"), 10, 64)
limit, err := strconv.ParseInt(c.QueryParam("limit"), 10, 64)

for i, topicSuffix := range topicsSuffix {
topics[i] = topicPrefix + "/" + topicSuffix
}
if limit == 0 {
limit = app.Defaults.LimitOfMessages
}

if from == 0 {
from = time.Now().Unix()
}

logger.Logger.Debugf("user %s is asking for histories for topicPrefix %s with args topics=%s from=%d and limit=%d", userID, topicPrefix, topics, from, limit)
authenticated, authorizedTopics, err := IsAuthorized(c.StdContext(), app, userID, topics...)
Expand All @@ -48,7 +38,7 @@ func HistoriesHandler(app *App) func(c echo.Context) error {
collection := app.Defaults.MongoMessagesCollection

for _, topic := range authorizedTopics {
topicMessages := SelectFromCollection(c, topic, from, limit, collection)
topicMessages := mongoclient.GetMessages(c, topic, from, limit, collection)
messages = append(messages, topicMessages...)
}
return c.JSON(http.StatusOK, messages)
Expand Down
88 changes: 14 additions & 74 deletions app/histories_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
"github.com/topfreegames/mqtt-history/models"
"github.com/topfreegames/mqtt-history/mongoclient"
. "github.com/topfreegames/mqtt-history/testing"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/mongo"
)

Expand All @@ -43,8 +42,10 @@ func TestHistoriesHandler(t *testing.T) {
})

g.It("It should return 401 if the user is not authorized into the topics", func() {
userID := fmt.Sprintf("test:%s", uuid.NewV4().String())
testID := strings.Replace(uuid.NewV4().String(), "-", "", -1)
path := fmt.Sprintf("/history/chat/test_?userid=test:test&topics=%s", testID)
testID2 := strings.Replace(uuid.NewV4().String(), "-", "", -1)
path := fmt.Sprintf("/v2/histories/chat/test?userid=%s&topics=%s,%s", userID, testID, testID2)
status, _ := Get(a, path, t)
g.Assert(status).Equal(http.StatusUnauthorized)
})
Expand All @@ -55,16 +56,8 @@ func TestHistoriesHandler(t *testing.T) {
topic := fmt.Sprintf("chat/test/%s", testID)
topic2 := fmt.Sprintf("chat/test/%s", testID2)

var topics, topics2 []string
topics = append(topics, topic)
topics2 = append(topics2, topic2)

query := func(c *mongo.Collection) error {
_, err := c.InsertMany(ctx, []interface{}{ACL{Username: "test:test", Pubsub: topics}, ACL{Username: "test:test", Pubsub: topics2}})
return err
}

err := mongoclient.GetCollection("mqtt_acl", query)
authorizedTopics := []string{topic, topic2}
err := AuthorizeTestUserInTopics(ctx, authorizedTopics)
Expect(err).To(BeNil())

testMessage := models.Message{
Expand Down Expand Up @@ -105,48 +98,11 @@ func TestHistoriesHandler(t *testing.T) {
topic := fmt.Sprintf("chat/test/%s", testID)
topic2 := fmt.Sprintf("chat/test/%s", testID2)

var topics, topics2 []string
topics = append(topics, topic)
topics2 = append(topics2, topic2)

// given that the user is authorized to read from these topics
insertAuthCallback := func(c *mongo.Collection) error {
_, err := c.InsertMany(ctx, []interface{}{ACL{Username: "test:test", Pubsub: topics}, ACL{Username: "test:test", Pubsub: topics2}})
return err
}

err := mongoclient.GetCollection("mqtt_acl", insertAuthCallback)
authorizedTopics := []string{topic, topic2}
err := AuthorizeTestUserInTopics(ctx, authorizedTopics)
Expect(err).To(BeNil())

testMessage := models.MongoMessage{
Timestamp: time.Now().AddDate(0, 0, -1).Unix(),
Payload: bson.M{
"original_payload": bson.M{
"test1": "test2",
},
},
Topic: topic,
}

testMessage2 := models.MongoMessage{
// ensure the message was received 1 second before so that the mongo query can pick up this message
Timestamp: time.Now().Add(-1 * time.Second).Unix(),
Payload: bson.M{
"original_payload": bson.M{
"test3": "test4",
},
},
Topic: topic2,
}

// and given that the user has 2 messages stored in mongo
insertMessagesCallback := func(c *mongo.Collection) error {
_, err := c.InsertMany(ctx, []interface{}{testMessage, testMessage2})
return err
}

messagesCollection := a.Config.GetString("mongo.messages.collection")
err = mongoclient.GetCollection(messagesCollection, insertMessagesCallback)
err = InsertMongoMessages(ctx, authorizedTopics)
Expect(err).To(BeNil())

// when the mongo feature flag is enabled
Expand All @@ -162,8 +118,8 @@ func TestHistoriesHandler(t *testing.T) {
err = json.Unmarshal([]byte(body), &messages)
Expect(err).To(BeNil())
g.Assert(len(messages)).Equal(2)
g.Assert(messages[0].Payload).Equal("{\"original_payload\":{\"test1\":\"test2\"}}")
g.Assert(messages[1].Payload).Equal("{\"original_payload\":{\"test3\":\"test4\"}}")
g.Assert(messages[0].Payload).Equal("{\"test 0\":\"test 1\"}")
g.Assert(messages[1].Payload).Equal("{\"test 1\":\"test 2\"}")
})

g.It("It should return 200 if the user is authorized into at least one topic", func() {
Expand All @@ -172,16 +128,8 @@ func TestHistoriesHandler(t *testing.T) {
topic := fmt.Sprintf("chat/test/%s", testID)
topic2 := fmt.Sprintf("chat/test/%s", testID2)

var topics, topics2 []string
topics = append(topics, topic)
topics2 = append(topics2, topic2)

query := func(c *mongo.Collection) error {
_, err := c.InsertOne(ctx, ACL{Username: "test:test", Pubsub: topics})
return err
}

err := mongoclient.GetCollection("mqtt_acl", query)
authorizedTopics := []string{topic}
err := AuthorizeTestUserInTopics(ctx, authorizedTopics)
Expect(err).To(BeNil())

testMessage := models.Message{
Expand Down Expand Up @@ -262,16 +210,8 @@ func TestHistoriesHandler(t *testing.T) {
topic := fmt.Sprintf("chat/test/%s", testID)
topic2 := fmt.Sprintf("chat/test/%s", testID2)

var topics, topics2 []string
topics = append(topics, topic)
topics2 = append(topics2, topic2)

query := func(c *mongo.Collection) error {
_, err := c.InsertMany(ctx, []interface{}{ACL{Username: "test:test", Pubsub: topics}, ACL{Username: "test:test", Pubsub: topics2}})
return err
}

err := mongoclient.GetCollection("mqtt_acl", query)
authorizedTopics := []string{topic, topic2}
err := AuthorizeTestUserInTopics(ctx, authorizedTopics)
Expect(err).To(BeNil())

testMessage := models.Message{
Expand Down
44 changes: 44 additions & 0 deletions app/histories_v2.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package app

import (
"net/http"

"github.com/topfreegames/mqtt-history/logger"
"github.com/topfreegames/mqtt-history/mongoclient"

"github.com/labstack/echo"
"github.com/topfreegames/mqtt-history/models"
)

func HistoriesV2Handler(app *App) func(c echo.Context) error {
return func(c echo.Context) error {
c.Set("route", "HistoriesV2")
topicPrefix := c.ParamValues()[0]
topicsSuffix, userID, from, limit := ParseHistoriesQueryParams(c, app.Defaults.LimitOfMessages)
topics := make([]string, len(topicsSuffix))

for i, topicSuffix := range topicsSuffix {
topics[i] = topicPrefix + "/" + topicSuffix
}

logger.Logger.Debugf("user %s is asking for histories v2 for topicPrefix %s with args topics=%s from=%d and limit=%d", userID, topicPrefix, topics, from, limit)
authenticated, authorizedTopics, err := IsAuthorized(c.StdContext(), app, userID, topics...)
if err != nil {
return err
}

if !authenticated {
return c.String(echo.ErrUnauthorized.Code, echo.ErrUnauthorized.Message)
}

// retrieve messages
messages := make([]*models.MessageV2, 0)
collection := app.Defaults.MongoMessagesCollection

for _, topic := range authorizedTopics {
topicMessages := mongoclient.GetMessagesV2(c, topic, from, limit, collection)
messages = append(messages, topicMessages...)
}
return c.JSON(http.StatusOK, messages)
}
}
Loading

0 comments on commit 6d7e327

Please sign in to comment.