Skip to content

Commit

Permalink
Protect against dropped databases
Browse files Browse the repository at this point in the history
Helps fix the worst of #448
  • Loading branch information
kegsay committed Jun 4, 2024
1 parent 5da6865 commit 3c2275a
Showing 1 changed file with 25 additions and 4 deletions.
29 changes: 25 additions & 4 deletions sync3/extensions/todevice.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,30 @@ func (r *ToDeviceRequest) ProcessInitial(ctx context.Context, res *Response, ext
r.Limit = 100 // default to 100
}
l := logger.With().Str("user", extCtx.UserID).Str("device", extCtx.DeviceID).Logger()

mapMu.Lock()
lastSentPos, exists := deviceIDToSinceDebugOnly[extCtx.DeviceID]
internal.Logf(ctx, "to_device", "since=%v limit=%v last_sent=%v", r.Since, r.Limit, lastSentPos)
isFirstRequest := !exists
mapMu.Unlock()

// If this is the first time we've seen this device ID since starting up, ignore the client-provided 'since'
// value. This is done to protect against dropped postgres sequences. Consider:
// - 5 to-device messages arrive for Alice
// - Alice requests all messages, gets them and acks them so since=5, and the nextval() sequence is 6.
// - the server admin drops the DB and starts over again. The DB sequence starts back at 1.
// - 2 to-device messages arrive for Alice
// - Alice requests messages from since=5. No messages are returned as the 2 new messages have a lower sequence number.
// - Even worse, those 2 messages are deleted because sending since=5 ACKNOWLEDGES all messages <=5.
// By ignoring the first `since` on startup, we effectively force the client into sending since=0. In this scenario,
// it will then A) not delete anything as since=0 acknowledges nothing, B) return the 2 to-device events.
//
// The cost to this is that it is possible to send duplicate to-device events if the server restarts before the client
// has time to send the ACK to the server. This isn't fatal as clients do suppress duplicate to-device events.
if isFirstRequest {
r.Since = ""
}

var from int64
var err error
if r.Since != "" {
Expand All @@ -82,10 +106,7 @@ func (r *ToDeviceRequest) ProcessInitial(ctx context.Context, res *Response, ext
internal.GetSentryHubFromContextOrDefault(ctx).CaptureException(err)
}
}
mapMu.Lock()
lastSentPos := deviceIDToSinceDebugOnly[extCtx.DeviceID]
internal.Logf(ctx, "to_device", "since=%v limit=%v last_sent=%v", r.Since, r.Limit, lastSentPos)
mapMu.Unlock()

if from < lastSentPos {
// we told the client about a newer position, but yet they are using an older position, yell loudly
l.Warn().Int64("last_sent", lastSentPos).Int64("recv", from).Bool("initial", extCtx.IsInitial).Msg(
Expand Down

0 comments on commit 3c2275a

Please sign in to comment.