Skip to content

Commit

Permalink
fix(core): implement listener connection relay to avoid abrupt reconn…
Browse files Browse the repository at this point in the history
…ects
  • Loading branch information
bjoerge committed May 8, 2024
1 parent 7e4940f commit 3cdc78a
Show file tree
Hide file tree
Showing 3 changed files with 260 additions and 20 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
/* eslint-disable max-nested-callbacks */
import {describe, expect, it, jest} from '@jest/globals'
import {type SanityClient} from '@sanity/client'
import {EMPTY, from, lastValueFrom, NEVER, Observable, of, timer} from 'rxjs'
import {concatMap, delay, map, takeUntil, tap, toArray} from 'rxjs/operators'

import {createPairListener} from '../createPairListener'

describe('createPairListener', () => {
it('properly sets up and discards overlapping listeners', async () => {
const unsubscribe = jest.fn()
const listen = jest.fn(() => {
return new Observable((subscriber) => {
subscriber.next({type: 'welcome'})
return unsubscribe
}).pipe(
// todo: figure out why a delay is needed here
delay(1),
)
})

const getDocuments = jest.fn((ids: string[]) => of(ids.map((id) => ({_id: id}))))

const mockedClient = {
observable: {
listen,
getDocuments,
},
} as unknown as SanityClient

const listener = createPairListener(
mockedClient,
{publishedId: 'foo', draftId: 'drafts.bar'},
{relay: {exchangeWaitMin: 50, exchangeWaitMax: 50, exchangeOverLapTime: 10}},
)
const events = await lastValueFrom(
listener.pipe(
takeUntil(
// We'll subscribe for a little more than 20ms, which means the first leg should be done and the second leg should be started
timer(80).pipe(
tap(() => {
// at this moment unsubscribe should have been called on the first listener leg
expect(unsubscribe).toHaveBeenCalledTimes(1)
}),
),
),
toArray(),
),
)

expect(listen).toHaveBeenCalledTimes(2)
expect(unsubscribe).toHaveBeenCalledTimes(2)
expect(getDocuments).toHaveBeenCalledTimes(1)

expect(events).toEqual([
{
type: 'snapshot',
document: {_id: 'drafts.bar'},
documentId: 'drafts.bar',
},
{
type: 'snapshot',
document: {_id: 'foo'},
documentId: 'foo',
},
])
})

it('dedupes any listener events sent in the overlapping period', async () => {
const unsubscribe = jest.fn()

let listenerNo = 0
const listen = jest.fn(() => {
listenerNo++
return from([
{type: 'welcome'},
{type: 'mutation', transactionId: `one-${listenerNo}`},
{type: 'mutation', transactionId: 'dupe'},
]).pipe(concatMap((ev) => timer(1).pipe(map(() => ev))))
})

const mockedClient = {
observable: {
listen,
getDocuments: (ids: string[]) => of(ids.map((id) => ({_id: id}))),
},
} as unknown as SanityClient

const listener = createPairListener(
mockedClient,
{publishedId: 'foo', draftId: 'drafts.bar'},
{relay: {exchangeWaitMin: 30, exchangeWaitMax: 30, exchangeOverLapTime: 2}},
)
const events = await lastValueFrom(listener.pipe(takeUntil(timer(55)), toArray()))

expect(listen).toHaveBeenCalledTimes(2)
expect(unsubscribe).toHaveBeenCalledTimes(0)

expect(events).toEqual([
{
document: {_id: 'drafts.bar'},
documentId: 'drafts.bar',
type: 'snapshot',
},
{
document: {_id: 'foo'},
documentId: 'foo',
type: 'snapshot',
},
{transactionId: 'one-1', type: 'mutation'},
{transactionId: 'dupe', type: 'mutation'},
{transactionId: 'one-2', type: 'mutation'},
])
})

it('avoids subscribing to the next listener before the first listener has received a welcome event, even if exchange interval has passed', async () => {
const listen = jest.fn(() => NEVER)
const getDocuments = jest.fn(() => EMPTY)

const mockedClient = {
observable: {
listen,
getDocuments,
},
} as unknown as SanityClient

const listener = createPairListener(
mockedClient,
{publishedId: 'foo', draftId: 'drafts.bar'},
{relay: {exchangeWaitMin: 10, exchangeWaitMax: 10, exchangeOverLapTime: 2}},
)

const events = await lastValueFrom(listener.pipe(takeUntil(timer(30)), toArray()))
expect(listen).toHaveBeenCalledTimes(1)
expect(getDocuments).toHaveBeenCalledTimes(0)
expect(events).toEqual([])
})
})
132 changes: 113 additions & 19 deletions packages/sanity/src/core/store/_legacy/document/createPairListener.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,20 @@
import {type SanityClient} from '@sanity/client'
import {type SanityDocument} from '@sanity/types'
import {groupBy} from 'lodash'
import {defer, type Observable, of as observableOf} from 'rxjs'
import {concatMap, map, mergeMap, scan} from 'rxjs/operators'
import LRU from 'quick-lru'
import {defer, merge, type Observable, of as observableOf, timer} from 'rxjs'
import {
concatMap,
delay,
filter,
map,
mergeMap,
scan,
share,
switchMap,
take,
takeUntil,
} from 'rxjs/operators'

import {
type IdPair,
Expand All @@ -25,9 +37,16 @@ export interface InitialSnapshotEvent {
document: SanityDocument | null
}

interface RelayOptions {
exchangeWaitMin: number
exchangeWaitMax: number
exchangeOverLapTime: number
}

/** @internal */
export interface PairListenerOptions {
tag?: string
relay: RelayOptions
}

/** @internal */
Expand Down Expand Up @@ -58,29 +77,19 @@ function allPendingTransactionEventsReceived(listenerEvents: ListenerEvent[]) {
)
}

/** Add some randomness to the exchange delay to avoid thundering herd */
function getExchangeWait(min: number, max: number) {
return min + Math.floor(Math.random() * (max - min))
}

/** @internal */
export function createPairListener(
client: SanityClient,
idPair: IdPair,
options: PairListenerOptions = {},
options?: PairListenerOptions,
): Observable<ListenerEvent> {
const {publishedId, draftId} = idPair
return defer(
() =>
client.observable.listen(
`*[_id == $publishedId || _id == $draftId]`,
{
publishedId,
draftId,
},
{
includeResult: false,
events: ['welcome', 'mutation', 'reconnect'],
effectFormat: 'mendoza',
tag: options.tag || 'document.pair-listener',
},
) as Observable<WelcomeEvent | MutationEvent | ReconnectEvent>,
).pipe(
return createRelayPairListener(client, idPair, options).pipe(
concatMap((event) =>
event.type === 'welcome'
? fetchInitialDocumentSnapshots().pipe(
Expand Down Expand Up @@ -138,6 +147,91 @@ export function createPairListener(
}
}

type ClientListenerEvent = WelcomeEvent | MutationEvent | ReconnectEvent

function createRelayPairListener(
client: SanityClient,
idPair: IdPair,
options?: PairListenerOptions,
): Observable<ClientListenerEvent> {
const {publishedId, draftId} = idPair
const currentLeg = defer(
() =>
client.observable.listen(
`*[_id == $publishedId || _id == $draftId]`,
{publishedId, draftId},
{
includeResult: false,
events: ['welcome', 'mutation', 'reconnect'],
effectFormat: 'mendoza',
tag: options?.tag || 'document.pair-listener',
},
) as Observable<ClientListenerEvent>,
).pipe(share())

if (!options?.relay) {
return currentLeg
}

const {exchangeWaitMin, exchangeWaitMax, exchangeOverLapTime} = options.relay

// This represents the next leg, and will be started after a certain delay
const nextLeg = currentLeg.pipe(
// make sure we are connected to the current leg before scheduling the next one. This prevents build-up in case it takes a while to connect
filter((ev) => ev.type === 'welcome'),
// current listener may still get disconnected and reconnect,
// so in case we receive a new welcome event we should cancel the previously scheduled next leg
switchMap(() => timer(getExchangeWait(exchangeWaitMin, exchangeWaitMax))),
take(1),
mergeMap(() => createRelayPairListener(client, idPair, options)),
share(),
)

// Merge messages from current leg with next leg into a single stream
return merge(
// take from currentLeg until we get the first 'welcome' from next leg, plus adding little overlap in time
currentLeg.pipe(
takeUntil(
nextLeg.pipe(
filter((e) => e.type === 'welcome'),
take(1),
delay(exchangeOverLapTime),
),
),
),
// ignore the first welcome event from the next leg.
nextLeg.pipe(filter((e, index) => e.type !== 'welcome' || index > 0)),
).pipe(distinctByTransactionId())
}

/**
* Operator function that takes a stream of listener events
* and returns a new stream that filters out events sharing the same transaction ID
*/
function distinctByTransactionId() {
return (input$: Observable<ClientListenerEvent>) =>
input$.pipe(
scan(
(
[seen]: [LRU<string, boolean>, ClientListenerEvent | null],
event,
): [LRU<string, boolean>, ClientListenerEvent | null] => {
if (event.type !== 'mutation') {
return [seen, event]
}
if (seen.has(event.transactionId)) {
return [seen, null]
}
seen.set(event.transactionId, true)
return [seen, event]
},
[new LRU<string, boolean>({maxSize: 1000}), null],
),
map(([_, event]) => event),
filter((event): event is ClientListenerEvent => event !== null),
)
}

function createSnapshotEvent(
documentId: string,
document: null | SanityDocument,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,14 @@ function submitCommitRequest(
)
}

const RELAY_CONFIG = {
/** How long to wait after the first connection is set up to start the exchange */
exchangeWaitMin: 1000 * 60 * 12,
exchangeWaitMax: 1000 * 60 * 19,
/** How long should the overlap between the two listeners be */
exchangeOverLapTime: 1000 * 20,
}

/** @internal */
export function checkoutPair(
client: SanityClient,
Expand All @@ -201,7 +209,7 @@ export function checkoutPair(
const {publishedId, draftId} = idPair

const listenerEventsConnector = new Subject<ListenerEvent>()
const listenerEvents$ = createPairListener(client, idPair).pipe(
const listenerEvents$ = createPairListener(client, idPair, {relay: RELAY_CONFIG}).pipe(
share({connector: () => listenerEventsConnector}),
)

Expand Down

0 comments on commit 3cdc78a

Please sign in to comment.