Skip to content

Commit

Permalink
[desk-tool][base] Rework the fetching and aligning of events
Browse files Browse the repository at this point in the history
- Introduce a remoteSnapshots, an observable which returns snapshots
  together with a chain of preceding mutations.
- Introduce a separate Aligner-class which is only responsible for
  aligning up the translog results with the incoming mutations.
- Move logic out of the Timeline-class to make it only care about
  the representation of the Timeline.
  • Loading branch information
judofyr authored and rexxars committed Oct 6, 2020
1 parent 9a67b11 commit 61e936e
Show file tree
Hide file tree
Showing 18 changed files with 459 additions and 260 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@ import {
CommitFunction,
CommittedEvent,
DocumentMutationEvent,
DocumentRemoteMutationEvent,
DocumentRebaseEvent,
SnapshotEvent,
RemoteSnapshotEvent
} from './types'
import {ListenerEvent} from '../getPairListener'
import {Mutation} from '../types'
Expand All @@ -15,7 +15,6 @@ export type BufferedDocumentEvent =
| SnapshotEvent
| DocumentRebaseEvent
| DocumentMutationEvent
| DocumentRemoteMutationEvent
| CommittedEvent

const prepare = id => document => {
Expand All @@ -25,6 +24,7 @@ const prepare = id => document => {

export interface BufferedDocumentWrapper {
consistency$: Observable<boolean>
remoteSnapshot$: Observable<RemoteSnapshotEvent>
events: Observable<BufferedDocumentEvent>
// helper functions
patch: (patches) => Mutation[]
Expand Down Expand Up @@ -52,6 +52,8 @@ export const createBufferedDocument = (
return {
events: bufferedDocument.updates$,
consistency$: bufferedDocument.consistency$,
remoteSnapshot$: bufferedDocument.remoteSnapshot$,

patch: patches => patches.map(patch => ({patch: {...patch, id: documentId}})),
create: document => ({create: prepareDoc(document)}),
createIfNotExists: document => ({createIfNotExists: prepareDoc(document)}),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ import {
DocumentRebaseEvent,
MutationPayload,
SnapshotEvent,
DocumentRemoteMutationEvent
DocumentRemoteMutationEvent,
RemoteSnapshotEvent
} from './types'

import {ListenerEvent, MutationEvent} from '../getPairListener'
Expand Down Expand Up @@ -81,7 +82,7 @@ export const createObservableBufferedDocument = (
const rebase$ = new Subject<DocumentRebaseEvent>()

// a stream of remote mutations with effetcs
const effects$ = new Subject<DocumentRemoteMutationEvent>();
const remoteMutations = new Subject<DocumentRemoteMutationEvent>()

const createInitialBufferedDocument = initialSnapshot => {
const bufferedDocument = new BufferedDocument(initialSnapshot)
Expand All @@ -97,9 +98,10 @@ export const createObservableBufferedDocument = (
})
}

bufferedDocument.onRemoteMutation = (mutation) => {
effects$.next({
bufferedDocument.onRemoteMutation = mutation => {
remoteMutations.next({
type: 'remoteMutation',
head: bufferedDocument.document.HEAD,
transactionId: mutation.transactionId,
timestamp: mutation.timestamp,
author: mutation.identity,
Expand Down Expand Up @@ -211,9 +213,18 @@ export const createObservableBufferedDocument = (
snapshotAfterSync$
).pipe(map(toSnapshotEvent), publishReplay(1), refCount())

const remoteSnapshot$: Observable<RemoteSnapshotEvent> = merge(
currentBufferedDocument$.pipe(
map(bufferedDocument => bufferedDocument.document.HEAD),
map(toSnapshotEvent)
),
remoteMutations
).pipe(publishReplay(1), refCount())

return {
updates$: merge(snapshot$, actionHandler$, mutations$, rebase$, effects$),
updates$: merge(snapshot$, actionHandler$, mutations$, rebase$),
consistency$: consistency$.pipe(distinctUntilChanged(), publishReplay(1), refCount()),
remoteSnapshot$,
addMutation,
addMutations,
commit
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ export interface CommittedEvent {

export interface DocumentRemoteMutationEvent {
type: 'remoteMutation'
head: SanityDocument
transactionId: string
author: string
timestamp: Date
Expand All @@ -37,6 +38,8 @@ export interface DocumentRemoteMutationEvent {
}
}

export type RemoteSnapshotEvent = DocumentRemoteMutationEvent | SnapshotEvent

// HTTP API Mutation payloads
// Note: this is *not* the same as the Mutation helper class exported by @sanity/mutator
export interface MutationPayload {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
import {getPairListener, ListenerEvent} from '../getPairListener'
import {BufferedDocumentEvent, createBufferedDocument} from '../buffered-doc/createBufferedDocument'
import {filter, map, share} from 'rxjs/operators'
import {IdPair, Mutation} from '../types'
import {IdPair, Mutation, ReconnectEvent} from '../types'
import {merge, Observable} from 'rxjs'
import client from 'part:@sanity/base/client'
import {RemoteSnapshotEvent} from '../buffered-doc/types'

const isEventForDocId = (id: string) => (event: ListenerEvent): boolean =>
event.type !== 'reconnect' && event.documentId === id
Expand All @@ -15,10 +16,14 @@ function commitMutations(mutations) {
})
}

export type DocumentVersionEvent = BufferedDocumentEvent & {version: 'published' | 'draft'}
type WithVersion<T> = T & {version: 'published' | 'draft'}

export type DocumentVersionEvent = WithVersion<ReconnectEvent | BufferedDocumentEvent>
export type RemoteSnapshotVersionEvent = WithVersion<RemoteSnapshotEvent>

export interface DocumentVersion {
consistency$: Observable<boolean>
remoteSnapshot$: Observable<RemoteSnapshotVersionEvent>
events: Observable<DocumentVersionEvent>

patch: (patches) => Mutation[]
Expand All @@ -36,16 +41,18 @@ export interface Pair {
draft: DocumentVersion
}

function setVersion(version: 'draft' | 'published') {
return (ev: any): DocumentVersionEvent => ({...ev, version})
function setVersion<T>(version: 'draft' | 'published') {
return (ev: T): T & {version: 'draft' | 'published'} => ({...ev, version})
}

export function checkoutPair(idPair: IdPair): Pair {
const {publishedId, draftId} = idPair

const listenerEvents$ = getPairListener(client, idPair).pipe(share())

const reconnect$ = listenerEvents$.pipe(filter(ev => ev.type === 'reconnect'))
const reconnect$ = listenerEvents$.pipe(filter(ev => ev.type === 'reconnect')) as Observable<
ReconnectEvent
>

const draft = createBufferedDocument(
draftId,
Expand All @@ -63,12 +70,14 @@ export function checkoutPair(idPair: IdPair): Pair {
draft: {
...draft,
events: merge(reconnect$, draft.events).pipe(map(setVersion('draft'))),
consistency$: draft.consistency$
consistency$: draft.consistency$,
remoteSnapshot$: draft.remoteSnapshot$.pipe(map(setVersion('draft')))
},
published: {
...published,
events: merge(reconnect$, published.events).pipe(map(setVersion('published'))),
consistency$: published.consistency$
consistency$: published.consistency$,
remoteSnapshot$: published.remoteSnapshot$.pipe(map(setVersion('published')))
}
}
}

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
import {merge} from 'rxjs'
import {switchMap} from 'rxjs/operators'
import {IdPair} from '../types'
import {memoize} from '../utils/createMemoizer'
import {memoizedPair} from './memoizedPair'

export const remoteSnapshots = memoize(
(idPair: IdPair) => {
return memoizedPair(idPair).pipe(
switchMap(({published, draft}) => merge(published.remoteSnapshot$, draft.remoteSnapshot$))
)
},
idPair => idPair.publishedId
)
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import {IdPair, SanityDocument, Mutation} from '../types'
import {IdPair, SanityDocument, Mutation, ReconnectEvent} from '../types'
import {filter, map, publishReplay, refCount} from 'rxjs/operators'
import {memoizedPair} from './memoizedPair'
import {BufferedDocumentEvent} from '../buffered-doc/createBufferedDocument'
Expand All @@ -9,7 +9,7 @@ import {DocumentVersion} from './checkoutPair'

// return true if the event comes with a document snapshot
function isSnapshotEvent(
event: BufferedDocumentEvent
event: BufferedDocumentEvent | ReconnectEvent
): event is SnapshotEvent & {
version: 'published' | 'draft'
} {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,8 @@ export function ChangesPanel({
schemaType,
timelineMode
}: ChangesPanelProps) {
const {close: closeHistory, timeline} = useDocumentHistory()
const diff: ObjectDiff = timeline.currentDiff() as any
const {close: closeHistory, historyController} = useDocumentHistory()
const diff: ObjectDiff = historyController.currentDiff() as any

if (!loading && diff?.type !== 'object') {
return null
Expand Down

0 comments on commit 61e936e

Please sign in to comment.