Skip to content

Commit

Permalink
Merge pull request #190 from rocboss/jc/alimy
Browse files Browse the repository at this point in the history
optimize search logic for bridgeTweetSearchServant
  • Loading branch information
alimy committed Nov 3, 2022
2 parents 26f8cea + 67c669f commit f2d674c
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 23 deletions.
60 changes: 39 additions & 21 deletions internal/dao/search/bridge.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package search

import (
"time"

"github.com/rocboss/paopao-ce/internal/core"
"github.com/rocboss/paopao-ce/internal/model"
"github.com/sirupsen/logrus"
Expand All @@ -17,8 +19,9 @@ type documents struct {
}

type bridgeTweetSearchServant struct {
ts core.TweetSearchService
updateDocsCh chan *documents
ts core.TweetSearchService
updateDocsCh chan *documents
updateDocsTempCh chan *documents
}

func (s *bridgeTweetSearchServant) IndexName() string {
Expand Down Expand Up @@ -47,32 +50,47 @@ func (s *bridgeTweetSearchServant) Search(user *model.User, q *core.QueryReq, of
func (s *bridgeTweetSearchServant) updateDocs(doc *documents) {
select {
case s.updateDocsCh <- doc:
logrus.Debugln("addDocuments send documents by chan")
logrus.Debugln("addDocuments send documents by updateDocsCh chan")
default:
go func(item *documents) {
if len(item.docItems) > 0 {
if _, err := s.ts.AddDocuments(item.docItems, item.primaryKey...); err != nil {
logrus.Errorf("addDocuments in gorotine occurs error: %v", err)
}
} else if len(item.identifiers) > 0 {
if err := s.ts.DeleteDocuments(item.identifiers); err != nil {
logrus.Errorf("deleteDocuments in gorotine occurs error: %s", err)
select {
case s.updateDocsTempCh <- doc:
logrus.Debugln("addDocuments send documents by updateDocsTempCh chan")
default:
go func() {
s.handleUpdate(doc)

// watch updateDocsTempch to continue handle update if needed.
// cancel loop if no item had watched in 1 minute.
for count := 0; count > 60; count++ {
select {
case item := <-s.updateDocsTempCh:
// reset count to continue handle docs update
count = 0
s.handleUpdate(item)
default:
// sleeping to wait docs item pass over to handle
time.Sleep(1 * time.Second)
}
}
}
}(doc)
}()
}
}
}

func (s *bridgeTweetSearchServant) startUpdateDocs() {
for doc := range s.updateDocsCh {
if len(doc.docItems) > 0 {
if _, err := s.ts.AddDocuments(doc.docItems, doc.primaryKey...); err != nil {
logrus.Errorf("addDocuments occurs error: %v", err)
}
} else if len(doc.identifiers) > 0 {
if err := s.ts.DeleteDocuments(doc.identifiers); err != nil {
logrus.Errorf("deleteDocuments occurs error: %s", err)
}
s.handleUpdate(doc)
}
}

func (s *bridgeTweetSearchServant) handleUpdate(item *documents) {
if len(item.docItems) > 0 {
if _, err := s.ts.AddDocuments(item.docItems, item.primaryKey...); err != nil {
logrus.Errorf("addDocuments occurs error: %v", err)
}
} else if len(item.identifiers) > 0 {
if err := s.ts.DeleteDocuments(item.identifiers); err != nil {
logrus.Errorf("deleteDocuments occurs error: %s", err)
}
}
}
5 changes: 3 additions & 2 deletions internal/dao/search/search.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,9 @@ func NewBridgeTweetSearchService(ts core.TweetSearchService) core.TweetSearchSer
capacity = 10000
}
bts := &bridgeTweetSearchServant{
ts: ts,
updateDocsCh: make(chan *documents, capacity),
ts: ts,
updateDocsCh: make(chan *documents, capacity),
updateDocsTempCh: make(chan *documents, 100),
}

numWorker := conf.TweetSearchSetting.MinWorker
Expand Down

0 comments on commit f2d674c

Please sign in to comment.