Skip to content

Commit

Permalink
[document-store] Refactor document store (#1003)
Browse files Browse the repository at this point in the history
  • Loading branch information
bjoerge committed Oct 16, 2018
1 parent 159de73 commit 839c6e1
Show file tree
Hide file tree
Showing 4 changed files with 166 additions and 143 deletions.
30 changes: 15 additions & 15 deletions packages/@sanity/base/src/datastores/document/getPairListener.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import client from 'part:@sanity/base/client'
import {of as observableOf} from 'rxjs'
import {defer, of as observableOf} from 'rxjs'
import {concatMap} from 'rxjs/operators'

const DOCS_QUERY = `{
Expand All @@ -24,26 +24,26 @@ function createSnapshotEvent(documentId, document) {

export function getPairListener(idPair) {
const {publishedId, draftId} = idPair
return client.observable
.listen(
return defer(() =>
client.observable.listen(
`*[_id == $publishedId || _id == $draftId]`,
{
publishedId,
draftId
},
{includeResult: false, events: ['welcome', 'mutation', 'reconnect']}
)
.pipe(
concatMap(
event =>
event.type === 'welcome'
? fetchDocumentSnapshots({publishedId, draftId}).pipe(
concatMap(snapshots => [
createSnapshotEvent(draftId, snapshots.draft),
createSnapshotEvent(publishedId, snapshots.published)
])
)
: observableOf(event)
)
).pipe(
concatMap(
event =>
event.type === 'welcome'
? fetchDocumentSnapshots({publishedId, draftId}).pipe(
concatMap(snapshots => [
createSnapshotEvent(draftId, snapshots.draft),
createSnapshotEvent(publishedId, snapshots.published)
])
)
: observableOf(event)
)
)
}
10 changes: 10 additions & 0 deletions packages/@sanity/desk-tool/src/pane/EditorPane.js
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,16 @@ function documentEventToState(event) {
case 'reconnect': {
return {}
}
case 'committed': {
// note: we *could* use this in conjunction with <document>.commit()
// by setting this.state.isSaving=true before calling <document>.commit and setting to false
// again when we get the 'committed' event back.
// However, calling <document>.commit() doesn't necessarily result in a commit actually being done,
// and thus we are not guaranteed to get a 'committed' event back after a call to
// <document>.commit(), which means we could easily get into a situation where the
// `isSaving` state stays around forever.
return {}
}
default: {
// eslint-disable-next-line no-console
console.log('Unhandled document event type "%s"', event.type, event)
Expand Down
147 changes: 19 additions & 128 deletions packages/@sanity/document-store/src/createDocumentStore.js
Original file line number Diff line number Diff line change
@@ -1,150 +1,40 @@
import {Observable, merge} from 'rxjs'
import {share, filter, map, concat, switchMap, tap} from 'rxjs/operators'

import {omit} from 'lodash'
import pubsub from 'nano-pubsub'
import {BufferedDocument, Mutation} from '@sanity/mutator'

const NOOP = () => {}
import {merge, Observable} from 'rxjs'
import {filter, share} from 'rxjs/operators'
import {createObservableBufferedDocument} from './createObservableBufferedDocument'

function createBufferedDocument(documentId, serverEvents$, doCommit) {
const reconnects$ = serverEvents$.pipe(filter(event => event.type === 'reconnect'))

const saves = pubsub()

const bufferedDocs$ = serverEvents$.pipe(
filter(event => event.type === 'snapshot'),
map(event => event.document),
map(snapshot => {
const bufferedDocument = new BufferedDocument(snapshot || null)

bufferedDocument.commitHandler = function commitHandler(opts) {
const payload = opts.mutation.params

// TODO:
// right now the BufferedDocument just commits fire-and-forget-ish
// We should be able to handle failures and retry here
const bufferedDocument = createObservableBufferedDocument(serverEvents$, doCommit)

doCommit(omit(payload, 'resultRev')).subscribe({
next: res => {
opts.success(res)
saves.publish()
},
error: opts.failure
})
}

const rebase$ = new Observable(rebaseObserver => {
bufferedDocument.onRebase = edge => {
rebaseObserver.next({type: 'rebase', document: edge})
}
return () => {
bufferedDocument.onRebase = NOOP
}
}).pipe(share())

const mutation$ = new Observable(mutationObserver => {
bufferedDocument.onMutation = ({mutation, remote}) => {
mutationObserver.next({
type: 'mutation',
document: bufferedDocument.LOCAL,
mutations: mutation.mutations,
origin: remote ? 'remote' : 'local'
})
}

const serverMutations = serverEvents$
.pipe(filter(event => event.type === 'mutation'))
.subscribe(event => bufferedDocument.arrive(new Mutation(event)))

return () => {
serverMutations.unsubscribe()
bufferedDocument.onMutation = NOOP
}
}).pipe(share())

return {
events: new Observable(observer => {
observer.next({type: 'snapshot', document: bufferedDocument.LOCAL})
observer.complete()
}).pipe(concat(merge(mutation$, rebase$, reconnects$))),

patch(patches) {
const mutations = patches.map(patch => ({patch: {...patch, id: documentId}}))

bufferedDocument.add(new Mutation({mutations: mutations}))
},
create(document) {
const mutation = {
create: Object.assign({id: documentId}, document)
}
bufferedDocument.add(new Mutation({mutations: [mutation]}))
},
createIfNotExists(document) {
bufferedDocument.add(new Mutation({mutations: [{createIfNotExists: document}]}))
},
createOrReplace(document) {
bufferedDocument.add(new Mutation({mutations: [{createOrReplace: document}]}))
},
delete() {
bufferedDocument.add(new Mutation({mutations: [{delete: {id: documentId}}]}))
},
commit() {
return new Observable(observer => {
// todo: connect observable with request from bufferedDocument.commit somehow
bufferedDocument.commit()
return saves.subscribe(() => {
observer.next()
observer.complete()
})
})
}
}
}),
share()
)

let currentBuffered
const cachedBuffered = new Observable(observer => {
if (currentBuffered) {
observer.next(currentBuffered)
observer.complete()
}
return bufferedDocs$
.pipe(
tap(doc => {
currentBuffered = doc
})
)
.subscribe(observer)
})
const reconnects$ = serverEvents$.pipe(filter(event => event.type === 'reconnect'))

return {
events: cachedBuffered.pipe(switchMap(bufferedDoc => bufferedDoc.events)),
events: merge(reconnects$, bufferedDocument.updates$),
patch(patches) {
cachedBuffered.subscribe(bufferedDoc => bufferedDoc.patch(patches))
bufferedDocument.addMutations(patches.map(patch => ({patch: {...patch, id: documentId}})))
},
create(document) {
cachedBuffered.subscribe(bufferedDoc => bufferedDoc.create(document))
bufferedDocument.addMutation({
create: Object.assign({id: documentId}, document)
})
},
createIfNotExists(document) {
cachedBuffered.subscribe(bufferedDoc => bufferedDoc.createIfNotExists(document))
bufferedDocument.addMutation({createIfNotExists: document})
},
createOrReplace(document) {
cachedBuffered.subscribe(bufferedDoc => bufferedDoc.createOrReplace(document))
bufferedDocument.addMutation({createOrReplace: document})
},
delete() {
cachedBuffered.subscribe(bufferedDoc => bufferedDoc.delete())
bufferedDocument.addMutation({delete: {id: documentId}})
},
commit() {
return cachedBuffered.pipe(switchMap(bufferedDoc => bufferedDoc.commit()))
return bufferedDocument.commit()
}
}
}

const isDocId = id => event => event.documentId === id
const isEventForDocId = id => event => event.type === 'reconnect' || event.documentId === id

module.exports = function createDocumentStore({serverConnection}) {
export default function createDocumentStore({serverConnection}) {
return {
byId,
byIds,
Expand Down Expand Up @@ -181,12 +71,13 @@ module.exports = function createDocumentStore({serverConnection}) {

const draft = createBufferedDocument(
draftId,
serverEvents$.pipe(filter(isDocId(draftId))),
serverEvents$.pipe(filter(isEventForDocId(draftId))),
doCommit
)

const published = createBufferedDocument(
publishedId,
serverEvents$.pipe(filter(isDocId(publishedId))),
serverEvents$.pipe(filter(isEventForDocId(publishedId))),
doCommit
)
return {draft, published}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
import {BufferedDocument, Mutation} from '@sanity/mutator'
import {defer, merge, of, Subject} from 'rxjs'
import {
filter,
map,
distinctUntilChanged,
publishReplay,
refCount,
scan,
share,
tap,
withLatestFrom
} from 'rxjs/operators'

export const snapshotEventFrom = snapshot => ({
type: 'snapshot',
document: snapshot
})

// This is an observable interface for BufferedDocument in an attempt
// to make it easier to work with the api provided by it
export const createObservableBufferedDocument = (serverEvents$, doCommit) => {
// Incoming local actions (e.g. a request to mutate, a request to commit pending changes, etc.)
const actions$ = new Subject()

// Stream of events that has happened with documents (e.g. a mutation that has been applied, a rebase).
// These are "after the fact" events and also includes the next document state.
const updates$ = new Subject()

const createInitialBufferedDocument = snapshot => {
const bufferedDocument = new BufferedDocument(snapshot)
bufferedDocument.onMutation = ({mutation, remote}) => {
updates$.next({
type: 'mutation',
document: bufferedDocument.LOCAL,
mutations: mutation.mutations,
origin: remote ? 'remote' : 'local'
})
}

bufferedDocument.onRebase = edge => {
updates$.next({type: 'rebase', document: edge})
}

bufferedDocument.commitHandler = opts => {
const {resultRev, ...mutation} = opts.mutation.params
doCommit(mutation)
.pipe(tap(() => updates$.next({type: 'committed'})))
.subscribe({next: opts.success, error: opts.failure})
}
return bufferedDocument
}

const bufferedDocument$ = serverEvents$.pipe(
scan((bufferedDocument, serverEvent) => {
if (serverEvent.type === 'snapshot') {
if (bufferedDocument) {
// we received a new snapshot and already got an old one. When we receive a snapshot again
// it is usually because the connection has been down. Attempt to save pending changes (if any)
bufferedDocument.commit()
}
return createInitialBufferedDocument(serverEvent.document || null)
}
if (!bufferedDocument) {
// eslint-disable-next-line no-console
console.warn(
'Ignoring event of type "%s" since buffered document has not yet been set up with snapshot',
serverEvent.type
)
return bufferedDocument
}
if (serverEvent.type === 'mutation') {
bufferedDocument.arrive(new Mutation(serverEvent))
} else if (serverEvent.type !== 'reconnect') {
// eslint-disable-next-line no-console
console.warn('Received unexpected server event of type "%s"', serverEvent.type)
}
return bufferedDocument
}, null),
publishReplay(1),
refCount()
)

// this is where the side effects mandated by local actions actually happens
const actionHandler$ = actions$.pipe(
withLatestFrom(bufferedDocument$),
map(([action, bufferedDocument]) => {
if (action.type === 'mutation') {
bufferedDocument.add(new Mutation({mutations: action.mutations}))
}
if (action.type === 'commit') {
bufferedDocument.commit()
}
return null
}),
filter(Boolean),
share()
)

const emitAction = action => actions$.next(action)

const addMutations = mutations => emitAction({type: 'mutation', mutations})
const addMutation = mutation => addMutations([mutation])

const commit = () =>
defer(() => {
emitAction({type: 'commit'})
return of(null)
})

const snapshot$ = bufferedDocument$.pipe(
distinctUntilChanged((bufDoc, prevBufDoc) => bufDoc.LOCAL === prevBufDoc.LOCAL),
map(buf => snapshotEventFrom(buf.LOCAL))
)

return {
updates$: merge(snapshot$, actionHandler$, updates$),
addMutation,
addMutations,
commit
}
}

0 comments on commit 839c6e1

Please sign in to comment.