Skip to content

Commit

Permalink
Deprec/remove cassandra from mqtt (#33)
Browse files Browse the repository at this point in the history
* removes cassandra from dockercompose file

* removes cassandra from makefile

* removes cassandra from tests

* removes cassandra package

* removes code related to cassandra

* removes mongoenabled flag

* undo changes

* removes unecessary comment
  • Loading branch information
mflilian committed Oct 7, 2022
1 parent fb016af commit 42968f1
Show file tree
Hide file tree
Showing 62 changed files with 40 additions and 16,697 deletions.
10 changes: 1 addition & 9 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -21,18 +21,10 @@ run-containers: ## run all test containers
kill-containers: ## kill all test containers
@cd test_containers && docker-compose stop && cd ..

CASSANDRA_CONTAINER := mqtt-history_cassandra_1
create-cassandra-table:
@until docker exec $(CASSANDRA_CONTAINER) cqlsh -e 'describe cluster'; do echo 'Waiting for Cassandra...' && sleep 2; done
@echo 'Creating keyspace and table on Cassandra'
@docker exec $(CASSANDRA_CONTAINER) cqlsh -e "$$(cat scripts/create.cql)";
@echo 'Done'

setup/mongo:
go run scripts/setup_mongo_messages-index.go

run-tests: run-containers ## run tests using the docker containers
@make CASSANDRA_CONTAINER=mqtthistory_test_cassandra create-cassandra-table
@make coverage
@make kill-containers

Expand All @@ -45,7 +37,7 @@ run: ## start the API
@go run main.go start

deps: ## start the API dependencies as docker containers
@docker-compose up -d mongo cassandra
@docker-compose up -d mongo

cross: cross-linux cross-darwin

Expand Down
9 changes: 4 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
[![Build Status](https://travis-ci.org/topfreegames/mqtt-history.svg?branch=master)](https://travis-ci.org/topfreegames/mqtt-history)
[![Coverage Status](https://coveralls.io/repos/github/topfreegames/mqtt-history/badge.svg?branch=master)](https://coveralls.io/github/topfreegames/mqtt-history?branch=master)

An MQTT-based history handler for messages recorded by [mqttbot](https://github.com/topfreegames/mqttbot) in Cassandra
An MQTT-based history handler for messages recorded by [mqttbot](https://github.com/topfreegames/mqttbot) in MongoDB

There's also support for messages stored in MongoDB, assuming the message documents contain these **required** fields:
```
Expand Down Expand Up @@ -33,27 +33,26 @@ Use `make setup/mongo` to create indexes on MongoDB for querying messages over

## Features
- Listen to healthcheck requests
- Retrieve message history from Cassandra when requested by users
- Retrieve message history from MongoDB when requested by users
- Authorization handling with support for MongoDB or an HTTP Authorization API

## Setup

Make sure you have Go installed on your machine.

You also need to have access to running instances of Cassandra and Mongo.
You also need to have access to running instances of Mongo.

### Running the application

If you want to run the application locally you can do so by running

```
make deps
make create-cassandra-table
make setup/mongo
make run
```

You may need to change the configurations to point to your MQTT, Cassandra
You may need to change the configurations to point to your MQTT
and Mongo servers, or you can use the provided containers, they can be run
by executing `make run-containers`

Expand Down
34 changes: 5 additions & 29 deletions app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import (
"github.com/labstack/echo/engine/standard"
"github.com/spf13/viper"
"github.com/topfreegames/extensions/echo"
"github.com/topfreegames/mqtt-history/cassandra"

"github.com/topfreegames/mqtt-history/logger"
"github.com/topfreegames/mqtt-history/models"
"github.com/uber-go/zap"
Expand All @@ -41,7 +41,6 @@ type App struct {
NewRelic newrelic.Application
NumberOfDaysToSearch int
DDStatsD *extnethttpmiddleware.DogStatsD
Cassandra cassandra.DataStore
Defaults *models.Defaults
Bucket *models.Bucket
}
Expand Down Expand Up @@ -82,43 +81,20 @@ func (app *App) configureBucket() {
}

func (app *App) configureStorage() {
if app.Defaults.MongoEnabled {
app.Defaults.LimitOfMessages = app.Config.GetInt64("mongo.messages.limit")
return
}

app.Defaults.LimitOfMessages = app.Config.GetInt64("mongo.messages.limit")
return

app.configureBucket()
if app.Defaults.CassandraEnabled {
app.configureCassandra()
}
}

func (app *App) configureDefaults() {
app.Defaults = &models.Defaults{
BucketQuantityOnSelect: app.Config.GetInt("cassandra.bucket.quantity"),
LimitOfMessages: app.Config.GetInt64("cassandra.messages.limit"),
MongoEnabled: app.Config.GetBool("mongo.messages.enabled"),
LimitOfMessages: app.Config.GetInt64("mongo.messages.limit"),
MongoMessagesCollection: app.Config.GetString("mongo.messages.collection"),
CassandraEnabled: app.Config.GetBool("cassandra.enabled"),
}
}

func (app *App) configureCassandra() {
logger.Logger.Infof("Connecting to Cassandra")
cassandra, err := cassandra.GetCassandra(
logger.Logger,
app.Config,
app.DDStatsD,
)
if err != nil {
logger.Logger.Error("Failed to initialize Cassandra.", zap.Error(err))
panic(fmt.Sprintf("Could not initialize Cassandra, err: %s", err))
}

logger.Logger.Info("Initialized Cassandra successfully.")
app.Cassandra = cassandra
}

func (app *App) configureNewRelic() {
newRelicKey := app.Config.GetString("newrelic.key")
config := newrelic.NewConfig("mqtt-history", newRelicKey)
Expand Down
30 changes: 0 additions & 30 deletions app/common.go

This file was deleted.

62 changes: 11 additions & 51 deletions app/histories.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package app

import (
"net/http"
"sync"

"github.com/topfreegames/mqtt-history/mongoclient"

Expand Down Expand Up @@ -32,61 +31,22 @@ func HistoriesHandler(app *App) func(c echo.Context) error {
}

messages := make([]*models.Message, 0)
if app.Defaults.MongoEnabled {
collection := app.Defaults.MongoMessagesCollection
var wg sync.WaitGroup
var mu sync.Mutex
// guarantees ordering in responses payload
topicsMessagesMap := make(map[string][]*models.MessageV2, len(authorizedTopics))
for _, topic := range authorizedTopics {
wg.Add(1)
go func(topicsMessagesMap map[string][]*models.MessageV2, topic string) {
topicMessages := mongoclient.GetMessagesV2(
c,
mongoclient.QueryParameters{
Topic: topic,
From: from,
Limit: limit,
Collection: collection,
},
)
mu.Lock()
topicsMessagesMap[topic] = topicMessages
mu.Unlock()
wg.Done()
}(topicsMessagesMap, topic)
}
wg.Wait()
var gameID string
// guarantees ordering in responses payload
for _, topic := range authorizedTopics {
topicMessages := make([]*models.Message, len(topicsMessagesMap[topic]))
for idx, topicMessageV2 := range topicsMessagesMap[topic] {
topicMessages[idx] = mongoclient.ConvertMessageV2ToMessage(topicMessageV2)
}
messages = append(messages, topicMessages...)
if gameID != "" && len(topicsMessagesMap[topic]) > 0 {
gameID = topicsMessagesMap[topic][0].GameId
}
}

if gameID != "" {
if metricTagsMap, ok := c.Get("metricTagsMap").(map[string]interface{}); ok {
metricTagsMap["gameID"] = gameID
}
}

return c.JSON(http.StatusOK, messages)
}

bucketQnt := app.Defaults.BucketQuantityOnSelect
currentBucket := app.Bucket.Get(from)
collection := app.Defaults.MongoMessagesCollection

for _, topic := range authorizedTopics {
topicMessages := selectFromBuckets(c.StdContext(), bucketQnt, int(limit), currentBucket, topic, app.Cassandra)
topicMessages := mongoclient.GetMessages(
c,
mongoclient.QueryParameters{
Topic: topic,
From: from,
Limit: limit,
Collection: collection,
},
)
messages = append(messages, topicMessages...)
}

return c.JSON(http.StatusOK, messages)

}
}
Loading

0 comments on commit 42968f1

Please sign in to comment.