Skip to content

Commit

Permalink
don't assume it's possible to resume from nowhere since Mongo will th…
Browse files Browse the repository at this point in the history
…row an error
  • Loading branch information
ericvergnaud committed Oct 21, 2021
1 parent b351960 commit 85a7380
Showing 1 changed file with 6 additions and 10 deletions.
16 changes: 6 additions & 10 deletions MongoStore/src/main/java/prompto/store/mongo/MongoAuditor.java
Original file line number Diff line number Diff line change
Expand Up @@ -87,13 +87,15 @@ else if(thread.getState()!=State.NEW)

private void watchInstancesChanges() {
List<Bson> filters = Collections.singletonList(Aggregates.match(Filters.in("operationType", Arrays.asList("insert", "update", "delete"))));
BsonTimestamp resumeTimestamp = fetchLastAuditTimestamp();
if(resumeTimestamp==null)
resumeTimestamp = computeResumeTimestamp();
ChangeStreamIterable<Document> stream = store.getInstancesCollection()
.watch(filters)
.startAtOperationTime(resumeTimestamp)
.fullDocument(FullDocument.UPDATE_LOOKUP);
BsonTimestamp resumeTimestamp = fetchLastAuditTimestamp();
if(resumeTimestamp!=null) {
logger.info(()->"Resuming audit from " + LocalDateTime.ofEpochSecond(resumeTimestamp.getTime(), 0, ZoneOffset.UTC));
stream = stream.startAtOperationTime(resumeTimestamp);
} else
logger.warn(()->"Starting audit without a resume timestamp");
try (MongoChangeStreamCursor<ChangeStreamDocument<Document>> cursor = stream.cursor()) {
while(!isTerminated.get()) {
consumeChanges(cursor);
Expand All @@ -115,12 +117,6 @@ private void consumeChanges(MongoChangeStreamCursor<ChangeStreamDocument<Documen
}
}

private BsonTimestamp computeResumeTimestamp() {
LocalDateTime dt = LocalDateTime.now().minusYears(1);
long seconds = dt.toEpochSecond(ZoneOffset.UTC);
return new BsonTimestamp((int)seconds, 0);
}

private BsonTimestamp fetchLastAuditTimestamp() {
Bson filter = Filters.eq("name", "LAST_AUDIT_TIMESTAMP");
Document record = store.db.getCollection(AUDIT_CONFIGS_COLLECTION).find(filter).first();
Expand Down

0 comments on commit 85a7380

Please sign in to comment.