Skip to content

Commit

Permalink
Update to elastic search 5
Browse files Browse the repository at this point in the history
  • Loading branch information
henrod committed Jun 6, 2017
1 parent 6a337e5 commit 81a7cea
Show file tree
Hide file tree
Showing 12 changed files with 52 additions and 39 deletions.
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
FROM golang:1.8-alpine
FROM golang:1.8.3-alpine

MAINTAINER TFG Co <backend@tfgco.com>

Expand Down
9 changes: 5 additions & 4 deletions app/histories.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package app

import (
"context"
"fmt"
"net/http"
"reflect"
Expand All @@ -10,7 +11,7 @@ import (
"github.com/labstack/echo"
"github.com/topfreegames/mqtt-history/es"
"github.com/topfreegames/mqtt-history/logger"
"gopkg.in/olivere/elastic.v3"
"gopkg.in/olivere/elastic.v5"
)

// HistoriesHandler is the handler responsible for sending multiples rooms history to the player
Expand All @@ -19,7 +20,7 @@ func HistoriesHandler(app *App) func(c echo.Context) error {
esclient := es.GetESClient()
c.Set("route", "Histories")
topicPrefix := c.ParamValues()[0]
authorizedTopics := []string{}
authorizedTopics := []interface{}{}
userID := c.QueryParam("userid")
topicsSuffix := strings.Split(c.QueryParam("topics"), ",")
topics := make([]string, len(topicsSuffix))
Expand Down Expand Up @@ -54,13 +55,13 @@ func HistoriesHandler(app *App) func(c echo.Context) error {
if redisResults[0] != nil && len(authorizedTopics) > 0 {
boolQuery := elastic.NewBoolQuery()
topicBoolQuery := elastic.NewBoolQuery()
topicBoolQuery.Should(elastic.NewTermsQuery("topic", authorizedTopics))
topicBoolQuery.Should(elastic.NewTermsQuery("topic", authorizedTopics...))
boolQuery.Must(topicBoolQuery)

var searchResults *elastic.SearchResult
err = WithSegment("elasticsearch", c, func() error {
searchResults, err = esclient.Search().Index("chat").Query(boolQuery).
Sort("timestamp", false).From(from).Size(limit).Do()
Sort("timestamp", false).From(from).Size(limit).Do(context.TODO())
return err
})

Expand Down
13 changes: 7 additions & 6 deletions app/histories_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
package app_test

import (
"context"
"encoding/json"
"fmt"
"net/http"
Expand Down Expand Up @@ -71,10 +72,10 @@ func TestHistoriesHandler(t *testing.T) {
Topic: topic2,
}

_, err = esclient.Index().Index("chat").Type("message").BodyJson(testMessage).Do()
_, err = esclient.Index().Index("chat").Type("message").BodyJson(testMessage).Do(context.TODO())
Expect(err).To(BeNil())

_, err = esclient.Index().Index("chat").Type("message").BodyJson(testMessage2).Do()
_, err = esclient.Index().Index("chat").Type("message").BodyJson(testMessage2).Do(context.TODO())
Expect(err).To(BeNil())

refreshIndex()
Expand Down Expand Up @@ -113,10 +114,10 @@ func TestHistoriesHandler(t *testing.T) {
Topic: topic2,
}

_, err = esclient.Index().Index("chat").Type("message").BodyJson(testMessage).Do()
_, err = esclient.Index().Index("chat").Type("message").BodyJson(testMessage).Do(context.TODO())
Expect(err).To(BeNil())

_, err = esclient.Index().Index("chat").Type("message").BodyJson(testMessage2).Do()
_, err = esclient.Index().Index("chat").Type("message").BodyJson(testMessage2).Do(context.TODO())
Expect(err).To(BeNil())

refreshIndex()
Expand Down Expand Up @@ -153,10 +154,10 @@ func TestHistoriesHandler(t *testing.T) {
Topic: topic2,
}

_, err = esclient.Index().Index("chat").Type("message").BodyJson(testMessage).Do()
_, err = esclient.Index().Index("chat").Type("message").BodyJson(testMessage).Do(context.TODO())
Expect(err).To(BeNil())

_, err = esclient.Index().Index("chat").Type("message").BodyJson(testMessage2).Do()
_, err = esclient.Index().Index("chat").Type("message").BodyJson(testMessage2).Do(context.TODO())
Expect(err).To(BeNil())

refreshIndex()
Expand Down
7 changes: 4 additions & 3 deletions app/history.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package app

import (
"context"
"encoding/json"
"errors"
"fmt"
Expand All @@ -14,7 +15,7 @@ import (
"github.com/labstack/echo"
"github.com/topfreegames/mqtt-history/es"
"github.com/topfreegames/mqtt-history/logger"
"gopkg.in/olivere/elastic.v3"
"gopkg.in/olivere/elastic.v5"
)

// Message represents a chat message
Expand Down Expand Up @@ -58,7 +59,7 @@ func HistoryHandler(app *App) func(c echo.Context) error {
var searchResults *elastic.SearchResult
err = WithSegment("elasticsearch", c, func() error {
searchResults, err = esclient.Search().Index("chat").Query(boolQuery).
Sort("timestamp", false).From(from).Size(limit).Do()
Sort("timestamp", false).From(from).Size(limit).Do(context.TODO())
return err
})

Expand Down Expand Up @@ -145,7 +146,7 @@ func HistorySinceHandler(app *App) func(c echo.Context) error {
var searchResults *elastic.SearchResult
err = WithSegment("elasticsearch", c, func() error {
searchResults, err = esclient.Search().Index("chat").Query(boolQuery).
Sort("timestamp", false).From(from).Size(limit).Pretty(true).Do()
Sort("timestamp", false).From(from).Size(limit).Pretty(true).Do(context.TODO())
return err
})

Expand Down
23 changes: 12 additions & 11 deletions app/history_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ package app_test

import (
"bytes"
"context"
"encoding/json"
"fmt"
"net/http"
Expand All @@ -17,7 +18,7 @@ import (

. "github.com/franela/goblin"
. "github.com/onsi/gomega"
"github.com/satori/go.uuid"
uuid "github.com/satori/go.uuid"
. "github.com/topfreegames/mqtt-history/app"
"github.com/topfreegames/mqtt-history/es"
"github.com/topfreegames/mqtt-history/redisclient"
Expand Down Expand Up @@ -70,7 +71,7 @@ func TestHistoryHandler(t *testing.T) {
Payload: "{\"test1\":\"test2\"}",
Topic: topic,
}
_, err = esclient.Index().Index("chat").Type("message").BodyJson(testMessage).Do()
_, err = esclient.Index().Index("chat").Type("message").BodyJson(testMessage).Do(context.TODO())
Expect(err).To(BeNil())

refreshIndex()
Expand Down Expand Up @@ -130,7 +131,7 @@ func TestHistoryHandler(t *testing.T) {
Topic: topic,
}

_, err = esclient.Index().Index("chat").Type("message").BodyJson(testMessage).Do()
_, err = esclient.Index().Index("chat").Type("message").BodyJson(testMessage).Do(context.TODO())
Expect(err).To(BeNil())

refreshIndex()
Expand Down Expand Up @@ -186,7 +187,7 @@ func TestHistoryHandler(t *testing.T) {
"/historysince/%s?userid=test:test&since=%d",
topic, (time.Now().UnixNano() / 1000000000), // now
)
_, err = esclient.Index().Index("chat").Type("message").BodyJson(testMessage).Do()
_, err = esclient.Index().Index("chat").Type("message").BodyJson(testMessage).Do(context.TODO())
Expect(err).To(BeNil())

// Update indexes
Expand Down Expand Up @@ -229,7 +230,7 @@ func TestHistoryHandler(t *testing.T) {
Payload: "{\"test1\":\"test2\"}",
Topic: topic,
}
_, err = esclient.Index().Index("chat").Type("message").BodyJson(testMessage).Do()
_, err = esclient.Index().Index("chat").Type("message").BodyJson(testMessage).Do(context.TODO())
Expect(err).To(BeNil())
}

Expand Down Expand Up @@ -278,7 +279,7 @@ func TestHistoryHandler(t *testing.T) {
Payload: "{\"test1\":\"test2\"}",
Topic: topic,
}
_, err = esclient.Index().Index("chat").Type("message").BodyJson(testMessage).Do()
_, err = esclient.Index().Index("chat").Type("message").BodyJson(testMessage).Do(context.TODO())
Expect(err).To(BeNil())
}

Expand Down Expand Up @@ -322,7 +323,7 @@ func TestHistoryHandler(t *testing.T) {
Payload: "{\"test1\":\"test2\"}",
Topic: topic,
}
_, err = esclient.Index().Index("chat").Type("message").BodyJson(testMessage).Do()
_, err = esclient.Index().Index("chat").Type("message").BodyJson(testMessage).Do(context.TODO())
Expect(err).To(BeNil())
}

Expand Down Expand Up @@ -370,7 +371,7 @@ func TestHistoryHandler(t *testing.T) {
Payload: "{\"test1\":\"test2\"}",
Topic: topic,
}
_, err = esclient.Index().Index("chat").Type("message").BodyJson(testMessage).Do()
_, err = esclient.Index().Index("chat").Type("message").BodyJson(testMessage).Do(context.TODO())
Expect(err).To(BeNil())

messageTime = baseTime + 1*second
Expand All @@ -379,7 +380,7 @@ func TestHistoryHandler(t *testing.T) {
Payload: "{\"test1\":\"test2\"}",
Topic: fmt.Sprintf("%s/moremore", topic),
}
_, err = esclient.Index().Index("chat").Type("message").BodyJson(testMessage).Do()
_, err = esclient.Index().Index("chat").Type("message").BodyJson(testMessage).Do(context.TODO())
Expect(err).To(BeNil())

// Update indexes
Expand Down Expand Up @@ -423,7 +424,7 @@ func TestHistoryHandler(t *testing.T) {
Payload: "{\"test1\":\"test2\"}",
Topic: topic,
}
_, err = esclient.Index().Index("chat").Type("message").BodyJson(testMessage).Do()
_, err = esclient.Index().Index("chat").Type("message").BodyJson(testMessage).Do(context.TODO())
Expect(err).To(BeNil())
}

Expand Down Expand Up @@ -471,7 +472,7 @@ func TestHistoryHandler(t *testing.T) {
Payload: "{\"test1\":\"test2\"}",
Topic: topic,
}
_, err = esclient.Index().Index("chat").Type("message").BodyJson(testMessage).Do()
_, err = esclient.Index().Index("chat").Type("message").BodyJson(testMessage).Do(context.TODO())
Expect(err).To(BeNil())
}

Expand Down
2 changes: 1 addition & 1 deletion config/local.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ elasticsearch:
host: "http://localhost:9123"
sniff: false
indexMappings:
chat: '{ "mappings": { "message":{"_ttl": { "enabled": true, "default": "2d" }}}}'
chat: '{ "mappings": { "message":{}}}'
redis:
host: "localhost"
port: 4444
Expand Down
2 changes: 1 addition & 1 deletion config/test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ elasticsearch:
host: "http://localhost:9123"
sniff: false
indexMappings:
chat: '{ "mappings": { "message":{"_ttl": { "enabled": true, "default": "2d" }}}}'
chat: '{ "mappings": { "message":{}}}'
redis:
host: "localhost"
port: 4444
Expand Down
8 changes: 7 additions & 1 deletion docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,13 @@ services:
image: redis:3.2
command: redis-server
elasticsearch:
image: elasticsearch:2.3.4
image: docker.elastic.co/elasticsearch/elasticsearch:5.4.1
ports:
- "9200:9200"
environment:
- http.host=0.0.0.0
- transport.host=127.0.0.1
- xpack.security.enabled=false
mosquitto:
image: tfgco/mosquitto_with_auth:0.0.1
environment:
Expand Down
5 changes: 3 additions & 2 deletions es/es_client.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
package es

import (
"context"
"fmt"
"os"
"strings"
"sync"

"gopkg.in/olivere/elastic.v3"
"gopkg.in/olivere/elastic.v5"

"github.com/op/go-logging"
"github.com/spf13/viper"
Expand Down Expand Up @@ -62,7 +63,7 @@ func configureESClient() {

indexes := viper.GetStringMapString("elasticsearch.indexMappings")
for index, mappings := range indexes {
_, err = client.CreateIndex(index).Body(mappings).Do()
_, err = client.CreateIndex(index).Body(mappings).Do(context.TODO())
if err != nil {
if strings.Contains(err.Error(), "index_already_exists_exception") || strings.Contains(err.Error(), "IndexAlreadyExistsException") ||
strings.Contains(err.Error(), "already exists as alias") {
Expand Down
10 changes: 4 additions & 6 deletions glide.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion glide.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ import:
- package: github.com/gavv/httpexpect
- package: github.com/ajg/form
- package: github.com/valyala/fasthttp
- package: gopkg.in/olivere/elastic.v3
- package: gopkg.in/olivere/elastic.v5
version: ^5.0.39
- package: github.com/uber-go/zap
- package: github.com/uber-go/atomic
- package: github.com/getsentry/raven-go
Expand Down
7 changes: 5 additions & 2 deletions test_containers/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,13 @@ services:
labels:
- "local.example.description=Mosquitto v1.4.9 MQTT server"
elasticsearch:
image: elasticsearch:2.4
image: docker.elastic.co/elasticsearch/elasticsearch:5.4.1
ports:
- "9123:9200"
- "9300:9300"
command: "elasticsearch -Des.logger.level=DEBUG"
environment:
- http.host=0.0.0.0
- transport.host=127.0.0.1
- xpack.security.enabled=false
networks:
mosqnet:

0 comments on commit 81a7cea

Please sign in to comment.