Skip to content

Commit

Permalink
Use Inbox to back TeamChannelSource (#14849)
Browse files Browse the repository at this point in the history
* Use Inbox to back TeamChannelSource

* convload/index active convs

* Remove all remote calls to GetTLFConversations

* InboxSource.Read

* Revert "InboxSource.Read"

This reverts commit 88f73e0.

* comment

* don't modify query

* v12.1, syncer test

* 2.13.1

* defaultMemberStatusFilter

* fix test
  • Loading branch information
joshblum committed Jan 10, 2019
1 parent 700cd24 commit 5f1a984
Show file tree
Hide file tree
Showing 22 changed files with 178 additions and 302 deletions.
3 changes: 0 additions & 3 deletions go/chat/helper.go
Expand Up @@ -1030,9 +1030,6 @@ func (n *newConversationHelper) create(ctx context.Context) (res chat1.Conversat
return res, err
}

// Clear team channel source
n.G().TeamChannelSource.ChannelsChanged(ctx, updateConv.Conv.Metadata.IdTriple.Tlfid)

if res.Error != nil {
return res, errors.New(res.Error.Message)
}
Expand Down
14 changes: 8 additions & 6 deletions go/chat/inboxsource.go
Expand Up @@ -159,7 +159,7 @@ func (b *baseInboxSource) Localize(ctx context.Context, uid gregor1.UID, convs [
localizerTyp types.ConversationLocalizerTyp) ([]chat1.ConversationLocal, chan types.AsyncInboxResult, error) {
localizeCb := make(chan types.AsyncInboxResult, len(convs))
localizer := b.createConversationLocalizer(ctx, localizerTyp, localizeCb)
b.Debug(ctx, "Localize: using localizer: %s", localizer.Name())
b.Debug(ctx, "Localize: using localizer: %s, convs: %d", localizer.Name(), len(convs))

res, err := localizer.Localize(ctx, uid, types.Inbox{
ConvsUnverified: convs,
Expand Down Expand Up @@ -491,11 +491,13 @@ func (s *HybridInboxSource) fetchRemoteInbox(ctx context.Context, uid gregor1.UI
if query != nil && query.SkipBgLoads {
continue
}
// Queue all these convs up to be loaded by the background loader Only
// load first 100 non KBFS convs so we don't get the conv loader too
// backed up
if bgEnqueued < 100 &&
conv.Metadata.MembersType != chat1.ConversationMembersType_KBFS {
// Queue all these convs up to be loaded by the background loader. Only
// load first 100 non KBFS convs, ACTIVE convs so we don't get the conv
// loader too backed up.
if conv.Metadata.MembersType != chat1.ConversationMembersType_KBFS &&
(conv.HasMemberStatus(chat1.ConversationMemberStatus_ACTIVE) ||
conv.HasMemberStatus(chat1.ConversationMemberStatus_PREVIEW)) &&
bgEnqueued < 100 {
job := types.NewConvLoaderJob(conv.GetConvID(), nil /* query */, &chat1.Pagination{Num: 50},
types.ConvLoaderPriorityMedium, newConvLoaderPagebackHook(s.G(), 0, 5))
if err := s.G().ConvLoader.Queue(ctx, job); err != nil {
Expand Down
23 changes: 0 additions & 23 deletions go/chat/push.go
Expand Up @@ -922,27 +922,6 @@ func (g *PushHandler) MembershipUpdate(ctx context.Context, m gregor.OutOfBandMe
return nil
}

func (g *PushHandler) TeamChannels(ctx context.Context, m gregor.OutOfBandMessage) (err error) {
var identBreaks []keybase1.TLFIdentifyFailure
ctx = Context(ctx, g.G(), keybase1.TLFIdentifyBehavior_CHAT_GUI, &identBreaks,
g.identNotifier)
defer g.Trace(ctx, func() error { return err }, "TeamChannels")()
if m.Body() == nil {
return errors.New("gregor handler for team channels update: nil message body")
}

var update chat1.TeamChannelUpdate
reader := bytes.NewReader(m.Body().Bytes())
dec := codec.NewDecoder(reader, &codec.MsgpackHandle{WriteExt: true})
if err = dec.Decode(&update); err != nil {
return err
}

g.G().TeamChannelSource.ChannelsChanged(ctx, update.TeamID)

return nil
}

func (g *PushHandler) SetConvRetention(ctx context.Context, m gregor.OutOfBandMessage) (err error) {
var identBreaks []keybase1.TLFIdentifyFailure
ctx = Context(ctx, g.G(), keybase1.TLFIdentifyBehavior_CHAT_GUI, &identBreaks,
Expand Down Expand Up @@ -1154,8 +1133,6 @@ func (g *PushHandler) HandleOobm(ctx context.Context, obm gregor.OutOfBandMessag
return true, g.Typing(ctx, obm)
case types.PushMembershipUpdate:
return true, g.MembershipUpdate(ctx, obm)
case types.PushTeamChannels:
return true, g.TeamChannels(ctx, obm)
case types.PushConvRetention:
return true, g.SetConvRetention(ctx, obm)
case types.PushTeamRetention:
Expand Down
4 changes: 4 additions & 0 deletions go/chat/search/indexer.go
Expand Up @@ -454,6 +454,10 @@ func (idx *Indexer) allConvs(ctx context.Context, uid gregor1.UID) (map[string]t
chat1.ConversationStatus_FAVORITE,
chat1.ConversationStatus_MUTED,
},
MemberStatus: []chat1.ConversationMemberStatus{
chat1.ConversationMemberStatus_ACTIVE,
chat1.ConversationMemberStatus_PREVIEW,
},
SkipBgLoads: true,
}
username := idx.G().Env.GetUsername().String()
Expand Down
2 changes: 1 addition & 1 deletion go/chat/sender_test.go
Expand Up @@ -265,7 +265,7 @@ func setupTest(t *testing.T, numUsers int) (context.Context, *kbtest.ChatMockWor
pushHandler.SetClock(world.Fc)
g.PushHandler = pushHandler
g.ChatHelper = NewHelper(g, getRI)
g.TeamChannelSource = NewCachingTeamChannelSource(g, getRI)
g.TeamChannelSource = NewTeamChannelSource(g)
g.ActivityNotifier = NewNotifyRouterActivityRouter(g)

searcher := search.NewRegexpSearcher(g)
Expand Down
26 changes: 4 additions & 22 deletions go/chat/server.go
Expand Up @@ -1819,29 +1819,12 @@ func (h *Server) JoinConversationLocal(ctx context.Context, arg chat1.JoinConver
}

// List all the conversations on the team
teamConvs, err := h.remoteClient().GetTLFConversations(ctx, chat1.GetTLFConversationsArg{
TlfID: nameInfo.ID,
TopicType: arg.TopicType,
SummarizeMaxMsgs: false, // tough call here, depends on if we are in most of convos on the team
})
convs, err := h.G().TeamChannelSource.GetChannelsFull(ctx, uid, nameInfo.ID, arg.TopicType)
if err != nil {
h.Debug(ctx, "JoinConversationLocal: failed to list team conversations: %s", err.Error())
return res, err
}
if teamConvs.RateLimit != nil {
res.RateLimits = append(res.RateLimits, *teamConvs.RateLimit)
}

// Localize the conversations so we can find the conversation ID
teamConvsLocal, _, err := h.G().InboxSource.Localize(ctx, uid,
utils.RemoteConvs(teamConvs.Conversations), types.ConversationLocalizerBlocking)
if err != nil {
h.Debug(ctx, "JoinConversationLocal: failed to localize conversations: %s", err.Error())
return res, err
}

var convID chat1.ConversationID
for _, conv := range teamConvsLocal {
for _, conv := range convs {
topicName := utils.GetTopicName(conv)
if topicName != "" && topicName == arg.TopicName {
convID = conv.GetConvID()
Expand All @@ -1851,8 +1834,7 @@ func (h *Server) JoinConversationLocal(ctx context.Context, arg chat1.JoinConver
return res, fmt.Errorf("no topic name %s exists on specified team", arg.TopicName)
}

err = JoinConversation(ctx, h.G(), h.DebugLabeler, h.remoteClient, uid, convID)
if err != nil {
if err = JoinConversation(ctx, h.G(), h.DebugLabeler, h.remoteClient, uid, convID); err != nil {
return res, err
}
res.Offline = h.G().InboxSource.IsOffline(ctx)
Expand Down Expand Up @@ -1957,7 +1939,7 @@ func (h *Server) GetTLFConversationsLocal(ctx context.Context, arg chat1.GetTLFC
var identBreaks []keybase1.TLFIdentifyFailure
ctx = Context(ctx, h.G(), keybase1.TLFIdentifyBehavior_CHAT_GUI,
&identBreaks, h.identNotifier)
defer h.Trace(ctx, func() error { return err }, fmt.Sprintf("GetTLFConversations(%s)",
defer h.Trace(ctx, func() error { return err }, fmt.Sprintf("GetTLFConversationsLocal(%s)",
arg.TlfName))()
defer func() { err = h.handleOfflineError(ctx, err, &res) }()
defer func() { h.setResultRateLimit(ctx, &res) }()
Expand Down
4 changes: 2 additions & 2 deletions go/chat/server_test.go
Expand Up @@ -363,7 +363,7 @@ func (c *chatTestContext) as(t *testing.T, user *kbtest.FakeUser) *chatTestUserC
pushHandler := NewPushHandler(g)
pushHandler.SetClock(c.world.Fc)
g.PushHandler = pushHandler
g.TeamChannelSource = NewCachingTeamChannelSource(g, func() chat1.RemoteInterface { return ri })
g.TeamChannelSource = NewTeamChannelSource(g)
g.AttachmentURLSrv = types.DummyAttachmentHTTPSrv{}
g.ActivityNotifier = NewNotifyRouterActivityRouter(g)
g.Unfurler = types.DummyUnfurler{}
Expand Down Expand Up @@ -5657,7 +5657,7 @@ func TestChatSrvTeamChannelNameMentions(t *testing.T) {
TlfVisibility: keybase1.TLFVisibility_PRIVATE,
MembersType: chat1.ConversationMembersType_TEAM,
})
t.Logf("conv: %s chan: %s", conv.Id, channel.Conv.GetConvID())
t.Logf("conv: %s chan: %s, err: %v", conv.Id, channel.Conv.GetConvID(), err)
require.NoError(t, err)
consumeNewMsgRemote(t, listener1, chat1.MessageType_JOIN)
if index == 0 {
Expand Down
86 changes: 41 additions & 45 deletions go/chat/storage/inbox.go
Expand Up @@ -26,6 +26,12 @@ import (

const inboxVersion = 22

var defaultMemberStatusFilter = []chat1.ConversationMemberStatus{
chat1.ConversationMemberStatus_ACTIVE,
chat1.ConversationMemberStatus_PREVIEW,
chat1.ConversationMemberStatus_RESET,
}

type InboxFlushMode int

const (
Expand Down Expand Up @@ -474,12 +480,11 @@ func (i *Inbox) supersedersNotEmpty(ctx context.Context, superseders []chat1.Con
return false
}

func (i *Inbox) applyQuery(ctx context.Context, query *chat1.GetInboxQuery, rcs []types.RemoteConversation) []types.RemoteConversation {
func (i *Inbox) applyQuery(ctx context.Context, query *chat1.GetInboxQuery, rcs []types.RemoteConversation) (res []types.RemoteConversation) {
if query == nil {
query = &chat1.GetInboxQuery{}
}
var res []types.RemoteConversation
filtered := 0

var queryConvIDMap map[string]bool
if query.ConvID != nil {
query.ConvIDs = append(query.ConvIDs, *query.ConvID)
Expand All @@ -490,83 +495,74 @@ func (i *Inbox) applyQuery(ctx context.Context, query *chat1.GetInboxQuery, rcs
queryConvIDMap[c.String()] = true
}
}

queryMemberStatusMap := map[chat1.ConversationMemberStatus]bool{}
memberStatus := query.MemberStatus
// Default allowed member statuses
if len(memberStatus) == 0 {
memberStatus = defaultMemberStatusFilter
}
for _, memberStatus := range memberStatus {
queryMemberStatusMap[memberStatus] = true
}

queryStatusMap := map[chat1.ConversationStatus]bool{}
for _, status := range query.Status {
queryStatusMap[status] = true
}

for _, rc := range rcs {
ok := true
conv := rc.Conv

// Existence check
if conv.Metadata.Existence != chat1.ConversationExistence_ACTIVE {
ok = false
continue
}

// Member status check
switch conv.ReaderInfo.Status {
case chat1.ConversationMemberStatus_ACTIVE, chat1.ConversationMemberStatus_PREVIEW,
chat1.ConversationMemberStatus_RESET:
// only let these states through
default:
ok = false
if _, ok := queryMemberStatusMap[conv.ReaderInfo.Status]; !ok && len(memberStatus) > 0 {
continue
}
// Status check
if _, ok := queryStatusMap[conv.Metadata.Status]; !ok && len(query.Status) > 0 {
continue
}

// Basic checks
if queryConvIDMap != nil && !queryConvIDMap[conv.GetConvID().String()] {
filtered++
continue
}
if query.After != nil && !conv.ReaderInfo.Mtime.After(*query.After) {
ok = false
continue
}
if query.Before != nil && !conv.ReaderInfo.Mtime.Before(*query.Before) {
ok = false
continue
}
if query.TopicType != nil && *query.TopicType != conv.Metadata.IdTriple.TopicType {
ok = false
continue
}
if query.TlfVisibility != nil && *query.TlfVisibility != keybase1.TLFVisibility_ANY &&
*query.TlfVisibility != conv.Metadata.Visibility {
ok = false
continue
}
if query.UnreadOnly && !conv.IsUnread() {
ok = false
continue
}
if query.ReadOnly && conv.IsUnread() {
ok = false
continue
}
if query.TlfID != nil && !query.TlfID.Eq(conv.Metadata.IdTriple.Tlfid) {
ok = false
}

// Check to see if the conv status is in the query list
if len(query.Status) > 0 {
found := false
for _, s := range query.Status {
if s == conv.Metadata.Status {
found = true
break
}
}
if !found {
ok = false
}
continue
}

// If we are finalized and are superseded, then don't return this
if query.OneChatTypePerTLF == nil ||
(query.OneChatTypePerTLF != nil && *query.OneChatTypePerTLF) {
if conv.Metadata.FinalizeInfo != nil && len(conv.Metadata.SupersededBy) > 0 && len(query.ConvIDs) == 0 {
if i.supersedersNotEmpty(ctx, conv.Metadata.SupersededBy, rcs) {
ok = false
continue
}
}
}

if ok {
res = append(res, rc)
} else {
filtered++
}
res = append(res, rc)
}

filtered := len(rcs) - len(res)
i.Debug(ctx, "applyQuery: res size: %d filtered: %d", len(res), filtered)
return res
}
Expand Down
4 changes: 3 additions & 1 deletion go/chat/sync.go
Expand Up @@ -228,7 +228,9 @@ func (s *Syncer) shouldDoFullReloadFromIncremental(ctx context.Context, syncRes
return true
}
switch conv.ReaderInfo.Status {
case chat1.ConversationMemberStatus_LEFT, chat1.ConversationMemberStatus_REMOVED:
case chat1.ConversationMemberStatus_LEFT,
chat1.ConversationMemberStatus_REMOVED,
chat1.ConversationMemberStatus_NEVER_JOINED:
s.Debug(ctx, "shouldDoFullReloadFromIncremental: join or leave conv")
return true
}
Expand Down
22 changes: 21 additions & 1 deletion go/chat/sync_test.go
Expand Up @@ -8,6 +8,7 @@ import (
"github.com/keybase/client/go/chat/storage"
"github.com/keybase/client/go/chat/types"
"github.com/keybase/client/go/kbtest"
"github.com/keybase/client/go/libkb"
"github.com/keybase/client/go/protocol/chat1"
"github.com/keybase/client/go/protocol/gregor1"
"github.com/keybase/client/go/protocol/keybase1"
Expand Down Expand Up @@ -385,7 +386,8 @@ func TestSyncerNeverJoined(t *testing.T) {
require.NoError(t, err)
require.NoError(t, syncer.Sync(context.TODO(), ri, uid, &res.Chat))
}
ctx := Context(context.TODO(), g1, keybase1.TLFIdentifyBehavior_CHAT_GUI, nil, nil)

ctx := context.TODO()
doAuthedSync(ctx, g1, syncer1, ctc1.ri, uid1)
select {
case sres := <-listener1.inboxSynced:
Expand All @@ -394,11 +396,16 @@ func TestSyncerNeverJoined(t *testing.T) {
require.Equal(t, chat1.SyncInboxResType_INCREMENTAL, typ)
require.Len(t, sres.Incremental().Items, 2)
require.Equal(t, convID.String(), sres.Incremental().Items[0].Conv.ConvID)
require.Equal(t, chat1.ConversationMemberStatus_ACTIVE, sres.Incremental().Items[0].Conv.MemberStatus)
require.Equal(t, chanID.String(), sres.Incremental().Items[1].Conv.ConvID)
require.Equal(t, chat1.ConversationMemberStatus_ACTIVE, sres.Incremental().Items[1].Conv.MemberStatus)
case <-time.After(20 * time.Second):
require.Fail(t, "no inbox synced received")
}

// simulate an old client that doesn't understand NEVER_JOINED
libkb.UserAgent = "old:ua:2.12.1"
ctx = Context(context.TODO(), g1, keybase1.TLFIdentifyBehavior_CHAT_GUI, nil, nil)
doAuthedSync(ctx, g2, syncer2, ctc2.ri, uid2)
select {
case sres := <-listener2.inboxSynced:
Expand All @@ -407,6 +414,19 @@ func TestSyncerNeverJoined(t *testing.T) {
require.Equal(t, chat1.SyncInboxResType_INCREMENTAL, typ)
require.Len(t, sres.Incremental().Items, 1)
require.Equal(t, convID.String(), sres.Incremental().Items[0].Conv.ConvID)
require.Equal(t, chat1.ConversationMemberStatus_ACTIVE, sres.Incremental().Items[0].Conv.MemberStatus)
case <-time.After(20 * time.Second):
require.Fail(t, "no inbox synced received")
}

// New clients get a CLEAR here.
ctx = context.TODO()
doAuthedSync(ctx, g2, syncer2, ctc2.ri, uid2)
select {
case sres := <-listener2.inboxSynced:
typ, err := sres.SyncType()
require.NoError(t, err)
require.Equal(t, chat1.SyncInboxResType_CLEAR, typ)
case <-time.After(20 * time.Second):
require.Fail(t, "no inbox synced received")
}
Expand Down

0 comments on commit 5f1a984

Please sign in to comment.