Skip to content

Commit

Permalink
Remove the need for explicit row locks when allocating UIDs.
Browse files Browse the repository at this point in the history
The existing code with explicit row locks exhibited very poor
performance, and also prevented multiple TSDs from allocating
UIDs concurrently.

The new approach consists in:
  1. Performing an atomic increment to grab a new UID.
  2. CAS'ing (CompareAndSet) the reverse mapping (uid => name)
  3. CAS'ing the forward mapping (name => uid)

If we die after step 1, we waste an UID.  If we die after step 2,
we just end up with an orphaned reverse mapping (harmless).  When
two TSDs race to assign a UID to the same name, one of them will
fail to CAS the forward mapping at step 3, and will retry to find
the UID assigned by the winning TSD.  When that occurs, the only
net consequence is that a UID will have been wasted by the losing
TSD, whereas the previous implementation wouldn't waste one when
this happened.  The 'uid fsck' command can easily detect orphaned
or wasted UIDs, and we could conceivably put them on some kind of
a free list in the future to re-allocate them.

If two TSDs are running side-by-side, and one uses the old method
while the other uses the new lock-less method, things still work
as expected.  There are two possible scenarios:
  - Old TSD goes first, locks the MAXID row, and does its thing.
    The new TSD will have to wait until the row lock is released
    for its atomic increment to go through.
  - The new TSD goes first, atomically increments the MAXID row,
    and does its thing.  The second TSD locks the MAXID row and
    proceeds to allocate its own ID concurrently.
  • Loading branch information
tsuna committed May 30, 2013
1 parent af0f093 commit 0118583
Show file tree
Hide file tree
Showing 3 changed files with 186 additions and 278 deletions.
6 changes: 6 additions & 0 deletions NEWS
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
OpenTSDB - User visible changes.

* Version 1.1.1 (2013-??-??) [???????]

Noteworthy changes:
- UIDs are now assigned in a lock-less fashion.


* Version 1.1.0 (2013-03-08) [12879d7]

Noteworthy changes:
Expand Down
253 changes: 100 additions & 153 deletions src/uid/UniqueId.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,14 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.hbase.async.AtomicIncrementRequest;
import org.hbase.async.Bytes;
import org.hbase.async.DeleteRequest;
import org.hbase.async.GetRequest;
import org.hbase.async.HBaseClient;
import org.hbase.async.HBaseException;
import org.hbase.async.KeyValue;
import org.hbase.async.PutRequest;
import org.hbase.async.RowLock;
import org.hbase.async.RowLockRequest;
import org.hbase.async.Scanner;

/**
Expand Down Expand Up @@ -237,135 +236,116 @@ public byte[] getOrCreateId(String name) throws HBaseException {
+ "' name='" + name + '\'');
}

// The dance to assign an ID.
RowLock lock;
// Assign an ID.
final long id; // The ID.
byte row[]; // The same ID, as a byte array.
try {
lock = getLock();
} catch (HBaseException e) {
try {
Thread.sleep(61000 / MAX_ATTEMPTS_ASSIGN_ID);
} catch (InterruptedException ie) {
break; // We've been asked to stop here, let's bail out.
id = client.atomicIncrement(new AtomicIncrementRequest(table, MAXID_ROW,
ID_FAMILY, kind))
.joinUninterruptibly();
row = Bytes.fromLong(id);
LOG.info("Got ID=" + id
+ " for kind='" + kind() + "' name='" + name + "'");
// row.length should actually be 8.
if (row.length < idWidth) {
throw new IllegalStateException("OMG, row.length = " + row.length
+ " which is less than " + idWidth
+ " for id=" + id
+ " row=" + Arrays.toString(row));
}
// Verify that we're going to drop bytes that are 0.
for (int i = 0; i < row.length - idWidth; i++) {
if (row[i] != 0) {
final String message = "All Unique IDs for " + kind()
+ " on " + idWidth + " bytes are already assigned!";
LOG.error("OMG " + message);
throw new IllegalStateException(message);
}
}
// Shrink the ID on the requested number of bytes.
row = Arrays.copyOfRange(row, row.length - idWidth, row.length);
} catch (HBaseException e) {
LOG.error("Failed to assign an ID, atomic increment on row="
+ Arrays.toString(MAXID_ROW) + " column='" +
fromBytes(ID_FAMILY) + ':' + kind() + '\'', e);
hbe = e;
continue;
}
if (lock == null) { // Should not happen.
LOG.error("WTF, got a null pointer as a RowLock!");
} catch (IllegalStateException e) {
throw e; // To avoid handling this exception in the next `catch'.
} catch (Exception e) {
LOG.error("WTF? Unexpected exception type when assigning an ID,"
+ " ICV on row=" + Arrays.toString(MAXID_ROW) + " column='"
+ fromBytes(ID_FAMILY) + ':' + kind() + '\'', e);
continue;
}
// We now have hbase.regionserver.lease.period ms to complete the loop.
// If we die before the next PutRequest succeeds, we just waste an ID.

// Create the reverse mapping first, so that if we die before creating
// the forward mapping we don't run the risk of "publishing" a
// partially assigned ID. The reverse mapping on its own is harmless
// but the forward mapping without reverse mapping is bad.
try {
// Verify that the row still doesn't exist (to avoid re-creating it if
// it got created before we acquired the lock due to a race condition).
try {
final byte[] id = getId(name);
LOG.info("Race condition, found ID for kind='" + kind()
+ "' name='" + name + '\'');
return id;
} catch (NoSuchUniqueName e) {
// OK, the row still doesn't exist, let's create it now.
}

// Assign an ID.
long id; // The ID.
byte row[]; // The same ID, as a byte array.
try {
// We want to send an ICV with our explicit RowLock, but HBase's RPC
// interface doesn't expose this interface. Since an ICV would
// attempt to lock the row again, and we already locked it, we can't
// use ICV here, we have to do it manually while we hold the RowLock.
// To be fixed by HBASE-2292.
{ // HACK HACK HACK
{
final byte[] current_maxid = hbaseGet(MAXID_ROW, ID_FAMILY, lock);
if (current_maxid != null) {
if (current_maxid.length == 8) {
id = Bytes.getLong(current_maxid) + 1;
} else {
throw new IllegalStateException("invalid current_maxid="
+ Arrays.toString(current_maxid));
}
} else {
id = 1;
}
row = Bytes.fromLong(id);
}
final PutRequest update_maxid = new PutRequest(
table, MAXID_ROW, ID_FAMILY, kind, row, lock);
hbasePutWithRetry(update_maxid, MAX_ATTEMPTS_PUT,
INITIAL_EXP_BACKOFF_DELAY);
} // end HACK HACK HACK.
LOG.info("Got ID=" + id
+ " for kind='" + kind() + "' name='" + name + "'");
// row.length should actually be 8.
if (row.length < idWidth) {
throw new IllegalStateException("OMG, row.length = " + row.length
+ " which is less than " + idWidth
+ " for id=" + id
+ " row=" + Arrays.toString(row));
}
// Verify that we're going to drop bytes that are 0.
for (int i = 0; i < row.length - idWidth; i++) {
if (row[i] != 0) {
final String message = "All Unique IDs for " + kind()
+ " on " + idWidth + " bytes are already assigned!";
LOG.error("OMG " + message);
throw new IllegalStateException(message);
}
}
// Shrink the ID on the requested number of bytes.
row = Arrays.copyOfRange(row, row.length - idWidth, row.length);
} catch (HBaseException e) {
LOG.error("Failed to assign an ID, ICV on row="
+ Arrays.toString(MAXID_ROW) + " column='" +
fromBytes(ID_FAMILY) + ':' + kind() + '\'', e);
hbe = e;
continue;
} catch (IllegalStateException e) {
throw e; // To avoid handling this exception in the next `catch'.
} catch (Exception e) {
LOG.error("WTF? Unexpected exception type when assigning an ID,"
+ " ICV on row=" + Arrays.toString(MAXID_ROW) + " column='"
+ fromBytes(ID_FAMILY) + ':' + kind() + '\'', e);
continue;
}
// If we die before the next PutRequest succeeds, we just waste an ID.

// Create the reverse mapping first, so that if we die before creating
// the forward mapping we don't run the risk of "publishing" a
// partially assigned ID. The reverse mapping on its own is harmless
// but the forward mapping without reverse mapping is bad.
try {
final PutRequest reverse_mapping = new PutRequest(
table, row, NAME_FAMILY, kind, toBytes(name));
hbasePutWithRetry(reverse_mapping, MAX_ATTEMPTS_PUT,
INITIAL_EXP_BACKOFF_DELAY);
} catch (HBaseException e) {
LOG.error("Failed to Put reverse mapping! ID leaked: " + id, e);
hbe = e;
continue;
final PutRequest reverse_mapping = new PutRequest(
table, row, NAME_FAMILY, kind, toBytes(name));
// We are CAS'ing the KV into existence -- the second argument is how
// we tell HBase we want to atomically create the KV, so that if there
// is already a KV in this cell, we'll fail. Technically we could do
// just a `put' here, as we have a freshly allocated UID, so there is
// not reason why a KV should already exist for this UID, but just to
// err on the safe side and catch really weird corruption cases, we do
// a CAS instead to create the KV.
if (!client.compareAndSet(reverse_mapping, HBaseClient.EMPTY_ARRAY)
.joinUninterruptibly()) {
LOG.error("WTF! Failed to CAS reverse mapping: " + reverse_mapping
+ " -- run an fsck against the UID table!");
}
} catch (HBaseException e) {
LOG.error("Failed to CAS reverse mapping! ID leaked: " + id
+ " of kind " + kind(), e);
hbe = e;
continue;
} catch (Exception e) {
LOG.error("WTF, should never be here! ID leaked: " + id
+ " of kind " + kind(), e);
continue;
}
// If die before the next PutRequest succeeds, we just have an
// "orphaned" reversed mapping, in other words a UID has been allocated
// but never used and is not reachable, so it's just a wasted UID.

// Now create the forward mapping.
try {
final PutRequest forward_mapping = new PutRequest(
table, toBytes(name), ID_FAMILY, kind, row);
hbasePutWithRetry(forward_mapping, MAX_ATTEMPTS_PUT,
INITIAL_EXP_BACKOFF_DELAY);
} catch (HBaseException e) {
LOG.error("Failed to Put forward mapping! ID leaked: " + id, e);
hbe = e;
// Now create the forward mapping.
try {
final PutRequest forward_mapping = new PutRequest(
table, toBytes(name), ID_FAMILY, kind, row);
// If two TSDs attempted to allocate a UID for the same name at the
// same time, they would both have allocated a UID, and created a
// reverse mapping, and upon getting here, only one of them would
// manage to CAS this KV into existence. The one that loses the
// race will retry and discover the UID assigned by the winner TSD,
// and a UID will have been wasted in the process. No big deal.
if (!client.compareAndSet(forward_mapping, HBaseClient.EMPTY_ARRAY)
.joinUninterruptibly()) {
LOG.warn("Race condition: tried to assign ID " + id + " to "
+ kind() + ":" + name + ", but CAS failed on "
+ forward_mapping + ", which indicates this UID must have"
+ " been allocated concurrently by another TSD. So ID "
+ id + " was leaked.");
continue;
}

addIdToCache(name, row);
addNameToCache(row, name);
return row;
} finally {
unlock(lock);
} catch (HBaseException e) {
LOG.error("Failed to Put reverse mapping! ID leaked: " + id
+ " of kind " + kind(), e);
hbe = e;
continue;
} catch (Exception e) {
LOG.error("WTF, should never be here! ID leaked: " + id
+ " of kind " + kind(), e);
continue;
}

addIdToCache(name, row);
addNameToCache(row, name);
return row;
}
if (hbe == null) {
throw new IllegalStateException("Should never happen!");
Expand Down Expand Up @@ -543,43 +523,10 @@ private Scanner getSuggestScanner(final String search) {
return scanner;
}

/** Gets an exclusive lock for on the table using the MAXID_ROW.
* The lock expires after hbase.regionserver.lease.period ms
* (default = 60000)
* @throws HBaseException if the row lock couldn't be acquired.
*/
private RowLock getLock() throws HBaseException {
try {
return client.lockRow(new RowLockRequest(table, MAXID_ROW)).joinUninterruptibly();
} catch (HBaseException e) {
LOG.warn("Failed to lock the `MAXID_ROW' row", e);
throw e;
} catch (Exception e) {
throw new RuntimeException("Should never be here", e);
}
}

/** Releases the lock passed in argument. */
private void unlock(final RowLock lock) {
try {
client.unlockRow(lock);
} catch (HBaseException e) {
LOG.error("Error while releasing the lock on row `MAXID_ROW'", e);
}
}

/** Returns the cell of the specified row, using family:kind. */
private byte[] hbaseGet(final byte[] row, final byte[] family) throws HBaseException {
return hbaseGet(row, family, null);
}

/** Returns the cell of the specified row key, using family:kind. */
private byte[] hbaseGet(final byte[] key, final byte[] family,
final RowLock lock) throws HBaseException {
private byte[] hbaseGet(final byte[] key,
final byte[] family) throws HBaseException {
final GetRequest get = new GetRequest(table, key);
if (lock != null) {
get.withRowLock(lock);
}
get.family(family).qualifier(kind);
try {
final ArrayList<KeyValue> row = client.get(get).joinUninterruptibly();
Expand Down
Loading

0 comments on commit 0118583

Please sign in to comment.