From 3cdc78a3590ab5a08613bc80928f23726d6a96bd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bj=C3=B8rge=20N=C3=A6ss?= Date: Fri, 3 May 2024 13:45:27 +0200 Subject: [PATCH] fix(core): implement listener connection relay to avoid abrupt reconnects --- .../__tests__/createPairListener.test.ts | 138 ++++++++++++++++++ .../_legacy/document/createPairListener.ts | 132 ++++++++++++++--- .../document/document-pair/checkoutPair.ts | 10 +- 3 files changed, 260 insertions(+), 20 deletions(-) create mode 100644 packages/sanity/src/core/store/_legacy/document/__tests__/createPairListener.test.ts diff --git a/packages/sanity/src/core/store/_legacy/document/__tests__/createPairListener.test.ts b/packages/sanity/src/core/store/_legacy/document/__tests__/createPairListener.test.ts new file mode 100644 index 000000000000..50a1ce5c82a4 --- /dev/null +++ b/packages/sanity/src/core/store/_legacy/document/__tests__/createPairListener.test.ts @@ -0,0 +1,138 @@ +/* eslint-disable max-nested-callbacks */ +import {describe, expect, it, jest} from '@jest/globals' +import {type SanityClient} from '@sanity/client' +import {EMPTY, from, lastValueFrom, NEVER, Observable, of, timer} from 'rxjs' +import {concatMap, delay, map, takeUntil, tap, toArray} from 'rxjs/operators' + +import {createPairListener} from '../createPairListener' + +describe('createPairListener', () => { + it('properly sets up and discards overlapping listeners', async () => { + const unsubscribe = jest.fn() + const listen = jest.fn(() => { + return new Observable((subscriber) => { + subscriber.next({type: 'welcome'}) + return unsubscribe + }).pipe( + // todo: figure out why a delay is needed here + delay(1), + ) + }) + + const getDocuments = jest.fn((ids: string[]) => of(ids.map((id) => ({_id: id})))) + + const mockedClient = { + observable: { + listen, + getDocuments, + }, + } as unknown as SanityClient + + const listener = createPairListener( + mockedClient, + {publishedId: 'foo', draftId: 'drafts.bar'}, + {relay: {exchangeWaitMin: 50, exchangeWaitMax: 50, exchangeOverLapTime: 10}}, + ) + const events = await lastValueFrom( + listener.pipe( + takeUntil( + // We'll subscribe for a little more than 20ms, which means the first leg should be done and the second leg should be started + timer(80).pipe( + tap(() => { + // at this moment unsubscribe should have been called on the first listener leg + expect(unsubscribe).toHaveBeenCalledTimes(1) + }), + ), + ), + toArray(), + ), + ) + + expect(listen).toHaveBeenCalledTimes(2) + expect(unsubscribe).toHaveBeenCalledTimes(2) + expect(getDocuments).toHaveBeenCalledTimes(1) + + expect(events).toEqual([ + { + type: 'snapshot', + document: {_id: 'drafts.bar'}, + documentId: 'drafts.bar', + }, + { + type: 'snapshot', + document: {_id: 'foo'}, + documentId: 'foo', + }, + ]) + }) + + it('dedupes any listener events sent in the overlapping period', async () => { + const unsubscribe = jest.fn() + + let listenerNo = 0 + const listen = jest.fn(() => { + listenerNo++ + return from([ + {type: 'welcome'}, + {type: 'mutation', transactionId: `one-${listenerNo}`}, + {type: 'mutation', transactionId: 'dupe'}, + ]).pipe(concatMap((ev) => timer(1).pipe(map(() => ev)))) + }) + + const mockedClient = { + observable: { + listen, + getDocuments: (ids: string[]) => of(ids.map((id) => ({_id: id}))), + }, + } as unknown as SanityClient + + const listener = createPairListener( + mockedClient, + {publishedId: 'foo', draftId: 'drafts.bar'}, + {relay: {exchangeWaitMin: 30, exchangeWaitMax: 30, exchangeOverLapTime: 2}}, + ) + const events = await lastValueFrom(listener.pipe(takeUntil(timer(55)), toArray())) + + expect(listen).toHaveBeenCalledTimes(2) + expect(unsubscribe).toHaveBeenCalledTimes(0) + + expect(events).toEqual([ + { + document: {_id: 'drafts.bar'}, + documentId: 'drafts.bar', + type: 'snapshot', + }, + { + document: {_id: 'foo'}, + documentId: 'foo', + type: 'snapshot', + }, + {transactionId: 'one-1', type: 'mutation'}, + {transactionId: 'dupe', type: 'mutation'}, + {transactionId: 'one-2', type: 'mutation'}, + ]) + }) + + it('avoids subscribing to the next listener before the first listener has received a welcome event, even if exchange interval has passed', async () => { + const listen = jest.fn(() => NEVER) + const getDocuments = jest.fn(() => EMPTY) + + const mockedClient = { + observable: { + listen, + getDocuments, + }, + } as unknown as SanityClient + + const listener = createPairListener( + mockedClient, + {publishedId: 'foo', draftId: 'drafts.bar'}, + {relay: {exchangeWaitMin: 10, exchangeWaitMax: 10, exchangeOverLapTime: 2}}, + ) + + const events = await lastValueFrom(listener.pipe(takeUntil(timer(30)), toArray())) + expect(listen).toHaveBeenCalledTimes(1) + expect(getDocuments).toHaveBeenCalledTimes(0) + expect(events).toEqual([]) + }) +}) diff --git a/packages/sanity/src/core/store/_legacy/document/createPairListener.ts b/packages/sanity/src/core/store/_legacy/document/createPairListener.ts index aa55c9230178..0e51b60a4294 100644 --- a/packages/sanity/src/core/store/_legacy/document/createPairListener.ts +++ b/packages/sanity/src/core/store/_legacy/document/createPairListener.ts @@ -2,8 +2,20 @@ import {type SanityClient} from '@sanity/client' import {type SanityDocument} from '@sanity/types' import {groupBy} from 'lodash' -import {defer, type Observable, of as observableOf} from 'rxjs' -import {concatMap, map, mergeMap, scan} from 'rxjs/operators' +import LRU from 'quick-lru' +import {defer, merge, type Observable, of as observableOf, timer} from 'rxjs' +import { + concatMap, + delay, + filter, + map, + mergeMap, + scan, + share, + switchMap, + take, + takeUntil, +} from 'rxjs/operators' import { type IdPair, @@ -25,9 +37,16 @@ export interface InitialSnapshotEvent { document: SanityDocument | null } +interface RelayOptions { + exchangeWaitMin: number + exchangeWaitMax: number + exchangeOverLapTime: number +} + /** @internal */ export interface PairListenerOptions { tag?: string + relay: RelayOptions } /** @internal */ @@ -58,29 +77,19 @@ function allPendingTransactionEventsReceived(listenerEvents: ListenerEvent[]) { ) } +/** Add some randomness to the exchange delay to avoid thundering herd */ +function getExchangeWait(min: number, max: number) { + return min + Math.floor(Math.random() * (max - min)) +} + /** @internal */ export function createPairListener( client: SanityClient, idPair: IdPair, - options: PairListenerOptions = {}, + options?: PairListenerOptions, ): Observable { const {publishedId, draftId} = idPair - return defer( - () => - client.observable.listen( - `*[_id == $publishedId || _id == $draftId]`, - { - publishedId, - draftId, - }, - { - includeResult: false, - events: ['welcome', 'mutation', 'reconnect'], - effectFormat: 'mendoza', - tag: options.tag || 'document.pair-listener', - }, - ) as Observable, - ).pipe( + return createRelayPairListener(client, idPair, options).pipe( concatMap((event) => event.type === 'welcome' ? fetchInitialDocumentSnapshots().pipe( @@ -138,6 +147,91 @@ export function createPairListener( } } +type ClientListenerEvent = WelcomeEvent | MutationEvent | ReconnectEvent + +function createRelayPairListener( + client: SanityClient, + idPair: IdPair, + options?: PairListenerOptions, +): Observable { + const {publishedId, draftId} = idPair + const currentLeg = defer( + () => + client.observable.listen( + `*[_id == $publishedId || _id == $draftId]`, + {publishedId, draftId}, + { + includeResult: false, + events: ['welcome', 'mutation', 'reconnect'], + effectFormat: 'mendoza', + tag: options?.tag || 'document.pair-listener', + }, + ) as Observable, + ).pipe(share()) + + if (!options?.relay) { + return currentLeg + } + + const {exchangeWaitMin, exchangeWaitMax, exchangeOverLapTime} = options.relay + + // This represents the next leg, and will be started after a certain delay + const nextLeg = currentLeg.pipe( + // make sure we are connected to the current leg before scheduling the next one. This prevents build-up in case it takes a while to connect + filter((ev) => ev.type === 'welcome'), + // current listener may still get disconnected and reconnect, + // so in case we receive a new welcome event we should cancel the previously scheduled next leg + switchMap(() => timer(getExchangeWait(exchangeWaitMin, exchangeWaitMax))), + take(1), + mergeMap(() => createRelayPairListener(client, idPair, options)), + share(), + ) + + // Merge messages from current leg with next leg into a single stream + return merge( + // take from currentLeg until we get the first 'welcome' from next leg, plus adding little overlap in time + currentLeg.pipe( + takeUntil( + nextLeg.pipe( + filter((e) => e.type === 'welcome'), + take(1), + delay(exchangeOverLapTime), + ), + ), + ), + // ignore the first welcome event from the next leg. + nextLeg.pipe(filter((e, index) => e.type !== 'welcome' || index > 0)), + ).pipe(distinctByTransactionId()) +} + +/** + * Operator function that takes a stream of listener events + * and returns a new stream that filters out events sharing the same transaction ID + */ +function distinctByTransactionId() { + return (input$: Observable) => + input$.pipe( + scan( + ( + [seen]: [LRU, ClientListenerEvent | null], + event, + ): [LRU, ClientListenerEvent | null] => { + if (event.type !== 'mutation') { + return [seen, event] + } + if (seen.has(event.transactionId)) { + return [seen, null] + } + seen.set(event.transactionId, true) + return [seen, event] + }, + [new LRU({maxSize: 1000}), null], + ), + map(([_, event]) => event), + filter((event): event is ClientListenerEvent => event !== null), + ) +} + function createSnapshotEvent( documentId: string, document: null | SanityDocument, diff --git a/packages/sanity/src/core/store/_legacy/document/document-pair/checkoutPair.ts b/packages/sanity/src/core/store/_legacy/document/document-pair/checkoutPair.ts index d8bccd7d2a92..2a13d2001df6 100644 --- a/packages/sanity/src/core/store/_legacy/document/document-pair/checkoutPair.ts +++ b/packages/sanity/src/core/store/_legacy/document/document-pair/checkoutPair.ts @@ -192,6 +192,14 @@ function submitCommitRequest( ) } +const RELAY_CONFIG = { + /** How long to wait after the first connection is set up to start the exchange */ + exchangeWaitMin: 1000 * 60 * 12, + exchangeWaitMax: 1000 * 60 * 19, + /** How long should the overlap between the two listeners be */ + exchangeOverLapTime: 1000 * 20, +} + /** @internal */ export function checkoutPair( client: SanityClient, @@ -201,7 +209,7 @@ export function checkoutPair( const {publishedId, draftId} = idPair const listenerEventsConnector = new Subject() - const listenerEvents$ = createPairListener(client, idPair).pipe( + const listenerEvents$ = createPairListener(client, idPair, {relay: RELAY_CONFIG}).pipe( share({connector: () => listenerEventsConnector}), )