Skip to content

Commit

Permalink
syncの修正
Browse files Browse the repository at this point in the history
  • Loading branch information
ikura-hamu committed Jul 20, 2023
1 parent 2e9b674 commit 62438b5
Show file tree
Hide file tree
Showing 3 changed files with 61 additions and 35 deletions.
28 changes: 17 additions & 11 deletions service/search/es.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"context"
"encoding/json"
"fmt"
"io"
"net/http"
"strings"
"sync"
Expand Down Expand Up @@ -256,14 +257,21 @@ type searchQuery m
type searchBody struct {
Query *struct {
Bool *struct {
Musts []searchQuery `json:"musts,omitempty"`
Musts []searchQuery `json:"must,omitempty"`
} `json:"bool,omitempty"`
} `json:"query,omitempty"`
}

func NewSearchBody(sq []searchQuery) searchBody {
sb := searchBody{}
sb.Query.Bool.Musts = sq
sb := searchBody{
Query: &struct {
Bool *struct {
Musts []searchQuery `json:"must,omitempty"`
} `json:"bool,omitempty"`
}{Bool: &struct {
Musts []searchQuery `json:"must,omitempty"`
}{Musts: sq}},
}
return sb
}

Expand Down Expand Up @@ -312,7 +320,7 @@ func (e *esEngine) Do(q *Query) (Result, error) {
Gt: q.After.ValueOrZero().Format(esDateFormat),
}}})
case !q.After.Valid && q.Before.Valid:
musts = append(musts, searchQuery{"rage": rangeQuery{"createdAt": rangeParameters{
musts = append(musts, searchQuery{"range": rangeQuery{"createdAt": rangeParameters{
Lt: q.Before.ValueOrZero().Format(esDateFormat),
}}})
}
Expand Down Expand Up @@ -383,28 +391,26 @@ func (e *esEngine) Do(q *Query) (Result, error) {
e.client.Search.WithFrom(offset),
e.client.Search.WithContext(context.Background()),
)

if err != nil {
return nil, err
}
if sr.IsError() {
return nil, fmt.Errorf("failed to get search result")
}
var searchResultBody []byte
_, err = sr.Body.Read(searchResultBody)
searchResultBody, err := io.ReadAll(sr.Body)
defer sr.Body.Close()
if err != nil {
return nil, fmt.Errorf("failed to get search result body")
return nil, err
}
defer sr.Body.Close()

var res m
err = json.Unmarshal(searchResultBody, &res)
if err != nil {
return nil, fmt.Errorf("failed to unmarshal response body")
return nil, err
}

e.l.Debug("search result", zap.Reflect("hits", res["hits"]))
return e.parseResBody(res)
return e.parseResultBody(res)
}

func (e *esEngine) Available() bool {
Expand Down
28 changes: 28 additions & 0 deletions service/search/es_result.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,34 @@ type esResult struct {
messages []message.Message
}

type esSearchResponse struct {
Shards struct {
Failed int64 `json:"failed"`
Skipped int64 `json:"skipped"`
Successful int64 `json:"successful"`
Total int64 `json:"total"`
} `json:"_shards"`
Hits struct {
Hits []struct {
ID string `json:"_id"`
Index string `json:"_index"`
Score interface{} `json:"_score"`
Source struct {
doc esMessageDoc
} `json:"_source"`
Type string `json:"_type"`
Sort []int64 `json:"sort"`
} `json:"hits"`
MaxScore interface{} `json:"max_score"`
Total struct {
Relation string `json:"relation"`
Value int64 `json:"value"`
} `json:"total"`
} `json:"hits"`
TimedOut bool `json:"timed_out"`
Took int64 `json:"took"`
}

func (e *esEngine) parseResultBody(resBody m) (Result, error) {
totalHits := resBody["hits"].(map[string]interface{})["total"].(map[string]interface{})["value"].(float64)
hits := resBody["hits"].(map[string]interface{})["hits"].([]any)
Expand Down
40 changes: 16 additions & 24 deletions service/search/es_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,6 @@ func (e *esEngine) sync() error {

lastSynced, err := e.lastInsertedUpdated()
if err != nil {
e.l.Debug("last insert updated")
return err
}

Expand All @@ -158,8 +157,6 @@ func (e *esEngine) sync() error {
for {
messages, more, err := e.repo.GetUpdatedMessagesAfter(lastInsert, syncMessageBulk)
if err != nil {
e.l.Debug("get updated messages after")

return err
}
if len(messages) == 0 {
Expand All @@ -173,8 +170,6 @@ func (e *esEngine) sync() error {
// 新規メッセージが2ページ以上の時のみデータが入ったキャッシュを作成
userCache, err = e.newUserCache()
if err != nil {
e.l.Debug("user cache")

return err
}
}
Expand All @@ -187,27 +182,26 @@ func (e *esEngine) sync() error {
if v.CreatedAt.After(lastSynced) {
doc, err := e.convertMessageCreated(v, message.Parse(v.Text), userCache)
if err != nil {
e.l.Debug("convert message created")

return err
}
err = bulkIndexer.Add(context.Background(), esutil.BulkIndexerItem{
Action: "index",
DocumentID: v.ID.String(),
Body: esutil.NewJSONReader(doc),
})
if err != nil {
return err
}
} else {
doc := e.convertMessageUpdated(v, message.Parse(v.Text))
err = bulkIndexer.Add(context.Background(), esutil.BulkIndexerItem{
Action: "update",
DocumentID: v.ID.String(),
Body: esutil.NewJSONReader(e.convertMessageUpdated(v, message.Parse(v.Text))),
Body: esutil.NewJSONReader(map[string]any{"doc": doc}),
})
}
if err != nil {

e.l.Debug("add")

return err
if err != nil {
return err
}
}
}

Expand All @@ -216,8 +210,8 @@ func (e *esEngine) sync() error {
return err
}

e.l.Info(fmt.Sprintf("indexed %v message(s) to index, updated %v message(s) on index, last insert %v",
bulkIndexer.Stats().NumIndexed, bulkIndexer.Stats().NumUpdated, lastInsert))
e.l.Info(fmt.Sprintf("indexed %v message(s) to index, updated %v message(s) on index, failed %v message(s), last insert %v",
bulkIndexer.Stats().NumIndexed, bulkIndexer.Stats().NumUpdated, bulkIndexer.Stats().NumFailed, lastInsert))

if !more {
break
Expand Down Expand Up @@ -268,7 +262,8 @@ func (e *esEngine) sync() error {
return err
}

e.l.Info(fmt.Sprintf("deleted %v message(s) from index, last delete %v", bulkIndexer.Stats().NumDeleted, lastDelete))
e.l.Info(fmt.Sprintf("deleted %v message(s) from index, failed %v message(s), last delete %v",
bulkIndexer.Stats().NumDeleted, bulkIndexer.Stats().NumFailed, lastDelete))

if !more {
break
Expand Down Expand Up @@ -296,21 +291,18 @@ func (e *esEngine) lastInsertedUpdated() (time.Time, error) {
return time.Time{}, err
}

var res m
var res esSearchResponse

err = json.Unmarshal(body, &res)
if err != nil {
return time.Time{}, err
}

result, err := e.parseResultBody(res)
if err != nil {
return time.Time{}, err
}
lastUpdatedDoc := res.Hits.Hits

if len(result.Hits()) == 0 {
if len(lastUpdatedDoc) == 0 {
return time.Time{}, nil
}

return result.Hits()[0].GetUpdatedAt(), nil
return lastUpdatedDoc[0].Source.doc.UpdatedAt, nil
}

0 comments on commit 62438b5

Please sign in to comment.