From 5172c71e68312d4e7329b3fbd52280a7056e42c2 Mon Sep 17 00:00:00 2001 From: Rafael Acevedo Date: Mon, 22 Apr 2019 08:20:12 -0300 Subject: [PATCH 1/5] feat(elasticsearch): remove types to support elasticsearch 7.0 --- Gopkg.lock | 1 + Makefile | 2 +- docker-compose.yml | 23 +++++++++++++++++++---- src/elasticsearch/codec.go | 2 +- src/elasticsearch/elasticsearch.go | 10 +++++++--- src/injector/store/store.go | 4 ++-- 6 files changed, 31 insertions(+), 11 deletions(-) diff --git a/Gopkg.lock b/Gopkg.lock index 15d6ef3..fb84d83 100644 --- a/Gopkg.lock +++ b/Gopkg.lock @@ -266,6 +266,7 @@ "github.com/go-kit/kit/metrics/prometheus", "github.com/inloco/goavro", "github.com/olivere/elastic", + "github.com/pkg/errors", "github.com/prometheus/client_golang/prometheus", "github.com/prometheus/client_golang/prometheus/promhttp", "github.com/stretchr/testify/assert", diff --git a/Makefile b/Makefile index 6ac0012..e08aef5 100644 --- a/Makefile +++ b/Makefile @@ -15,5 +15,5 @@ docker/run: count=0; \ until curl localhost:9200 || ((count ++ >= 10)); \ do echo "Retrying: Verify if Elasticsearch is ready"; sleep 5; done - curl -XPOST "localhost:9200/_template/my-topic" --data '{"template":"my-topic-*","settings":{"refresh_interval":"30s","number_of_replicas":0},"mappings":{"_default_":{"_all":{"enabled":"false"},"_source":{"enabled":"true"},"properties":{"@timestamp":{"format":"epoch_millis","ignore_malformed":true,"type":"date"}},"dynamic_templates":[{"strings":{"match_mapping_type":"string","mapping":{"type":"text","index":false}}}]}}}' + curl -XPOST -H "Content-Type: application/json" "localhost:9200/_template/my-topic" --data '{"template":"my-topic-*","settings":{"refresh_interval":"30s","number_of_replicas":0},"mappings":{"_source":{"enabled":"true"},"properties":{"@timestamp":{"format":"epoch_millis","ignore_malformed":true,"type":"date"}},"dynamic_templates":[{"strings":{"match_mapping_type":"string","mapping":{"type":"keyword","index":true}}}]}}' docker-compose up -d producer app \ No newline at end of file diff --git a/docker-compose.yml b/docker-compose.yml index 67d8cfd..4e36ecc 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -25,12 +25,23 @@ services: ports: - "8081:8081" elasticsearch: - image: 'elasticsearch:5-alpine' + image: 'docker.elastic.co/elasticsearch/elasticsearch-oss:7.0.0' container_name: elasticsearch + environment: + - cluster.name=docker-cluster + - bootstrap.memory_lock=true + - discovery.type=single-node + - "ES_JAVA_OPTS=-Xms512m -Xmx512m" ports: - - '9200:9200' + - "9200:9200" + ulimits: + memlock: + soft: -1 + hard: -1 + volumes: + - esdata1:/usr/share/elasticsearch/data kibana: - image: 'kibana:5' + image: 'docker.elastic.co/kibana/kibana-oss:7.0.0' container_name: kibana depends_on: - elasticsearch @@ -70,4 +81,8 @@ services: environment: - KAFKA_ADDRESS=kafka:9092 - SCHEMA_REGISTRY_URL=http://schema-registry:8081 - - LOG_LEVEL=DEBUG \ No newline at end of file + - LOG_LEVEL=DEBUG + +volumes: + esdata1: + driver: local \ No newline at end of file diff --git a/src/elasticsearch/codec.go b/src/elasticsearch/codec.go index 35898e6..f2b0f8a 100644 --- a/src/elasticsearch/codec.go +++ b/src/elasticsearch/codec.go @@ -36,7 +36,7 @@ func (c basicCodec) EncodeElasticRecords(records []*models.Record) ([]*models.El elasticRecords[idx] = &models.ElasticRecord{ Index: index, - Type: record.Topic, + Type: "_doc", ID: docID, Json: record.FilteredFieldsJSON(c.config.BlacklistedColumns), } diff --git a/src/elasticsearch/elasticsearch.go b/src/elasticsearch/elasticsearch.go index e45a5b9..8a35ef6 100644 --- a/src/elasticsearch/elasticsearch.go +++ b/src/elasticsearch/elasticsearch.go @@ -2,9 +2,10 @@ package elasticsearch import ( "context" - "fmt" + "github.com/pkg/errors" + "net/http" "github.com/go-kit/kit/log" @@ -53,7 +54,7 @@ func (d recordDatabase) CloseClient() { type InsertResponse struct { AlreadyExists []string Retry []*models.ElasticRecord - Overloaded bool + Backoff bool } func (d recordDatabase) Insert(records []*models.ElasticRecord) (*InsertResponse, error) { @@ -65,7 +66,10 @@ func (d recordDatabase) Insert(records []*models.ElasticRecord) (*InsertResponse ctx, cancel := context.WithTimeout(context.Background(), timeout) defer cancel() res, err := bulkRequest.Do(ctx) - + if err == elastic.ErrNoClient || errors.Cause(err) == elastic.ErrNoClient { + level.Warn(d.logger).Log("message", "no elasticsearch node available", "err", err) + return &InsertResponse{AlreadyExists: nil, Retry: records, Backoff: true}, nil + } if err != nil { return nil, err } diff --git a/src/injector/store/store.go b/src/injector/store/store.go index b9f4316..b257d48 100644 --- a/src/injector/store/store.go +++ b/src/injector/store/store.go @@ -33,10 +33,10 @@ func (s basicStore) Insert(records []*models.Record) error { break } //some records failed to index, backoff(if overloaded) then retry - if res.Overloaded { + if res.Backoff { time.Sleep(s.backoff) } - s.db.Insert(res.Retry) + elasticRecords = res.Retry } return nil } From 502cf283a03ebc837a4e93bf18a7d55fe341e37b Mon Sep 17 00:00:00 2001 From: Rafael Acevedo Date: Mon, 22 Apr 2019 09:12:14 -0300 Subject: [PATCH 2/5] feat(go): migrate to go modules --- .circleci/config.yml | 50 +---- Gopkg.lock | 275 ------------------------ Gopkg.toml | 37 ---- go.mod | 21 ++ go.sum | 216 +++++++++++++++++++ src/elasticsearch/codec.go | 4 +- src/elasticsearch/codec_test.go | 4 +- src/elasticsearch/elasticsearch.go | 2 +- src/elasticsearch/elasticsearch_test.go | 12 +- src/kafka/consumer_test.go | 7 +- 10 files changed, 260 insertions(+), 368 deletions(-) delete mode 100644 Gopkg.lock delete mode 100644 Gopkg.toml create mode 100644 go.mod create mode 100644 go.sum diff --git a/.circleci/config.yml b/.circleci/config.yml index 5696b61..0d0b3dc 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -1,12 +1,10 @@ version: 2 jobs: build: - working_directory: /go/src environment: - PACKAGE_PATH: "github.com/inloco/kafka-elasticsearch-injector" - - LOCAL_BIN_PATH: "~/bin:$PATH" docker: - - image: circleci/golang:1.10 + - image: circleci/golang:1.12 environment: DOCKER_REPO: inlocomedia/kafka-elasticsearch-injector-go - image: confluentinc/cp-zookeeper:3.1.1 @@ -21,21 +19,17 @@ jobs: - SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL=localhost:2181 - SCHEMA_REGISTRY_HOST_NAME=localhost - SCHEMA_REGISTRY_LISTENERS=http://localhost:8081 - - image: elasticsearch:5-alpine + - image: docker.elastic.co/elasticsearch/elasticsearch-oss:7.0.0 environment: ES_JAVA_OPTS: -Xms750m -Xmx750m discovery.type: single-node http.host: 0.0.0.0 transport.host: 127.0.0.1 steps: - - checkout: - path: $PACKAGE_PATH - pwd: /go/src/github.com/inloco/kafka-elasticsearch-injector + - checkout - run: name: Generate helper files - command: | - echo $(git rev-parse --short HEAD) > HEAD - pwd: /go/src/github.com/inloco/kafka-elasticsearch-injector + command: echo $(git rev-parse --short HEAD) > HEAD - setup_remote_docker: reusable: true - run: @@ -43,35 +37,9 @@ jobs: command: | docker info docker login --username $DOCKER_USER --password $DOCKER_PASSWORD - - run: - name: "Make golang get play nice with repositories" - command: | - git config --global http.https://gopkg.in.followRedirects true - git config --global url."git@bitbucket.org:".insteadOf "https://bitbucket.org/" - git config --global url."git@github.com:".insteadOf "https://github.com/" - cat ~/.gitconfig - ssh-keyscan bitbucket.org >> ~/.ssh/known_hosts - ssh-keyscan github.com >> ~/.ssh/known_hosts - pwd: /go/src/github.com/inloco/kafka-elasticsearch-injector - - run: - name: "Install dep" - command: curl https://raw.githubusercontent.com/golang/dep/v0.5.0/install.sh | sh - pwd: /go/src/github.com/inloco/kafka-elasticsearch-injector - restore_cache: keys: - - kafka-elasticsearch-injector-go-{{ .Branch }}-{{ checksum "/go/src/github.com/inloco/kafka-elasticsearch-injector/Gopkg.lock" }} - - kafka-elasticsearch-injector-go-develop-{{ checksum "/go/src/github.com/inloco/kafka-elasticsearch-injector/Gopkg.lock" }} - - run: - name: "Install package deps" - command: | - if [ ! -d "vendor/" ]; then - dep ensure -v - fi - pwd: /go/src/github.com/inloco/kafka-elasticsearch-injector - - save_cache: - key: kafka-elasticsearch-injector-go-{{ .Branch }}-{{ checksum "/go/src/github.com/inloco/kafka-elasticsearch-injector/Gopkg.lock" }} - paths: - - /go/src/github.com/inloco/kafka-elasticsearch-injector/vendor + - go-mod-v4-{{ checksum "go.sum" }} - run: name: "Make sure elasticsearch is up and running" command: | @@ -81,16 +49,17 @@ jobs: - run: name: "Run project tests (excluding vendor)" command: go test $(go list ./... | grep -v /vendor/) - pwd: /go/src/github.com/inloco/kafka-elasticsearch-injector - run: name: Build injector command: GOOS=linux GOARCH=386 go build -a --ldflags="-s" -o bin/injector cmd/injector.go - pwd: /go/src/github.com/inloco/kafka-elasticsearch-injector - run: name: Build injector docker image command: | docker build --rm=false -t "${DOCKER_REPO}:ci" -f cmd/Dockerfile . - pwd: /go/src/github.com/inloco/kafka-elasticsearch-injector + - save_cache: + key: go-mod-v4-{{ checksum "go.sum" }} + paths: + - "/go/pkg/mod" - deploy: name: Push to Docker Hub command: | @@ -109,4 +78,3 @@ jobs: docker tag "${DOCKER_REPO}:ci" "${DOCKER_REPO}:$(cat VERSION)-$(cat HEAD)" docker push "${DOCKER_REPO}:$(cat VERSION)-$(cat HEAD)" fi - pwd: /go/src/github.com/inloco/kafka-elasticsearch-injector diff --git a/Gopkg.lock b/Gopkg.lock deleted file mode 100644 index fb84d83..0000000 --- a/Gopkg.lock +++ /dev/null @@ -1,275 +0,0 @@ -# This file is autogenerated, do not edit; changes may be undone by the next 'dep ensure'. - - -[[projects]] - digest = "1:355a000a06dc127c555e408c3778f9968fe4c1680ea6a1c13ba5612274b78551" - name = "github.com/Shopify/sarama" - packages = ["."] - pruneopts = "" - revision = "3b1b38866a79f06deddf0487d5c27ba0697ccd65" - version = "v1.15.0" - -[[projects]] - branch = "master" - digest = "1:0c5485088ce274fac2e931c1b979f2619345097b39d91af3239977114adf0320" - name = "github.com/beorn7/perks" - packages = ["quantile"] - pruneopts = "" - revision = "4c0e84591b9aa9e6dcfdf3e020114cd81f89d5f9" - -[[projects]] - digest = "1:60a269b5c75d69a6aa69c6439f8bf98c471c192427b94c1bc2314a5874ba3fac" - name = "github.com/bsm/sarama-cluster" - packages = ["."] - pruneopts = "" - revision = "3001c2453136632aa3219a58ea3795bb584b83b5" - version = "v2.1.12" - -[[projects]] - digest = "1:cf8ceeac78a0f94bd7f86f46569c918d08ddfb49ec844e3beadfd24808cb18e4" - name = "github.com/datamountaineer/schema-registry" - packages = ["."] - pruneopts = "" - revision = "6240b64c5baac4be7f827b042afaa90e8fcfa439" - version = "0.1.0" - -[[projects]] - digest = "1:56c130d885a4aacae1dd9c7b71cfe39912c7ebc1ff7d2b46083c8812996dc43b" - name = "github.com/davecgh/go-spew" - packages = ["spew"] - pruneopts = "" - revision = "346938d642f2ec3594ed81d874461961cd0faa76" - version = "v1.1.0" - -[[projects]] - digest = "1:f714fa0ab4449a2fe13d156446ac1c1e16bc85334e9be320d42bf8bee362ba45" - name = "github.com/eapache/go-resiliency" - packages = ["breaker"] - pruneopts = "" - revision = "6800482f2c813e689c88b7ed3282262385011890" - version = "v1.0.0" - -[[projects]] - branch = "master" - digest = "1:1f7503fa58a852a1416556ae2ddb219b49a1304fd408391948e2e3676514c48d" - name = "github.com/eapache/go-xerial-snappy" - packages = ["."] - pruneopts = "" - revision = "bb955e01b9346ac19dc29eb16586c90ded99a98c" - -[[projects]] - digest = "1:d8d46d21073d0f65daf1740ebf4629c65e04bf92e14ce93c2201e8624843c3d3" - name = "github.com/eapache/queue" - packages = ["."] - pruneopts = "" - revision = "44cc805cf13205b55f69e14bcb69867d1ae92f98" - version = "v1.1.0" - -[[projects]] - digest = "1:44ec1082ba97d89ce860abcc6ee3f0cf24e658d3efb8531b0f0a52f0781e4243" - name = "github.com/go-kit/kit" - packages = [ - "endpoint", - "log", - "log/level", - "metrics", - "metrics/internal/lv", - "metrics/prometheus", - ] - pruneopts = "" - revision = "4dc7be5d2d12881735283bcab7352178e190fc71" - version = "v0.6.0" - -[[projects]] - digest = "1:6a4a01d58b227c4b6b11111b9f172ec5c17682b82724e58e6daf3f19f4faccd8" - name = "github.com/go-logfmt/logfmt" - packages = ["."] - pruneopts = "" - revision = "390ab7935ee28ec6b286364bba9b4dd6410cb3d5" - version = "v0.3.0" - -[[projects]] - digest = "1:9ca737b471693542351e112c9e86be9bf7385e42256893a09ecb2a98e2036f74" - name = "github.com/go-stack/stack" - packages = ["."] - pruneopts = "" - revision = "259ab82a6cad3992b4e21ff5cac294ccb06474bc" - version = "v1.7.0" - -[[projects]] - digest = "1:bcb38c8fc9b21bb8682ce2d605a7d4aeb618abc7f827e3ac0b27c0371fdb23fb" - name = "github.com/golang/protobuf" - packages = ["proto"] - pruneopts = "" - revision = "925541529c1fa6821df4e44ce2723319eb2be768" - version = "v1.0.0" - -[[projects]] - branch = "master" - digest = "1:09307dfb1aa3f49a2bf869dcfa4c6c06ecd3c207221bd1c1a1141f0e51f209eb" - name = "github.com/golang/snappy" - packages = ["."] - pruneopts = "" - revision = "553a641470496b2327abcac10b36396bd98e45c9" - -[[projects]] - branch = "feature/union" - digest = "1:1c04a0b26ac7490c122331940c28a1be0d004b91cf417f3ccfb59950d5199825" - name = "github.com/inloco/goavro" - packages = ["."] - pruneopts = "" - revision = "eba86de05ad396273148c04785ebad17df07ee97" - -[[projects]] - branch = "master" - digest = "1:1ed9eeebdf24aadfbca57eb50e6455bd1d2474525e0f0d4454de8c8e9bc7ee9a" - name = "github.com/kr/logfmt" - packages = ["."] - pruneopts = "" - revision = "b84e30acd515aadc4b783ad4ff83aff3299bdfe0" - -[[projects]] - branch = "master" - digest = "1:ccc20cacf54eb16464dad02efa1c14fa7c0b9e124639b0d2a51dcc87b0154e4c" - name = "github.com/mailru/easyjson" - packages = [ - ".", - "buffer", - "jlexer", - "jwriter", - ] - pruneopts = "" - revision = "32fa128f234d041f196a9f3e0fea5ac9772c08e1" - -[[projects]] - digest = "1:4c23ced97a470b17d9ffd788310502a077b9c1f60221a85563e49696276b4147" - name = "github.com/matttproud/golang_protobuf_extensions" - packages = ["pbutil"] - pruneopts = "" - revision = "3247c84500bff8d9fb6d579d800f20b3e091582c" - version = "v1.0.0" - -[[projects]] - digest = "1:f5276738ac91cc71983a22662ebc48d92d147c861140b5eda06baf78991bc89a" - name = "github.com/olivere/elastic" - packages = [ - ".", - "config", - "uritemplates", - ] - pruneopts = "" - revision = "2963eb09b89294356e1a826068f33e5a44326ac0" - version = "v6.1.5" - -[[projects]] - digest = "1:52b8f1dc01c8a930900aa94e227b0c4c36e7102a5b124687ff1f88a221590234" - name = "github.com/pierrec/lz4" - packages = ["."] - pruneopts = "" - revision = "2fcda4cb7018ce05a25959d2fe08c83e3329f169" - version = "v1.1" - -[[projects]] - digest = "1:ff95a6c61f34f32e57833783059c80274d84e9c74e6e315c3dc2e93e9bf3dab9" - name = "github.com/pierrec/xxHash" - packages = ["xxHash32"] - pruneopts = "" - revision = "f051bb7f1d1aaf1b5a665d74fb6b0217712c69f7" - version = "v0.1.1" - -[[projects]] - digest = "1:7365acd48986e205ccb8652cc746f09c8b7876030d53710ea6ef7d0bd0dcd7ca" - name = "github.com/pkg/errors" - packages = ["."] - pruneopts = "" - revision = "645ef00459ed84a119197bfb8d8205042c6df63d" - version = "v0.8.0" - -[[projects]] - digest = "1:256484dbbcd271f9ecebc6795b2df8cad4c458dd0f5fd82a8c2fa0c29f233411" - name = "github.com/pmezard/go-difflib" - packages = ["difflib"] - pruneopts = "" - revision = "792786c7400a136282c1664665ae0a8db921c6c2" - version = "v1.0.0" - -[[projects]] - digest = "1:4142d94383572e74b42352273652c62afec5b23f325222ed09198f46009022d1" - name = "github.com/prometheus/client_golang" - packages = [ - "prometheus", - "prometheus/promhttp", - ] - pruneopts = "" - revision = "c5b7fccd204277076155f10851dad72b76a49317" - version = "v0.8.0" - -[[projects]] - branch = "master" - digest = "1:60aca47f4eeeb972f1b9da7e7db51dee15ff6c59f7b401c1588b8e6771ba15ef" - name = "github.com/prometheus/client_model" - packages = ["go"] - pruneopts = "" - revision = "99fa1f4be8e564e8a6b613da7fa6f46c9edafc6c" - -[[projects]] - branch = "master" - digest = "1:acf9415ef3a5f298495b0e1aa4d0e18f571a3c845872944e6c52777496819b21" - name = "github.com/prometheus/common" - packages = [ - "expfmt", - "internal/bitbucket.org/ww/goautoneg", - "model", - ] - pruneopts = "" - revision = "89604d197083d4781071d3c65855d24ecfb0a563" - -[[projects]] - branch = "master" - digest = "1:cc6c60d87dac2a3e75068ffcc2020144c3fdf39fb1d49f08b83ca6e7abe560db" - name = "github.com/prometheus/procfs" - packages = [ - ".", - "internal/util", - "nfs", - "xfs", - ] - pruneopts = "" - revision = "cb4147076ac75738c9a7d279075a253c0cc5acbd" - -[[projects]] - branch = "master" - digest = "1:b735d1c7f79d73bbbec30d4874ef521e0f02b73f7394d5daa527747ac367df5c" - name = "github.com/rcrowley/go-metrics" - packages = ["."] - pruneopts = "" - revision = "8732c616f52954686704c8645fe1a9d59e9df7c1" - -[[projects]] - digest = "1:a30066593578732a356dc7e5d7f78d69184ca65aeeff5939241a3ab10559bb06" - name = "github.com/stretchr/testify" - packages = ["assert"] - pruneopts = "" - revision = "12b6f73e6084dad08a7c6e575284b177ecafbc71" - version = "v1.2.1" - -[solve-meta] - analyzer-name = "dep" - analyzer-version = 1 - input-imports = [ - "github.com/Shopify/sarama", - "github.com/bsm/sarama-cluster", - "github.com/datamountaineer/schema-registry", - "github.com/go-kit/kit/endpoint", - "github.com/go-kit/kit/log", - "github.com/go-kit/kit/log/level", - "github.com/go-kit/kit/metrics/prometheus", - "github.com/inloco/goavro", - "github.com/olivere/elastic", - "github.com/pkg/errors", - "github.com/prometheus/client_golang/prometheus", - "github.com/prometheus/client_golang/prometheus/promhttp", - "github.com/stretchr/testify/assert", - ] - solver-name = "gps-cdcl" - solver-version = 1 diff --git a/Gopkg.toml b/Gopkg.toml deleted file mode 100644 index 59617e2..0000000 --- a/Gopkg.toml +++ /dev/null @@ -1,37 +0,0 @@ -# Gopkg.toml example -# -# Refer to https://github.com/golang/dep/blob/master/docs/Gopkg.toml.md -# for detailed Gopkg.toml documentation. -# -# required = ["github.com/user/thing/cmd/thing"] -# ignored = ["github.com/user/project/pkgX", "bitbucket.org/user/project/pkgA/pkgY"] -# -# [[constraint]] -# name = "github.com/user/project" -# version = "1.0.0" -# -# [[constraint]] -# name = "github.com/user/project2" -# branch = "dev" -# source = "github.com/myfork/project2" -# -# [[override]] -# name = "github.com/x/y" -# version = "2.4.0" - - -[[constraint]] - name = "github.com/bsm/sarama-cluster" - version = "2.1.12" - -[[constraint]] - name = "github.com/go-kit/kit" - version = "v0.6.0" - -[[constraint]] - name = "github.com/datamountaineer/schema-registry" - version = "0.1.0" - -[[constraint]] - name = "github.com/inloco/goavro" - branch = "feature/union" diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..bc67917 --- /dev/null +++ b/go.mod @@ -0,0 +1,21 @@ +module github.com/inloco/kafka-elasticsearch-injector + +go 1.12 + +require ( + github.com/Shopify/sarama v1.19.0 + github.com/VividCortex/gohistogram v1.0.0 // indirect + github.com/bsm/sarama-cluster v2.1.12+incompatible + github.com/datamountaineer/schema-registry v0.0.0-20170721142813-6240b64c5baa + github.com/go-kit/kit v0.8.0 + github.com/inloco/goavro v0.0.0-20180215164239-eba86de05ad3 + github.com/olivere/elastic/v7 v7.0.0 + github.com/onsi/ginkgo v1.8.0 // indirect + github.com/onsi/gomega v1.5.0 // indirect + github.com/pierrec/xxHash v0.1.1 // indirect + github.com/pkg/errors v0.8.1 + github.com/prometheus/client_golang v0.9.3-0.20190127221311-3c4408c8b829 + github.com/stretchr/testify v1.2.2 + golang.org/x/sync v0.0.0-20190412183630-56d357773e84 // indirect + gopkg.in/linkedin/goavro.v1 v1.0.5 // indirect +) diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..445392b --- /dev/null +++ b/go.sum @@ -0,0 +1,216 @@ +cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw= +cloud.google.com/go v0.34.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw= +git.apache.org/thrift.git v0.12.0/go.mod h1:fPE2ZNJGynbRyZ4dJvy6G277gSllfV2HJqblrnkyeyg= +github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= +github.com/Shopify/sarama v1.15.0 h1:v/Q3THMtunYfvKhbFfhegInfoW70HoNgsOdmuvFN5Qg= +github.com/Shopify/sarama v1.15.0/go.mod h1:FVkBWblsNy7DGZRfXLU0O9RCGt5g3g3yEuWXgklEdEo= +github.com/Shopify/sarama v1.19.0 h1:9oksLxC6uxVPHPVYUmq6xhr1BOF/hHobWH2UzO67z1s= +github.com/Shopify/sarama v1.19.0/go.mod h1:FVkBWblsNy7DGZRfXLU0O9RCGt5g3g3yEuWXgklEdEo= +github.com/Shopify/toxiproxy v2.1.4+incompatible h1:TKdv8HiTLgE5wdJuEML90aBgNWsokNbMijUGhmcoBJc= +github.com/Shopify/toxiproxy v2.1.4+incompatible/go.mod h1:OXgGpZ6Cli1/URJOF1DMxUHB2q5Ap20/P/eIdh4G0pI= +github.com/VividCortex/gohistogram v1.0.0 h1:6+hBz+qvs0JOrrNhhmR7lFxo5sINxBCGXrdtl/UvroE= +github.com/VividCortex/gohistogram v1.0.0/go.mod h1:Pf5mBqqDxYaXu3hDrrU+w6nw50o/4+TcAqDqk/vUH7g= +github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc= +github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0= +github.com/apache/thrift v0.12.0/go.mod h1:cp2SuWMxlEZw2r+iP2GNCdIi4C1qmUzdZFSVb+bacwQ= +github.com/aws/aws-sdk-go v1.19.6/go.mod h1:KmX6BPdI08NWTb3/sm4ZGu5ShLoqVDhKgpiN924inxo= +github.com/beorn7/perks v0.0.0-20160804104726-4c0e84591b9a h1:BtpsbiV638WQZwhA98cEZw2BsbnQJrbd0BI7tsy0W1c= +github.com/beorn7/perks v0.0.0-20160804104726-4c0e84591b9a/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q= +github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973 h1:xJ4a3vCFaGF/jqvzLMYoU8P317H5OQ+Via4RmuPwCS0= +github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q= +github.com/bsm/sarama-cluster v2.1.12+incompatible h1:7NakmL2HnNvCCy685HlJbhYDU390nanFpwoonmhDJlw= +github.com/bsm/sarama-cluster v2.1.12+incompatible/go.mod h1:r7ao+4tTNXvWm+VRpRJchr2kQhqxgmAp2iEX5W96gMM= +github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw= +github.com/datamountaineer/schema-registry v0.0.0-20170721142813-6240b64c5baa h1:RdJBGYg+y10xkrTYMAMKCm4/GaTIp0DHqauvyY/8b8Y= +github.com/datamountaineer/schema-registry v0.0.0-20170721142813-6240b64c5baa/go.mod h1:S2tyO7aOaun9e+PdeX1Rp938aIJjV2FTn/P7mK3Ud3o= +github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8= +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/eapache/go-resiliency v1.0.0 h1:XPZo5qMI0LGzIqT9wRq6dPv2vEuo9MWCar1wHY8Kuf4= +github.com/eapache/go-resiliency v1.0.0/go.mod h1:kFI+JgMyC7bLPUVY133qvEBtVayf5mFgVsvEsIPBvNs= +github.com/eapache/go-resiliency v1.1.0 h1:1NtRmCAqadE2FN4ZcN6g90TP3uk8cg9rn9eNK2197aU= +github.com/eapache/go-resiliency v1.1.0/go.mod h1:kFI+JgMyC7bLPUVY133qvEBtVayf5mFgVsvEsIPBvNs= +github.com/eapache/go-xerial-snappy v0.0.0-20160609142408-bb955e01b934 h1:oGLoaVIefp3tiOgi7+KInR/nNPvEpPM6GFo+El7fd14= +github.com/eapache/go-xerial-snappy v0.0.0-20160609142408-bb955e01b934/go.mod h1:+020luEh2TKB4/GOp8oxxtq0Daoen/Cii55CzbTV6DU= +github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21 h1:YEetp8/yCZMuEPMUDHG0CW/brkkEp8mzqk2+ODEitlw= +github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21/go.mod h1:+020luEh2TKB4/GOp8oxxtq0Daoen/Cii55CzbTV6DU= +github.com/eapache/queue v1.1.0 h1:YOEu7KNc61ntiQlcEeUIoDTJ2o8mQznoNvUhiigpIqc= +github.com/eapache/queue v1.1.0/go.mod h1:6eCeP0CKFpHLu8blIFXhExK/dRa7WDZfr6jVFPTqq+I= +github.com/fortytw2/leaktest v1.3.0 h1:u8491cBMTQ8ft8aeV+adlcytMZylmA5nnwwkRZjI8vw= +github.com/fortytw2/leaktest v1.3.0/go.mod h1:jDsjWgpAGjm2CA7WthBh/CdZYEPF31XHquHwclZch5g= +github.com/fsnotify/fsnotify v1.4.7 h1:IXs+QLmnXW2CcXuY+8Mzv/fWEsPGWxqefPtCP5CnV9I= +github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo= +github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04= +github.com/go-kit/kit v0.6.0 h1:wTifptAGIyIuir4bRyN4h7+kAa2a4eepLYVmRe5qqQ8= +github.com/go-kit/kit v0.6.0/go.mod h1:xBxKIO96dXMWWy0MnWVtmwkA9/13aqxPnvrjFYMA2as= +github.com/go-kit/kit v0.8.0 h1:Wz+5lgoB0kkuqLEc6NVmwRknTKP6dTGbSqvhZtBI/j0= +github.com/go-kit/kit v0.8.0/go.mod h1:xBxKIO96dXMWWy0MnWVtmwkA9/13aqxPnvrjFYMA2as= +github.com/go-logfmt/logfmt v0.3.0 h1:8HUsc87TaSWLKwrnumgC8/YconD2fJQsRJAsWaPg2ic= +github.com/go-logfmt/logfmt v0.3.0/go.mod h1:Qt1PoO58o5twSAckw1HlFXLmHsOX5/0LbT9GBnD5lWE= +github.com/go-stack/stack v1.7.0 h1:S04+lLfST9FvL8dl4R31wVUC/paZp/WQZbLmUgWboGw= +github.com/go-stack/stack v1.7.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY= +github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY= +github.com/gogo/protobuf v1.1.1/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ= +github.com/gogo/protobuf v1.2.0/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ= +github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q= +github.com/golang/lint v0.0.0-20180702182130-06c8688daad7/go.mod h1:tluoj9z5200jBnyusfRPU2LqT6J+DAorxEvtC7LHB+E= +github.com/golang/mock v1.1.1/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A= +github.com/golang/mock v1.2.0/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A= +github.com/golang/protobuf v1.2.0 h1:P3YflyNX/ehuJFLhxviNdFxQPkGK5cDcApsge1SqnvM= +github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= +github.com/golang/snappy v0.0.0-20170215233205-553a64147049 h1:K9KHZbXKpGydfDN0aZrsoHpLJlZsBrGMFWbgLDGnPZk= +github.com/golang/snappy v0.0.0-20170215233205-553a64147049/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= +github.com/golang/snappy v0.0.0-20180518054509-2e65f85255db h1:woRePGFeVFfLKN/pOkfl+p/TAqKOfFu+7KPlMVpok/w= +github.com/golang/snappy v0.0.0-20180518054509-2e65f85255db/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= +github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M= +github.com/gorilla/context v1.1.1/go.mod h1:kBGZzfjB9CEq2AlWe17Uuf7NDRt0dE0s8S51q0aT7Yg= +github.com/gorilla/mux v1.6.2/go.mod h1:1lud6UwP+6orDFRuTfBEV8e9/aOM/c4fVVCaMa2zaAs= +github.com/grpc-ecosystem/grpc-gateway v1.6.2/go.mod h1:RSKVYQBd5MCa4OVpNdGskqpgL2+G+NZTnrVHpWWfpdw= +github.com/hashicorp/golang-lru v0.5.0/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8= +github.com/hpcloud/tail v1.0.0 h1:nfCOvKYfkgYP8hkirhJocXT2+zOD8yUNjXaWfTlyFKI= +github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU= +github.com/inloco/goavro v0.0.0-20180215164239-eba86de05ad3 h1:PZ1jhtQe/zAt9KdSvp7uKs+ApOz6e1iL6Uz47+9ieSw= +github.com/inloco/goavro v0.0.0-20180215164239-eba86de05ad3/go.mod h1:OFM8BkEMoEGY9tlXBPMZ8xDPVwrWk+dEVCPsD+CbgsU= +github.com/jmespath/go-jmespath v0.0.0-20180206201540-c2b33e8439af/go.mod h1:Nht3zPeWKUH0NzdCt2Blrr5ys8VGpn0CEB0cQHVjt7k= +github.com/julienschmidt/httprouter v1.2.0/go.mod h1:SYymIcj16QtmaHHD7aYtjjsJG7VTCxuUUipMqKk8s4w= +github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= +github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= +github.com/kr/logfmt v0.0.0-20140226030751-b84e30acd515 h1:T+h1c/A9Gawja4Y9mFVWj2vyii2bbUNDw3kt9VxK2EY= +github.com/kr/logfmt v0.0.0-20140226030751-b84e30acd515/go.mod h1:+0opPa2QZZtGFBFZlji/RkVcI2GknAs/DXo4wKdlNEc= +github.com/mailru/easyjson v0.0.0-20171120080333-32fa128f234d h1:bM4HYnlVXPgUKmzl7o3drEaVfOk+sTBiADAQOWjU+8I= +github.com/mailru/easyjson v0.0.0-20171120080333-32fa128f234d/go.mod h1:C1wdFJiN94OJF2b5HbByQZoLdCWB1Yqtg26g4irojpc= +github.com/mailru/easyjson v0.0.0-20190312143242-1de009706dbe h1:W/GaMY0y69G4cFlmsC6B9sbuo2fP8OFP1ABjt4kPz+w= +github.com/mailru/easyjson v0.0.0-20190312143242-1de009706dbe/go.mod h1:C1wdFJiN94OJF2b5HbByQZoLdCWB1Yqtg26g4irojpc= +github.com/matttproud/golang_protobuf_extensions v1.0.0 h1:YNOwxxSJzSUARoD9KRZLzM9Y858MNGCOACTvCW9TSAc= +github.com/matttproud/golang_protobuf_extensions v1.0.0/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0= +github.com/matttproud/golang_protobuf_extensions v1.0.1 h1:4hp9jkHxhMHkqkrB3Ix0jegS5sx/RkqARlsWZ6pIwiU= +github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0= +github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U= +github.com/olivere/elastic v6.1.5+incompatible h1:T570UQADp2WZI2sUEOaIlk1WyRZMypCodvCsCR5/hTI= +github.com/olivere/elastic v6.1.5+incompatible/go.mod h1:J+q1zQJTgAz9woqsbVRqGeB5G1iqDKVBWLNSYW8yfJ8= +github.com/olivere/elastic/v7 v7.0.0 h1:iw29D/OSXdR2loC4qPNddvWjuQqN7Co/uALVD4Si+D4= +github.com/olivere/elastic/v7 v7.0.0/go.mod h1:h2vSaBKzz7eL+VsYPtIOXOURZlXmp+yY5MgyIW3Y/M0= +github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= +github.com/onsi/ginkgo v1.7.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= +github.com/onsi/ginkgo v1.8.0 h1:VkHVNpR4iVnU8XQR6DBm8BqYjN7CRzw+xKUbVVbbW9w= +github.com/onsi/ginkgo v1.8.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= +github.com/onsi/gomega v1.4.3/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY= +github.com/onsi/gomega v1.5.0 h1:izbySO9zDPmjJ8rDjLvkA2zJHIo+HkYXHnf7eN7SSyo= +github.com/onsi/gomega v1.5.0/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY= +github.com/opentracing/opentracing-go v1.1.0/go.mod h1:UkNAQd3GIcIGf0SeVgPpRdFStlNbqXla1AfSYxPUl2o= +github.com/openzipkin/zipkin-go v0.1.3/go.mod h1:NtoC/o8u3JlF1lSlyPNswIbeQH9bJTmOf0Erfk+hxe8= +github.com/openzipkin/zipkin-go v0.1.6/go.mod h1:QgAqvLzwWbR/WpD4A3cGpPtJrZXNIiJc5AZX7/PBEpw= +github.com/pierrec/lz4 v0.0.0-20171218195038-2fcda4cb7018 h1:evK2lBbc5w4EFMMs3onLGK1IqzGD3d0RsSGtLJGj+qg= +github.com/pierrec/lz4 v0.0.0-20171218195038-2fcda4cb7018/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY= +github.com/pierrec/lz4 v2.0.5+incompatible h1:2xWsjqPFWcplujydGg4WmhC/6fZqK42wMM8aXeqhl0I= +github.com/pierrec/lz4 v2.0.5+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY= +github.com/pierrec/xxHash v0.1.1 h1:KP4NrV9023xp3M4FkTYfcXqWigsOCImL1ANJ7sh5vg4= +github.com/pierrec/xxHash v0.1.1/go.mod h1:w2waW5Zoa/Wc4Yqe0wgrIYAGKqRMf7czn2HNKXmuL+I= +github.com/pkg/errors v0.8.0 h1:WdK/asTD0HN+q6hsWO3/vpuAkAr+tw6aNJNDFFf0+qw= +github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= +github.com/pkg/errors v0.8.1 h1:iURUrRGxPUNPdy5/HRSm+Yj6okJ6UtLINN0Q9M4+h3I= +github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/prometheus/client_golang v0.8.0 h1:1921Yw9Gc3iSc4VQh3PIoOqgPCZS7G/4xQNVUp8Mda8= +github.com/prometheus/client_golang v0.8.0/go.mod h1:7SWBe2y4D6OKWSNQJUaRYU/AaXPKyh/dDVn+NZz0KFw= +github.com/prometheus/client_golang v0.9.1/go.mod h1:7SWBe2y4D6OKWSNQJUaRYU/AaXPKyh/dDVn+NZz0KFw= +github.com/prometheus/client_golang v0.9.3-0.20190127221311-3c4408c8b829 h1:D+CiwcpGTW6pL6bv6KI3KbyEyCKyS+1JWS2h8PNDnGA= +github.com/prometheus/client_golang v0.9.3-0.20190127221311-3c4408c8b829/go.mod h1:p2iRAGwDERtqlqzRXnrOVns+ignqQo//hLXqYxZYVNs= +github.com/prometheus/client_model v0.0.0-20171117100541-99fa1f4be8e5 h1:cLL6NowurKLMfCeQy4tIeph12XNQWgANCNvdyrOYKV4= +github.com/prometheus/client_model v0.0.0-20171117100541-99fa1f4be8e5/go.mod h1:MbSGuTsp3dbXC40dX6PRTWyKYBIrTGTE9sqQNg2J8bo= +github.com/prometheus/client_model v0.0.0-20180712105110-5c3871d89910/go.mod h1:MbSGuTsp3dbXC40dX6PRTWyKYBIrTGTE9sqQNg2J8bo= +github.com/prometheus/client_model v0.0.0-20190115171406-56726106282f h1:BVwpUVJDADN2ufcGik7W992pyps0wZ888b/y9GXcLTU= +github.com/prometheus/client_model v0.0.0-20190115171406-56726106282f/go.mod h1:MbSGuTsp3dbXC40dX6PRTWyKYBIrTGTE9sqQNg2J8bo= +github.com/prometheus/common v0.0.0-20180110214958-89604d197083 h1:BVsJT8+ZbyuL3hypz/HmEiM8h2P6hBQGig4el9/MdjA= +github.com/prometheus/common v0.0.0-20180110214958-89604d197083/go.mod h1:daVV7qP5qjZbuso7PdcryaAu0sAZbrN9i7WWcTMWvro= +github.com/prometheus/common v0.2.0 h1:kUZDBDTdBVBYBj5Tmh2NZLlF60mfjA27rM34b+cVwNU= +github.com/prometheus/common v0.2.0/go.mod h1:TNfzLD0ON7rHzMJeJkieUDPYmFC7Snx/y86RQel1bk4= +github.com/prometheus/procfs v0.0.0-20180125133057-cb4147076ac7 h1:hhvfGDVThBnd4kYisSFmYuHYeUhglxcwag7FhVPH9zM= +github.com/prometheus/procfs v0.0.0-20180125133057-cb4147076ac7/go.mod h1:c3At6R/oaqEKCNdg8wHV1ftS6bRYblBhIjjI8uT2IGk= +github.com/prometheus/procfs v0.0.0-20181005140218-185b4288413d/go.mod h1:c3At6R/oaqEKCNdg8wHV1ftS6bRYblBhIjjI8uT2IGk= +github.com/prometheus/procfs v0.0.0-20190117184657-bf6a532e95b1 h1:/K3IL0Z1quvmJ7X0A1AwNEK7CRkVK3YwfOU/QAL4WGg= +github.com/prometheus/procfs v0.0.0-20190117184657-bf6a532e95b1/go.mod h1:c3At6R/oaqEKCNdg8wHV1ftS6bRYblBhIjjI8uT2IGk= +github.com/rcrowley/go-metrics v0.0.0-20180125231941-8732c616f529 h1:QdrarV+Ze3cQpiZZ410O4mpB0WUdOgMc3Rwu8zOmLVg= +github.com/rcrowley/go-metrics v0.0.0-20180125231941-8732c616f529/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4= +github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a h1:9ZKAASQSHhDYGoxY8uLVpewe1GDZ2vu2Tr/vTdVAkFQ= +github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4= +github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo= +github.com/smartystreets/go-aws-auth v0.0.0-20180515143844-0c1422d1fdb9/go.mod h1:SnhjPscd9TpLiy1LpzGSKh3bXCfxxXuqd9xmQJy3slM= +github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/testify v1.2.1 h1:52QO5WkIUcHGIR7EnGagH88x1bUzqGXTC5/1bDTUQ7U= +github.com/stretchr/testify v1.2.1/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= +github.com/stretchr/testify v1.2.2 h1:bSDNvY7ZPG5RlJ8otE/7V6gMiyenm9RtJ7IUVIAoJ1w= +github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= +go.opencensus.io v0.19.1/go.mod h1:gug0GbSHa8Pafr0d2urOSgoXHZ6x/RUlaiT0d9pqb4A= +go.opencensus.io v0.19.2/go.mod h1:NO/8qkisMZLZ1FCsKNqtJPwc8/TaclWyY0B6wcYNg9M= +golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= +golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= +golang.org/x/lint v0.0.0-20180702182130-06c8688daad7/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE= +golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE= +golang.org/x/lint v0.0.0-20181217174547-8f45f776aaf1/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE= +golang.org/x/lint v0.0.0-20190227174305-5b3e6a55c961/go.mod h1:wehouNa3lNwaWXcvxsM5YxQ5yQlVC4a0KAMCusXpPoU= +golang.org/x/lint v0.0.0-20190301231843-5614ed5bae6f/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE= +golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= +golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= +golang.org/x/net v0.0.0-20180906233101-161cd47e91fd h1:nTDtHvHSdCn1m6ITfMRqtOd/9+7a3s8RBNOZ3eYZzJA= +golang.org/x/net v0.0.0-20180906233101-161cd47e91fd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= +golang.org/x/net v0.0.0-20181106065722-10aee1819953/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= +golang.org/x/net v0.0.0-20181114220301-adae6a3d119a/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= +golang.org/x/net v0.0.0-20190108225652-1e06a53dbb7e/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= +golang.org/x/net v0.0.0-20190125091013-d26f9f9a57f3/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= +golang.org/x/net v0.0.0-20190213061140-3a22650c66bd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= +golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= +golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= +golang.org/x/oauth2 v0.0.0-20181203162652-d668ce993890/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= +golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= +golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20190227155943-e225da77a7e6/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20190412183630-56d357773e84 h1:IqXQ59gzdXv58Jmm2xn0tSOR9i6HqroaOFRQ3wR/dJQ= +golang.org/x/sync v0.0.0-20190412183630-56d357773e84/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e h1:o3PsSEY8E4eXWkXrIP9YJALUkVZqzHJT5DOasTyn8Vs= +golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20181116152217-5ac8a444bdc5/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20181122145206-62eef0e2fa9b/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20181218192612-074acd46bca6/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/text v0.3.0 h1:g61tztE5qeGQ89tm6NTjjM9VPIm088od1l6aSorWRWg= +golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +golang.org/x/tools v0.0.0-20180828015842-6cd1fcedba52/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +golang.org/x/tools v0.0.0-20181219222714-6e267b5cc78e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +golang.org/x/tools v0.0.0-20190226205152-f727befe758c/go.mod h1:9Yl7xja0Znq3iFh3HoIrodX9oNMXvdceNzlUR8zjMvY= +golang.org/x/tools v0.0.0-20190312170243-e65039ee4138/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs= +google.golang.org/api v0.0.0-20181220000619-583d854617af/go.mod h1:4mhQ8q/RsB7i+udVvVy5NUi08OU8ZlA0gRVgrF7VFY0= +google.golang.org/api v0.2.0/go.mod h1:IfRCZScioGtypHNTlz3gFk67J8uePVW7uDTBzXuIkhU= +google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM= +google.golang.org/appengine v1.3.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4= +google.golang.org/appengine v1.4.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4= +google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc= +google.golang.org/genproto v0.0.0-20181219182458-5a97ab628bfb/go.mod h1:7Ep/1NZk928CDR8SjdVbjWNpdIf6nzjE3BTgJDr2Atg= +google.golang.org/genproto v0.0.0-20190307195333-5fe7a883aa19/go.mod h1:VzzqZJRnGkLBvHegQrXjBqPurQTc5/KpmUdxsrq26oE= +google.golang.org/grpc v1.16.0/go.mod h1:0JHn/cJsOMiMfNA9+DeHDlAU7KAAB5GDlYFpa9MZMio= +google.golang.org/grpc v1.17.0/go.mod h1:6QZJwpn2B+Zp71q/5VxRsJ6NXXVCE5NRUHRo+f3cWCs= +google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c= +gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/fsnotify.v1 v1.4.7 h1:xOHLXZwVvI9hhs+cLKq5+I5onOuwQLhQwiu63xxlHs4= +gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys= +gopkg.in/linkedin/goavro.v1 v1.0.5 h1:BJa69CDh0awSsLUmZ9+BowBdokpduDZSM9Zk8oKHfN4= +gopkg.in/linkedin/goavro.v1 v1.0.5/go.mod h1:Aw5GdAbizjOEl0kAMHV9iHmA8reZzW/OKuJAl4Hb9F0= +gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 h1:uRGJdciOHaEIrze2W8Q3AKkepLTh2hOroT7a+7czfdQ= +gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw= +gopkg.in/yaml.v2 v2.2.1 h1:mUhvW9EsL+naU5Q3cakzfE91YhliOondGd6ZrsDBHQE= +gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= +gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= +honnef.co/go/tools v0.0.0-20180728063816-88497007e858/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= +honnef.co/go/tools v0.0.0-20180920025451-e3ad64cb4ed3/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= +honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= diff --git a/src/elasticsearch/codec.go b/src/elasticsearch/codec.go index f2b0f8a..700c65c 100644 --- a/src/elasticsearch/codec.go +++ b/src/elasticsearch/codec.go @@ -8,6 +8,8 @@ import ( "github.com/inloco/kafka-elasticsearch-injector/src/models" ) +const typeDoc = "_doc" + type Codec interface { EncodeElasticRecords(records []*models.Record) ([]*models.ElasticRecord, error) } @@ -36,7 +38,7 @@ func (c basicCodec) EncodeElasticRecords(records []*models.Record) ([]*models.El elasticRecords[idx] = &models.ElasticRecord{ Index: index, - Type: "_doc", + Type: typeDoc, ID: docID, Json: record.FilteredFieldsJSON(c.config.BlacklistedColumns), } diff --git a/src/elasticsearch/codec_test.go b/src/elasticsearch/codec_test.go index 1b1e11c..36747e9 100644 --- a/src/elasticsearch/codec_test.go +++ b/src/elasticsearch/codec_test.go @@ -26,7 +26,7 @@ func TestCodec_EncodeElasticRecords(t *testing.T) { if assert.NoError(t, err) && assert.Len(t, elasticRecords, 1) { elasticRecord := elasticRecords[0] assert.Equal(t, fmt.Sprintf("%s-%s", record.Topic, record.FormatTimestampDay()), elasticRecord.Index) - assert.Equal(t, record.Topic, elasticRecord.Type) + assert.Equal(t, "_doc", elasticRecord.Type) assert.Equal(t, fmt.Sprintf("%d:%d", record.Partition, record.Offset), elasticRecord.ID) assert.Equal(t, id, elasticRecord.Json["id"]) assert.Equal(t, value, elasticRecord.Json["value"]) @@ -46,7 +46,7 @@ func TestCodec_EncodeElasticRecordsHourSuffix(t *testing.T) { if assert.NoError(t, err) && assert.Len(t, elasticRecords, 1) { elasticRecord := elasticRecords[0] assert.Equal(t, fmt.Sprintf("%s-%s", record.Topic, record.FormatTimestampHour()), elasticRecord.Index) - assert.Equal(t, record.Topic, elasticRecord.Type) + assert.Equal(t, "_doc", elasticRecord.Type) assert.Equal(t, fmt.Sprintf("%d:%d", record.Partition, record.Offset), elasticRecord.ID) assert.Equal(t, id, elasticRecord.Json["id"]) assert.Equal(t, value, elasticRecord.Json["value"]) diff --git a/src/elasticsearch/elasticsearch.go b/src/elasticsearch/elasticsearch.go index 8a35ef6..26f81e6 100644 --- a/src/elasticsearch/elasticsearch.go +++ b/src/elasticsearch/elasticsearch.go @@ -11,7 +11,7 @@ import ( "github.com/go-kit/kit/log" "github.com/go-kit/kit/log/level" "github.com/inloco/kafka-elasticsearch-injector/src/models" - "github.com/olivere/elastic" + "github.com/olivere/elastic/v7" ) var esClient *elastic.Client diff --git a/src/elasticsearch/elasticsearch_test.go b/src/elasticsearch/elasticsearch_test.go index 5083fdb..70cf01a 100644 --- a/src/elasticsearch/elasticsearch_test.go +++ b/src/elasticsearch/elasticsearch_test.go @@ -16,7 +16,7 @@ import ( "github.com/inloco/kafka-elasticsearch-injector/src/kafka/fixtures" "github.com/inloco/kafka-elasticsearch-injector/src/logger_builder" "github.com/inloco/kafka-elasticsearch-injector/src/models" - "github.com/olivere/elastic" + "github.com/olivere/elastic/v7" "github.com/stretchr/testify/assert" ) @@ -34,7 +34,6 @@ var template = ` "template": "my-topic-*", "settings": {}, "mappings": { - "my-topic": { "_source": { "enabled": "true" }, @@ -51,10 +50,9 @@ var template = ` ], "properties": { "id": { - "type": "keyword" + "type": "keyword" } } - } }, "aliases": {} } @@ -84,7 +82,7 @@ func TestRecordDatabase_Insert(t *testing.T) { } res, err := db.GetClient().Get().Index(record.Index).Type(record.Type).Id(record.ID).Do(context.Background()) if assert.NoError(t, err) { - json.Unmarshal(*res.Source, &recordFromES) + json.Unmarshal(res.Source, &recordFromES) } assert.Equal(t, recordFromES.Id, id) } @@ -106,7 +104,7 @@ func TestRecordDatabase_Insert_RepeatedId(t *testing.T) { } res, err := db.GetClient().Get().Index(record.Index).Type(record.Type).Id(record.ID).Do(context.Background()) if assert.NoError(t, err) { - json.Unmarshal(*res.Source, &recordFromES) + json.Unmarshal(res.Source, &recordFromES) } assert.Equal(t, recordFromES.Id, id) } @@ -125,7 +123,7 @@ func TestRecordDatabase_Insert_Multiple(t *testing.T) { } res, err := db.GetClient().Get().Index(record.Index).Type(record.Type).Id(record.ID).Do(context.Background()) if assert.NoError(t, err) { - json.Unmarshal(*res.Source, &recordFromES) + json.Unmarshal(res.Source, &recordFromES) } assert.Equal(t, recordFromES.Id, id) } diff --git a/src/kafka/consumer_test.go b/src/kafka/consumer_test.go index 4eda398..26923f1 100644 --- a/src/kafka/consumer_test.go +++ b/src/kafka/consumer_test.go @@ -20,7 +20,7 @@ import ( "github.com/inloco/kafka-elasticsearch-injector/src/metrics" "github.com/inloco/kafka-elasticsearch-injector/src/models" "github.com/inloco/kafka-elasticsearch-injector/src/schema_registry" - "github.com/olivere/elastic" + "github.com/olivere/elastic/v7" "github.com/stretchr/testify/assert" ) @@ -123,12 +123,11 @@ func TestKafka_Start(t *testing.T) { esId := fmt.Sprintf("%d:%d", msg.Partition, msg.Offset) _, err = db.GetClient().Refresh(esIndex).Do(context.Background()) if assert.NoError(t, err) { - res, err := db.GetClient().Get().Index(esIndex). - Type(msg.Topic).Id(esId).Do(context.Background()) + res, err := db.GetClient().Get().Index(esIndex).Id(esId).Do(context.Background()) var r fixtures.FixtureRecord if assert.NoError(t, err) { assert.True(t, res.Found) - err = json.Unmarshal(*res.Source, &r) + err = json.Unmarshal(res.Source, &r) if assert.NoError(t, err) { assert.Equal(t, rec.Id, r.Id) assert.InDelta(t, expectedTimestamp, r.Timestamp, 5000.0) From 9c22b9b16ba9f055722296ea4541abf256076ffe Mon Sep 17 00:00:00 2001 From: Rafael Acevedo Date: Mon, 22 Apr 2019 09:44:48 -0300 Subject: [PATCH 3/5] chore(README): add go modules instructions --- README.md | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/README.md b/README.md index c4bfae8..d542e2f 100644 --- a/README.md +++ b/README.md @@ -112,10 +112,10 @@ The exported metrics are: ## Development -Clone the repo, install dep and retrieve dependencies: +This repository uses Go modules for dependency management, make sure it is enabled. +To build the project from source, run from project root: ```bash -go get -u github.com/golang/dep/... -dep ensure -v +go build cmd/injector.go ``` To run tests, run `docker-compose up -d zookeeper kafka schema-registry elasticsearch` and run `make test`. From 83b78637683c144f848dc145bdbdf4b99f8b5a63 Mon Sep 17 00:00:00 2001 From: Rafael Acevedo Date: Mon, 22 Apr 2019 09:44:59 -0300 Subject: [PATCH 4/5] chore(VERSION): update to 1.0.0 --- VERSION | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/VERSION b/VERSION index ee6cdce..afaf360 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -0.6.1 +1.0.0 \ No newline at end of file From 7c4d752876b5cbbd1e39ecc00b597a3da0deade2 Mon Sep 17 00:00:00 2001 From: Rafael Acevedo Date: Mon, 22 Apr 2019 09:48:35 -0300 Subject: [PATCH 5/5] refactor(elasticsearch): ignore errors on log --- src/elasticsearch/elasticsearch.go | 2 +- src/elasticsearch/elasticsearch_test.go | 6 +++--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/src/elasticsearch/elasticsearch.go b/src/elasticsearch/elasticsearch.go index 26f81e6..7b4ddc3 100644 --- a/src/elasticsearch/elasticsearch.go +++ b/src/elasticsearch/elasticsearch.go @@ -67,7 +67,7 @@ func (d recordDatabase) Insert(records []*models.ElasticRecord) (*InsertResponse defer cancel() res, err := bulkRequest.Do(ctx) if err == elastic.ErrNoClient || errors.Cause(err) == elastic.ErrNoClient { - level.Warn(d.logger).Log("message", "no elasticsearch node available", "err", err) + _ = level.Warn(d.logger).Log("message", "no elasticsearch node available", "err", err) return &InsertResponse{AlreadyExists: nil, Retry: records, Backoff: true}, nil } if err != nil { diff --git a/src/elasticsearch/elasticsearch_test.go b/src/elasticsearch/elasticsearch_test.go index 70cf01a..4a3d18f 100644 --- a/src/elasticsearch/elasticsearch_test.go +++ b/src/elasticsearch/elasticsearch_test.go @@ -82,7 +82,7 @@ func TestRecordDatabase_Insert(t *testing.T) { } res, err := db.GetClient().Get().Index(record.Index).Type(record.Type).Id(record.ID).Do(context.Background()) if assert.NoError(t, err) { - json.Unmarshal(res.Source, &recordFromES) + _ = json.Unmarshal(res.Source, &recordFromES) } assert.Equal(t, recordFromES.Id, id) } @@ -104,7 +104,7 @@ func TestRecordDatabase_Insert_RepeatedId(t *testing.T) { } res, err := db.GetClient().Get().Index(record.Index).Type(record.Type).Id(record.ID).Do(context.Background()) if assert.NoError(t, err) { - json.Unmarshal(res.Source, &recordFromES) + _ = json.Unmarshal(res.Source, &recordFromES) } assert.Equal(t, recordFromES.Id, id) } @@ -123,7 +123,7 @@ func TestRecordDatabase_Insert_Multiple(t *testing.T) { } res, err := db.GetClient().Get().Index(record.Index).Type(record.Type).Id(record.ID).Do(context.Background()) if assert.NoError(t, err) { - json.Unmarshal(res.Source, &recordFromES) + _ = json.Unmarshal(res.Source, &recordFromES) } assert.Equal(t, recordFromES.Id, id) }