Skip to content

Commit

Permalink
subject startmsgid to same constraints as anything else (#16459)
Browse files Browse the repository at this point in the history
  • Loading branch information
mmaxim committed Mar 7, 2019
1 parent d82e743 commit e27d841
Show file tree
Hide file tree
Showing 3 changed files with 70 additions and 41 deletions.
21 changes: 17 additions & 4 deletions go/chat/flip/dealer.go
Expand Up @@ -170,9 +170,7 @@ func (v GameMessageV1) Encode() (GameMessageEncoded, error) {
func (d *Dealer) run(ctx context.Context, game *Game) {
doneCh := make(chan error)
key := game.key
go func() {
doneCh <- game.run(ctx)
}()
go game.run(ctx, doneCh)
err := <-doneCh

if err != nil {
Expand Down Expand Up @@ -584,7 +582,7 @@ func (g *Game) handleTimerEvent(ctx context.Context) error {
return TimeoutError{G: g.md, Stage: g.stageForTimeout}
}

func (g *Game) run(ctx context.Context) error {
func (g *Game) runMain(ctx context.Context) error {
for {
timer := g.getNextTimer()
var err error
Expand Down Expand Up @@ -612,6 +610,21 @@ func (g *Game) run(ctx context.Context) error {
}
}

func (g *Game) runDrain(ctx context.Context) {
i := 0
for range g.msgCh {
i++
}
if i > 0 {
g.clogf(ctx, "drained %d messages on shutdown in game %s", i, g.md)
}
}

func (g *Game) run(ctx context.Context, doneCh chan error) {
doneCh <- g.runMain(ctx)
g.runDrain(ctx)
}

func absDuration(d time.Duration) time.Duration {
if d < time.Duration(0) {
return time.Duration(-1) * d
Expand Down
84 changes: 48 additions & 36 deletions go/chat/flipmanager.go
Expand Up @@ -632,26 +632,32 @@ func (m *FlipManager) handleUpdate(ctx context.Context, update flip.GameStateUpd
GameID: gameID.String(),
}
}

switch {
case update.Err != nil:
m.Debug(ctx, "handleUpdate: error received")
status.Phase = chat1.UICoinFlipPhase_ERROR
status.ProgressText = fmt.Sprintf("Something went wrong: %s", update.Err)
formatted := m.formatError(ctx, update.Err)
status.ErrorInfo = &formatted
case update.Commitment != nil:
m.Debug(ctx, "handleUpdate: commit received")
// Only care about these while we are in the commitment phase
if status.Phase == chat1.UICoinFlipPhase_COMMITMENT {
status.ErrorInfo = nil
status.Phase = chat1.UICoinFlipPhase_COMMITMENT
m.addParticipant(ctx, &status, *update.Commitment)
}
case update.CommitmentComplete != nil:
m.Debug(ctx, "handleUpdate: complete received")
status.ErrorInfo = nil
status.Phase = chat1.UICoinFlipPhase_REVEALS
m.finalizeParticipants(ctx, &status, *update.CommitmentComplete)
case update.Reveal != nil:
m.Debug(ctx, "handleUpdate: reveal received")
m.addReveal(ctx, &status, *update.Reveal)
case update.Result != nil:
m.Debug(ctx, "handleUpdate: result received")
status.Phase = chat1.UICoinFlipPhase_COMPLETE
status.ErrorInfo = nil
m.addResult(ctx, &status, *update.Result, update.Metadata.ConversationID)
Expand Down Expand Up @@ -1104,12 +1110,7 @@ func (m *FlipManager) updateActiveGame(ctx context.Context, uid gregor1.UID, con
}
}
}()
if m.isStartMsgID(nextMsg.GetMessageID()) {
// if this is a start msg, then just send it in
m.Debug(ctx, "updateActiveGame: starting new game: convID: %s gameID: %s", convID, gameID)
return nil
}
m.Debug(ctx, "updateActiveGame: convID: %s gameID: %s nextMsgID: %d", convID, gameID,
m.Debug(ctx, "updateActiveGame: uid: %s convID: %s gameID: %s nextMsgID: %d", uid, convID, gameID,
nextMsg.GetMessageID())
// Get current msg ID of the game if we know about it
var msgIDStart chat1.MessageID
Expand All @@ -1119,12 +1120,17 @@ func (m *FlipManager) updateActiveGame(ctx context.Context, uid gregor1.UID, con
m.Debug(ctx, "updateActiveGame: truly incremental update, injecting...")
return nil
} else if nextMsg.GetMessageID() <= storedMsgID {
m.Debug(ctx, "updateActiveGame: update from the past, ignoring")
m.Debug(ctx, "updateActiveGame: update from the past, ignoring: stored: %d", storedMsgID)
return errors.New("update from the past")
}
m.Debug(ctx, "updateActiveGame: gapped update: storedMsgID: %d", storedMsgID)
msgIDStart = storedMsgID
} else {
if m.isStartMsgID(nextMsg.GetMessageID()) {
// if this is a start msg, then just send it in
m.Debug(ctx, "updateActiveGame: starting new game: convID: %s gameID: %s", convID, gameID)
return nil
}
m.Debug(ctx, "updateActiveGame: unknown game, setting start to 0")
}
// Otherwise, grab the thread and inject everything that has happened so far
Expand Down Expand Up @@ -1159,14 +1165,16 @@ func (m *FlipManager) MaybeInjectFlipMessage(ctx context.Context, boxedMsg chat1
return false
}
defer m.Trace(ctx, func() error { return nil }, "MaybeInjectFlipMessage: uid: %s convID: %s", uid, convID)()
lock := m.injectLockTab.AcquireOnName(ctx, m.G(), convID.String())
defer lock.Release(ctx)

// Update inbox for this guy
if err := m.G().InboxSource.UpdateInboxVersion(ctx, uid, inboxVers); err != nil {
m.Debug(ctx, "MaybeInjectFlipMessage: failed to update inbox version: %s", err)
// charge forward here, we will figure it out
}
if err := storage.New(m.G(), nil).SetMaxMsgID(ctx, convID, uid, boxedMsg.GetMessageID()); err != nil {
m.Debug(ctx, "MaybeInjectFlipMessage: failed to write max msgid: %s", err)
// charge forward from this error
}
// Unbox the message
conv, err := utils.GetUnverifiedConv(ctx, m.G(), uid, convID, types.InboxSourceDataSourceAll)
if err != nil {
Expand All @@ -1178,38 +1186,42 @@ func (m *FlipManager) MaybeInjectFlipMessage(ctx context.Context, boxedMsg chat1
m.Debug(ctx, "MaybeInjectFlipMessage: failed to unbox: %s", err)
return true
}
if err := storage.New(m.G(), nil).SetMaxMsgID(ctx, convID, uid, msg.GetMessageID()); err != nil {
m.Debug(ctx, "MaybeInjectFlipMessage: failed to write max msgid: %s", err)
// charge forward from this error
}
body := msg.Valid().MessageBody
if !body.IsType(chat1.MessageType_FLIP) {
m.Debug(ctx, "MaybeInjectFlipMessage: bogus flip message with a non-flip body")
return true
}
// Ignore anything from the current device
if m.Me().Eq(flip.UserDevice{
U: msg.Valid().ClientHeader.Sender,
D: msg.Valid().ClientHeader.SenderDevice,
}) {
// If this is our own message, then we need to make sure to update the msgID of the flip
m.gameMsgIDs.Add(body.Flip().GameID.String(), msg.GetMessageID())
return true
}
// Check to see if we are going to participate from this inject
hmi, err := m.getHostMessageInfo(ctx, convID)
if err != nil {
m.Debug(ctx, "MaybeInjectFlipMessage: failed to get host message info: %s", err)
return true
}
if m.shouldIgnoreInject(ctx, hmi.ConvID, convID, body.Flip().GameID) {
m.Debug(ctx, "MaybeInjectFlipMessage: ignored flip message")
return true
}
// Check to see if the game is unknown, and if so, then rebuild and see what we can do
if err := m.updateActiveGame(ctx, uid, convID, hmi.ConvID, msg, body.Flip().GameID); err != nil {
m.Debug(ctx, "MaybeInjectFlipMessage: failed to rebuild non-active game: %s", err)
}
go func(ctx context.Context) {
defer m.Trace(ctx, func() error { return nil },
"MaybeInjectFlipMessage(goroutine): uid: %s convID: %s", uid, convID)()
lock := m.injectLockTab.AcquireOnName(ctx, m.G(), convID.String())
defer lock.Release(ctx)

// Ignore anything from the current device
if m.Me().Eq(flip.UserDevice{
U: msg.Valid().ClientHeader.Sender,
D: msg.Valid().ClientHeader.SenderDevice,
}) {
// If this is our own message, then we need to make sure to update the msgID of the flip
m.Debug(ctx, "MaybeInjectFlipMessage: own message, updating stored and exiting")
m.gameMsgIDs.Add(body.Flip().GameID.String(), msg.GetMessageID())
return
}
// Check to see if we are going to participate from this inject
hmi, err := m.getHostMessageInfo(ctx, convID)
if err != nil {
m.Debug(ctx, "MaybeInjectFlipMessage: failed to get host message info: %s", err)
return
}
if m.shouldIgnoreInject(ctx, hmi.ConvID, convID, body.Flip().GameID) {
m.Debug(ctx, "MaybeInjectFlipMessage: ignored flip message")
return
}
// Check to see if the game is unknown, and if so, then rebuild and see what we can do
if err := m.updateActiveGame(ctx, uid, convID, hmi.ConvID, msg, body.Flip().GameID); err != nil {
m.Debug(ctx, "MaybeInjectFlipMessage: failed to rebuild non-active game: %s", err)
}
}(BackgroundContext(ctx, m.G()))
return true
}

Expand Down
6 changes: 5 additions & 1 deletion go/chat/flipmanager_test.go
Expand Up @@ -68,6 +68,9 @@ func TestFlipManagerStartFlip(t *testing.T) {
ctc.as(t, users[1]).h.G().NotifyRouter.AddListener(listener1)
ctc.as(t, users[2]).h.G().NotifyRouter.AddListener(listener2)

t.Logf("uid0: %s", users[0].GetUID())
t.Logf("uid1: %s", users[1].GetUID())
t.Logf("uid2: %s", users[2].GetUID())
conv := mustCreateConversationForTest(t, ctc, users[0], chat1.TopicType_CHAT,
mt, ctc.as(t, users[1]).user(), ctc.as(t, users[2]).user())
consumeNewConversation(t, listener0, conv.Id)
Expand Down Expand Up @@ -112,7 +115,7 @@ func TestFlipManagerStartFlip(t *testing.T) {
consumeNewMsgRemote(t, listener2, chat1.MessageType_FLIP)
res0 = consumeFlipToResult(t, ui0, listener0, numUsers)
found := false
t.Logf("res0 (range): %s", res0)
t.Logf("res0 (limit): %s", res0)
for i := 1; i <= 10; i++ {
if res0 == fmt.Sprintf("%d", i) {
found = true
Expand All @@ -135,6 +138,7 @@ func TestFlipManagerStartFlip(t *testing.T) {
consumeNewMsgRemote(t, listener1, chat1.MessageType_FLIP)
consumeNewMsgRemote(t, listener2, chat1.MessageType_FLIP)
res0 = consumeFlipToResult(t, ui0, listener0, numUsers)
t.Logf("res0 (range): %s", res0)
found = false
for i := 10; i <= 15; i++ {
if res0 == fmt.Sprintf("%d", i) {
Expand Down

0 comments on commit e27d841

Please sign in to comment.