Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -900,6 +900,39 @@ class SyncMongoClientIntTests : BaseStitchAndroidIntTest() {
}
}

@Test
fun testConfigure() {
val testSync = getTestSync()
val remoteColl = getTestCollRemote()

val docToInsert = Document("hello", "world")
val insertedId = Tasks.await(testSync.insertOneAndSync(docToInsert)).insertedId

var hasConflictHandlerBeenInvoked = false
var hasChangeEventListenerBeenInvoked = false

testSync.configure(
{ _: BsonValue, _: ChangeEvent<Document>, remoteEvent: ChangeEvent<Document> ->
hasConflictHandlerBeenInvoked = true
assertEquals(remoteEvent.fullDocument["fly"], "away")
remoteEvent.fullDocument
},
{ _: BsonValue, _: ChangeEvent<Document> ->
hasChangeEventListenerBeenInvoked = true
},
{ _, _ -> }
)

val sem = watchForEvents(namespace)
Tasks.await(remoteColl.insertOne(Document("_id", insertedId).append("fly", "away")))
sem.acquire()

streamAndSync()

Assert.assertTrue(hasConflictHandlerBeenInvoked)
Assert.assertTrue(hasChangeEventListenerBeenInvoked)
}

private fun streamAndSync() {
val dataSync = (mongoClient as RemoteMongoClientImpl).dataSynchronizer
if (testNetworkMonitor.connectedState) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@

package com.mongodb.stitch.android.services.mongodb.remote;

import android.support.annotation.NonNull;
import android.support.annotation.Nullable;

import com.google.android.gms.tasks.Task;
import com.mongodb.stitch.core.services.mongodb.remote.RemoteDeleteResult;
import com.mongodb.stitch.core.services.mongodb.remote.RemoteInsertOneResult;
Expand Down Expand Up @@ -43,9 +46,9 @@ public interface Sync<DocumentT> {
* document.
* @param errorListener the error listener to invoke when an irrecoverable error occurs
*/
void configure(final ConflictHandler<DocumentT> conflictHandler,
final ChangeEventListener<DocumentT> changeEventListener,
final ErrorListener errorListener);
void configure(@NonNull final ConflictHandler<DocumentT> conflictHandler,
@Nullable final ChangeEventListener<DocumentT> changeEventListener,
@Nullable final ErrorListener errorListener);

/**
* Requests that the given document _id be synchronized.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@

package com.mongodb.stitch.android.services.mongodb.remote.internal;

import android.support.annotation.NonNull;
import android.support.annotation.Nullable;

import com.google.android.gms.tasks.Task;
import com.mongodb.stitch.android.core.internal.common.TaskDispatcher;
import com.mongodb.stitch.android.services.mongodb.remote.Sync;
Expand Down Expand Up @@ -45,9 +48,9 @@ public class SyncImpl<DocumentT> implements Sync<DocumentT> {
}

@Override
public void configure(final ConflictHandler<DocumentT> conflictHandler,
final ChangeEventListener<DocumentT> changeEventListener,
final ErrorListener errorListener) {
public void configure(@NonNull final ConflictHandler<DocumentT> conflictHandler,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[nit] ditto

@Nullable final ChangeEventListener<DocumentT> changeEventListener,
@Nullable final ErrorListener errorListener) {
this.proxy.configure(conflictHandler, changeEventListener, errorListener);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@ public CoreRemoteMongoClientImpl(final CoreStitchServiceClient service,
networkMonitor,
authMonitor
);
this.dataSynchronizer.start();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@

import java.util.Set;

import javax.annotation.Nonnull;
import javax.annotation.Nullable;

import org.bson.BsonValue;
import org.bson.conversions.Bson;

Expand All @@ -33,15 +36,15 @@
public interface CoreSync<DocumentT> {
/**
* Set the conflict resolver and and change event listener on this collection.
* @param conflictResolver the conflict resolver to invoke when a conflict happens between local
* and remote events.
* @param conflictHandler the conflict resolver to invoke when a conflict happens between local
* and remote events.
* @param changeEventListener the event listener to invoke when a change event happens for the
* document.
* @param errorListener the error listener to invoke when an irrecoverable error occurs
*/
void configure(final ConflictHandler<DocumentT> conflictResolver,
final ChangeEventListener<DocumentT> changeEventListener,
final ErrorListener errorListener);
void configure(@Nonnull final ConflictHandler<DocumentT> conflictHandler,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[nit] ditto

@Nullable final ChangeEventListener<DocumentT> changeEventListener,
@Nullable final ErrorListener errorListener);

/**
* Requests that the given document _id be synchronized.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@
import com.mongodb.stitch.core.services.mongodb.remote.sync.ErrorListener;

import java.util.Set;

import javax.annotation.Nonnull;
import javax.annotation.Nullable;

import org.bson.BsonDocument;
Expand All @@ -54,12 +56,12 @@ public CoreSyncImpl(final MongoNamespace namespace,
}

@Override
public void configure(final ConflictHandler<DocumentT> conflictResolver,
final ChangeEventListener<DocumentT> changeEventListener,
final ErrorListener errorListener) {
public void configure(@Nonnull final ConflictHandler<DocumentT> conflictHandler,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[nit] ditto

@Nullable final ChangeEventListener<DocumentT> changeEventListener,
@Nullable final ErrorListener errorListener) {
this.dataSynchronizer.configure(
namespace,
conflictResolver,
conflictHandler,
changeEventListener,
errorListener,
this.service.getCodecRegistry().get(documentClass)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,9 @@
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

import javax.annotation.Nonnull;
import javax.annotation.Nullable;

import org.bson.BsonArray;
import org.bson.BsonBoolean;
import org.bson.BsonDocument;
Expand Down Expand Up @@ -103,6 +106,8 @@ public class DataSynchronizer implements NetworkMonitor.StateListener {

private InstanceSynchronizationConfig syncConfig;
private boolean syncThreadEnabled = true;
private boolean isConfigured = false;
private boolean isRunning = false;
private Thread syncThread;
private long logicalT = 0; // The current logical time or sync iteration.

Expand Down Expand Up @@ -195,24 +200,46 @@ public void reloadConfig() {
networkMonitor,
authMonitor
);
this.isConfigured = false;
this.stop();
} finally {
syncLock.unlock();
}
}

public <T> void configure(@Nonnull final MongoNamespace namespace,
@Nonnull final ConflictHandler<T> conflictHandler,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[nit] ditto

@Nullable final ChangeEventListener<T> changeEventListener,
@Nullable final ErrorListener errorListener,
@Nonnull final Codec<T> codec) {
if (conflictHandler == null) {
logger.warn(
"Invalid configuration: conflictHandler should not be null. "
+ "The DataSynchronizer will not begin syncing until a ConflictHandler has been "
+ "provided.");
return;
}

public <T> void configure(final MongoNamespace namespace,
final ConflictHandler<T> conflictHandler,
final ChangeEventListener<T> changeEventListener,
final ErrorListener errorListener,
final Codec<T> codec) {
this.errorListener = errorListener;

this.syncConfig.getNamespaceConfig(namespace).configure(
conflictHandler,
changeEventListener,
codec
);
this.triggerListeningToNamespace(namespace);

syncLock.lock();
if (!this.isConfigured) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[minor] We should grab the lock first before checking configured otherwise this can be a torn read or write.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

this.isConfigured = true;
syncLock.unlock();
this.triggerListeningToNamespace(namespace);
} else {
syncLock.unlock();
}

if (!isRunning) {
this.start();
}
}

/**
Expand All @@ -221,7 +248,7 @@ public <T> void configure(final MongoNamespace namespace,
public void start() {
syncLock.lock();
try {
if (syncThread != null) {
if (syncThread != null || !this.isConfigured) {
return;
}
instanceChangeStreamListener.start();
Expand All @@ -231,6 +258,7 @@ public void start() {
logger));
if (syncThreadEnabled) {
syncThread.start();
isRunning = true;
}
} finally {
syncLock.unlock();
Expand Down Expand Up @@ -263,6 +291,7 @@ public void stop() {
return;
}
syncThread = null;
isRunning = false;
} finally {
syncLock.unlock();
}
Expand Down Expand Up @@ -1025,6 +1054,7 @@ private void resolveConflict(
} else {
// Update the document locally which will keep the pending writes but with
// a new version next time around.
@SuppressWarnings("unchecked")
final BsonDocument docForStorage =
BsonUtils.documentToBsonDocument(
resolvedDocument,
Expand Down Expand Up @@ -1467,7 +1497,7 @@ private void deleteOneFromRemote(
emitEvent(documentId, changeEventForLocalDelete(namespace, documentId, false));
}

private void triggerListeningToNamespace(final MongoNamespace namespace) {
void triggerListeningToNamespace(final MongoNamespace namespace) {
syncLock.lock();
try {
final NamespaceSynchronizationConfig nsConfig = this.syncConfig.getNamespaceConfig(namespace);
Expand All @@ -1493,6 +1523,15 @@ private void triggerListeningToNamespace(final MongoNamespace namespace) {
}
}

/**
* Whether or not the DataSynchronizer is running in the background.
*
* @return true if running, false if not
*/
public boolean isRunning() {
return isRunning;
}

public boolean areAllStreamsOpen() {
syncLock.lock();
try {
Expand Down
Loading