Skip to content

Commit

Permalink
🐛 Elastic searchのbulk indexerのメモリリーク修正
Browse files Browse the repository at this point in the history
  • Loading branch information
ikura-hamu committed Jan 12, 2024
1 parent ee248c0 commit 973ad95
Showing 1 changed file with 97 additions and 67 deletions.
164 changes: 97 additions & 67 deletions service/search/es_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,58 +172,12 @@ func (e *esEngine) sync() error {
return err
}
}
bulkIndexer, _ := esutil.NewBulkIndexer(esutil.BulkIndexerConfig{
Client: e.client,
Index: getIndexName(esMessageIndex),
})

for _, v := range messages {
if v.CreatedAt.After(lastSynced) {
doc, err := e.convertMessageCreated(v, message.Parse(v.Text), userCache)
if err != nil {
return err
}

data, err := json.Marshal(*doc)
if err != nil {
return err
}

err = bulkIndexer.Add(context.Background(), esutil.BulkIndexerItem{
Action: "index",
DocumentID: v.ID.String(),
Body: bytes.NewReader(data),
})
if err != nil {
return err
}
} else {
doc := e.convertMessageUpdated(v, message.Parse(v.Text))

data, err := json.Marshal(map[string]any{"doc": *doc})
if err != nil {
return err
}

err = bulkIndexer.Add(context.Background(), esutil.BulkIndexerItem{
Action: "update",
DocumentID: v.ID.String(),
Body: bytes.NewReader(data),
})
if err != nil {
return err
}
}
}

err = bulkIndexer.Close(context.Background())
err = syncNewMessages(e, messages, lastInsert, lastSynced, userCache)
if err != nil {
return err
}

e.l.Info(fmt.Sprintf("indexed %v message(s) to index, updated %v message(s) on index, failed %v message(s), last insert %v",
bulkIndexer.Stats().NumIndexed, bulkIndexer.Stats().NumUpdated, bulkIndexer.Stats().NumFailed, lastInsert))

if !more {
break
}
Expand All @@ -243,42 +197,118 @@ func (e *esEngine) sync() error {
}
lastDelete = messages[len(messages)-1].DeletedAt.Time

bulkIndexer, _ := esutil.NewBulkIndexer(esutil.BulkIndexerConfig{
Client: e.client,
Index: getIndexName(esMessageIndex),
})
err = syncDeletedMessages(e, messages, lastDelete, lastSynced)
if err != nil {
return err
}

if !more {
break
}
}

return nil
}

func syncNewMessages(e *esEngine, messages []*model.Message, lastInsert time.Time, lastSynced time.Time, userCache userCache) (err error) {
bulkIndexer, _ := esutil.NewBulkIndexer(esutil.BulkIndexerConfig{
Client: e.client,
Index: getIndexName(esMessageIndex),
})
defer func() {
er := bulkIndexer.Close(context.Background())
if err != nil && er != nil { // エラーが発生してからdeferに来た時、エラーの上書きを防ぐ。
err = fmt.Errorf("error in bulk index: %w.\nerror in closing bulk indexer: %w", err, er) //無名関数の戻り値を名前付きにしているので、このerrが返る。
return
}
if er != nil {
err = er
return
}

e.l.Info(fmt.Sprintf("indexed %v message(s) to index, updated %v message(s) on index, failed %v message(s), last insert %v",
bulkIndexer.Stats().NumIndexed, bulkIndexer.Stats().NumUpdated, bulkIndexer.Stats().NumFailed, lastInsert))
}()

for _, v := range messages {
if v.CreatedAt.After(lastSynced) {
doc, err := e.convertMessageCreated(v, message.Parse(v.Text), userCache)
if err != nil {
return err
}

data, err := json.Marshal(*doc)
if err != nil {
return err
}

count := 0
for _, v := range messages {
if v.CreatedAt.After(lastSynced) {
continue
err = bulkIndexer.Add(context.Background(), esutil.BulkIndexerItem{
Action: "index",
DocumentID: v.ID.String(),
Body: bytes.NewReader(data),
})
if err != nil {
return err
}
count++
} else {
doc := e.convertMessageUpdated(v, message.Parse(v.Text))

data, err := json.Marshal(map[string]any{"doc": *doc})
if err != nil {
return err
}

err = bulkIndexer.Add(context.Background(), esutil.BulkIndexerItem{
Action: "delete",
Action: "update",
DocumentID: v.ID.String(),
Body: bytes.NewReader(data),
})
if err != nil {
return err
}
}
if count == 0 {
if more {
continue
}
break
}

return nil
}

func syncDeletedMessages(e *esEngine, messages []*model.Message, lastDelete time.Time, lastSynced time.Time) (err error) {
bulkIndexer, _ := esutil.NewBulkIndexer(esutil.BulkIndexerConfig{
Client: e.client,
Index: getIndexName(esMessageIndex),
})
defer func() {
er := bulkIndexer.Close(context.Background())
if err != nil && er != nil { // エラーが発生してからdeferに来た時、エラーの上書きを防ぐ
err = fmt.Errorf("error in bulk index: %w.\nerror in closing bulk indexer: %w", err, er)
return
}
err = bulkIndexer.Close(context.Background())
if err != nil {
return err
if er != nil {
err = er
return
}

e.l.Info(fmt.Sprintf("deleted %v message(s) from index, failed %v message(s), last delete %v",
bulkIndexer.Stats().NumDeleted, bulkIndexer.Stats().NumFailed, lastDelete))
return

Check failure on line 293 in service/search/es_sync.go

View workflow job for this annotation

GitHub Actions / Lint

S1023: redundant `return` statement (gosimple)
}()

if !more {
break
count := 0
for _, v := range messages {
if v.CreatedAt.After(lastSynced) {
continue
}
count++
err = bulkIndexer.Add(context.Background(), esutil.BulkIndexerItem{
Action: "delete",
DocumentID: v.ID.String(),
})
if err != nil {
return err
}
}
if count == 0 {
return nil
}

return nil
Expand Down

0 comments on commit 973ad95

Please sign in to comment.