diff --git a/sync3/extensions/todevice.go b/sync3/extensions/todevice.go index 951177b3..430c3da5 100644 --- a/sync3/extensions/todevice.go +++ b/sync3/extensions/todevice.go @@ -65,6 +65,30 @@ func (r *ToDeviceRequest) ProcessInitial(ctx context.Context, res *Response, ext r.Limit = 100 // default to 100 } l := logger.With().Str("user", extCtx.UserID).Str("device", extCtx.DeviceID).Logger() + + mapMu.Lock() + lastSentPos, exists := deviceIDToSinceDebugOnly[extCtx.DeviceID] + internal.Logf(ctx, "to_device", "since=%v limit=%v last_sent=%v", r.Since, r.Limit, lastSentPos) + isFirstRequest := !exists + mapMu.Unlock() + + // If this is the first time we've seen this device ID since starting up, ignore the client-provided 'since' + // value. This is done to protect against dropped postgres sequences. Consider: + // - 5 to-device messages arrive for Alice + // - Alice requests all messages, gets them and acks them so since=5, and the nextval() sequence is 6. + // - the server admin drops the DB and starts over again. The DB sequence starts back at 1. + // - 2 to-device messages arrive for Alice + // - Alice requests messages from since=5. No messages are returned as the 2 new messages have a lower sequence number. + // - Even worse, those 2 messages are deleted because sending since=5 ACKNOWLEDGES all messages <=5. + // By ignoring the first `since` on startup, we effectively force the client into sending since=0. In this scenario, + // it will then A) not delete anything as since=0 acknowledges nothing, B) return the 2 to-device events. + // + // The cost to this is that it is possible to send duplicate to-device events if the server restarts before the client + // has time to send the ACK to the server. This isn't fatal as clients do suppress duplicate to-device events. + if isFirstRequest { + r.Since = "" + } + var from int64 var err error if r.Since != "" { @@ -82,10 +106,7 @@ func (r *ToDeviceRequest) ProcessInitial(ctx context.Context, res *Response, ext internal.GetSentryHubFromContextOrDefault(ctx).CaptureException(err) } } - mapMu.Lock() - lastSentPos := deviceIDToSinceDebugOnly[extCtx.DeviceID] - internal.Logf(ctx, "to_device", "since=%v limit=%v last_sent=%v", r.Since, r.Limit, lastSentPos) - mapMu.Unlock() + if from < lastSentPos { // we told the client about a newer position, but yet they are using an older position, yell loudly l.Warn().Int64("last_sent", lastSentPos).Int64("recv", from).Bool("initial", extCtx.IsInitial).Msg(