Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We鈥檒l occasionally send you account related emails.

Already on GitHub? Sign in to your account

[stable26] Fix sync errors after network issues #4487

Merged
merged 12 commits into from Jul 11, 2023
Merged
165 changes: 165 additions & 0 deletions cypress/e2e/api/SyncServiceProvider.spec.js
@@ -0,0 +1,165 @@
/*
* @copyright Copyright (c) 2023 Max <max@nextcloud.com>
*
* @author Max <max@nextcloud.com>
*
* @license AGPL-3.0-or-later
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
*/

import { randUser } from '../../utils/index.js'
import { SyncService } from '../../../src/services/SyncService.js'
import createSyncServiceProvider from '../../../src/services/SyncServiceProvider.js'
import { Doc } from 'yjs'

const user = randUser()

describe('Sync service provider', function() {
let fileId

before(function() {
cy.createUser(user)
window.OC = {
config: { modRewriteWorking: false },
webroot: '',
}
})

beforeEach(function() {
cy.login(user)
cy.prepareSessionApi()
cy.uploadTestFile('test.md')
.then(id => {
fileId = id
})
cy.wrap(new Doc()).as('source')
cy.wrap(new Doc()).as('target')
cy.get('@source').then(source => createProvider(source)).as('sourceProvider')
cy.get('@target').then(target => createProvider(target)).as('targetProvider')
})

afterEach(function() {
this.sourceProvider?.destroy()
this.targetProvider?.destroy()
})

/**
* @param {object} ydoc Yjs document
*/
function createProvider(ydoc) {
const syncService = new SyncService({
serialize: () => 'Serialized',
getDocumentState: () => null,
})
syncService.on('opened', () => syncService.startSync())
return createSyncServiceProvider({
ydoc,
syncService,
fileId,
initialSession: null,
disableBc: true,
})
}

it('recovers from a dropped message', function() {
const sourceMap = this.source.getMap()
const targetMap = this.target.getMap()
cy.intercept({ method: 'POST', url: '**/apps/text/session/push' })
.as('push')
cy.intercept({ method: 'POST', url: '**/apps/text/session/sync' })
.as('sync')
cy.wait('@push')
cy.then(() => {
sourceMap.set('keyA', 'valueA')
expect(targetMap.get('keyB')).to.be.eq(undefined)
})
cy.wait('@sync')
cy.wait('@sync')
// eslint-disable-next-line cypress/no-unnecessary-waiting
cy.wait(1000)
cy.then(() => {
expect(targetMap.get('keyA')).to.be.eq('valueA')
})
cy.intercept({
method: 'POST',
url: '**/apps/text/session/push',
}, req => {
if (req.body.steps) {
req.reply({ forceNetworkError: true })
req.alias = 'dead'
} else {
req.continue()
}
})
cy.then(() => {
sourceMap.set('keyB', 'valueB')
expect(targetMap.get('keyB')).to.be.eq(undefined)
})
cy.wait('@dead')
cy.then(() => {
expect(targetMap.get('keyB')).to.be.eq(undefined)
})
cy.intercept({
method: 'POST',
url: '**/apps/text/session/push',
}, req => {
if (req.body.steps) {
req.alias = 'alive'
req.continue()
} else {
req.continue()
}
})
cy.then(() => {
sourceMap.set('keyC', 'valueC')
expect(targetMap.get('keyB')).to.be.eq(undefined)
})
cy.wait('@alive')
// eslint-disable-next-line cypress/no-unnecessary-waiting
cy.wait(1000)
cy.then(() => {
expect(targetMap.get('keyC')).to.be.eq('valueC')
expect(targetMap.get('keyB')).to.be.eq('valueB')
})
})

/*
* Counts the amount of push and sync requests in one minute.
* Skipped per default, useful for comparison before/after changes to SyncProvider or PollingBackend.
*/
it.skip('is not too chatty', function() {
const sourceMap = this.source.getMap()
const targetMap = this.target.getMap()
cy.intercept({ method: 'POST', url: '**/apps/text/session/push' })
.as('push')
cy.intercept({ method: 'POST', url: '**/apps/text/session/sync' })
.as('sync')
cy.wait('@push')
cy.then(() => {
sourceMap.set('keyA', 'valueA')
expect(targetMap.get('keyB')).to.be.eq(undefined)
})
// eslint-disable-next-line cypress/no-unnecessary-waiting
cy.wait(60000)
cy.then(() => {
expect(targetMap.get('keyA')).to.be.eq('valueA')
})
// 2 clients push awareness updates every 15 seconds -> 2*5 = 10. Actual 15.
cy.get('@push.all').its('length').should('be.lessThan', 30)
// 2 clients sync fast first and then every 5 seconds -> 2*12 = 24. Actual 32.
cy.get('@sync.all').its('length').should('be.lessThan', 60)
})
})
61 changes: 61 additions & 0 deletions cypress/e2e/api/Yjs.spec.js
@@ -0,0 +1,61 @@
/*
* @copyright Copyright (c) 2023 Jonas <jonas@nextcloud.com>
*
* @author Jonas <jonas@nextcloud.com>
*
* @license AGPL-3.0-or-later
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
*/

import { applyUpdate, Doc, encodeStateAsUpdate, encodeStateVector } from 'yjs'

describe('Yjs', function() {
// Only tests that Yjs allows to apply steps in wrong order
it('applies step in wrong order', function() {
const source = new Doc()
const target = new Doc()
const sourceMap = source.getMap()
const targetMap = target.getMap()

target.on('afterTransaction', (tr, doc) => {
// console.log('afterTransaction', tr)
})

// Add keyA to source and apply to target
sourceMap.set('keyA', 'valueA')
const update0A = encodeStateAsUpdate(source)
const sourceVectorA = encodeStateVector(source)
applyUpdate(target, update0A)
expect(targetMap.get('keyA')).to.be.eq('valueA')

// Add keyB to source, don't apply to target yet
sourceMap.set('keyB', 'valueB')
const updateAB = encodeStateAsUpdate(source, sourceVectorA)
const sourceVectorB = encodeStateVector(source)

// Add keyC to source, apply to target
sourceMap.set('keyC', 'valueC')
const updateBC = encodeStateAsUpdate(source, sourceVectorB)
applyUpdate(target, updateBC)
expect(targetMap.get('keyB')).to.be.eq(undefined)
expect(targetMap.get('keyC')).to.be.eq(undefined)

// Apply keyB to target
applyUpdate(target, updateAB)
expect(targetMap.get('keyB')).to.be.eq('valueB')
expect(targetMap.get('keyC')).to.be.eq('valueC')
})
})
2 changes: 1 addition & 1 deletion cypress/e2e/sync.spec.js
Expand Up @@ -77,7 +77,7 @@ describe('Sync', () => {
}
}).as('sessionRequests')
cy.wait('@dead', { timeout: 30000 })
cy.get('#editor-container .document-status', { timeout: 10000 })
cy.get('#editor-container .document-status', { timeout: 30000 })
.should('contain', 'File could not be loaded')
.then(() => {
count = 4
Expand Down
6 changes: 1 addition & 5 deletions src/components/Editor.vue
Expand Up @@ -573,11 +573,7 @@ export default {
}
if (type === ERROR_TYPE.CONNECTION_FAILED && !this.hasConnectionIssue) {
this.hasConnectionIssue = true
// FIXME: ideally we just try to reconnect in the service, so we don't loose steps
OC.Notification.showTemporary('Connection failed, reconnecting')
if (data.retry !== false) {
setTimeout(this.reconnect.bind(this), 5000)
}
OC.Notification.showTemporary('Connection failed.')
}
if (type === ERROR_TYPE.SOURCE_NOT_FOUND) {
this.hasConnectionIssue = true
Expand Down
6 changes: 3 additions & 3 deletions src/services/PollingBackend.js
Expand Up @@ -163,7 +163,7 @@ class PollingBackend {
if (!e.response || e.code === 'ECONNABORTED') {
if (this.#fetchRetryCounter++ >= MAX_RETRY_FETCH_COUNT) {
logger.error('[PollingBackend:fetchSteps] Network error when fetching steps, emitting CONNECTION_FAILED')
this.#syncService.emit('error', { type: ERROR_TYPE.CONNECTION_FAILED, data: { retry: false } })
this.#syncService.emit('error', { type: ERROR_TYPE.CONNECTION_FAILED, data: {} })

} else {
logger.error(`[PollingBackend:fetchSteps] Network error when fetching steps, retry ${this.#fetchRetryCounter}`)
Expand All @@ -186,11 +186,11 @@ class PollingBackend {
this.disconnect()
} else if (e.response.status === 503) {
this.increaseRefetchTimer()
this.#syncService.emit('error', { type: ERROR_TYPE.CONNECTION_FAILED, data: { retry: false } })
this.#syncService.emit('error', { type: ERROR_TYPE.CONNECTION_FAILED, data: {} })
logger.error('Failed to fetch steps due to unavailable service', { error: e })
} else {
this.disconnect()
this.#syncService.emit('error', { type: ERROR_TYPE.CONNECTION_FAILED, data: { retry: false } })
this.#syncService.emit('error', { type: ERROR_TYPE.CONNECTION_FAILED, data: {} })
logger.error('Failed to fetch steps due to other reason', { error: e })
}

Expand Down
39 changes: 27 additions & 12 deletions src/services/SyncService.js
Expand Up @@ -66,6 +66,8 @@ const ERROR_TYPE = {

class SyncService {

#sendIntervalId

constructor({ serialize, getDocumentState, ...options }) {
/** @type {import('mitt').Emitter<import('./SyncService').EventTypes>} _bus */
this._bus = mitt()
Expand All @@ -84,6 +86,7 @@ class SyncService {

this.version = null
this.sending = false
this.#sendIntervalId = null

this.autosave = debounce(this._autosave.bind(this), AUTOSAVE_INTERVAL)

Expand Down Expand Up @@ -142,12 +145,26 @@ class SyncService {
}

sendSteps(getSendable) {
if (!this.connection || this.sending) {
setTimeout(() => {
this.sendSteps(getSendable)
}, 200)
// If already retrying, do nothing.
if (this.#sendIntervalId) {
return
}
if (this.connection && !this.sending) {
return this._sendSteps(getSendable)
}
// If already sending, retry every 200ms.
return new Promise((resolve, reject) => {
this.#sendIntervalId = setInterval(() => {
if (this.connection && !this.sending) {
clearInterval(this.#sendIntervalId)
this.#sendIntervalId = null
this._sendSteps(getSendable).then(resolve).catch(reject)
}
}, 200)
})
}

_sendSteps(getSendable) {
this.sending = true
const data = getSendable()
if (data.steps.length > 0) {
Expand All @@ -156,26 +173,24 @@ class SyncService {
return this.connection.push(data)
.then((response) => {
this.sending = false
}).catch(({ response, code }) => {
logger.error('failed to apply steps due to collission, retrying')
}).catch(err => {
const { response, code } = err
this.sending = false
if (!response || code === 'ECONNABORTED') {
this.emit('error', { type: ERROR_TYPE.CONNECTION_FAILED, data: {} })
return
}
const { status, data } = response
if (status === 403) {
if (response?.status === 403) {
if (!data.document) {
// either the session is invalid or the document is read only.
logger.error('failed to write to document - not allowed')
}
// Only emit conflict event if we have synced until the latest version
if (data.document?.currentVersion === this.version) {
if (response.data.document?.currentVersion === this.version) {
this.emit('error', { type: ERROR_TYPE.PUSH_FAILURE, data: {} })
OC.Notification.showTemporary('Changes could not be sent yet')
}
}
// TODO: Retry and warn
throw new Error('Failed to apply steps. Retry!', { cause: err })
})
}

Expand All @@ -186,7 +201,7 @@ class SyncService {
.map(s => {
return { step: s.lastAwarenessMessage, clientId: s.clientId }
})
const newSteps = awareness
const newSteps = [...awareness]
this.steps = [...this.steps, ...awareness.map(s => s.step)]
for (let i = 0; i < steps.length; i++) {
const singleSteps = steps[i].data
Expand Down
6 changes: 4 additions & 2 deletions src/services/SyncServiceProvider.js
Expand Up @@ -31,18 +31,20 @@ import { logger } from '../helpers/logger.js'
* @param {object} options.syncService - sync service to build upon
* @param {number} options.fileId - file id of the file to open
* @param {object} options.initialSession - initialSession to start from
* @param {boolean} options.disableBc - disable broadcast channel synchronization (default: disabled in debug mode, enabled otherwise)
*/
export default function createSyncServiceProvider({ ydoc, syncService, fileId, initialSession }) {
export default function createSyncServiceProvider({ ydoc, syncService, fileId, initialSession, disableBc }) {
if (!fileId) {
// We need a file id as a unique identifier for y.js as otherwise state might leak between different files
throw new Error('fileId is required')
}
const WebSocketPolyfill = initWebSocketPolyfill(syncService, fileId, initialSession)
disableBc = disableBc ?? !!window?._oc_debug
const websocketProvider = new WebsocketProvider(
'ws://localhost:1234',
'file:' + fileId,
ydoc,
{ WebSocketPolyfill },
{ WebSocketPolyfill, disableBc },
)
websocketProvider.on('status', event => logger.debug('status', event))
return websocketProvider
Expand Down