From 37ddbb4a7578c3eb5bdc208aab959f71ec4259ff Mon Sep 17 00:00:00 2001 From: Tom Moor Date: Mon, 23 Nov 2020 13:41:48 -0800 Subject: [PATCH 01/10] fix: getYDoc should await bindState, if provided --- bin/utils.js | 19 ++++++++++++------- 1 file changed, 12 insertions(+), 7 deletions(-) diff --git a/bin/utils.js b/bin/utils.js index c49c763..357795a 100644 --- a/bin/utils.js +++ b/bin/utils.js @@ -43,7 +43,7 @@ if (typeof persistenceDir === 'string') { ldb.storeUpdate(docName, update) }) }, - writeState: async (docName, ydoc) => {} + writeState: async (docName, ydoc) => { } } } @@ -142,17 +142,22 @@ class WSSharedDoc extends Y.Doc { * * @param {string} docname - the name of the Y.Doc to find or create * @param {boolean} gc - whether to allow gc on the doc (applies only when created) - * @return {WSSharedDoc} + * @return {Promise} */ -const getYDoc = (docname, gc = true) => map.setIfUndefined(docs, docname, () => { +const getYDoc = async (docname, gc = true) => { + let set = map.get(docname) + if (set !== undefined) { + return set + } + const doc = new WSSharedDoc(docname) doc.gc = gc if (persistence !== null) { - persistence.bindState(docname, doc) + await persistence.bindState(docname, doc) } docs.set(docname, doc) return doc -}) +} exports.getYDoc = getYDoc @@ -227,10 +232,10 @@ const pingTimeout = 30000 * @param {any} req * @param {any} opts */ -exports.setupWSConnection = (conn, req, { docName = req.url.slice(1).split('?')[0], gc = true } = {}) => { +exports.setupWSConnection = async (conn, req, { docName = req.url.slice(1).split('?')[0], gc = true } = {}) => { conn.binaryType = 'arraybuffer' // get doc, initialize if it does not exist yet - const doc = getYDoc(docName, gc) + const doc = await getYDoc(docName, gc) doc.conns.set(conn, new Set()) // listen and reply to events conn.on('message', /** @param {ArrayBuffer} message */ message => messageListener(conn, doc, new Uint8Array(message))) From 306039f00bfaedb600254a505deab79ab50aa4db Mon Sep 17 00:00:00 2001 From: Tom Moor Date: Mon, 23 Nov 2020 14:19:04 -0800 Subject: [PATCH 02/10] Update utils.js --- bin/utils.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/bin/utils.js b/bin/utils.js index 357795a..dc882c9 100644 --- a/bin/utils.js +++ b/bin/utils.js @@ -145,7 +145,7 @@ class WSSharedDoc extends Y.Doc { * @return {Promise} */ const getYDoc = async (docname, gc = true) => { - let set = map.get(docname) + let set = docs.get(docname) if (set !== undefined) { return set } From 0949ab2dc14f053ca290c26316349262668d70e6 Mon Sep 17 00:00:00 2001 From: Tom Moor Date: Mon, 23 Nov 2020 15:00:07 -0800 Subject: [PATCH 03/10] refactor to remove race condition when multiple clients connect in quick succession --- bin/utils.js | 33 ++++++++++++++++++--------------- 1 file changed, 18 insertions(+), 15 deletions(-) diff --git a/bin/utils.js b/bin/utils.js index dc882c9..372c5fd 100644 --- a/bin/utils.js +++ b/bin/utils.js @@ -89,7 +89,7 @@ class WSSharedDoc extends Y.Doc { /** * @param {string} name */ - constructor (name) { + constructor(name) { super({ gc: gcEnabled }) this.name = name this.mux = mutex.createMutex() @@ -134,6 +134,10 @@ class WSSharedDoc extends Y.Doc { { maxWait: CALLBACK_DEBOUNCE_MAXWAIT } )) } + + if (persistence !== null) { + this.whenSynced = persistence.bindState(name, this) + } } } @@ -142,22 +146,14 @@ class WSSharedDoc extends Y.Doc { * * @param {string} docname - the name of the Y.Doc to find or create * @param {boolean} gc - whether to allow gc on the doc (applies only when created) - * @return {Promise} + * @return {WSSharedDoc} */ -const getYDoc = async (docname, gc = true) => { - let set = docs.get(docname) - if (set !== undefined) { - return set - } - +const getYDoc = (docname, gc = true) => map.setIfUndefined(docs, docname, () => { const doc = new WSSharedDoc(docname) doc.gc = gc - if (persistence !== null) { - await persistence.bindState(docname, doc) - } docs.set(docname, doc) return doc -} +}) exports.getYDoc = getYDoc @@ -235,7 +231,7 @@ const pingTimeout = 30000 exports.setupWSConnection = async (conn, req, { docName = req.url.slice(1).split('?')[0], gc = true } = {}) => { conn.binaryType = 'arraybuffer' // get doc, initialize if it does not exist yet - const doc = await getYDoc(docName, gc) + const doc = getYDoc(docName, gc) doc.conns.set(conn, new Set()) // listen and reply to events conn.on('message', /** @param {ArrayBuffer} message */ message => messageListener(conn, doc, new Uint8Array(message))) @@ -265,8 +261,15 @@ exports.setupWSConnection = async (conn, req, { docName = req.url.slice(1).split conn.on('pong', () => { pongReceived = true }) - // put the following in a variables in a block so the interval handlers don't keep in in - // scope + + // await the doc state being updated from persistence, if available, otherwise + // we may send sync step 2 too early + if (doc.whenSynced) { + await doc.whenSynced + } + + // put the following in a variables in a block so the interval handlers don't + // keep in scope { // send sync step 1 const encoder = encoding.createEncoder() From 5dd8754144d9c45c1ece343064e706ca2b035cff Mon Sep 17 00:00:00 2001 From: Tom Moor Date: Mon, 23 Nov 2020 15:41:58 -0800 Subject: [PATCH 04/10] fix: wait doc load before binding connection listeners --- bin/utils.js | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/bin/utils.js b/bin/utils.js index 372c5fd..aa7e756 100644 --- a/bin/utils.js +++ b/bin/utils.js @@ -233,6 +233,13 @@ exports.setupWSConnection = async (conn, req, { docName = req.url.slice(1).split // get doc, initialize if it does not exist yet const doc = getYDoc(docName, gc) doc.conns.set(conn, new Set()) + + // await the doc state being updated from persistence, if available, otherwise + // we may send sync step 2 too early + if (doc.whenSynced) { + await doc.whenSynced + } + // listen and reply to events conn.on('message', /** @param {ArrayBuffer} message */ message => messageListener(conn, doc, new Uint8Array(message))) @@ -262,12 +269,6 @@ exports.setupWSConnection = async (conn, req, { docName = req.url.slice(1).split pongReceived = true }) - // await the doc state being updated from persistence, if available, otherwise - // we may send sync step 2 too early - if (doc.whenSynced) { - await doc.whenSynced - } - // put the following in a variables in a block so the interval handlers don't // keep in scope { From 2df19492f26ae2e62b3e5caf7791a3eb62a1ff39 Mon Sep 17 00:00:00 2001 From: Tom Moor Date: Mon, 23 Nov 2020 15:42:54 -0800 Subject: [PATCH 05/10] lint --- bin/utils.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/bin/utils.js b/bin/utils.js index aa7e756..4291264 100644 --- a/bin/utils.js +++ b/bin/utils.js @@ -89,7 +89,7 @@ class WSSharedDoc extends Y.Doc { /** * @param {string} name */ - constructor(name) { + constructor (name) { super({ gc: gcEnabled }) this.name = name this.mux = mutex.createMutex() From 08076bd118d6a093e68d8433c6e3cdaed7f690d3 Mon Sep 17 00:00:00 2001 From: William Hilton Date: Tue, 8 Dec 2020 14:57:44 -0500 Subject: [PATCH 06/10] fix issue where incoming client messages get dropped during bindState --- bin/utils.js | 19 ++++++++++++------- 1 file changed, 12 insertions(+), 7 deletions(-) diff --git a/bin/utils.js b/bin/utils.js index 4291264..f17ca69 100644 --- a/bin/utils.js +++ b/bin/utils.js @@ -162,12 +162,17 @@ exports.getYDoc = getYDoc * @param {WSSharedDoc} doc * @param {Uint8Array} message */ -const messageListener = (conn, doc, message) => { +const messageListener = async (conn, doc, message) => { const encoder = encoding.createEncoder() const decoder = decoding.createDecoder(message) const messageType = decoding.readVarUint(decoder) switch (messageType) { case messageSync: + // await the doc state being updated from persistence, if available, otherwise + // we may send sync step 2 too early + if (doc.whenSynced) { + await doc.whenSynced + } encoding.writeVarUint(encoder, messageSync) syncProtocol.readSyncMessage(decoder, encoder, doc, null) if (encoding.length(encoder) > 1) { @@ -234,12 +239,6 @@ exports.setupWSConnection = async (conn, req, { docName = req.url.slice(1).split const doc = getYDoc(docName, gc) doc.conns.set(conn, new Set()) - // await the doc state being updated from persistence, if available, otherwise - // we may send sync step 2 too early - if (doc.whenSynced) { - await doc.whenSynced - } - // listen and reply to events conn.on('message', /** @param {ArrayBuffer} message */ message => messageListener(conn, doc, new Uint8Array(message))) @@ -272,6 +271,12 @@ exports.setupWSConnection = async (conn, req, { docName = req.url.slice(1).split // put the following in a variables in a block so the interval handlers don't // keep in scope { + // await the doc state being updated from persistence, if available, otherwise + // we may send sync step 1 too early + if (doc.whenSynced) { + await doc.whenSynced + } + // send sync step 1 const encoder = encoding.createEncoder() encoding.writeVarUint(encoder, messageSync) From 6dfb96bd1d9e14c50f4dad109360108cb2d2f2d2 Mon Sep 17 00:00:00 2001 From: William Hilton Date: Tue, 8 Dec 2020 14:59:31 -0500 Subject: [PATCH 07/10] reduce diff noise --- bin/utils.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/bin/utils.js b/bin/utils.js index f17ca69..371408c 100644 --- a/bin/utils.js +++ b/bin/utils.js @@ -43,7 +43,7 @@ if (typeof persistenceDir === 'string') { ldb.storeUpdate(docName, update) }) }, - writeState: async (docName, ydoc) => { } + writeState: async (docName, ydoc) => {} } } From af3786347d8c65cecf81a660112e371c0dcbcf71 Mon Sep 17 00:00:00 2001 From: William Hilton Date: Tue, 8 Dec 2020 15:09:42 -0500 Subject: [PATCH 08/10] fix typing of bindState --- bin/utils.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/bin/utils.js b/bin/utils.js index 371408c..9b27343 100644 --- a/bin/utils.js +++ b/bin/utils.js @@ -24,7 +24,7 @@ const wsReadyStateClosed = 3 // eslint-disable-line const gcEnabled = process.env.GC !== 'false' && process.env.GC !== '0' const persistenceDir = process.env.YPERSISTENCE /** - * @type {{bindState: function(string,WSSharedDoc):void, writeState:function(string,WSSharedDoc):Promise, provider: any}|null} + * @type {{bindState: function(string,WSSharedDoc):void|Promise, writeState:function(string,WSSharedDoc):Promise, provider: any}|null} */ let persistence = null if (typeof persistenceDir === 'string') { From c6c52ff119b4f7c7dc85eebda075d554de519c72 Mon Sep 17 00:00:00 2001 From: William Hilton Date: Tue, 8 Dec 2020 15:15:39 -0500 Subject: [PATCH 09/10] fix typing of whenSynced --- bin/utils.js | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/bin/utils.js b/bin/utils.js index 9b27343..0bc7802 100644 --- a/bin/utils.js +++ b/bin/utils.js @@ -103,6 +103,10 @@ class WSSharedDoc extends Y.Doc { */ this.awareness = new awarenessProtocol.Awareness(this) this.awareness.setLocalState(null) + /** + * @type {Promise|void} + */ + this.whenSynced = void 0 /** * @param {{ added: Array, updated: Array, removed: Array }} changes * @param {Object | null} conn Origin is the connection that made the change From eb2c6f7c1d9f906b8bab7a359c32f907a0b92256 Mon Sep 17 00:00:00 2001 From: Tom Moor Date: Mon, 25 Jan 2021 12:41:45 -0800 Subject: [PATCH 10/10] fix: Race condition between bindState and writeState. If the websocket disconnects before bindState has applied the persisted value from the database then writeState will end up writing an empty ydoc --- bin/utils.js | 20 +++++++++++++++----- 1 file changed, 15 insertions(+), 5 deletions(-) diff --git a/bin/utils.js b/bin/utils.js index 0bc7802..87f3d6f 100644 --- a/bin/utils.js +++ b/bin/utils.js @@ -43,7 +43,7 @@ if (typeof persistenceDir === 'string') { ldb.storeUpdate(docName, update) }) }, - writeState: async (docName, ydoc) => {} + writeState: async (docName, ydoc) => { } } } @@ -107,6 +107,10 @@ class WSSharedDoc extends Y.Doc { * @type {Promise|void} */ this.whenSynced = void 0 + /** + * @type {boolean} + */ + this.isSynced = false /** * @param {{ added: Array, updated: Array, removed: Array }} changes * @param {Object | null} conn Origin is the connection that made the change @@ -141,6 +145,7 @@ class WSSharedDoc extends Y.Doc { if (persistence !== null) { this.whenSynced = persistence.bindState(name, this) + .then(() => { this.isSynced = true }) } } } @@ -203,11 +208,16 @@ const closeConn = (doc, conn) => { const controlledIds = doc.conns.get(conn) doc.conns.delete(conn) awarenessProtocol.removeAwarenessStates(doc.awareness, Array.from(controlledIds), null) + if (doc.conns.size === 0 && persistence !== null) { - // if persisted, we store state and destroy ydocument - persistence.writeState(doc.name, doc).then(() => { - doc.destroy() - }) + if (doc.isSynced) { + // if persisted and the state has finished loading from the database, + // we write the state back to persisted storage + persistence.writeState(doc.name, doc).then(() => { + doc.destroy() + }) + } + docs.delete(doc.name) } }