Skip to content

Commit

Permalink
Merge pull request #1 from stoplightio/fix/bind-state
Browse files Browse the repository at this point in the history
async bindState small changes
  • Loading branch information
tommoor committed Dec 8, 2020
2 parents 2df1949 + c6c52ff commit 3cd6822
Showing 1 changed file with 18 additions and 9 deletions.
27 changes: 18 additions & 9 deletions bin/utils.js
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ const wsReadyStateClosed = 3 // eslint-disable-line
const gcEnabled = process.env.GC !== 'false' && process.env.GC !== '0'
const persistenceDir = process.env.YPERSISTENCE
/**
* @type {{bindState: function(string,WSSharedDoc):void, writeState:function(string,WSSharedDoc):Promise<any>, provider: any}|null}
* @type {{bindState: function(string,WSSharedDoc):void|Promise<void>, writeState:function(string,WSSharedDoc):Promise<any>, provider: any}|null}
*/
let persistence = null
if (typeof persistenceDir === 'string') {
Expand All @@ -43,7 +43,7 @@ if (typeof persistenceDir === 'string') {
ldb.storeUpdate(docName, update)
})
},
writeState: async (docName, ydoc) => { }
writeState: async (docName, ydoc) => {}
}
}

Expand Down Expand Up @@ -103,6 +103,10 @@ class WSSharedDoc extends Y.Doc {
*/
this.awareness = new awarenessProtocol.Awareness(this)
this.awareness.setLocalState(null)
/**
* @type {Promise<void>|void}
*/
this.whenSynced = void 0
/**
* @param {{ added: Array<number>, updated: Array<number>, removed: Array<number> }} changes
* @param {Object | null} conn Origin is the connection that made the change
Expand Down Expand Up @@ -162,12 +166,17 @@ exports.getYDoc = getYDoc
* @param {WSSharedDoc} doc
* @param {Uint8Array} message
*/
const messageListener = (conn, doc, message) => {
const messageListener = async (conn, doc, message) => {
const encoder = encoding.createEncoder()
const decoder = decoding.createDecoder(message)
const messageType = decoding.readVarUint(decoder)
switch (messageType) {
case messageSync:
// await the doc state being updated from persistence, if available, otherwise
// we may send sync step 2 too early
if (doc.whenSynced) {
await doc.whenSynced
}
encoding.writeVarUint(encoder, messageSync)
syncProtocol.readSyncMessage(decoder, encoder, doc, null)
if (encoding.length(encoder) > 1) {
Expand Down Expand Up @@ -234,12 +243,6 @@ exports.setupWSConnection = async (conn, req, { docName = req.url.slice(1).split
const doc = getYDoc(docName, gc)
doc.conns.set(conn, new Set())

// await the doc state being updated from persistence, if available, otherwise
// we may send sync step 2 too early
if (doc.whenSynced) {
await doc.whenSynced
}

// listen and reply to events
conn.on('message', /** @param {ArrayBuffer} message */ message => messageListener(conn, doc, new Uint8Array(message)))

Expand Down Expand Up @@ -272,6 +275,12 @@ exports.setupWSConnection = async (conn, req, { docName = req.url.slice(1).split
// put the following in a variables in a block so the interval handlers don't
// keep in scope
{
// await the doc state being updated from persistence, if available, otherwise
// we may send sync step 1 too early
if (doc.whenSynced) {
await doc.whenSynced
}

// send sync step 1
const encoder = encoding.createEncoder()
encoding.writeVarUint(encoder, messageSync)
Expand Down

0 comments on commit 3cd6822

Please sign in to comment.