Skip to content

Commit

Permalink
sync実装
Browse files Browse the repository at this point in the history
  • Loading branch information
ikura-hamu committed Jul 15, 2023
1 parent c0dec44 commit e2d195f
Showing 1 changed file with 38 additions and 19 deletions.
57 changes: 38 additions & 19 deletions service/search/es_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,11 @@ import (
"strings"
"time"

"github.com/elastic/go-elasticsearch/v7/esutil"
"github.com/gofrs/uuid"
json "github.com/json-iterator/go"
"github.com/olivere/elastic/v7"

// "github.com/olivere/elastic/v7"
"go.uber.org/zap"

"github.com/traPtitech/traQ/model"
Expand Down Expand Up @@ -171,31 +173,41 @@ func (e *esEngine) sync() error {
return err
}
}
bulkIndexer, _ := esutil.NewBulkIndexer(esutil.BulkIndexerConfig{
Client: e.client,
Index: getIndexName(esMessageIndex),
})

bulk := e.client.Bulk().Index(getIndexName(esMessageIndex))
for _, v := range messages {
var bulkReq elastic.BulkableRequest
if v.CreatedAt.After(lastSynced) {
doc, err := e.convertMessageCreated(v, message.Parse(v.Text), userCache)
if err != nil {
return err
}
bulkReq = elastic.NewBulkIndexRequest().
Id(v.ID.String()).
Doc(doc)
err = bulkIndexer.Add(context.Background(), esutil.BulkIndexerItem{
Action: "index",
DocumentID: v.ID.String(),
Body: esutil.NewJSONReader(doc),
})
} else {
bulkReq = elastic.NewBulkUpdateRequest().
Id(v.ID.String()).
Doc(e.convertMessageUpdated(v, message.Parse(v.Text)))
err = bulkIndexer.Add(context.Background(), esutil.BulkIndexerItem{
Action: "update",
DocumentID: v.ID.String(),
Body: esutil.NewJSONReader(e.convertMessageUpdated(v, message.Parse(v.Text))),
})
}
if err != nil {
return err
}
bulk.Add(bulkReq)
}
res, err := bulk.Do(context.Background())

err = bulkIndexer.Close(context.Background())
if err != nil {
return err
}

e.l.Info(fmt.Sprintf("indexed %v message(s) to index, updated %v message(s) on index, last insert %v", len(res.Indexed()), len(res.Updated()), lastInsert))
e.l.Info(fmt.Sprintf("indexed %v message(s) to index, updated %v message(s) on index, last insert %v",
bulkIndexer.Stats().NumIndexed, bulkIndexer.Stats().NumUpdated, lastInsert))

if !more {
break
Expand All @@ -216,30 +228,37 @@ func (e *esEngine) sync() error {
}
lastDelete = messages[len(messages)-1].DeletedAt.Time

bulk := e.client.Bulk().Index(getIndexName(esMessageIndex))
bulkIndexer, _ := esutil.NewBulkIndexer(esutil.BulkIndexerConfig{
Client: e.client,
Index: getIndexName(esMessageIndex),
})

count := 0
for _, v := range messages {
if v.CreatedAt.After(lastSynced) {
continue
}
count++
bulk.Add(
elastic.NewBulkDeleteRequest().
Id(v.ID.String()),
)
err = bulkIndexer.Add(context.Background(), esutil.BulkIndexerItem{
Action: "delete",
DocumentID: v.ID.String(),
})
if err != nil {
return err
}
}
if count == 0 {
if more {
continue
}
break
}
res, err := bulk.Do(context.Background())
err = bulkIndexer.Close(context.Background())
if err != nil {
return err
}

e.l.Info(fmt.Sprintf("deleted %v message(s) from index, last delete %v", len(res.Deleted()), lastDelete))
e.l.Info(fmt.Sprintf("deleted %v message(s) from index, last delete %v", bulkIndexer.Stats().NumDeleted, lastDelete))

if !more {
break
Expand Down

0 comments on commit e2d195f

Please sign in to comment.