Skip to content

Conversation

jsflax
Copy link
Contributor

@jsflax jsflax commented Oct 31, 2018

Ticket Description:
This should implement the same CRUD methods that the non sync service has. All of these methods should only operate on the synced, offline documents.

For replacements, they should use the same set and unset event optimization

  1. Add insertMany, updateMany, deleteMany, aggregate, and count to the DataSynchronizer. This is to be in parity with RemoteMongoCollection.
    a. For deleteMany and updateMany, the documents need to be fetched before the delete to
    retrieve the ids, due to the way canonical filters work.
    b. For updateMany, this will match potential unmodified documents. Ignore those.
    c. For updateMany, it may be an upsert. If it is an upsert, filter the result using the upsertedId
    and emit an insert.
  2. Replace findOneById, updateOneById and deleteOneById with find, updateOne and deleteOne, respectively.
    a. For deleteOne, the document needs to be fetched before the delete to retrieve the id, due to
    the way canonical filters work.
    b. For updateOne, it may be an upsert. If it is an upsert, filter the result using the upsertedId
    and emit an insert.
  3. Proxy each method upwards through CoreSync.
  4. Add AggregateOperation, CoreSyncAggregateIterable, CountOperation, InsertManyOperation, and DeleteManyOperation classes to be instantiated in SyncOperations to maintain parity with Operations (remote).
  5. Add SyncCountOptions, SyncDeleteOptions, SyncUpdateOptions, SyncDeleteResult, SyncInsertManyResult, SyncInsertOneResult, and SyncUpdateResult to both maintain parity with the Remote*Options classes, and to create a divergence between our options and the local mongo client library.
  6. Proxy CoreSync operations upwards through Sync (server) and Sync (Android).

Unit testing to test the new logic was critical here, especially for the bulk operations. Integration tests were limited to ensuring that each new operation operates as intended in an integrated environment. Since no new Sync cases were created, adding new conflicted/failing integration tests seemed unnecessarily cumbersome.

@jsflax
Copy link
Contributor Author

jsflax commented Oct 31, 2018

As a note, the diff isn't very clean in certain areas because I opted to make the method order in parity with the remote methods as well.

@jsflax jsflax requested review from edaniels and adamchel October 31, 2018 00:25
@jsflax
Copy link
Contributor Author

jsflax commented Oct 31, 2018

@edaniels @adamchel PTAL

Copy link
Contributor

@edaniels edaniels left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. Just questions and a comment or two on style/tests

assertTrue(eventSemaphore?.tryAcquire(10, TimeUnit.SECONDS) ?: true)
override fun waitForEvents(amount: Int) {
waitLock.lock()
changeEventListener.totalEventsToAccumulate = amount
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[q] Why is this here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is to give the harness the ability to wait for multiple events. The accumulator actually does nothing– could just be a counter– as the actual events are later accumulated via capturing the values from the spy.

This is being used to test bulk writes, as multiple change events are emitted from a single method call.

private val expectedEvent: ChangeEvent<BsonDocument>?,
var emitEventSemaphore: Semaphore?
) : ChangeEventListener<BsonDocument> {
val eventAccumulator = mutableListOf<ChangeEvent<BsonDocument>>()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[q] Why was this added?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See answer above.

* Adds and returns a document with a new version to the given document.
*
* @param document the document to attach a new version to.
* @param document the document to attach a new version to.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is checkstlye not catching any of these issues?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Checkstyle did not. The DataSynchronizer doesn't seem to actually be formatted, as far as I can tell, and my autoformatter "went to town" on the class. We should probably file a ticket (or just make a patch) to reformat it.

private static BsonDocument withNewVersion(
final BsonDocument document,
final BsonDocument newVersion
final BsonDocument document,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why did this change?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See above.

* Any local updates to this document cause it to be resumed. An example of pausing a document
* is when a conflict is being resolved for that document and the handler throws an exception.
*
* <p>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Stray?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See above. Though technically, this is also fine.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

// else this is an update
if (documentBeforeUpdate == null && updateOptions.isUpsert()) {
config = syncConfig.addSynchronizedDocument(namespace, documentId);
triggerListeningToNamespace(namespace);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Curious, do you have a test that checks that if you were to remove this line, it would fail?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't believe so– I can add a line in our new upsert tests that ensures that the stream is opened on the namespace.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added to testUpsertOne and testUpsertMany in DataSynchronizerUnitTests 👍

final BsonDocument beforeDocument;
final BsonValue documentId = BsonUtils.getDocumentId(afterDocument);

if ((beforeDocument = idToBeforeDocumentMap.get(documentId)) == null
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Comment here

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

final BsonDocument beforeDocument;
final BsonValue documentId = BsonUtils.getDocumentId(afterDocument);

if ((beforeDocument = idToBeforeDocumentMap.get(documentId)) == null
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[nit] Assigning to beforeDocument like this feels kind of odd as opposed to just doing it right before the if

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

Copy link
Contributor

@adamchel adamchel left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

excellent work! I mostly just have nits and minors but I think this will be LGTM once you address my comments

boolean resumeSyncForDocument(@NonNull final BsonValue documentId);

/**
* Counts the number of documents in the collection.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[nit] should the comment be "Counts the number of documents in the collection that are synchronized with the remote collection"?

Task<Long> count();

/**
* Counts the number of documents in the collection that have been synchronized from the remote
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[nit] I think synchronized "with" remote would be more accurate than synchronized "from", since this may count local documents that haven't been synchronized to the remote, and therefore have never been synchronized "from" the remote

Task<Long> count(final Bson filter);

/**
* Counts the number of documents in the collection that have been synchronized from the remote
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[nit] ditto

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

and pretty much everywhere in this file, "synchronized from" should become "synchronized with". not going to comment on this anymore

final Class<ResultT> resultClass);

/**
* Inserts the provided document. If the document is missing an identifier, the client should
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[nit] it should generate one, but will it? if it is going to generate one it should say "If the document is missing an identifier, one will be generated."


import org.bson.conversions.Bson;

class AggregateOperation<T> implements Operation<Collection<T>> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[minor] shouldn't this class be a SyncAggregateOperation? AggregateOperation already exists in another package which might get confusing. this also would make this have parity with the existing SyncFindOperation

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Discussed offline. Because these are package private, we are going to keep these as is.

SyncFindOperation will be whittled down to FindOperation as well.

fun insertManyAndSync(documents: List<Document>): SyncInsertManyResult

/**
* Deletes a single document by the given id. It is first searched for in the local synchronized
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[nit] this comment looks out of date. honestly this file probably doesn't need doc comments for the CRUD methods, this is pretty internal

return UpdateResult.acknowledged(0, 0L, null);
void insertManyAndSync(final MongoNamespace namespace,
final List<BsonDocument> documents) {
getLocalCollection(namespace).insertMany(documents);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[alternatively] instead of generating IDs client-side, and depending on BsonUtils.getDocumentId not failing we could look at the result of the local insertMany (and insertOne in insertOneAndSync), and use the inserted ids there to update the doc config and emit the event

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should note that there are no results from local inserts.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The local client does not return a result.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

my mistake, I assumed it returned a an insertedId like Stitch

filter,
update,
new FindOneAndUpdateOptions()
.collation(updateOptions.getCollation())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[minor] collation, bypassDocumentValidation, and arrayFilters are all unsupported by the remote MongoDB service so this might result in conflicts between local and remote where a document that is able to inserted locally is not able to be inserted remotely. we could either not support those options at all, or just do this with the assumption that we will eventually support the full mongodb surface area

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's no actual way for these to get passed downstream, if you look through the proxies– we do not actually support the options. But we seem to operate within the DataSynchronizer on localmdb without considering Remote.


@Test
fun testDeleteOneById() {
fun testUpsertOne() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[nit] one other test that might be worth adding is to do an update where upsert is specified, but there are documents that match, and we want to verify that no insert occurs

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe that's covered here– but I can add another check to make it clearer.

}

@Test
fun testDeleteManyNoConflicts() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[nit] might be worth throwing in an upsert test here

@jsflax jsflax requested a review from dkaminsky October 31, 2018 20:29

override fun insertOneAndSync(document: Document): RemoteInsertOneResult {
override fun count(filter: Bson): Long {
return Tasks.await(sync.count())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Even if not used currently, expected behavior would be for this to propagate the filter.

Suggested change
return Tasks.await(sync.count())
return Tasks.await(sync.count(filter))

@jsflax
Copy link
Contributor Author

jsflax commented Nov 1, 2018

@adamchel PTAL

Copy link
Contributor

@adamchel adamchel left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@jsflax jsflax merged commit ee20789 into mongodb:master Nov 1, 2018
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants