Skip to content

Commit

Permalink
Implemented additional provisions in the id allocation logic to allow…
Browse files Browse the repository at this point in the history
… the user to configure Titan in such a way that id allocation conflicts are much less likely when ingesting through lots of machines. Fixes #382
  • Loading branch information
mbroecheler committed Nov 6, 2013
1 parent c136530 commit 0c4459a
Show file tree
Hide file tree
Showing 14 changed files with 520 additions and 213 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,9 @@ public CassandraTransaction(StoreTxConfig config, Consistency readConsistency, C
} else if (config.getConsistency() == ConsistencyLevel.KEY_CONSISTENT) {
this.readConsistency = Consistency.QUORUM;
this.writeConsistency = Consistency.QUORUM;
} else if (config.getConsistency() == ConsistencyLevel.LOCAL_KEY_CONSISTENT) {
this.readConsistency = Consistency.LOCAL_QUORUM;
this.writeConsistency = Consistency.LOCAL_QUORUM;
} else {
throw new IllegalArgumentException("Unsupported consistency level: " + config.getConsistency());
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package com.thinkaurelius.titan.diskstorage.cassandra.embedded;

import com.thinkaurelius.titan.CassandraStorageSetup;
import com.thinkaurelius.titan.diskstorage.IDAllocationTest;
import com.thinkaurelius.titan.diskstorage.LockKeyColumnValueStoreTest;
import com.thinkaurelius.titan.diskstorage.StorageException;
import com.thinkaurelius.titan.diskstorage.keycolumnvalue.KeyColumnValueStoreManager;
import org.apache.commons.configuration.Configuration;

public class InternalCassandraEmbeddedIDAllocationTest extends IDAllocationTest {

public InternalCassandraEmbeddedIDAllocationTest(Configuration baseConfig) {
super(baseConfig);
}

@Override
public KeyColumnValueStoreManager openStorageManager(int idx) throws StorageException {
Configuration sc = CassandraStorageSetup.getEmbeddedCassandraStorageConfiguration(getClass().getSimpleName());
return new CassandraEmbeddedStoreManager(sc);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,19 @@
import com.thinkaurelius.titan.diskstorage.util.TimeUtility;
import com.thinkaurelius.titan.diskstorage.util.WriteBufferUtil;
import com.thinkaurelius.titan.diskstorage.util.WriteByteBuffer;
import com.thinkaurelius.titan.graphdb.configuration.GraphDatabaseConfiguration;
import static com.thinkaurelius.titan.graphdb.configuration.GraphDatabaseConfiguration.*;

import com.thinkaurelius.titan.graphdb.database.idassigner.IDPoolExhaustedException;
import com.thinkaurelius.titan.graphdb.database.idhandling.VariableLong;
import org.apache.commons.codec.binary.Hex;
import org.apache.commons.configuration.Configuration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Arrays;
import java.util.List;
import java.util.Random;

/**
* {@link com.thinkaurelius.titan.diskstorage.IDAuthority} implementation assuming that the backing store
Expand Down Expand Up @@ -43,11 +49,44 @@ public class ConsistentKeyIDManager extends AbstractIDManager {
private final int rollbackAttempts = 5;
private final int rollbackWaitTime = 200;

private final int uniqueIdBitWidth;
private final int uniqueIDUpperBound;
private final long idBlockUpperBound;
private final int uniqueId;
private final boolean randomizeUniqueId;
private final ConsistencyLevel consistencLevel;

private final Random random = new Random();

public ConsistentKeyIDManager(KeyColumnValueStore idStore, StoreManager manager, Configuration config) throws StorageException {
super(config);
Preconditions.checkArgument(manager.getFeatures().supportsConsistentKeyOperations());
this.manager = manager;
this.idStore = idStore;

uniqueIdBitWidth = config.getInt(IDAUTHORITY_UNIQUE_ID_BITS_KEY,IDAUTHORITY_UNIQUE_ID_BITS_DEFAULT);
Preconditions.checkArgument(uniqueIdBitWidth>=0 && uniqueIdBitWidth<=16,"Invalid unique id bit width defined [%s]. Must be in [0,16]",uniqueIdBitWidth);
uniqueIDUpperBound = 1<<uniqueIdBitWidth;
idBlockUpperBound = Long.MAX_VALUE>>uniqueIdBitWidth;
if (config.getBoolean(IDAUTHORITY_RANDOMIZE_UNIQUE_ID_KEY,IDAUTHORITY_RANDOMIZE_UNIQUE_ID_DEFAULT)) {
Preconditions.checkArgument(!config.containsKey(IDAUTHORITY_UNIQUE_ID_KEY),"Conflicting configuration: a unique id and randomization have been set");
Preconditions.checkArgument(!config.getBoolean(IDAUTHORITY_USE_LOCAL_CONSISTENCY_KEY, IDAUTHORITY_USE_LOCAL_CONSISTENCY_DEFAULT),
"Cannot use local consistency with randomization - this leads to data corruption");
randomizeUniqueId = true;
uniqueId = -1;
consistencLevel = ConsistencyLevel.KEY_CONSISTENT;
} else {
randomizeUniqueId = false;
if (config.getBoolean(IDAUTHORITY_USE_LOCAL_CONSISTENCY_KEY,IDAUTHORITY_USE_LOCAL_CONSISTENCY_DEFAULT)) {
Preconditions.checkArgument(config.containsKey(IDAUTHORITY_UNIQUE_ID_KEY),"Need to configure a unique id in order to use local consistency");
consistencLevel = ConsistencyLevel.LOCAL_KEY_CONSISTENT;
} else {
consistencLevel = ConsistencyLevel.KEY_CONSISTENT;
}
uniqueId = config.getInt(IDAUTHORITY_UNIQUE_ID_KEY,IDAUTHORITY_UNIQUE_ID_DEFAULT);
Preconditions.checkArgument(uniqueId>=0,"Invalid unique id: %s",uniqueId);
Preconditions.checkArgument(uniqueId<uniqueIDUpperBound,"Unique id is too large for bit width [%s]: %s",uniqueIdBitWidth,uniqueId);
}
}

@Override
Expand Down Expand Up @@ -75,22 +114,49 @@ private long getCurrentID(StaticBuffer partitionKey, StoreTransaction txh) throw
return latest;
}

private int getUniqueID() {
int id;
if (randomizeUniqueId) {
id = random.nextInt(uniqueIDUpperBound);
} else id = uniqueId;
assert id>=0 && id<uniqueIDUpperBound;
return id;
}

protected StaticBuffer getPartitionKey(int partition, int uniqueId) {
if (uniqueIdBitWidth==0)
return ByteBufferUtil.getIntBuffer(partition);
return ByteBufferUtil.getIntBuffer(new int[]{partition,uniqueId});
}

@Override
public long[] getIDBlock(int partition) throws StorageException {
//partition id can be any integer, even negative, its only a partition identifier

long blockSize = getBlockSize(partition);
Preconditions.checkArgument(idBlockUpperBound>blockSize,
"Block size [%s] is larger than upper bound [%s] for bit width [%s]",blockSize,idBlockUpperBound,uniqueIdBitWidth);

for (int retry = 0; retry < idApplicationRetryCount; retry++) {
StoreTransaction txh = null;
final int uniqueID = getUniqueID();
try {
txh = manager.beginTransaction(new StoreTxConfig(ConsistencyLevel.KEY_CONSISTENT, metricsPrefix));
txh = manager.beginTransaction(new StoreTxConfig(consistencLevel, metricsPrefix));
// Read the latest counter values from the idStore
StaticBuffer partitionKey = getPartitionKey(partition);
StaticBuffer partitionKey = getPartitionKey(partition,uniqueID);
// calculate the start (inclusive) and end (exclusive) of the allocation we're about to attempt
long nextStart = getCurrentID(partitionKey, txh);
Preconditions.checkArgument(Long.MAX_VALUE - blockSize > nextStart, "ID overflow detected");
long nextEnd = nextStart + blockSize;
if (idBlockUpperBound - blockSize <= nextStart) {
log.info("ID overflow detected. Current id {}, block size {} and upper bound {} for bit width {}",
nextStart,blockSize,idBlockUpperBound,uniqueIdBitWidth);
if (randomizeUniqueId && (retry+1)<idApplicationRetryCount) {
continue;
}
throw new IDPoolExhaustedException("Exhausted id block for partition ["+partition+"] with upper bound: " + idBlockUpperBound);
}

assert idBlockUpperBound - blockSize > nextStart;
long nextEnd = nextStart + blockSize;
StaticBuffer target = getBlockApplication(nextEnd);


Expand Down Expand Up @@ -137,6 +203,10 @@ public long[] getIDBlock(int partition) throws StorageException {
}

success = true;
//Pad ids
for (int i=0;i<result.length;i++) {
result[i] = (result[i]<<uniqueIdBitWidth)+uniqueID;
}
return result;
} else {
// Another claimant beat us to this id block -- try again.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import com.thinkaurelius.titan.diskstorage.locking.TemporaryLockingException;
import com.thinkaurelius.titan.diskstorage.util.ByteBufferUtil;
import com.thinkaurelius.titan.diskstorage.util.TimeUtility;
import com.thinkaurelius.titan.graphdb.database.idassigner.IDPoolExhaustedException;
import org.apache.commons.configuration.Configuration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -48,7 +49,11 @@ public long[] getIDBlock(int partition) throws StorageException {
try {
txh = manager.beginTransaction(new StoreTxConfig(metricsPrefix));
long current = getCurrentID(partitionKey, txh);
Preconditions.checkArgument(Long.MAX_VALUE - blockSize > current, "ID overflow detected");
if (Long.MAX_VALUE - blockSize <= current) {
throw new IDPoolExhaustedException("Exhausted id block for partition ["+partition+"]");
}

assert Long.MAX_VALUE - blockSize > current;
long next = current + blockSize;
idStore.mutate(partitionKey, ImmutableList.of(StaticBufferEntry.of(DEFAULT_COLUMN, ByteBufferUtil.getLongBuffer(next))), KeyColumnValueStore.NO_DELETIONS, txh);
txh.commit();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,26 @@ public enum ConsistencyLevel {
/**
* Consistency level which ensures that operations on a {@link KeyColumnValueStore} are key-consistent.
*/
KEY_CONSISTENT;
KEY_CONSISTENT,

/**
* Consistency level which ensures that operations on a {@link KeyColumnValueStore} are key-consistent with
* respect to a local cluster where multiple local clusters form the entire (global) cluster.
* In other words, {@link #KEY_CONSISTENT} ensures key consistency across the entire global cluster whereas this
* is restricted to the local cluster.
*/
LOCAL_KEY_CONSISTENT;


public boolean isKeyConsistent() {
switch (this) {
case KEY_CONSISTENT:
case LOCAL_KEY_CONSISTENT:
return true;
case DEFAULT:
return false;
default: throw new AssertionError(this.toString());
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ synchronized void mutate(List<Entry> additions, List<StaticBuffer> deletions, St
private ReentrantLock lock = null;

private Lock getLock(StoreTransaction txh) {
if (txh.getConfiguration().getConsistency() == ConsistencyLevel.KEY_CONSISTENT) {
if (txh.getConfiguration().getConsistency().isKeyConsistent()) {
if (lock == null) {
synchronized (this) {
if (lock == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,15 @@ public static final StaticBuffer getIntBuffer(int id) {
return new StaticArrayBuffer(arr);
}

public static final StaticBuffer getIntBuffer(int[] ids) {
ByteBuffer buffer = ByteBuffer.allocate(intSize * ids.length);
for (int i = 0; i < ids.length; i++)
buffer.putInt(ids[i]);
byte[] arr = buffer.array();
Preconditions.checkArgument(arr.length == intSize * ids.length);
return new StaticArrayBuffer(arr);
}

public static final StaticBuffer getLongBuffer(long id) {
ByteBuffer buffer = ByteBuffer.allocate(longSize);
buffer.putLong(id);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,57 @@ public class GraphDatabaseConfiguration {
public static final String IDAUTHORITY_RETRY_COUNT_KEY = "idauthority-retries";
public static final int IDAUTHORITY_RETRY_COUNT_DEFAULT = 20;

/**
* Configures the number of bits of Titan assigned ids that are reserved for a unique id marker that
* allows the id allocation to be scaled over multiple sub-clusters and to reduce race-conditions
* when a lot of Titan instances attempt to allocate ids at the same time (e.g. during parallel bulk loading)
*
* IMPORTANT: This should never ever, ever be modified from its initial value and ALL Titan instances must use the
* same value. Otherwise, data corruption will occur.
*/
public static final String IDAUTHORITY_UNIQUE_ID_BITS_KEY = "idauthority-uniqueid-bits";
public static final int IDAUTHORITY_UNIQUE_ID_BITS_DEFAULT = 0;

/**
* Unique id marker to be used by this Titan instance when allocating ids. The unique id marker
* must be non-negative and fit within the number of unique id bits configured.
* By assigning different unique id markers to individual Titan instances it can be assured
* that those instances don't conflict with one another when attempting to allocate new id blocks.
*
* IMPORTANT: The configured unique id marker must fit within the configured unique id bit width.
*/
public static final String IDAUTHORITY_UNIQUE_ID_KEY = "idauthority-uniqueid";
public static final int IDAUTHORITY_UNIQUE_ID_DEFAULT = 0;

/**
* Configures this Titan instance to use a random unique id marker each time it attempts to allocate
* a new id block. This is an alternative to configuring {@link #IDAUTHORITY_UNIQUE_ID_KEY} where the
* actual value does not matter since one just wants to avoid id allocation conflicts among many Titan
* instances.
*
* IMPORTANT: The random unique id will be randomly generated to fit within the unique id bit width. Hence
* this option must be configured accordingly.
*/
public static final String IDAUTHORITY_RANDOMIZE_UNIQUE_ID_KEY = "idauthority-uniqueid-random";
public static final boolean IDAUTHORITY_RANDOMIZE_UNIQUE_ID_DEFAULT = false;

/**
* Configures this Titan instance to use local consistency guarantees when allocating ids. This is useful
* when Titan runs on a very large cluster of machines that is broken up into multiple local sub-clusters.
* In this case, the consistency is only ensured within the local sub-clusters which does not require
* acquiring global locks that can be too expensive to acquire.
* Using local consistency requires that a unique id marker {@link #IDAUTHORITY_UNIQUE_ID_KEY} is configured
* that fits within the bit width {@link #IDAUTHORITY_UNIQUE_ID_BITS_KEY} and that each local cluster of Titan
* instances have a unique id. In other words, no two Titan sub-cluster should have the same unique id marker.
*
* THIS IS VERY IMPORTANT. Since only local consistency is used, identical unique id marker would result in
* data corruption.
*
*/
public static final String IDAUTHORITY_USE_LOCAL_CONSISTENCY_KEY = "idauthority-local-consistency";
public static final boolean IDAUTHORITY_USE_LOCAL_CONSISTENCY_DEFAULT = false;


/**
* Configuration key for the hostname or list of hostname of remote storage backend servers to connect to.
* <p/>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ public class StandardIDPool implements IDPool {
public StandardIDPool(IDAuthority idAuthority, long partitionID, long maximumID, long renewTimeoutMS, double renewBufferPercentage) {
Preconditions.checkArgument(maximumID > 0);
this.idAuthority = idAuthority;
Preconditions.checkArgument(partitionID<(1l<<32));
this.partitionID = (int) partitionID;
this.maxID = maximumID;
Preconditions.checkArgument(renewTimeoutMS>0,"Renew-timeout must be positive");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ private static int numVariableBlocks(final int numBits) {
return (numBits - 1) / 7 + 1;
}

private static int unsignedBitLength(final long value) {
public static int unsignedBitLength(final long value) {
return (value == 0) ? 1 : Long.SIZE - Long.numberOfLeadingZeros(value);
}

Expand Down

0 comments on commit 0c4459a

Please sign in to comment.