Skip to content

Commit

Permalink
sync修正
Browse files Browse the repository at this point in the history
  • Loading branch information
ikura-hamu committed Jul 16, 2023
1 parent bdcad8e commit 2e9b674
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 38 deletions.
41 changes: 6 additions & 35 deletions service/search/es_result.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"fmt"

"github.com/gofrs/uuid"
"github.com/olivere/elastic/v7"
"github.com/samber/lo"

"github.com/traPtitech/traQ/service/message"
Expand All @@ -17,17 +16,17 @@ type esResult struct {
messages []message.Message
}

func (e *esEngine) parseResBody(resBody m) (Result, error) {
totalHits := resBody["hits"].(m)["total"].(m)["value"].(int64)
hits := resBody["hits"].(m)["hits"].([]map[string]any)
func (e *esEngine) parseResultBody(resBody m) (Result, error) {
totalHits := resBody["hits"].(map[string]interface{})["total"].(map[string]interface{})["value"].(float64)
hits := resBody["hits"].(map[string]interface{})["hits"].([]any)

r := &esResult{
totalHits: totalHits,
totalHits: int64(totalHits),
messages: make([]message.Message, 0, len(hits)),
}

messageIDs := utils.Map(hits, func(hit map[string]any) uuid.UUID {
return uuid.Must(uuid.FromString(hit["_id"].(string)))
messageIDs := utils.Map(hits, func(hit any) uuid.UUID {
return uuid.Must(uuid.FromString(hit.(map[string]any)["_id"].(string)))
})

messages, err := e.mm.GetIn(messageIDs)
Expand All @@ -50,34 +49,6 @@ func (e *esEngine) parseResBody(resBody m) (Result, error) {
return r, nil
}

func (e *esEngine) bindESResult(sr *elastic.SearchResult) (Result, error) {
r := &esResult{
totalHits: sr.TotalHits(),
messages: make([]message.Message, 0, len(sr.Hits.Hits)),
}

messageIDs := utils.Map(sr.Hits.Hits, func(hit *elastic.SearchHit) uuid.UUID {
return uuid.Must(uuid.FromString(hit.Id))
})
messages, err := e.mm.GetIn(messageIDs)
if err != nil {
return nil, err
}
messagesMap := lo.SliceToMap(messages, func(m message.Message) (uuid.UUID, message.Message) {
return m.GetID(), m
})
// sort result
for _, id := range messageIDs {
msg, ok := messagesMap[id]
if !ok {
return nil, fmt.Errorf("message %v not found", id)
}
r.messages = append(r.messages, msg)
}

return r, nil
}

func (e *esResult) TotalHits() int64 {
return e.totalHits
}
Expand Down
20 changes: 17 additions & 3 deletions service/search/es_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"errors"
"fmt"
"io"
"strings"
"time"

Expand Down Expand Up @@ -148,6 +149,7 @@ func (e *esEngine) sync() error {

lastSynced, err := e.lastInsertedUpdated()
if err != nil {
e.l.Debug("last insert updated")
return err
}

Expand All @@ -156,6 +158,8 @@ func (e *esEngine) sync() error {
for {
messages, more, err := e.repo.GetUpdatedMessagesAfter(lastInsert, syncMessageBulk)
if err != nil {
e.l.Debug("get updated messages after")

return err
}
if len(messages) == 0 {
Expand All @@ -169,6 +173,8 @@ func (e *esEngine) sync() error {
// 新規メッセージが2ページ以上の時のみデータが入ったキャッシュを作成
userCache, err = e.newUserCache()
if err != nil {
e.l.Debug("user cache")

return err
}
}
Expand All @@ -181,6 +187,8 @@ func (e *esEngine) sync() error {
if v.CreatedAt.After(lastSynced) {
doc, err := e.convertMessageCreated(v, message.Parse(v.Text), userCache)
if err != nil {
e.l.Debug("convert message created")

return err
}
err = bulkIndexer.Add(context.Background(), esutil.BulkIndexerItem{
Expand All @@ -196,6 +204,9 @@ func (e *esEngine) sync() error {
})
}
if err != nil {

e.l.Debug("add")

return err
}
}
Expand Down Expand Up @@ -270,26 +281,29 @@ func (e *esEngine) sync() error {
// lastInsertedUpdated esに存在している、updatedAtが一番新しいメッセージの値を取得します
func (e *esEngine) lastInsertedUpdated() (time.Time, error) {
sr, err := e.client.Search(
e.client.Search.WithContext(context.Background()),
e.client.Search.WithIndex(getIndexName(esMessageIndex)),
e.client.Search.WithSort("updatedAt:desc"),
e.client.Search.WithSize(1))
if err != nil {
return time.Time{}, err
}

var body []byte
_, err = sr.Body.Read(body)
body, err := io.ReadAll(sr.Body)
defer sr.Body.Close()

if err != nil {
return time.Time{}, err
}

var res m

err = json.Unmarshal(body, &res)
if err != nil {
return time.Time{}, err
}

result, err := e.parseResBody(res)
result, err := e.parseResultBody(res)
if err != nil {
return time.Time{}, err
}
Expand Down

0 comments on commit 2e9b674

Please sign in to comment.