diff --git a/packages/sanity/src/core/store/_legacy/document/createPairListener.ts b/packages/sanity/src/core/store/_legacy/document/createPairListener.ts index aa55c923017..722616e040e 100644 --- a/packages/sanity/src/core/store/_legacy/document/createPairListener.ts +++ b/packages/sanity/src/core/store/_legacy/document/createPairListener.ts @@ -2,8 +2,9 @@ import {type SanityClient} from '@sanity/client' import {type SanityDocument} from '@sanity/types' import {groupBy} from 'lodash' -import {defer, type Observable, of as observableOf} from 'rxjs' -import {concatMap, map, mergeMap, scan} from 'rxjs/operators' +import LRU from 'quick-lru' +import {defer, merge, type Observable, of as observableOf, skip, timer} from 'rxjs' +import {concatMap, delay, filter, map, mergeMap, scan, share, take, takeUntil} from 'rxjs/operators' import { type IdPair, @@ -58,6 +59,18 @@ function allPendingTransactionEventsReceived(listenerEvents: ListenerEvent[]) { ) } +/** How long to wait after the first connection is set up to start the exhange */ +const EXCHANGE_WAIT_MIN = 1000 * 60 * 12 +const EXCHANGE_WAIT_MAX = 1000 * 20 * 19 + +/** How long should the overlap between the two listeners be */ +const EXCHANGE_OVERLAP_TIME = 1000 * 20 + +/** Add some randomness to the exchange delay to avoid thundering herd */ +function getExhangeWait() { + return Math.floor(Math.random() * (EXCHANGE_WAIT_MAX - EXCHANGE_WAIT_MIN) + EXCHANGE_WAIT_MIN) +} + /** @internal */ export function createPairListener( client: SanityClient, @@ -65,22 +78,7 @@ export function createPairListener( options: PairListenerOptions = {}, ): Observable { const {publishedId, draftId} = idPair - return defer( - () => - client.observable.listen( - `*[_id == $publishedId || _id == $draftId]`, - { - publishedId, - draftId, - }, - { - includeResult: false, - events: ['welcome', 'mutation', 'reconnect'], - effectFormat: 'mendoza', - tag: options.tag || 'document.pair-listener', - }, - ) as Observable, - ).pipe( + return _createRelayPairListener(client, idPair, options).pipe( concatMap((event) => event.type === 'welcome' ? fetchInitialDocumentSnapshots().pipe( @@ -138,6 +136,82 @@ export function createPairListener( } } +type ClientListenerEvent = WelcomeEvent | MutationEvent | ReconnectEvent + +function _createRelayPairListener( + client: SanityClient, + idPair: IdPair, + options: PairListenerOptions = {}, +): Observable { + const {publishedId, draftId} = idPair + + const currentLeg = defer( + () => + client.observable.listen( + `*[_id == $publishedId || _id == $draftId]`, + { + publishedId, + draftId, + }, + { + includeResult: false, + events: ['welcome', 'mutation', 'reconnect'], + effectFormat: 'mendoza', + tag: options.tag || 'document.pair-listener', + }, + ) as Observable, + ) + + // This represents the next leg, and will be started after a certain delay + const nextLeg = timer(getExhangeWait()).pipe( + mergeMap(() => _createRelayPairListener(client, idPair, options)), + share(), + ) + + // Merge current leg with next leg + return merge( + // take from currentLeg until we get 'welcome' from next leg, plus a little overlap in time + currentLeg.pipe( + takeUntil( + nextLeg.pipe( + filter((e) => e.type === 'welcome'), + take(1), + delay(EXCHANGE_OVERLAP_TIME), + ), + ), + ), + nextLeg.pipe(skip(1)), + ).pipe(distinctByTransactionId()) +} + +/** + * Operator function that operates on an observable of listener events that may include events with the duplicate transaction IDs + * and returns a new stream filtering out duplicates + */ +function distinctByTransactionId() { + return (input$: Observable) => + input$.pipe( + scan( + ( + [seen]: [LRU, ClientListenerEvent | null], + event, + ): [LRU, ClientListenerEvent | null] => { + if (event.type !== 'mutation') { + return [seen, event] + } + if (seen.has(event.transactionId)) { + return [seen, null] + } + seen.set(event.transactionId, true) + return [seen, event] + }, + [new LRU({maxSize: 1000}), null], + ), + map(([_, event]) => event), + filter((event): event is ClientListenerEvent => event !== null), + ) +} + function createSnapshotEvent( documentId: string, document: null | SanityDocument,