Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,13 @@ public static BsonValue getDocumentId(final BsonDocument document) {
return document.get("_id");
}

public static BsonValue getDocumentId(
final BsonDocument document,
final BsonValue defaultValue
) {
return document.get("_id", null);
}

/**
* Returns a copy of the given document.
* @param document the document to copy.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,10 @@
import com.mongodb.client.MongoCollection;

import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import javax.annotation.Nonnull;

import org.bson.BsonArray;
import org.bson.BsonBinary;
import org.bson.BsonBinaryReader;
import org.bson.BsonBinaryWriter;
Expand All @@ -56,16 +52,10 @@ class CoreDocumentSynchronizationConfig {
private final ReadWriteLock docLock;
private ChangeEvent<BsonDocument> lastUncommittedChangeEvent;
private long lastResolution;
private BsonValue lastKnownRemoteVersion;
private BsonDocument lastKnownRemoteVersion;
private boolean isStale;
private boolean isFrozen;

// TODO: How can this be trimmed? The same version could appear after we see it once. That
// may be a non-issue.
// TODO: To get rid of this, an ordering on versions would be needed and would have to be
// abided by other clients sync and non-sync alike.
private Set<BsonValue> committedVersions;

CoreDocumentSynchronizationConfig(
final MongoCollection<CoreDocumentSynchronizationConfig> docsColl,
final MongoNamespace namespace,
Expand All @@ -77,7 +67,6 @@ class CoreDocumentSynchronizationConfig {
this.docLock = new ReentrantReadWriteLock();
this.lastResolution = -1;
this.lastKnownRemoteVersion = null;
this.committedVersions = new HashSet<>();
this.lastUncommittedChangeEvent = null;
this.isStale = false;
}
Expand All @@ -92,7 +81,6 @@ class CoreDocumentSynchronizationConfig {
this.docLock = config.docLock;
this.lastResolution = config.lastResolution;
this.lastKnownRemoteVersion = config.lastKnownRemoteVersion;
this.committedVersions = config.committedVersions;
this.lastUncommittedChangeEvent = config.lastUncommittedChangeEvent;
this.isStale = config.isStale;
this.isFrozen = config.isFrozen;
Expand All @@ -103,16 +91,14 @@ private CoreDocumentSynchronizationConfig(
final BsonValue documentId,
final ChangeEvent<BsonDocument> lastUncommittedChangeEvent,
final long lastResolution,
final BsonValue lastVersion,
final Set<BsonValue> committedVersions,
final BsonDocument lastVersion,
final boolean isStale,
final boolean isFrozen
) {
this.namespace = namespace;
this.documentId = documentId;
this.lastResolution = lastResolution;
this.lastKnownRemoteVersion = lastVersion;
this.committedVersions = committedVersions;
this.lastUncommittedChangeEvent = lastUncommittedChangeEvent;
this.docLock = new ReentrantReadWriteLock();
this.docsColl = null;
Expand Down Expand Up @@ -230,7 +216,7 @@ void setSomePendingWrites(
*/
void setSomePendingWrites(
final long atTime,
final BsonValue atVersion,
final BsonDocument atVersion,
final ChangeEvent<BsonDocument> changeEvent
) {
docLock.writeLock().lock();
Expand All @@ -239,9 +225,6 @@ void setSomePendingWrites(
this.lastResolution = atTime;
this.lastKnownRemoteVersion = atVersion;

if (atVersion != null) {
this.committedVersions.add(atVersion);
}
docsColl.replaceOne(
getDocFilter(namespace, documentId),
this);
Expand All @@ -250,14 +233,12 @@ void setSomePendingWrites(
}
}

void setPendingWritesComplete(final BsonValue atVersion) {
void setPendingWritesComplete(final BsonDocument atVersion) {
docLock.writeLock().lock();
try {
this.lastUncommittedChangeEvent = null;
this.lastKnownRemoteVersion = atVersion;
if (atVersion != null) {
this.committedVersions.add(atVersion);
}

docsColl.replaceOne(
getDocFilter(namespace, documentId),
this);
Expand Down Expand Up @@ -341,7 +322,7 @@ public long getLastResolution() {
}
}

public BsonValue getLastKnownRemoteVersion() {
public BsonDocument getLastKnownRemoteVersion() {
docLock.readLock().lock();
try {
return lastKnownRemoteVersion;
Expand All @@ -350,10 +331,19 @@ public BsonValue getLastKnownRemoteVersion() {
}
}

public boolean hasCommittedVersion(final BsonValue version) {
public boolean hasCommittedVersion(final DocumentVersionInfo versionInfo) {
docLock.readLock().lock();
try {
return committedVersions.contains(version);
final DocumentVersionInfo localVersionInfo =
DocumentVersionInfo.fromVersionDoc(lastKnownRemoteVersion);

return ((versionInfo.hasVersion() && localVersionInfo.hasVersion()
&& (versionInfo.getVersion().getSyncProtocolVersion()
== localVersionInfo.getVersion().getSyncProtocolVersion())
&& (versionInfo.getVersion().getInstanceId()
.equals(localVersionInfo.getVersion().getInstanceId()))
&& (versionInfo.getVersion().getVersionCounter()
<= localVersionInfo.getVersion().getVersionCounter())));
} finally {
docLock.readLock().unlock();
}
Expand Down Expand Up @@ -440,8 +430,6 @@ BsonDocument toBsonDocument() {
// TODO: This may put the doc above the 16MiB but ignore for now.
asDoc.put(ConfigCodec.Fields.LAST_UNCOMMITTED_CHANGE_EVENT, encoded);
}
final BsonArray committedVersions = new BsonArray(new ArrayList<>(this.committedVersions));
asDoc.put(ConfigCodec.Fields.COMMITTED_VERSIONS, committedVersions);
asDoc.put(ConfigCodec.Fields.IS_STALE, new BsonBoolean(isStale));
asDoc.put(ConfigCodec.Fields.IS_FROZEN, new BsonBoolean(isFrozen));
return asDoc;
Expand All @@ -455,7 +443,6 @@ static CoreDocumentSynchronizationConfig fromBsonDocument(final BsonDocument doc
keyPresent(ConfigCodec.Fields.NAMESPACE_FIELD, document);
keyPresent(ConfigCodec.Fields.SCHEMA_VERSION_FIELD, document);
keyPresent(ConfigCodec.Fields.LAST_RESOLUTION_FIELD, document);
keyPresent(ConfigCodec.Fields.COMMITTED_VERSIONS, document);
keyPresent(ConfigCodec.Fields.IS_STALE, document);
keyPresent(ConfigCodec.Fields.IS_FROZEN, document);

Expand All @@ -471,12 +458,10 @@ static CoreDocumentSynchronizationConfig fromBsonDocument(final BsonDocument doc

final MongoNamespace namespace =
new MongoNamespace(document.getString(ConfigCodec.Fields.NAMESPACE_FIELD).getValue());
final BsonArray committedVersionsArr = document.getArray(ConfigCodec.Fields.COMMITTED_VERSIONS);
final Set<BsonValue> committedVersions = new HashSet<>(committedVersionsArr);

final BsonValue lastVersion;
final BsonDocument lastVersion;
if (document.containsKey(ConfigCodec.Fields.LAST_KNOWN_REMOTE_VERSION_FIELD)) {
lastVersion = document.get(ConfigCodec.Fields.LAST_KNOWN_REMOTE_VERSION_FIELD);
lastVersion = document.getDocument(ConfigCodec.Fields.LAST_KNOWN_REMOTE_VERSION_FIELD);
} else {
lastVersion = null;
}
Expand All @@ -498,7 +483,6 @@ static CoreDocumentSynchronizationConfig fromBsonDocument(final BsonDocument doc
lastUncommittedChangeEvent,
document.getNumber(ConfigCodec.Fields.LAST_RESOLUTION_FIELD).longValue(),
lastVersion,
committedVersions,
document.getBoolean(ConfigCodec.Fields.IS_STALE).getValue(),
document.getBoolean(ConfigCodec.Fields.IS_FROZEN, new BsonBoolean(false)).getValue());
}
Expand Down Expand Up @@ -537,7 +521,6 @@ static class Fields {
static final String LAST_RESOLUTION_FIELD = "last_resolution";
static final String LAST_KNOWN_REMOTE_VERSION_FIELD = "last_known_remote_version";
static final String LAST_UNCOMMITTED_CHANGE_EVENT = "last_uncommitted_change_event";
static final String COMMITTED_VERSIONS = "committed_versions";
static final String IS_STALE = "is_stale";
static final String IS_FROZEN = "is_frozen";
}
Expand Down
Loading