Skip to content

Commit

Permalink
Merge 5f0e3f6 into 9f7228f
Browse files Browse the repository at this point in the history
  • Loading branch information
pavolloffay committed Jan 12, 2018
2 parents 9f7228f + 5f0e3f6 commit 7c79538
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 6 deletions.
11 changes: 7 additions & 4 deletions plugin/storage/es/spanstore/writer.go
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/pkg/errors"
"github.com/uber/jaeger-lib/metrics"
"go.uber.org/zap"
"gopkg.in/olivere/elastic.v5"

"github.com/jaegertracing/jaeger/model"
"github.com/jaegertracing/jaeger/model/converter/json"
Expand Down Expand Up @@ -139,13 +140,15 @@ func (s *SpanWriter) createIndex(indexName string, mapping string, jsonSpan *jMo
start := time.Now()
exists, _ := s.client.IndexExists(indexName).Do(s.ctx) // don't need to check the error because the exists variable will be false anyway if there is an error
if !exists {
// if there are multiple collectors writing to the same elasticsearch host, if the collectors pass
// the exists check above and try to create the same index all at once, this might fail and
// drop a couple spans (~1 per collector). Creating indices ahead of time alleviates this issue.
// if there are multiple collectors writing to the same elasticsearch host a race condition can occur - create the index multiple times
// we check for the error type to minimize errors
_, err := s.client.CreateIndex(indexName).Body(s.fixMapping(mapping)).Do(s.ctx)
s.writerMetrics.indexCreate.Emit(err, time.Since(start))
if err != nil {
return s.logError(jsonSpan, err, "Failed to create index", s.logger)
eErr, ok := err.(*elastic.Error)
if !ok || eErr.Details != nil && eErr.Details.Type != "index_already_exists_exception" {
return s.logError(jsonSpan, err, "Failed to create index", s.logger)
}
}
}
writeCache(indexName, s.indexCache)
Expand Down
8 changes: 6 additions & 2 deletions plugin/storage/es/spanstore/writer_test.go
Expand Up @@ -258,9 +258,13 @@ func TestCheckAndCreateIndex(t *testing.T) {
`"error":"index creation error"`,
},
},
{
indexExists: false,
createError: &elastic.Error{Details: &elastic.ErrorDetails{Type: "index_already_exists_exception"}},
indexExistsError: &elastic.Error{Details: &elastic.ErrorDetails{Type: "index_already_exists_exception"}},
},
}
for _, tc := range testCases {
testCase := tc
for _, testCase := range testCases {
withSpanWriter(func(w *spanWriterTest) {
existsService := &mocks.IndicesExistsService{}
existsService.On("Do", mock.AnythingOfType("*context.emptyCtx")).Return(testCase.indexExists, testCase.indexExistsError)
Expand Down

0 comments on commit 7c79538

Please sign in to comment.