Skip to content

Commit

Permalink
handle superseded messages properly for incremental mode in GetThread…
Browse files Browse the repository at this point in the history
…Nonblock CORE-8523 (#13249)

* wip

* debug

* wip

* wip

* make pulllocalonly work bettrer

* works with test

* opts
  • Loading branch information
mmaxim committed Aug 10, 2018
1 parent a332a26 commit f759f48
Show file tree
Hide file tree
Showing 14 changed files with 275 additions and 82 deletions.
67 changes: 29 additions & 38 deletions go/chat/convsource.go
Expand Up @@ -133,8 +133,11 @@ func (s *baseConversationSource) postProcessThread(ctx context.Context, uid greg

// Resolve supersedes
if q == nil || !q.DisableResolveSupersedes {
deletePlaceholders := q != nil && q.EnableDeletePlaceholders
if superXform == nil {
superXform = newBasicSupersedesTransform(s.G())
superXform = newBasicSupersedesTransform(s.G(), basicSupersedesTransformOpts{
UseDeletePlaceholders: deletePlaceholders,
})
}
if thread.Messages, err = superXform.Run(ctx, conv, uid, thread.Messages); err != nil {
return err
Expand Down Expand Up @@ -165,7 +168,7 @@ func (s *baseConversationSource) postProcessThread(ctx context.Context, uid greg

func (s *baseConversationSource) TransformSupersedes(ctx context.Context,
unboxInfo types.UnboxConversationInfo, uid gregor1.UID, msgs []chat1.MessageUnboxed) ([]chat1.MessageUnboxed, error) {
transform := newBasicSupersedesTransform(s.G())
transform := newBasicSupersedesTransform(s.G(), basicSupersedesTransformOpts{})
return transform.Run(ctx, unboxInfo, uid, msgs)
}

Expand Down Expand Up @@ -660,7 +663,7 @@ func (s *HybridConversationSource) identifyTLF(ctx context.Context, conv types.U
}

func (s *HybridConversationSource) resolveHoles(ctx context.Context, uid gregor1.UID,
thread *chat1.ThreadView, conv chat1.Conversation, reason chat1.GetThreadReason) (err error) {
thread *chat1.ThreadView, conv chat1.Conversation, reason chat1.GetThreadReason) (holesFilled int, err error) {
defer s.Trace(ctx, func() error { return err }, "resolveHoles")()
var msgIDs []chat1.MessageID
// Gather all placeholder messages so we can go fetch them
Expand All @@ -673,47 +676,28 @@ func (s *HybridConversationSource) resolveHoles(ctx context.Context, uid gregor1
if index == len(thread.Messages)-1 {
// If the last message is a hole, we might not have fetched everything,
// so fail this case like a normal miss
return storage.MissError{}
return 0, storage.MissError{}
}
msgIDs = append(msgIDs, msg.GetMessageID())
}
}
if len(msgIDs) == 0 {
// Nothing to do
return nil
return 0, nil
}
if s.IsOffline(ctx) {
// Don't attempt if we are offline
return OfflineError{}
return 0, OfflineError{}
}

// Fetch all missing messages from server, and sub in the real ones into the placeholder slots
msgs, err := s.GetMessages(ctx, conv, uid, msgIDs, &reason)
if err != nil {
s.Debug(ctx, "resolveHoles: failed to get missing messages: %s", err.Error())
return err
}
msgLookup := make(map[chat1.MessageID]chat1.MessageUnboxed)
for _, msg := range msgs {
msgLookup[msg.GetMessageID()] = msg
}
for i, threadMsg := range thread.Messages {
state, err := threadMsg.State()
if err != nil {
continue
}
if state == chat1.MessageUnboxedState_PLACEHOLDER {
if msg, ok := msgLookup[threadMsg.GetMessageID()]; ok {
thread.Messages[i] = msg
} else {
s.Debug(ctx, "resolveHoles: did not fetch all placeholder messages, missing msgID: %d",
threadMsg.GetMessageID())
return fmt.Errorf("did not fetch all placeholder messages")
}
}
return 0, err
}
s.Debug(ctx, "resolveHoles: success: filled %d holes", len(msgs))
return nil
return len(msgs), nil
}

// maxHolesForPull is the number of misses in the body storage cache we will tolerate missing. A good
Expand All @@ -738,16 +722,23 @@ func (s *HybridConversationSource) Pull(ctx context.Context, convID chat1.Conver
if err == nil {
unboxConv = conv
// Try locally first
var holesFilled int
rc := storage.NewHoleyResultCollector(maxHolesForPull,
s.storage.ResultCollectorFromQuery(ctx, query, pagination))
thread, err = s.fetchMaybeNotify(ctx, conv.GetConvID(), uid, rc, conv.ReaderInfo.MaxMsgid,
query, pagination)
if err == nil {
// Since we are using the "holey" collector, we need to resolve any placeholder
// messages that may have been fetched.
s.Debug(ctx, "Pull: cache hit: convID: %s uid: %s holes: %d msgs: %d", unboxConv.GetConvID(), uid,
rc.Holes(), len(thread.Messages))
err = s.resolveHoles(ctx, uid, &thread, conv, reason)
s.Debug(ctx, "Pull: (holey) cache hit: convID: %s uid: %s holes: %d msgs: %d",
unboxConv.GetConvID(), uid, rc.Holes(), len(thread.Messages))
holesFilled, err = s.resolveHoles(ctx, uid, &thread, conv, reason)
}
if err == nil && holesFilled > 0 {
s.Debug(ctx, "Pull: %d holes filled, refetching from storage")
rc := s.storage.ResultCollectorFromQuery(ctx, query, pagination)
thread, err = s.fetchMaybeNotify(ctx, conv.GetConvID(), uid, rc, conv.ReaderInfo.MaxMsgid,
query, pagination)
}
if err == nil {
// Do online only things
Expand Down Expand Up @@ -830,16 +821,15 @@ func (s *HybridConversationSource) Pull(ctx context.Context, convID chat1.Conver
}

type pullLocalResultCollector struct {
*storage.SimpleResultCollector
num int
storage.ResultCollector
}

func (p *pullLocalResultCollector) Name() string {
return "pulllocal"
}

func (p *pullLocalResultCollector) String() string {
return fmt.Sprintf("[ %s: t: %d ]", p.Name(), p.num)
return fmt.Sprintf("[ %s: base: %s ]", p.Name(), p.ResultCollector)
}

func (p *pullLocalResultCollector) hasRealResults() bool {
Expand Down Expand Up @@ -867,10 +857,9 @@ func (p *pullLocalResultCollector) Error(err storage.Error) storage.Error {
return err
}

func newPullLocalResultCollector(num int) *pullLocalResultCollector {
func newPullLocalResultCollector(baseRC storage.ResultCollector) *pullLocalResultCollector {
return &pullLocalResultCollector{
num: num,
SimpleResultCollector: storage.NewSimpleResultCollector(num),
ResultCollector: baseRC,
}
}

Expand All @@ -885,7 +874,7 @@ func (s *HybridConversationSource) PullLocalOnly(ctx context.Context, convID cha
// Post process thread before returning
defer func() {
if err == nil {
superXform := newBasicSupersedesTransform(s.G())
superXform := newBasicSupersedesTransform(s.G(), basicSupersedesTransformOpts{})
superXform.SetMessagesFunc(func(ctx context.Context, conv types.UnboxConversationInfo,
uid gregor1.UID, msgIDs []chat1.MessageID,
_ *chat1.GetThreadReason) (res []chat1.MessageUnboxed, err error) {
Expand Down Expand Up @@ -928,7 +917,9 @@ func (s *HybridConversationSource) PullLocalOnly(ctx context.Context, convID cha
if pagination != nil {
num = pagination.Num
}
rc := storage.NewHoleyResultCollector(maxPlaceholders, newPullLocalResultCollector(num))
baseRC := s.storage.ResultCollectorFromQuery(ctx, query, pagination)
baseRC.SetTarget(num)
rc := storage.NewHoleyResultCollector(maxPlaceholders, newPullLocalResultCollector(baseRC))
tv, err = s.fetchMaybeNotify(ctx, convID, uid, rc, iboxMaxMsgID, query, pagination)
if err != nil {
s.Debug(ctx, "PullLocalOnly: failed to fetch local messages: %s", err.Error())
Expand Down
1 change: 0 additions & 1 deletion go/chat/helper.go
Expand Up @@ -1232,7 +1232,6 @@ func (n *newConversationHelper) makeFirstMessage(ctx context.Context, triple cha
}

func CreateNameInfoSource(ctx context.Context, g *globals.Context, membersType chat1.ConversationMembersType) types.NameInfoSource {
g.GetLog().CDebugf(ctx, "createNameInfoSource: hi")
if override := ctx.Value(nameInfoOverrideKey); override != nil {
g.GetLog().CDebugf(ctx, "createNameInfoSource: using override: %T", override)
return override.(types.NameInfoSource)
Expand Down
10 changes: 6 additions & 4 deletions go/chat/inboxsource.go
Expand Up @@ -78,7 +78,8 @@ func NewBlockingLocalizer(g *globals.Context) *BlockingLocalizer {
return &BlockingLocalizer{
Contextified: globals.NewContextified(g),
baseLocalizer: newBaseLocalizer(g),
pipeline: newLocalizerPipeline(g, newBasicSupersedesTransform(g)),
pipeline: newLocalizerPipeline(g,
newBasicSupersedesTransform(g, basicSupersedesTransformOpts{})),
}
}

Expand Down Expand Up @@ -124,9 +125,10 @@ func NewNonblockingLocalizer(g *globals.Context, localizeCb chan NonblockInboxRe
Contextified: globals.NewContextified(g),
DebugLabeler: utils.NewDebugLabeler(g.GetLog(), "NonblockingLocalizer", false),
baseLocalizer: newBaseLocalizer(g),
pipeline: newLocalizerPipeline(g, newBasicSupersedesTransform(g)),
localizeCb: localizeCb,
maxUnbox: maxUnbox,
pipeline: newLocalizerPipeline(g,
newBasicSupersedesTransform(g, basicSupersedesTransformOpts{})),
localizeCb: localizeCb,
maxUnbox: maxUnbox,
}
}

Expand Down
35 changes: 20 additions & 15 deletions go/chat/server.go
Expand Up @@ -482,28 +482,28 @@ func (h *Server) mergeLocalRemoteThread(ctx context.Context, remoteThread, local
if state == chat1.MessageUnboxedState_PLACEHOLDER && !rm[m.GetMessageID()] {
h.Debug(ctx, "mergeLocalRemoteThread: subbing in dead placeholder: msgID: %d",
m.GetMessageID())
res.Messages = append(res.Messages, chat1.NewMessageUnboxedWithPlaceholder(
chat1.MessageUnboxedPlaceholder{
MessageID: m.GetMessageID(),
Hidden: true,
},
))
res.Messages = append(res.Messages, utils.CreateHiddenPlaceholder(m.GetMessageID()))
}
}
sort.Sort(utils.ByMsgUnboxedMsgID(res.Messages))
}()

shouldAppend := func(oldMsg chat1.MessageUnboxed) bool {
state, err := oldMsg.State()
if err != nil {
shouldAppend := func(newMsg chat1.MessageUnboxed, oldMsgs map[chat1.MessageID]chat1.MessageUnboxed) bool {
oldMsg, ok := oldMsgs[newMsg.GetMessageID()]
if !ok {
return true
}
switch state {
case chat1.MessageUnboxedState_VALID:
return false
default:
// If either message is not valid, return the new one, something weird might be going on
if !oldMsg.IsValid() || !newMsg.IsValid() {
return true
}
// If newMsg is now superseded by something different than what we sent, then let's include it
if newMsg.Valid().ServerHeader.SupersededBy != oldMsg.Valid().ServerHeader.SupersededBy {
h.Debug(ctx, "mergeLocalRemoteThread: including supersededBy change: msgID: %d",
newMsg.GetMessageID())
return true
}
return false
}
switch mode {
case chat1.GetThreadNonblockCbMode_FULL:
Expand All @@ -516,8 +516,7 @@ func (h *Server) mergeLocalRemoteThread(ctx context.Context, remoteThread, local
}
res.Pagination = remoteThread.Pagination
for _, m := range remoteThread.Messages {
oldMsg, ok := lm[m.GetMessageID()]
if !ok || shouldAppend(oldMsg) {
if shouldAppend(m, lm) {
res.Messages = append(res.Messages, m)
}
}
Expand Down Expand Up @@ -659,6 +658,12 @@ func (h *Server) GetThreadNonblock(ctx context.Context, arg chat1.GetThreadNonbl
return res, err
}

// Enable delete placeholders for supersede transform
if arg.Query == nil {
arg.Query = new(chat1.GetThreadQuery)
}
arg.Query.EnableDeletePlaceholders = true

// Xlate pager control into pagination if given
if arg.Query != nil && arg.Query.MessageIDControl != nil {
pagination = utils.XlateMessageIDControlToPagination(arg.Query.MessageIDControl)
Expand Down

0 comments on commit f759f48

Please sign in to comment.