diff --git a/core/services/mongodb-remote/src/main/java/com/mongodb/stitch/core/services/mongodb/remote/sync/internal/DataSynchronizer.java b/core/services/mongodb-remote/src/main/java/com/mongodb/stitch/core/services/mongodb/remote/sync/internal/DataSynchronizer.java index dacfdf9f5..2c8638d37 100644 --- a/core/services/mongodb-remote/src/main/java/com/mongodb/stitch/core/services/mongodb/remote/sync/internal/DataSynchronizer.java +++ b/core/services/mongodb-remote/src/main/java/com/mongodb/stitch/core/services/mongodb/remote/sync/internal/DataSynchronizer.java @@ -399,6 +399,7 @@ private void syncRemoteToLocal() { "t='%d': syncRemoteToLocal START", logicalT)); + // 2. Run remote to local (R2L) sync routine for (final NamespaceSynchronizationConfig nsConfig : syncConfig) { final Map> remoteChangeEvents = instanceChangeStreamListener.getEventsForNamespace(nsConfig.getNamespace()); @@ -412,6 +413,7 @@ private void syncRemoteToLocal() { latestDocumentMap.put(latestDocument.get("_id"), latestDocument); } + // a. For each unprocessed change event for (final Map.Entry> eventEntry : remoteChangeEvents.entrySet()) { logger.info(String.format( @@ -420,6 +422,7 @@ private void syncRemoteToLocal() { logicalT, eventEntry.getValue().getOperationType())); + // i. Find the corresponding local document config. final CoreDocumentSynchronizationConfig docConfig = nsConfig.getSynchronizedDocument(eventEntry.getKey().asDocument().get("_id")); @@ -433,6 +436,9 @@ private void syncRemoteToLocal() { syncRemoteChangeEventToLocal(nsConfig, docConfig, eventEntry.getValue()); } + // For synchronized documents that had no unprocessed change event, but were marked as stale, + // synthesize a remote replace event to replace the local stale document with the latest + // remote copy. for (final BsonValue docId : unseenIds) { final CoreDocumentSynchronizationConfig docConfig = nsConfig.getSynchronizedDocument(docId); @@ -456,6 +462,9 @@ private void syncRemoteToLocal() { } } + // For synchronized documents that had no unprocessed change event, and did not have a latest + // version when stale documents were queried, synthesize a remote delete event to delete + // the local document. unseenIds.removeAll(latestDocumentMap.keySet()); for (final BsonValue unseenId : unseenIds) { final CoreDocumentSynchronizationConfig docConfig = @@ -517,9 +526,8 @@ private void syncRemoteChangeEventToLocal( docConfig.getDocumentId(), remoteChangeEvent.getOperationType().toString())); - final BsonDocument remoteDoc = remoteChangeEvent.getFullDocument(); final DocumentVersionInfo currentRemoteVersionInfo = DocumentVersionInfo - .getRemoteVersionInfo(remoteDoc); + .getRemoteVersionInfo(remoteChangeEvent.getFullDocument()); if (currentRemoteVersionInfo.hasVersion() && currentRemoteVersionInfo.getVersion().getSyncProtocolVersion() != 1) { @@ -539,6 +547,10 @@ private void syncRemoteChangeEventToLocal( return; } + // ii. If the version info for the unprocessed change event has the same GUID as the local + // document version GUID, and has a version counter less than or equal to the local + // document version version counter, drop the event, as it implies the event has already + // been applied to the local collection. if (docConfig.hasCommittedVersion(currentRemoteVersionInfo)) { // Skip this event since we generated it. logger.info(String.format( @@ -552,6 +564,8 @@ private void syncRemoteChangeEventToLocal( } + // iii. If the document does not have local writes pending, apply the change event to the local + // document and emit a change event for it. if (docConfig.getLastUncommittedChangeEvent() == null) { switch (remoteChangeEvent.getOperationType()) { case REPLACE: @@ -593,34 +607,155 @@ private void syncRemoteChangeEventToLocal( nsConfig.getNamespace(), docConfig.getDocumentId(), remoteChangeEvent.getOperationType().toString())); - break; + return; } } - // Check for pending writes on this version - { - final DocumentVersionInfo lastKnownLocalVersionInfo = DocumentVersionInfo - .getLocalVersionInfo(docConfig); + // At this point, we know there is a pending write for this document, so we will either drop + // the event if we know it is already applied or we know the event is stale, or we will raise a + // conflict. + + // iv. Otherwise, check if the version info of the incoming remote change event is different + // from the version of the local document. + final DocumentVersionInfo lastKnownLocalVersionInfo = DocumentVersionInfo + .getLocalVersionInfo(docConfig); + + // 1. If both the local document version and the remote change event version are empty, drop + // the event. The absence of a version is effectively a version, and the pending write will + // set a version on the next L2R pass if it’s not a delete. + if (!lastKnownLocalVersionInfo.hasVersion() && !currentRemoteVersionInfo.hasVersion()) { + logger.info(String.format( + Locale.US, + "t='%d': syncRemoteChangeEventToLocal ns=%s documentId=%s remote and local have same " + + "empty version but a write is pending; waiting for next L2R pass", + logicalT, + nsConfig.getNamespace(), + docConfig.getDocumentId())); + return; + } + + // 2. If either the local document version or the remote change event version are empty, raise + // a conflict. The absence of a version is effectively a version, and a remote change event + // with no version indicates a document that may have been committed by another client not + // adhering to the mobile sync protocol. + if (!lastKnownLocalVersionInfo.hasVersion() || !currentRemoteVersionInfo.hasVersion()) { + logger.info(String.format( + Locale.US, + "t='%d': syncRemoteChangeEventToLocal ns=%s documentId=%s remote and local have same " + + "empty version but a write is pending; waiting for next L2R pass", + logicalT, + nsConfig.getNamespace(), + docConfig.getDocumentId())); + resolveConflict(nsConfig.getNamespace(), docConfig, remoteChangeEvent); + return; + } - // There is a pending write that must skip R2L if both versions are empty. The absence of a - // version is effectively a version. The pending write, if it's not a delete, should be - // setting a new version anyway. - if (!lastKnownLocalVersionInfo.hasVersion() && !currentRemoteVersionInfo.hasVersion()) { + // 3. Check if the GUID of the two versions are the same. + final DocumentVersionInfo.Version localVersion = lastKnownLocalVersionInfo.getVersion(); + final DocumentVersionInfo.Version remoteVersion = currentRemoteVersionInfo.getVersion(); + if (localVersion.instanceId.equals(remoteVersion.instanceId)) { + // a. If the GUIDs are the same, compare the version counter of the remote change event with + // the version counter of the local document + if (localVersion.versionCounter <= remoteVersion.versionCounter) { + // i. drop the event if the version counter of the remote event less than or equal to the + // version counter of the local document logger.info(String.format( - Locale.US, - "t='%d': syncRemoteChangeEventToLocal ns=%s documentId=%s remote and local have same " - + "empty version but a write is pending; waiting for next L2R pass", - logicalT, - nsConfig.getNamespace(), - docConfig.getDocumentId())); + Locale.US, + "t='%d': syncRemoteChangeEventToLocal ns=%s documentId=%s remote change event " + + "is stale; dropping the event", + logicalT, + nsConfig.getNamespace(), + docConfig.getDocumentId())); + return; + } else { + // ii. raise a conflict if the version counter of the remote event is greater than the + // version counter of the local document + logger.info(String.format( + Locale.US, + "t='%d': syncRemoteChangeEventToLocal ns=%s documentId=%s remote event version " + + "has higher counter than local version but a write is pending; " + + "raising conflict", + logicalT, + nsConfig.getNamespace(), + docConfig.getDocumentId())); + resolveConflict( + nsConfig.getNamespace(), + docConfig, + remoteChangeEvent); return; } } + // b. If the GUIDs are different, do a full document lookup against the remote server to + // fetch the latest version (this is to guard against the case where the unprocessed + // change event is stale). + final BsonDocument newestRemoteDocument = this.getRemoteCollection(nsConfig.getNamespace()) + .find(new Document("_id", docConfig.getDocumentId())).first(); + + if (newestRemoteDocument == null) { + // i. If the document is not found with a remote lookup, this means the document was + // deleted remotely, so raise a conflict using a synthesized delete event as the remote + // change event. + logger.info(String.format( + Locale.US, + "t='%d': syncRemoteChangeEventToLocal ns=%s documentId=%s remote event version " + + "stale and latest document lookup indicates a remote delete occurred, but " + + "a write is pending; raising conflict", + logicalT, + nsConfig.getNamespace(), + docConfig.getDocumentId())); + resolveConflict( + nsConfig.getNamespace(), + docConfig, + changeEventForLocalDelete( + nsConfig.getNamespace(), + docConfig.getDocumentId(), + docConfig.hasUncommittedWrites())); + return; + } + + + final DocumentVersionInfo newestRemoteVersionInfo = + DocumentVersionInfo.getRemoteVersionInfo(newestRemoteDocument); + + // ii. If the current GUID of the remote document (as determined by this lookup) is equal + // to the GUID of the local document, drop the event. We’re believed to be behind in + // the change stream at this point. + if (newestRemoteVersionInfo.hasVersion() + && newestRemoteVersionInfo.getVersion().getInstanceId() + .equals(localVersion.instanceId)) { + + logger.info(String.format( + Locale.US, + "t='%d': syncRemoteChangeEventToLocal ns=%s documentId=%s latest document lookup " + + "indicates that this is a stale event; dropping the event", + logicalT, + nsConfig.getNamespace(), + docConfig.getDocumentId())); + return; + + } + + // iii. If the current GUID of the remote document is not equal to the GUID of the local + // document, raise a conflict using a synthesized replace event as the remote change + // event. This means the remote document is a legitimately new document and we should + // handle the conflict. + logger.info(String.format( + Locale.US, + "t='%d': syncRemoteChangeEventToLocal ns=%s documentId=%s latest document lookup " + + "indicates a remote replace occurred, but a local write is pending; raising " + + "conflict with synthesized replace event", + logicalT, + nsConfig.getNamespace(), + docConfig.getDocumentId())); resolveConflict( - nsConfig.getNamespace(), - docConfig, - remoteChangeEvent); + nsConfig.getNamespace(), + docConfig, + changeEventForLocalReplace( + nsConfig.getNamespace(), + docConfig.getDocumentId(), + newestRemoteDocument, + docConfig.hasUncommittedWrites())); } /** @@ -634,11 +769,13 @@ private void syncLocalToRemote() { "t='%d': syncLocalToRemote START", logicalT)); + // 1. Run local to remote (L2R) sync routine // Search for modifications in each namespace. for (final NamespaceSynchronizationConfig nsConfig : syncConfig) { final CoreRemoteMongoCollection remoteColl = getRemoteCollection(nsConfig.getNamespace()); + // a. For each document that has local writes pending for (final CoreDocumentSynchronizationConfig docConfig : nsConfig) { if (!docConfig.hasUncommittedWrites() || docConfig.isFrozen()) { continue; @@ -654,6 +791,7 @@ private void syncLocalToRemote() { continue; } + // i. Retrieve the change event for this local document in the local config metadata final ChangeEvent localChangeEvent = docConfig.getLastUncommittedChangeEvent(); logger.info(String.format( @@ -678,208 +816,275 @@ private void syncLocalToRemote() { DocumentVersionInfo.getLocalVersionInfo(docConfig); final BsonDocument nextVersion; - switch (localChangeEvent.getOperationType()) { - case INSERT: { - nextVersion = DocumentVersionInfo.getFreshVersionDocument(); - // It's possible that we may insert after a delete happened and we didn't get a - // notification for it. There's nothing we can do about this. - try { - remoteColl.insertOne(withNewVersion(localChangeEvent.getFullDocument(), nextVersion)); - } catch (final StitchServiceException ex) { - if (ex.getErrorCode() != StitchServiceErrorCode.MONGODB_ERROR - || !ex.getMessage().contains("E11000")) { - this.emitError(docConfig, String.format( + // ii. Check if the internal remote change stream listener has an unprocessed event for + // this document. + final ChangeEvent unprocessedRemoteEvent = + instanceChangeStreamListener.getUnprocessedEventForDocumentId( + nsConfig.getNamespace(), + docConfig.getDocumentId()); + + if (unprocessedRemoteEvent != null) { + final DocumentVersionInfo unprocessedEventVersion = + DocumentVersionInfo.getRemoteVersionInfo( + unprocessedRemoteEvent.getFullDocument() + ); + + // 1. If it does and the version info is different, record that a conflict has occurred. + // Difference is determined if either the GUID is different or the version counter is + // greater than the local version counter. + if (!docConfig.hasCommittedVersion(unprocessedEventVersion)) { + isConflicted = true; + logger.info(String.format( Locale.US, - "t='%d': syncLocalToRemote ns=%s documentId=%s exception inserting: %s", + "t='%d': syncLocalToRemote ns=%s documentId=%s version different on " + + "unprocessed change event for document; raising conflict", logicalT, nsConfig.getNamespace(), - docConfig.getDocumentId(), - ex), ex); - continue; - } - logger.info(String.format( - Locale.US, - "t='%d': syncLocalToRemote ns=%s documentId=%s duplicate key exception on " - + "insert; raising conflict", - logicalT, - nsConfig.getNamespace(), - docConfig.getDocumentId())); - isConflicted = true; - } - break; + docConfig.getDocumentId())); } - case REPLACE: { - if (localDoc == null) { - final IllegalStateException illegalStateException = new IllegalStateException( - "expected document to exist for local replace change event: %s"); + // 2. Otherwise, the unprocessed event can be safely dropped and ignored in future R2L + // passes. Continue on to checking the operation type. + } - emitError( - docConfig, - illegalStateException.getMessage(), - illegalStateException - ); - continue; - } - nextVersion = localVersionInfo.getNextVersion(); - final BsonDocument nextDoc = withNewVersion(localDoc, nextVersion); - - final RemoteUpdateResult result; - try { - result = remoteColl.updateOne( - localVersionInfo.getFilter(), - nextDoc); - } catch (final StitchServiceException ex) { - this.emitError( - docConfig, - String.format( - Locale.US, - "t='%d': syncLocalToRemote ns=%s documentId=%s exception replacing: %s", - logicalT, - nsConfig.getNamespace(), - docConfig.getDocumentId(), - ex), - ex - ); - continue; - } - if (result.getMatchedCount() == 0) { - isConflicted = true; - logger.info(String.format( - Locale.US, - "t='%d': syncLocalToRemote ns=%s documentId=%s version different on replaced " - + "document or document deleted; raising conflict", - logicalT, - nsConfig.getNamespace(), - docConfig.getDocumentId())); + if (!isConflicted) { + // iii. Check the operation type + switch (localChangeEvent.getOperationType()) { + // 1. INSERT + case INSERT: { + nextVersion = DocumentVersionInfo.getFreshVersionDocument(); + + // It's possible that we may insert after a delete happened and we didn't get a + // notification for it. There's nothing we can do about this. + + // a. Insert document into remote database + try { + remoteColl.insertOne( + withNewVersion(localChangeEvent.getFullDocument(), nextVersion)); + } catch (final StitchServiceException ex) { + // b. If an error happens: + + // i. That is not a duplicate key exception, report an error to the error listener. + if (ex.getErrorCode() != StitchServiceErrorCode.MONGODB_ERROR + || !ex.getMessage().contains("E11000")) { + this.emitError(docConfig, String.format( + Locale.US, + "t='%d': syncLocalToRemote ns=%s documentId=%s exception inserting: %s", + logicalT, + nsConfig.getNamespace(), + docConfig.getDocumentId(), + ex), ex); + continue; + } + + // ii. Otherwise record that a conflict has occurred. + logger.info(String.format( + Locale.US, + "t='%d': syncLocalToRemote ns=%s documentId=%s duplicate key exception on " + + "insert; raising conflict", + logicalT, + nsConfig.getNamespace(), + docConfig.getDocumentId())); + isConflicted = true; + } + break; } - break; - } - case UPDATE: { - if (localDoc == null) { - final IllegalStateException illegalStateException = new IllegalStateException( - "expected document to exist for local update change event"); - emitError( - docConfig, - illegalStateException.getMessage(), - illegalStateException - ); - continue; - } - nextVersion = localVersionInfo.getNextVersion(); - final BsonDocument nextDoc = withNewVersion(localDoc, nextVersion); - - final BsonDocument translatedUpdate = new BsonDocument(); - if (!localChangeEvent.getUpdateDescription().getUpdatedFields().isEmpty()) { - final BsonDocument sets = new BsonDocument(); - for (final Map.Entry fieldValue : - localChangeEvent.getUpdateDescription().getUpdatedFields().entrySet()) { - sets.put(fieldValue.getKey(), fieldValue.getValue()); + + // 2. REPLACE + case REPLACE: { + if (localDoc == null) { + final IllegalStateException illegalStateException = new IllegalStateException( + "expected document to exist for local replace change event: %s"); + + emitError( + docConfig, + illegalStateException.getMessage(), + illegalStateException + ); + continue; } - sets.put(DOCUMENT_VERSION_FIELD, nextVersion); - translatedUpdate.put("$set", sets); - } - if (!localChangeEvent.getUpdateDescription().getRemovedFields().isEmpty()) { - final BsonDocument unsets = new BsonDocument(); - for (final String field : - localChangeEvent.getUpdateDescription().getRemovedFields()) { - unsets.put(field, BsonBoolean.TRUE); + nextVersion = localVersionInfo.getNextVersion(); + final BsonDocument nextDoc = withNewVersion(localDoc, nextVersion); + + // a. Update the document in the remote database using a query for the _id and the + // version with an update containing the replacement document with the version + // counter incremented by 1. + final RemoteUpdateResult result; + try { + result = remoteColl.updateOne( + localVersionInfo.getFilter(), + nextDoc); + } catch (final StitchServiceException ex) { + // b. If an error happens, report an error to the error listener. + this.emitError( + docConfig, + String.format( + Locale.US, + "t='%d': syncLocalToRemote ns=%s documentId=%s exception " + + "replacing: %s", + logicalT, + nsConfig.getNamespace(), + docConfig.getDocumentId(), + ex), + ex + ); + continue; } - translatedUpdate.put("$unset", unsets); + // c. If no documents are matched, record that a conflict has occurred. + if (result.getMatchedCount() == 0) { + isConflicted = true; + logger.info(String.format( + Locale.US, + "t='%d': syncLocalToRemote ns=%s documentId=%s version different on " + + "replaced document or document deleted; raising conflict", + logicalT, + nsConfig.getNamespace(), + docConfig.getDocumentId())); + } + break; } - final RemoteUpdateResult result; - try { - result = remoteColl.updateOne( - localVersionInfo.getFilter(), - translatedUpdate.isEmpty() - // See: changeEventForLocalUpdate for why we do this - ? nextDoc : translatedUpdate); - } catch (final StitchServiceException ex) { - emitError( - docConfig, - String.format( - Locale.US, - "t='%d': syncLocalToRemote ns=%s documentId=%s exception updating: %s", - logicalT, - nsConfig.getNamespace(), - docConfig.getDocumentId(), - ex), - ex - ); - continue; - } - if (result.getMatchedCount() == 0) { - isConflicted = true; - logger.info(String.format( - Locale.US, - "t='%d': syncLocalToRemote ns=%s documentId=%s version different on updated " - + "document or document deleted; raising conflict", - logicalT, - nsConfig.getNamespace(), - docConfig.getDocumentId())); - } - break; - } + // 3. UPDATE + case UPDATE: { + if (localDoc == null) { + final IllegalStateException illegalStateException = new IllegalStateException( + "expected document to exist for local update change event"); + emitError( + docConfig, + illegalStateException.getMessage(), + illegalStateException + ); + continue; + } + nextVersion = localVersionInfo.getNextVersion(); + final BsonDocument nextDoc = withNewVersion(localDoc, nextVersion); + + // a. Update the document in the remote database using a query for the _id and the + // version with an update containing the replacement document with the version + // counter incremented by 1. + final BsonDocument translatedUpdate = new BsonDocument(); + if (!localChangeEvent.getUpdateDescription().getUpdatedFields().isEmpty()) { + final BsonDocument sets = new BsonDocument(); + for (final Map.Entry fieldValue : + localChangeEvent.getUpdateDescription().getUpdatedFields().entrySet()) { + sets.put(fieldValue.getKey(), fieldValue.getValue()); + } + sets.put(DOCUMENT_VERSION_FIELD, nextVersion); + translatedUpdate.put("$set", sets); + } + if (!localChangeEvent.getUpdateDescription().getRemovedFields().isEmpty()) { + final BsonDocument unsets = new BsonDocument(); + for (final String field : + localChangeEvent.getUpdateDescription().getRemovedFields()) { + unsets.put(field, BsonBoolean.TRUE); + } + translatedUpdate.put("$unset", unsets); + } - case DELETE: { - nextVersion = null; - final RemoteDeleteResult result; - try { - result = remoteColl.deleteOne(DocumentVersionInfo.getVersionedFilter( - docConfig.getDocumentId(), - docConfig.getLastKnownRemoteVersion())); - } catch (final StitchServiceException ex) { - emitError( - docConfig, - String.format( - Locale.US, - "t='%d': syncLocalToRemote ns=%s documentId=%s exception deleting: %s", - logicalT, - nsConfig.getNamespace(), - docConfig.getDocumentId(), - ex), - ex - ); - continue; - } - if (result.getDeletedCount() == 0) { - remoteDocument = remoteColl.find(docFilter).first(); - remoteDocumentFetched = true; - if (remoteDocument != null) { + final RemoteUpdateResult result; + try { + result = remoteColl.updateOne( + localVersionInfo.getFilter(), + translatedUpdate.isEmpty() + // See: changeEventForLocalUpdate for why we do this + ? nextDoc : translatedUpdate); + } catch (final StitchServiceException ex) { + // b. If an error happens, report an error to the error listener. + emitError( + docConfig, + String.format( + Locale.US, + "t='%d': syncLocalToRemote ns=%s documentId=%s exception " + + "updating: %s", + logicalT, + nsConfig.getNamespace(), + docConfig.getDocumentId(), + ex), + ex + ); + continue; + } + if (result.getMatchedCount() == 0) { + // c. If no documents are matched, record that a conflict has occurred. isConflicted = true; logger.info(String.format( - Locale.US, - "t='%d': syncLocalToRemote ns=%s documentId=%s version different on removed " - + "document; raising conflict", - logicalT, - nsConfig.getNamespace(), - docConfig.getDocumentId())); + Locale.US, + "t='%d': syncLocalToRemote ns=%s documentId=%s version different on " + + "updated document or document deleted; raising conflict", + logicalT, + nsConfig.getNamespace(), + docConfig.getDocumentId())); + } + break; + } + + case DELETE: { + nextVersion = null; + final RemoteDeleteResult result; + // a. Delete the document in the remote database using a query for the _id and the + // version. + try { + result = remoteColl.deleteOne(localVersionInfo.getFilter()); + } catch (final StitchServiceException ex) { + // b. If an error happens, report an error to the error listener. + emitError( + docConfig, + String.format( + Locale.US, + "t='%d': syncLocalToRemote ns=%s documentId=%s exception " + + " deleting: %s", + logicalT, + nsConfig.getNamespace(), + docConfig.getDocumentId(), + ex), + ex + ); + continue; + } + // c. If no documents are matched, record that a conflict has occurred. + if (result.getDeletedCount() == 0) { + remoteDocument = remoteColl.find(docFilter).first(); + remoteDocumentFetched = true; + if (remoteDocument != null) { + isConflicted = true; + logger.info(String.format( + Locale.US, + "t='%d': syncLocalToRemote ns=%s documentId=%s version different on " + + "removed document; raising conflict", + logicalT, + nsConfig.getNamespace(), + docConfig.getDocumentId())); + } else { + // d. Desynchronize the document if there is no conflict, or if fetching a remote + // document after the conflict is raised returns no remote document. + desyncDocumentFromRemote(nsConfig.getNamespace(), docConfig.getDocumentId()); + } } else { desyncDocumentFromRemote(nsConfig.getNamespace(), docConfig.getDocumentId()); } - } else { - desyncDocumentFromRemote(nsConfig.getNamespace(), docConfig.getDocumentId()); + break; } - break; - } - default: - emitError( - docConfig, - String.format( - Locale.US, - "t='%d': syncLocalToRemote ns=%s documentId=%s unknown operation type occurred " - + "on the document: %s; dropping the event", - logicalT, - nsConfig.getNamespace(), - docConfig.getDocumentId(), - localChangeEvent.getOperationType().toString()) - ); - continue; + default: + emitError( + docConfig, + String.format( + Locale.US, + "t='%d': syncLocalToRemote ns=%s documentId=%s unknown operation " + + "type occurred on the document: %s; dropping the event", + logicalT, + nsConfig.getNamespace(), + docConfig.getDocumentId(), + localChangeEvent.getOperationType().toString()) + ); + continue; + } + } else { + nextVersion = null; } + logger.info(String.format( Locale.US, "t='%d': syncLocalToRemote ns=%s documentId=%s conflict=%s", @@ -888,7 +1093,37 @@ private void syncLocalToRemote() { docConfig.getDocumentId(), isConflicted)); - if (isConflicted) { + if (!isConflicted) { + // iv. If no conflict has occurred, move on to the remote to local sync routine. + + // TODO(STITCH-1972): This event may contain old version info. We should be filtering out + // the version anyway from local and remote events. + final ChangeEvent committedEvent = + docConfig.getLastUncommittedChangeEvent(); + emitEvent(docConfig.getDocumentId(), new ChangeEvent<>( + committedEvent.getId(), + committedEvent.getOperationType(), + committedEvent.getFullDocument(), + committedEvent.getNamespace(), + committedEvent.getDocumentKey(), + committedEvent.getUpdateDescription(), + false)); + + if (nextVersion == null) { + final IllegalStateException illegalStateException = new IllegalStateException( + "expected completed write to have version"); + + emitError( + docConfig, + illegalStateException.getMessage(), + illegalStateException + ); + } + docConfig.setPendingWritesComplete(nextVersion); + } else { + // v. Otherwise, invoke the collection-level conflict handler with the local change event + // and the remote change event (synthesized by doing a lookup of the document or + // sourced from the listener) final ChangeEvent remoteChangeEvent; if (!remoteDocumentFetched) { remoteChangeEvent = @@ -902,23 +1137,9 @@ private void syncLocalToRemote() { } resolveConflict( - nsConfig.getNamespace(), - docConfig, - remoteChangeEvent); - } else { - // TODO(STITCH-1972): This event may contain old version info. We should be filtering out - // the version anyway from local and remote events. - final ChangeEvent committedEvent = - docConfig.getLastUncommittedChangeEvent(); - emitEvent(docConfig.getDocumentId(), new ChangeEvent<>( - committedEvent.getId(), - committedEvent.getOperationType(), - committedEvent.getFullDocument(), - committedEvent.getNamespace(), - committedEvent.getDocumentKey(), - committedEvent.getUpdateDescription(), - false)); - docConfig.setPendingWritesComplete(nextVersion); + nsConfig.getNamespace(), + docConfig, + remoteChangeEvent); } } } @@ -927,6 +1148,9 @@ private void syncLocalToRemote() { Locale.US, "t='%d': syncLocalToRemote END", logicalT)); + + // 3. If there are still local writes pending for the document, it will go through the L2R + // phase on a subsequent pass and try to commit changes again. } private void emitError(final CoreDocumentSynchronizationConfig docConfig, @@ -956,7 +1180,8 @@ public Object call() { docConfig.setFrozen(true); this.logger.error(msg); - this.logger.error(String.format("Setting document %s to frozen", docConfig.getDocumentId())); + this.logger.error( + String.format("Setting document %s to frozen", docConfig.getDocumentId())); } @@ -978,8 +1203,8 @@ private void resolveConflict( if (this.syncConfig.getNamespaceConfig(namespace).getConflictHandler() == null) { logger.warn(String.format( Locale.US, - "t='%d': resolveConflict ns=%s documentId=%s no conflict resolver set; cannot resolve " - + "yet", + "t='%d': resolveConflict ns=%s documentId=%s no conflict resolver set; cannot " + + "resolve yet", logicalT, namespace, docConfig.getDocumentId())); @@ -996,6 +1221,7 @@ private void resolveConflict( docConfig.getLastUncommittedChangeEvent().getOperationType(), remoteEvent.getOperationType())); + // 2. Based on the result of the handler determine the next state of the document. final Object resolvedDocument; final ChangeEvent transformedRemoteEvent; try { @@ -1048,6 +1274,7 @@ private void resolveConflict( || (remoteEvent.getFullDocument() != null && transformedRemoteEvent.getFullDocument().equals(resolvedDocument)); + // a. If the resolved document is null: if (resolvedDocument == null) { logger.info(String.format( Locale.US, @@ -1058,11 +1285,17 @@ private void resolveConflict( docConfig.getDocumentId())); if (acceptRemote) { + // i. If the remote event was a DELETE, delete the document locally, desynchronize the + // document, and emit a change event for the deletion. deleteOneFromRemote(namespace, docConfig.getDocumentId()); } else { + // ii. Otherwise, delete the document locally, mark that there are pending writes for this + // document, and emit a change event for the deletion. deleteOneFromResolution(namespace, docConfig.getDocumentId(), remoteVersion); } } else { + // b. If the resolved document is not null: + // Update the document locally which will keep the pending writes but with // a new version next time around. @SuppressWarnings("unchecked") @@ -1073,20 +1306,24 @@ private void resolveConflict( logger.info(String.format( Locale.US, - "t='%d': resolveConflict ns=%s documentId=%s replacing local with resolved document with " - + "remote version acknowledged: %s", + "t='%d': resolveConflict ns=%s documentId=%s replacing local with resolved document " + + "with remote version acknowledged: %s", logicalT, namespace, docConfig.getDocumentId(), docForStorage.toJson())); - if (acceptRemote) { + // i. If the remote document is equal to the resolved document, replace the document + // locally, mark the document as having no pending writes, and emit a REPLACE change + // event. replaceOrUpsertOneFromRemote( namespace, docConfig.getDocumentId(), docForStorage, remoteVersion); } else { + // ii. Otherwise, replace the local document with the resolved document locally, mark that + // there are pending writes for this document, and emit a REPLACE change event. replaceOrUpsertOneFromResolution( namespace, docConfig.getDocumentId(), @@ -1150,10 +1387,12 @@ private ChangeEvent getSynthesizedRemoteChangeEventForDocument( final BsonValue documentId, final BsonDocument document ) { + // a. When the document is looked up, if it cannot be found the synthesized change event is a + // DELETE, otherwise it's a REPLACE. if (document == null) { return changeEventForLocalDelete(ns, documentId, false); } - return changeEventForLocalInsert(ns, document, false); + return changeEventForLocalReplace(ns, documentId, document, false); } /** diff --git a/core/services/mongodb-remote/src/main/java/com/mongodb/stitch/core/services/mongodb/remote/sync/internal/DocumentVersionInfo.java b/core/services/mongodb-remote/src/main/java/com/mongodb/stitch/core/services/mongodb/remote/sync/internal/DocumentVersionInfo.java index 9b097533c..9950e8195 100644 --- a/core/services/mongodb-remote/src/main/java/com/mongodb/stitch/core/services/mongodb/remote/sync/internal/DocumentVersionInfo.java +++ b/core/services/mongodb-remote/src/main/java/com/mongodb/stitch/core/services/mongodb/remote/sync/internal/DocumentVersionInfo.java @@ -109,7 +109,9 @@ private DocumentVersionInfo( } /** - * Returns whether this version is non-empty (i.e. a version from a document with no version) + * Returns whether this version is non-empty (i.e. a version from a document with no version). + * The absence of a version is effectively a version, and should be treated as such by consumers + * of this method. * @return true if the version is non-empty, false if the version is empty. */ boolean hasVersion() { diff --git a/core/services/mongodb-remote/src/main/java/com/mongodb/stitch/core/services/mongodb/remote/sync/internal/InstanceChangeStreamListener.java b/core/services/mongodb-remote/src/main/java/com/mongodb/stitch/core/services/mongodb/remote/sync/internal/InstanceChangeStreamListener.java index 0fc23e799..0217b4ab3 100644 --- a/core/services/mongodb-remote/src/main/java/com/mongodb/stitch/core/services/mongodb/remote/sync/internal/InstanceChangeStreamListener.java +++ b/core/services/mongodb-remote/src/main/java/com/mongodb/stitch/core/services/mongodb/remote/sync/internal/InstanceChangeStreamListener.java @@ -21,6 +21,8 @@ import java.util.Map; +import javax.annotation.Nullable; + import org.bson.BsonDocument; import org.bson.BsonValue; @@ -86,4 +88,16 @@ void removeWatcher(final MongoNamespace namespace, */ Map> getEventsForNamespace( final MongoNamespace namespace); + + /** + * If there is an unprocessed change event for a particular document ID, fetch it from the + * appropriate namespace change stream listener, and remove it. By reading the event here, we are + * assuming it will be processed by the consumer. + * + * @return the latest unprocessed change event for the given document ID and namespace, or null + * if none exists. + */ + @Nullable ChangeEvent getUnprocessedEventForDocumentId( + final MongoNamespace namespace, + final BsonValue documentId); } diff --git a/core/services/mongodb-remote/src/main/java/com/mongodb/stitch/core/services/mongodb/remote/sync/internal/InstanceChangeStreamListenerImpl.java b/core/services/mongodb-remote/src/main/java/com/mongodb/stitch/core/services/mongodb/remote/sync/internal/InstanceChangeStreamListenerImpl.java index fa973d35b..69cb2923a 100644 --- a/core/services/mongodb-remote/src/main/java/com/mongodb/stitch/core/services/mongodb/remote/sync/internal/InstanceChangeStreamListenerImpl.java +++ b/core/services/mongodb-remote/src/main/java/com/mongodb/stitch/core/services/mongodb/remote/sync/internal/InstanceChangeStreamListenerImpl.java @@ -26,9 +26,13 @@ import java.util.Map; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; + +import javax.annotation.Nullable; + import org.bson.BsonDocument; import org.bson.BsonValue; + final class InstanceChangeStreamListenerImpl implements InstanceChangeStreamListener { private final Map nsStreamers; @@ -210,4 +214,31 @@ public Map> getEventsForNamespace( } return streamer.getEvents(); } + + /** + * If there is an unprocessed change event for a particular document ID, fetch it from the + * appropriate namespace change stream listener, and remove it. By reading the event here, we are + * assuming it will be processed by the consumer. + * + * @return the latest unprocessed change event for the given document ID and namespace, or null + * if none exists. + */ + public @Nullable ChangeEvent getUnprocessedEventForDocumentId( + final MongoNamespace namespace, + final BsonValue documentId + ) { + this.instanceLock.readLock().lock(); + final NamespaceChangeStreamListener streamer; + try { + streamer = nsStreamers.get(namespace); + } finally { + this.instanceLock.readLock().unlock(); + } + + if (streamer == null) { + return null; + } + + return streamer.getUnprocessedEventForDocumentId(documentId); + } } diff --git a/core/services/mongodb-remote/src/main/java/com/mongodb/stitch/core/services/mongodb/remote/sync/internal/NamespaceChangeStreamListener.java b/core/services/mongodb-remote/src/main/java/com/mongodb/stitch/core/services/mongodb/remote/sync/internal/NamespaceChangeStreamListener.java index 893b0851f..c73d06ed4 100644 --- a/core/services/mongodb-remote/src/main/java/com/mongodb/stitch/core/services/mongodb/remote/sync/internal/NamespaceChangeStreamListener.java +++ b/core/services/mongodb-remote/src/main/java/com/mongodb/stitch/core/services/mongodb/remote/sync/internal/NamespaceChangeStreamListener.java @@ -37,6 +37,8 @@ import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; +import javax.annotation.Nullable; + import org.bson.BsonDocument; import org.bson.BsonValue; import org.bson.Document; @@ -258,7 +260,7 @@ void storeNextEvent() { } /** - * Returns the latest change events. + * Returns the latest change events, and clears them from the change stream listener. * * @return the latest change events. */ @@ -280,4 +282,31 @@ public Map> getEvents() { nsLock.writeLock().unlock(); } } + + /** + * If there is an unprocessed change event for a particular document ID, fetch it from the + * change stream listener, and remove it. By reading the event here, we are assuming it will be + * processed by the consumer. + * + * @return the latest unprocessed change event for the given document ID, or null if none exists. + */ + public @Nullable ChangeEvent getUnprocessedEventForDocumentId( + final BsonValue documentId + ) { + final ChangeEvent event; + nsLock.readLock().lock(); + try { + event = this.events.get(documentId); + } finally { + nsLock.readLock().unlock(); + } + + nsLock.writeLock().lock(); + try { + this.events.remove(documentId); + return event; + } finally { + nsLock.writeLock().unlock(); + } + } } diff --git a/server/services/mongodb-remote/src/test/java/com/mongodb/stitch/server/services/mongodb/remote/internal/SyncMongoClientIntTests.kt b/server/services/mongodb-remote/src/test/java/com/mongodb/stitch/server/services/mongodb/remote/internal/SyncMongoClientIntTests.kt index 1713a3d17..bb7a0561c 100644 --- a/server/services/mongodb-remote/src/test/java/com/mongodb/stitch/server/services/mongodb/remote/internal/SyncMongoClientIntTests.kt +++ b/server/services/mongodb-remote/src/test/java/com/mongodb/stitch/server/services/mongodb/remote/internal/SyncMongoClientIntTests.kt @@ -1051,7 +1051,7 @@ class SyncMongoClientIntTests : BaseStitchServerIntTest() { coll.configure( failingConflictHandler, null, - ErrorListener { documentId, error -> errorEmittedSem.release() }) + ErrorListener { _, _ -> errorEmittedSem.release() }) remoteColl.insertOne(docToInsert)