Skip to content

Commit

Permalink
Add ReadTracking interface and JTransaction.weakConsistency().
Browse files Browse the repository at this point in the history
  • Loading branch information
archiecobbs committed Sep 22, 2020
1 parent 24e2152 commit 3b5579f
Show file tree
Hide file tree
Showing 6 changed files with 120 additions and 21 deletions.
4 changes: 4 additions & 0 deletions CHANGES.txt
@@ -1,3 +1,7 @@
Version Next

- Added ReadTracking interface and JTransaction.weakConsistency()

Version 4.1.6 Released April 14, 2020

- SnapshotJTransaction and SnapshotTransaction now implement Closeable
Expand Down
Expand Up @@ -17,12 +17,14 @@
import io.permazen.kv.StaleTransactionException;
import io.permazen.kv.mvcc.MutableView;
import io.permazen.kv.mvcc.Mutations;
import io.permazen.kv.mvcc.ReadTracking;
import io.permazen.kv.mvcc.SnapshotRefs;
import io.permazen.kv.mvcc.Writes;
import io.permazen.kv.util.CloseableForwardingKVStore;
import io.permazen.util.CloseableIterator;

import java.util.Comparator;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;

import javax.annotation.concurrent.GuardedBy;
Expand All @@ -35,7 +37,7 @@
* {@link RaftKVDatabase} transaction.
*/
@ThreadSafe
public class RaftKVTransaction implements KVTransaction {
public class RaftKVTransaction implements KVTransaction, ReadTracking {

/*
Expand Down Expand Up @@ -592,6 +594,13 @@ private void fastVerifyExecuting() {
}
}

// ReadTracking

@Override
public AtomicBoolean getReadTrackingControl() {
return this.view.getReadTrackingControl();
}

// KVTransaction

@Override
Expand Down
34 changes: 15 additions & 19 deletions permazen-kv/src/main/java/io/permazen/kv/mvcc/MutableView.java
Expand Up @@ -18,6 +18,7 @@
import java.util.Arrays;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.concurrent.atomic.AtomicBoolean;

import javax.annotation.concurrent.GuardedBy;
import javax.annotation.concurrent.ThreadSafe;
Expand Down Expand Up @@ -45,15 +46,15 @@
* <p>
* During construction, instances may be configured to record all keys read into a {@link Reads} object (this is typically
* used for MVCC conflict detection). When reads are being tracked, tracking may temporarily be paused and resumed via
* {@link #setReadTrackingPaused}. Read tracking may be permanently disabled (and any recorded reads discarded) via
* {@link #getReadTrackingControl}. Read tracking may be permanently disabled (and any recorded reads discarded) via
* {@link #disableReadTracking}.
*
* <p>
* Instances are thread safe; however, directly accessing the associated {@link Reads} or {@link Writes} is not thread safe
* without first locking the containing instance.
*/
@ThreadSafe
public class MutableView extends AbstractKVStore implements Cloneable {
public class MutableView extends AbstractKVStore implements ReadTracking, Cloneable {

@GuardedBy("this")
private KVStore kv;
Expand All @@ -63,8 +64,7 @@ public class MutableView extends AbstractKVStore implements Cloneable {
private Reads reads;
@GuardedBy("this")
private boolean readOnly;
@GuardedBy("this")
private boolean readTrackingPaused;
private final AtomicBoolean readTrackingControl;

// Constructors

Expand Down Expand Up @@ -95,6 +95,7 @@ public MutableView(KVStore kv, Reads reads, Writes writes) {
Preconditions.checkArgument(writes != null, "null writes");
this.kv = kv;
this.reads = reads;
this.readTrackingControl = new AtomicBoolean(this.reads != null);
this.writes = writes;
}

Expand Down Expand Up @@ -132,9 +133,10 @@ public synchronized void setKVStore(KVStore kv) {
* {@link #get get()}, {@link #getAtLeast getAtLeast()}, {@link #getAtMost getAtMost()}, and {@link #getRange getRange()}.
*
* <p>
* The returned object should only be accessed while synchronized on this instance.
* The returned object is "live" and should only be accessed while synchronized on this instance.
*
* @return reads recorded, or null if this instance is not configured to record reads
* @return reads recorded, or null if this instance is not configured to record reads or read tracking has
* been permanently disabled via {@link #disableReadTracking}
*/
public synchronized Reads getReads() {
return this.reads;
Expand All @@ -152,7 +154,7 @@ public synchronized Writes getWrites() {
return this.writes;
}

// Read tracking
// ReadTracking

/**
* Permanently disable read tracking and discard the {@link Reads} associated with this instance.
Expand All @@ -162,18 +164,12 @@ public synchronized Writes getWrites() {
*/
public synchronized void disableReadTracking() {
this.reads = null;
this.readTrackingControl.set(false);
}

/**
* Temporarily disable or re-enable read tracking.
*
* <p>
* Ignored if there is no (longer any) associated {@link Reads} instance.
*
* @param readTrackingPaused true to disable read tracking, false to enable
*/
public synchronized void setReadTrackingPaused(final boolean readTrackingPaused) {
this.readTrackingPaused = readTrackingPaused;
@Override
public AtomicBoolean getReadTrackingControl() {
return this.readTrackingControl;
}

/**
Expand Down Expand Up @@ -366,7 +362,7 @@ public synchronized String toString() {
return this.getClass().getSimpleName()
+ "[writes=" + this.writes
+ (this.reads != null ? ",reads=" + this.reads : "")
+ (this.readTrackingPaused ? ",readTrackingPaused" : "")
+ (this.reads != null && !this.readTrackingControl.get() ? ",!readTracking" : "")
+ (this.readOnly ? ",r/o" : "")
+ "]";
}
Expand Down Expand Up @@ -406,7 +402,7 @@ private void recordReads(byte[] minKey, byte[] maxKey) {
assert maxKey == null || ByteUtil.compare(minKey, maxKey) < 0;

// Not tracking reads?
if (this.reads == null || this.readTrackingPaused)
if (this.reads == null || !this.readTrackingControl.get())
return;

// Define the range
Expand Down
46 changes: 46 additions & 0 deletions permazen-kv/src/main/java/io/permazen/kv/mvcc/ReadTracking.java
@@ -0,0 +1,46 @@

/*
* Copyright (C) 2015 Archie L. Cobbs. All rights reserved.
*/

package io.permazen.kv.mvcc;

import io.permazen.kv.KVStore;

import java.util.concurrent.atomic.AtomicBoolean;

/**
* Implemented by {@link KVStore}'s that are capable of tracking which keys have been read.
* Typically this is done to support MVCC conflict detection.
*
* <p>
* Read tracking includes all keys explicitly or implicitly read by calls to
* {@link #get get()}, {@link #getAtLeast getAtLeast()}, {@link #getAtMost getAtMost()}, and {@link #getRange getRange()}.
*
* <p>
* When reads are being tracked, tracking may temporarily be paused and later resumed via {@link #getReadTrackingControl}.
*/
public interface ReadTracking extends KVStore {

/**
* Get an {@link AtomicBoolean} that can be used to temporarily pause/un-pause read tracking.
*
* <p>
* By default the returned control is true. While set to false, read tracking is disabled; setting back
* to true re-enables read tracking.
*
* <p>
* For re-entrance safety, this should be done as follows:
* <blockquote><code>
* final boolean previous = kv.getReadTrackingControl().getAndSet(false);
* try {
* // do something without tracking reads...
* } finally {
* kv.getReadTrackingControl().set(previous);
* }
* </code></blockquote>
*
* @return control that enables/disables read tracking
*/
AtomicBoolean getReadTrackingControl();
}
Expand Up @@ -30,7 +30,7 @@
* {@link SnapshotKVDatabase} transaction.
*/
@ThreadSafe
public class SnapshotKVTransaction extends ForwardingKVStore implements KVTransaction, Closeable {
public class SnapshotKVTransaction extends ForwardingKVStore implements KVTransaction, ReadTracking, Closeable {

// Note: locking order: (1) SnapshotKVTransaction, (2) SnapshotKVDatabase, (3) MutableView

Expand Down Expand Up @@ -108,6 +108,13 @@ public MutableView getMutableView() {
return this.view;
}

// ReadTracking

@Override
public AtomicBoolean getReadTrackingControl() {
return this.view.getReadTrackingControl();
}

// ForwardingKVStore

/**
Expand Down
37 changes: 37 additions & 0 deletions permazen-main/src/main/java/io/permazen/JTransaction.java
Expand Up @@ -45,7 +45,9 @@
import io.permazen.index.Index3;
import io.permazen.index.Index4;
import io.permazen.kv.KVDatabaseException;
import io.permazen.kv.KVTransaction;
import io.permazen.kv.KeyRanges;
import io.permazen.kv.mvcc.ReadTracking;
import io.permazen.kv.util.AbstractKVNavigableSet;
import io.permazen.tuple.Tuple2;
import io.permazen.tuple.Tuple3;
Expand All @@ -67,6 +69,7 @@
import java.util.NavigableSet;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Stream;

import javax.annotation.concurrent.GuardedBy;
Expand Down Expand Up @@ -1781,6 +1784,40 @@ public void performAction(Runnable action) {
}
}

/**
* Apply weaker transaction consistency while invoking the given {@link Runnable}, if applicable.
*
* <p>
* If the the underlying {@link KVTransaction} does not implement {@link ReadTracking}, then this method
* simply invokes {@code action}. Otherwise, it disables tracking of reads while performing {@code action},
* which means some reads could return stale data.
*
* <p>
* This method is for "experts only".
*
* <p>
* There must be a {@linkplain #getCurrent current transaction} associated with the current thread.
*
* @param action action to perform
* @throws IllegalStateException if there is no {@linkplain #getCurrent current transaction}
* @throws IllegalArgumentException if {@code action} is null
* @see ReadTracking
*/
public static void weakConsistency(Runnable action) {
Preconditions.checkArgument(action != null, "null action");
final KVTransaction kvt = JTransaction.getCurrent().getTransaction().getKVTransaction();
if (kvt instanceof ReadTracking) {
final AtomicBoolean readTrackingControl = ((ReadTracking)kvt).getReadTrackingControl();
final boolean previous = readTrackingControl.getAndSet(false);
try {
action.run();
} finally {
readTrackingControl.set(previous);
}
} else
action.run();
}

// Internal methods

@SuppressWarnings("unchecked")
Expand Down

0 comments on commit 3b5579f

Please sign in to comment.