Skip to content

Commit

Permalink
not use IN query and ignore wildcard topic
Browse files Browse the repository at this point in the history
  • Loading branch information
henrod committed May 23, 2018
1 parent 0505ad7 commit d93e36a
Show file tree
Hide file tree
Showing 10 changed files with 81 additions and 600 deletions.
7 changes: 6 additions & 1 deletion app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ type App struct {
DDStatsD *extnethttpmiddleware.DogStatsD
Cassandra cassandra.DataStore
Defaults *models.Defaults
Bucket *models.Bucket
}

// GetApp creates an app given the parameters
Expand Down Expand Up @@ -72,10 +73,15 @@ func (app *App) Configure() {
app.configureJaeger()
app.configureCassandra()
app.configureDefaults()
app.configureBucket()

app.configureApplication()
}

func (app *App) configureBucket() {
app.Bucket = models.NewBucket(app.Config)
}

func (app *App) configureDefaults() {
app.Defaults = &models.Defaults{
BucketQuantityOnSelect: app.Config.GetInt("cassandra.bucket.quantity"),
Expand Down Expand Up @@ -187,7 +193,6 @@ func (app *App) configureApplication() {

// Routes
a.Get("/healthcheck", HealthCheckHandler(app))
a.Get("/historysince/*", HistorySinceHandler(app))
a.Get("/history/*", HistoryHandler(app))
a.Get("/histories/*", HistoriesHandler(app))
a.Get("/:other", NotFoundHandler(app))
Expand Down
30 changes: 30 additions & 0 deletions app/common.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package app

import (
"context"

"github.com/topfreegames/mqtt-history/cassandra"
"github.com/topfreegames/mqtt-history/models"
)

func selectFromBuckets(
ctx context.Context,
bucketQuantity, limit, currentBucket int,
topic string,
cassandra cassandra.DataStore,
) []*models.Message {
messages := []*models.Message{}

for i := 0; i < bucketQuantity && len(messages) < limit; i++ {
bucket := currentBucket - i
if bucket < 0 {
break
}

queryLimit := limit - len(messages)
bucketMessages := cassandra.SelectMessagesInBucket(ctx, topic, bucket, queryLimit)
messages = append(messages, bucketMessages...)
}

return messages
}
2 changes: 1 addition & 1 deletion app/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ func authenticate(ctx context.Context, app *App, userID string, topics ...string
}
authorizedTopics := []string{}
for _, topic := range topics {
if allowed[topic] {
if allowed[topic] && !strings.Contains(topic, "+") {
authorizedTopics = append(authorizedTopics, topic)
}
}
Expand Down
8 changes: 5 additions & 3 deletions app/histories.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,13 @@ func HistoriesHandler(app *App) func(c echo.Context) error {
return c.String(echo.ErrUnauthorized.Code, echo.ErrUnauthorized.Message)
}

qnt := app.Defaults.BucketQuantityOnSelect
bucketQnt := app.Defaults.BucketQuantityOnSelect
currentBucket := app.Bucket.Get(from)
messages := []*models.Message{}

for _, topic := range authorizedTopics {
newMessages := app.Cassandra.SelectMessagesInBucket(c.StdContext(), topic, from, qnt, limit)
messages = append(messages, newMessages...)
topicMessages := selectFromBuckets(c.StdContext(), bucketQnt, limit, currentBucket, topic, app.Cassandra)
messages = append(messages, topicMessages...)
}

return c.JSON(http.StatusOK, messages)
Expand Down
33 changes: 17 additions & 16 deletions app/histories_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,11 +73,12 @@ func TestHistoriesHandler(t *testing.T) {
Topic: topic2,
}

err = a.Cassandra.InsertWithTTL(ctx, testMessage.Topic, testMessage.Payload,
testMessage.Timestamp)
bucket := a.Bucket.Get(testMessage.Timestamp.Unix())
err = a.Cassandra.InsertWithTTL(ctx, testMessage.Topic, testMessage.Payload, bucket)
Expect(err).To(BeNil())
err = a.Cassandra.InsertWithTTL(ctx, testMessage2.Topic, testMessage2.Payload,
testMessage2.Timestamp)

bucket = a.Bucket.Get(testMessage2.Timestamp.Unix())
err = a.Cassandra.InsertWithTTL(ctx, testMessage2.Topic, testMessage2.Payload, bucket)
Expect(err).To(BeNil())

path := fmt.Sprintf("/histories/chat/test?userid=test:test&topics=%s,%s", testID, testID2)
Expand Down Expand Up @@ -122,12 +123,12 @@ func TestHistoriesHandler(t *testing.T) {
Topic: topic2,
}

err = a.Cassandra.InsertWithTTL(ctx, testMessage.Topic, testMessage.Payload,
testMessage.Timestamp)
bucket := a.Bucket.Get(testMessage.Timestamp.Unix())
err = a.Cassandra.InsertWithTTL(ctx, testMessage.Topic, testMessage.Payload, bucket)
Expect(err).To(BeNil())

err = a.Cassandra.InsertWithTTL(ctx, testMessage2.Topic, testMessage2.Payload,
testMessage2.Timestamp)
bucket = a.Bucket.Get(testMessage2.Timestamp.Unix())
err = a.Cassandra.InsertWithTTL(ctx, testMessage2.Topic, testMessage2.Payload, bucket)
Expect(err).To(BeNil())

path := fmt.Sprintf("/histories/chat/test?userid=test:test&topics=%s,%s", testID, testID2)
Expand Down Expand Up @@ -171,12 +172,12 @@ func TestHistoriesHandler(t *testing.T) {
Topic: topic2,
}

err = a.Cassandra.InsertWithTTL(ctx, testMessage.Topic, testMessage.Payload,
testMessage.Timestamp)
bucket := a.Bucket.Get(testMessage.Timestamp.Unix())
err = a.Cassandra.InsertWithTTL(ctx, testMessage.Topic, testMessage.Payload, bucket)
Expect(err).To(BeNil())

err = a.Cassandra.InsertWithTTL(ctx, testMessage2.Topic, testMessage2.Payload,
testMessage2.Timestamp)
bucket = a.Bucket.Get(testMessage2.Timestamp.Unix())
err = a.Cassandra.InsertWithTTL(ctx, testMessage2.Topic, testMessage2.Payload, bucket)
Expect(err).To(BeNil())

path := fmt.Sprintf("/histories/chat/test?userid=test:test&topics=%s,%s", testID, testID2)
Expand Down Expand Up @@ -214,12 +215,12 @@ func TestHistoriesHandler(t *testing.T) {
Topic: topic2,
}

err = a.Cassandra.InsertWithTTL(ctx, testMessage.Topic, testMessage.Payload,
testMessage.Timestamp)
bucket := a.Bucket.Get(testMessage.Timestamp.Unix())
err = a.Cassandra.InsertWithTTL(ctx, testMessage.Topic, testMessage.Payload, bucket)
Expect(err).To(BeNil())

err = a.Cassandra.InsertWithTTL(ctx, testMessage2.Topic, testMessage2.Payload,
testMessage2.Timestamp)
bucket = a.Bucket.Get(testMessage2.Timestamp.Unix())
err = a.Cassandra.InsertWithTTL(ctx, testMessage2.Topic, testMessage2.Payload, bucket)
Expect(err).To(BeNil())

path := fmt.Sprintf("/histories/chat/test?userid=test:test&topics=%s,%s", testID, testID2)
Expand Down
83 changes: 6 additions & 77 deletions app/history.go
Original file line number Diff line number Diff line change
@@ -1,15 +1,10 @@
package app

import (
"encoding/json"
"errors"
"fmt"
"net/http"
"os"
"strconv"
"time"

"github.com/getsentry/raven-go"
"github.com/labstack/echo"
"github.com/topfreegames/mqtt-history/logger"
)
Expand Down Expand Up @@ -39,84 +34,18 @@ func HistoryHandler(app *App) func(c echo.Context) error {
logger.Logger.Debugf(
"user %s is asking for history for topic %s with args from=%d and limit=%d",
userID, topic, from, limit)
if !authenticated {
return c.String(echo.ErrUnauthorized.Code, echo.ErrUnauthorized.Message)
}

qnt := app.Defaults.BucketQuantityOnSelect
messages := app.Cassandra.SelectMessagesInBucket(c.StdContext(),
topic,
from, qnt, limit)

return c.JSON(http.StatusOK, messages)
}
}

// HistorySinceHandler is the handler responsible for sending the rooms history to the player based in a initial date
func HistorySinceHandler(app *App) func(c echo.Context) error {
return func(c echo.Context) error {
c.Set("route", "HistorySince")
topic := c.ParamValues()[0]
userID := c.QueryParam("userid")
from, err := strconv.ParseInt(c.QueryParam("from"), 10, 64)
limit, err := strconv.Atoi(c.QueryParam("limit"))
since, err := strconv.ParseInt(c.QueryParam("since"), 10, 64)

now := int64(time.Now().Unix())
if since > now {
errorString := fmt.Sprintf("user %s is asking for history for topic %s with args from=%d, limit=%d and since=%d. Since is in the future, setting to 0!", userID, topic, from, limit, since)

logger.Logger.Errorf(errorString)

tags := map[string]string{
"source": "app",
"type": "Since is furure",
"url": c.Request().URI(),
"user-agent": c.Request().Header().Get("User-Agent"),
}

raven.CaptureError(errors.New(errorString), tags)
since = 0
limit = 100
}

defaultLimit := 10
if limitFromEnv := os.Getenv("HISTORYSINCE_LIMIT"); limitFromEnv != "" {
defaultLimit, err = strconv.Atoi(limitFromEnv)
}
if limit == 0 {
limit = defaultLimit
}

if from == 0 {
from = time.Now().Unix()
}

logger.Logger.Debugf("user %s is asking for history for topic %s with args from=%d, limit=%d and since=%d", userID, topic, from, limit, since)
authenticated, _, err := authenticate(c.StdContext(), app, userID, topic)
if err != nil {
return err
}

if !authenticated {
logger.Logger.Debugf(
"responded to user %s history for topic %s with args from=%d limit=%d and since=%d with code=%d and message=%s",
userID, topic, from, limit, since, echo.ErrUnauthorized.Code, echo.ErrUnauthorized.Message,
)
return c.String(echo.ErrUnauthorized.Code, echo.ErrUnauthorized.Message)
}

messages := app.Cassandra.SelectMessagesBeforeTime(c.StdContext(), topic, from, since, limit)
bucketQnt := app.Defaults.BucketQuantityOnSelect
currentBucket := app.Bucket.Get(from)

var resStr []byte
resStr, err = json.Marshal(messages)
if err != nil {
return err
}
logger.Logger.Debugf(
"responded to user %s history for topic %s with args from=%d limit=%d and since=%d with code=%d and message=%s",
userID, topic, from, limit, since, http.StatusOK, string(resStr),
)
messages := selectFromBuckets(c.StdContext(),
bucketQnt, limit, currentBucket,
topic,
app.Cassandra)

return c.JSON(http.StatusOK, messages)
}
Expand Down
Loading

0 comments on commit d93e36a

Please sign in to comment.