Skip to content

Commit

Permalink
fix(core): implement listener connection relay to avoid abrupt reconn…
Browse files Browse the repository at this point in the history
…ects
  • Loading branch information
bjoerge committed May 3, 2024
1 parent 424a928 commit 8098363
Showing 1 changed file with 92 additions and 18 deletions.
110 changes: 92 additions & 18 deletions packages/sanity/src/core/store/_legacy/document/createPairListener.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,9 @@
import {type SanityClient} from '@sanity/client'
import {type SanityDocument} from '@sanity/types'
import {groupBy} from 'lodash'
import {defer, type Observable, of as observableOf} from 'rxjs'
import {concatMap, map, mergeMap, scan} from 'rxjs/operators'
import LRU from 'quick-lru'
import {defer, merge, type Observable, of as observableOf, skip, timer} from 'rxjs'
import {concatMap, delay, filter, map, mergeMap, scan, share, take, takeUntil} from 'rxjs/operators'

import {
type IdPair,
Expand Down Expand Up @@ -58,29 +59,26 @@ function allPendingTransactionEventsReceived(listenerEvents: ListenerEvent[]) {
)
}

/** How long to wait after the first connection is set up to start the exhange */
const EXCHANGE_WAIT_MIN = 1000 * 60 * 12
const EXCHANGE_WAIT_MAX = 1000 * 20 * 19

/** How long should the overlap between the two listeners be */
const EXCHANGE_OVERLAP_TIME = 1000 * 20

/** Add some randomness to the exchange delay to avoid thundering herd */
function getExhangeWait() {
return Math.floor(Math.random() * (EXCHANGE_WAIT_MAX - EXCHANGE_WAIT_MIN) + EXCHANGE_WAIT_MIN)
}

/** @internal */
export function createPairListener(
client: SanityClient,
idPair: IdPair,
options: PairListenerOptions = {},
): Observable<ListenerEvent> {
const {publishedId, draftId} = idPair
return defer(
() =>
client.observable.listen(
`*[_id == $publishedId || _id == $draftId]`,
{
publishedId,
draftId,
},
{
includeResult: false,
events: ['welcome', 'mutation', 'reconnect'],
effectFormat: 'mendoza',
tag: options.tag || 'document.pair-listener',
},
) as Observable<WelcomeEvent | MutationEvent | ReconnectEvent>,
).pipe(
return _createRelayPairListener(client, idPair, options).pipe(
concatMap((event) =>
event.type === 'welcome'
? fetchInitialDocumentSnapshots().pipe(
Expand Down Expand Up @@ -138,6 +136,82 @@ export function createPairListener(
}
}

type ClientListenerEvent = WelcomeEvent | MutationEvent | ReconnectEvent

function _createRelayPairListener(
client: SanityClient,
idPair: IdPair,
options: PairListenerOptions = {},
): Observable<ClientListenerEvent> {
const {publishedId, draftId} = idPair

const currentLeg = defer(
() =>
client.observable.listen(
`*[_id == $publishedId || _id == $draftId]`,
{
publishedId,
draftId,
},
{
includeResult: false,
events: ['welcome', 'mutation', 'reconnect'],
effectFormat: 'mendoza',
tag: options.tag || 'document.pair-listener',
},
) as Observable<ClientListenerEvent>,
)

// This represents the next leg, and will be started after a certain delay
const nextLeg = timer(getExhangeWait()).pipe(
mergeMap(() => _createRelayPairListener(client, idPair, options)),
share(),
)

// Merge current leg with next leg
return merge(
// take from currentLeg until we get 'welcome' from next leg, plus a little overlap in time
currentLeg.pipe(
takeUntil(
nextLeg.pipe(
filter((e) => e.type === 'welcome'),
take(1),
delay(EXCHANGE_OVERLAP_TIME),
),
),
),
nextLeg.pipe(skip(1)),
).pipe(distinctByTransactionId())
}

/**
* Operator function that operates on an observable of listener events that may include events with the duplicate transaction IDs
* and returns a new stream filtering out duplicates
*/
function distinctByTransactionId() {
return (input$: Observable<ClientListenerEvent>) =>
input$.pipe(
scan(
(
[seen]: [LRU<string, boolean>, ClientListenerEvent | null],
event,
): [LRU<string, boolean>, ClientListenerEvent | null] => {
if (event.type !== 'mutation') {
return [seen, event]
}
if (seen.has(event.transactionId)) {
return [seen, null]
}
seen.set(event.transactionId, true)
return [seen, event]
},
[new LRU<string, boolean>({maxSize: 1000}), null],
),
map(([_, event]) => event),
filter((event): event is ClientListenerEvent => event !== null),
)
}

function createSnapshotEvent(
documentId: string,
document: null | SanityDocument,
Expand Down

0 comments on commit 8098363

Please sign in to comment.