Skip to content
Browse files

HBASEWD-2: Implement "row key hash"-based RowKeyDistributor

  • Loading branch information...
1 parent d56063b commit 337838f8e7777e2842c4003b99c85fda9230cc49 Alex Baranau committed May 19, 2011
View
16 README
@@ -75,6 +75,22 @@ Performing mapreduce job over written data chunk specified by Scan:
keyDistributor.addInfo(job.getConfiguration());
+Another useful RowKeyDistributor is RowKeyDistributorByHashPrefix. Please see
+example below. It will creates "distributed key" based on original key value
+so that later when you have original key and want to update the record you can
+calculate distributed key without roundtrip to HBase.
+
+ AbstractRowKeyDistributor keyDistributor =
+ new RowKeyDistributorByHashPrefix(
+ new RowKeyDistributorByHashPrefix.OneByteSimpleHash(15));
+
+You can use your own hashing logic here by implementing simple interface:
+
+ public static interface Hasher extends Parametrizable {
+ byte[] getHashPrefix(byte[] originalKey);
+ byte[][] getAllPossiblePrefixes();
+ }
+
Extending Row Keys Distributing Patterns:
-----------------------------------------
View
BIN hbasewd-0.1.0-SNAPSHOT-2011.05.19.jar
Binary file not shown.
View
5 src/main/java/com/sematext/hbase/wd/AbstractRowKeyDistributor.java
@@ -31,6 +31,9 @@
public void addInfo(Configuration conf) {
conf.set(WdTableInputFormat.ROW_KEY_DISTRIBUTOR_CLASS, this.getClass().getCanonicalName());
- conf.set(WdTableInputFormat.ROW_KEY_DISTRIBUTOR_PARAMS, getParamsToStore());
+ String paramsToStore = getParamsToStore();
+ if (paramsToStore != null) {
+ conf.set(WdTableInputFormat.ROW_KEY_DISTRIBUTOR_PARAMS, paramsToStore);
+ }
}
}
View
22 src/main/java/com/sematext/hbase/wd/DistributedScanner.java
@@ -33,15 +33,19 @@
* @author Alex Baranau
*/
public class DistributedScanner implements ResultScanner {
+ private final AbstractRowKeyDistributor keyDistributor;
private final ResultScanner[] scanners;
private final List<Result>[] nextOfScanners;
private Result next = null;
@SuppressWarnings("unchecked")
- public DistributedScanner(ResultScanner[] scanners) throws IOException {
+ public DistributedScanner(AbstractRowKeyDistributor keyDistributor, ResultScanner[] scanners) throws IOException {
+ this.keyDistributor = keyDistributor;
this.scanners = scanners;
this.nextOfScanners = new List[scanners.length];
- Arrays.fill(this.nextOfScanners, new ArrayList<Result>());
+ for (int i = 0; i < this.nextOfScanners.length; i++) {
+ this.nextOfScanners[i] = new ArrayList<Result>();
+ }
}
private boolean hasNext(int nbRows) throws IOException {
@@ -92,24 +96,24 @@ public static DistributedScanner create(HTable hTable, Scan original, AbstractRo
byte[][] startKeys = keyDistributor.getAllDistributedKeys(original.getStartRow());
byte[][] stopKeys = keyDistributor.getAllDistributedKeys(original.getStopRow());
Scan[] scans = new Scan[startKeys.length];
- for (byte i = 0; i < startKeys.length; i++) {
+ for (int i = 0; i < startKeys.length; i++) {
scans[i] = new Scan(original);
scans[i].setStartRow(startKeys[i]);
scans[i].setStopRow(stopKeys[i]);
}
ResultScanner[] rss = new ResultScanner[startKeys.length];
- for (byte i = 0; i < scans.length; i++) {
+ for (int i = 0; i < scans.length; i++) {
rss[i] = hTable.getScanner(scans[i]);
}
- return new DistributedScanner(rss);
+ return new DistributedScanner(keyDistributor, rss);
}
private Result nextInternal(int nbRows) throws IOException {
Result result = null;
- byte indexOfScannerToUse = -1;
- for (byte i = 0; i < nextOfScanners.length; i++) {
+ int indexOfScannerToUse = -1;
+ for (int i = 0; i < nextOfScanners.length; i++) {
if (nextOfScanners[i] == null) {
// result scanner is exhausted, don't advance it any more
continue;
@@ -126,7 +130,9 @@ private Result nextInternal(int nbRows) throws IOException {
nextOfScanners[i].addAll(Arrays.asList(results));
}
- if (result == null || Bytes.compareTo(nextOfScanners[i].get(0).getRow(), result.getRow()) < 0) {
+ // if result is null or next record has original key less than the candidate to be returned
+ if (result == null || Bytes.compareTo(keyDistributor.getOriginalKey(nextOfScanners[i].get(0).getRow()),
+ keyDistributor.getOriginalKey(result.getRow())) < 0) {
result = nextOfScanners[i].get(0);
indexOfScannerToUse = i;
}
View
138 src/main/java/com/sematext/hbase/wd/RowKeyDistributorByHashPrefix.java
@@ -0,0 +1,138 @@
+/**
+ * Copyright 2010 Sematext International
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.sematext.hbase.wd;
+
+import java.util.Arrays;
+
+import org.apache.hadoop.hbase.util.Bytes;
+
+/**
+ * Provides handy methods to distribute
+ *
+ * @author Alex Baranau
+ */
+public class RowKeyDistributorByHashPrefix extends AbstractRowKeyDistributor {
+ private static final String DELIM = "--";
+ private Hasher hasher;
+
+ /** Constructor reflection. DO NOT USE */
+ public RowKeyDistributorByHashPrefix() {
+ }
+
+ public RowKeyDistributorByHashPrefix(Hasher hasher) {
+ this.hasher = hasher;
+ }
+
+ public static interface Hasher extends Parametrizable {
+ byte[] getHashPrefix(byte[] originalKey);
+ byte[][] getAllPossiblePrefixes();
+ }
+
+ public static class OneByteSimpleHash implements Hasher {
+ private int mod;
+
+ /**
+ * For reflection, do NOT use it.
+ */
+ public OneByteSimpleHash() {}
+
+ /**
+ * Ctor
+ * @param maxBuckets max buckets number, should be in 1...255 range
+ */
+ public OneByteSimpleHash(int maxBuckets) {
+ if (maxBuckets < 1 || maxBuckets > 255) {
+ throw new IllegalArgumentException("maxBuckets should be in 1..255 range");
+ }
+ // i.e. "real" maxBuckets value = maxBuckets or maxBuckets-1
+ this.mod = (maxBuckets + 1) / 2;
+ }
+
+ // Used to minimize # of created object instances
+ // Should not be changed. TODO: secure that
+ private static final byte[][] PREFIXES;
+
+ static {
+ PREFIXES = new byte[Byte.MAX_VALUE - Byte.MIN_VALUE + 1][];
+ for (int i = Byte.MIN_VALUE; i <= Byte.MAX_VALUE; i++) {
+ PREFIXES[i - Byte.MIN_VALUE] = new byte[] {(byte) i};
+ }
+ }
+
+ @Override
+ public byte[] getHashPrefix(byte[] originalKey) {
+ byte hash = 0;
+ for (byte b : originalKey) {
+ hash += b;
+ }
+
+ return new byte[] {(byte) (hash % mod)};
+ }
+
+ @Override
+ public byte[][] getAllPossiblePrefixes() {
+ return Arrays.copyOfRange(PREFIXES, (0 - mod + 1) - Byte.MIN_VALUE, mod - Byte.MIN_VALUE);
+ }
+
+ @Override
+ public String getParamsToStore() {
+ return String.valueOf(mod);
+ }
+
+ @Override
+ public void init(String storedParams) {
+ this.mod = Integer.valueOf(storedParams);
+ }
+ }
+
+ @Override
+ public byte[] getDistributedKey(byte[] originalKey) {
+ return Bytes.add(hasher.getHashPrefix(originalKey), originalKey);
+ }
+
+ @Override
+ public byte[] getOriginalKey(byte[] adjustedKey) {
+ return Bytes.tail(adjustedKey, adjustedKey.length - 1);
+ }
+
+ @Override
+ public byte[][] getAllDistributedKeys(byte[] originalKey) {
+ byte[][] allPrefixes = hasher.getAllPossiblePrefixes();
+ byte[][] keys = new byte[allPrefixes.length][];
+ for (int i = 0; i < allPrefixes.length; i++) {
+ keys[i] = Bytes.add(allPrefixes[i], originalKey);
+ }
+
+ return keys;
+ }
+
+ @Override
+ public String getParamsToStore() {
+ String hasherParamsToStore = hasher.getParamsToStore();
+ return hasher.getClass().getName() + DELIM + (hasherParamsToStore == null ? "" : hasherParamsToStore);
+ }
+
+ @Override
+ public void init(String params) {
+ String[] parts = params.split(DELIM, 2);
+ try {
+ this.hasher = (Hasher) Class.forName(parts[0]).newInstance();
+ this.hasher.init(parts[1]);
+ } catch (Exception e) {
+ throw new RuntimeException("RoKeyDistributor initialization failed", e);
+ }
+ }
+}
View
4 src/main/java/com/sematext/hbase/wd/WdTableInputFormat.java
@@ -50,7 +50,7 @@ public void setConf(Configuration conf) {
rowKeyDistributor.init(conf.get(ROW_KEY_DISTRIBUTOR_PARAMS));
}
} catch (Exception e) {
- throw new RuntimeException("Cannot create row key distributor, " + ROW_KEY_DISTRIBUTOR_CLASS + ": " + clazz);
+ throw new RuntimeException("Cannot create row key distributor, " + ROW_KEY_DISTRIBUTOR_CLASS + ": " + clazz, e);
}
}
}
@@ -63,7 +63,7 @@ public void setConf(Configuration conf) {
byte[][] startRows = rowKeyDistributor.getAllDistributedKeys(originalScan.getStartRow());
byte[][] stopRows = rowKeyDistributor.getAllDistributedKeys(originalScan.getStopRow());
- for (byte i = 0; i < startRows.length; i++) {
+ for (int i = 0; i < startRows.length; i++) {
// Internally super.getSplits(...) uses scan object stored in private variable,
// to re-use the code of super class we switch scan object with scans we
Scan scan = new Scan(originalScan);
View
30 ...va/com/sematext/hbase/wd/TestHBaseWd.java → ...om/sematext/hbase/wd/HBaseWdTestUtil.java
@@ -35,39 +35,37 @@
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
-import org.junit.Test;
/**
* General unit-test for the whole concept
*
* @author Alex Baranau
*/
-public class TestHBaseWd {
+public class HBaseWdTestUtil {
private HBaseTestingUtility testingUtility;
- public static final byte[] CF = Bytes.toBytes("colfam");
- public static final byte[] QUAL = Bytes.toBytes("qual");
+ protected HTable hTable;
@Before
public void before() throws Exception {
testingUtility = new HBaseTestingUtility();
testingUtility.startMiniCluster(1);
+ hTable = testingUtility.createTable(Bytes.toBytes("table"), CF);
}
@After
public void after() throws IOException {
+ hTable = null;
testingUtility.shutdownMiniCluster();
testingUtility = null;
}
- @Test
- public void testSimpleScan() throws IOException, InterruptedException {
- byte bucketsCount = (byte) 12;
+ public static final byte[] CF = Bytes.toBytes("colfam");
+ public static final byte[] QUAL = Bytes.toBytes("qual");
- HTable hTable = testingUtility.createTable(Bytes.toBytes("table"), CF);
+ protected void testSimpleScan(AbstractRowKeyDistributor keyDistributor) throws IOException {
long millis = System.currentTimeMillis();
int valuesCountInSeekInterval = 0;
- AbstractRowKeyDistributor keyDistributor = new RowKeyDistributorByOneBytePrefix(bucketsCount);
for (int i = 0; i < 500; i++) {
int val = 500 + i - i * (i % 2) * 2; // i.e. 500, 499, 502, 497, 504, ...
valuesCountInSeekInterval += (val >= 100 && val < 900) ? 1 : 0;
@@ -89,7 +87,8 @@ public void testSimpleScan() throws IOException, InterruptedException {
for (Result current : distributedScanner) {
countMatched++;
if (previous != null) {
- Assert.assertTrue(Bytes.compareTo(current.getRow(), previous.getRow()) >= 0);
+ Assert.assertTrue(Bytes.compareTo(keyDistributor.getOriginalKey(current.getRow()),
+ keyDistributor.getOriginalKey(previous.getRow())) >= 0);
Assert.assertTrue(Bytes.toInt(current.getValue(CF, QUAL)) >= 100);
Assert.assertTrue(Bytes.toInt(current.getValue(CF, QUAL)) < 900);
}
@@ -99,13 +98,8 @@ public void testSimpleScan() throws IOException, InterruptedException {
Assert.assertEquals(valuesCountInSeekInterval, countMatched);
}
- @Test
- public void testMapreduceJob() throws IOException, ClassNotFoundException, InterruptedException {
- byte spread = (byte) 12;
-
+ protected void testMapReduce(AbstractRowKeyDistributor keyDistributor) throws IOException, InterruptedException, ClassNotFoundException {
// Writing data
- AbstractRowKeyDistributor keyDistributor = new RowKeyDistributorByOneBytePrefix(spread);
- HTable hTable = testingUtility.createTable(Bytes.toBytes("table"), CF);
long millis = System.currentTimeMillis();
int valuesCountInSeekInterval = 0;
for (int i = 0; i < 500; i++) {
@@ -126,13 +120,13 @@ public void testMapreduceJob() throws IOException, ClassNotFoundException, Inter
Job job = new Job(conf, "testMapreduceJob");
TableMapReduceUtil.initTableMapperJob("table", scan,
- RowCounterMapper.class, ImmutableBytesWritable.class, Result.class, job);
+ RowCounterMapper.class, ImmutableBytesWritable.class, Result.class, job);
// Substituting standard TableInputFormat which was set in TableMapReduceUtil.initTableMapperJob(...)
job.setInputFormatClass(WdTableInputFormat.class);
keyDistributor.addInfo(job.getConfiguration());
- job.setJarByClass(TestHBaseWd.class);
+ job.setJarByClass(HBaseWdTestUtil.class);
job.setNumReduceTasks(0);
job.setOutputFormatClass(NullOutputFormat.class);
View
73 src/test/java/com/sematext/hbase/wd/TestOneByteSimpleHash.java
@@ -0,0 +1,73 @@
+/**
+ * Copyright 2010 Sematext International
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.sematext.hbase.wd;
+
+import java.util.Arrays;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * @author Alex Baranau
+ */
+public class TestOneByteSimpleHash {
+ @Test
+ public void testMaxDistribution() {
+ RowKeyDistributorByHashPrefix.OneByteSimpleHash hasher = new RowKeyDistributorByHashPrefix.OneByteSimpleHash(255);
+ byte[][] allPrefixes = hasher.getAllPossiblePrefixes();
+ for (int i = 0; i < 1000; i++) {
+ byte[] originalKey = new byte[] {(byte) (Math.random() * 255),
+ (byte) (Math.random() * 255),
+ (byte) (Math.random() * 255)};
+ byte[] hash = hasher.getHashPrefix(originalKey);
+ boolean found = false;
+ for (int k = 0; k < allPrefixes.length; k++) {
+ if (Arrays.equals(allPrefixes[k], hash)) {
+ found = true;
+ break;
+ }
+ }
+ Assert.assertTrue("Hashed prefix wasn't found in all possible prefixes, val: " + Arrays.toString(hash), found);
+ }
+
+ Assert.assertArrayEquals(
+ hasher.getHashPrefix(new byte[] {123, 12, 11}), hasher.getHashPrefix(new byte[] {123, 12, 11}));
+ }
+
+ @Test
+ public void testLimitedDistribution() {
+ RowKeyDistributorByHashPrefix.OneByteSimpleHash hasher = new RowKeyDistributorByHashPrefix.OneByteSimpleHash(10);
+ byte[][] allPrefixes = hasher.getAllPossiblePrefixes();
+ Assert.assertTrue(allPrefixes.length >= 9 && allPrefixes.length <= 10);
+ for (int i = 0; i < 1000; i++) {
+ byte[] originalKey = new byte[] {(byte) (Math.random() * 255),
+ (byte) (Math.random() * 255),
+ (byte) (Math.random() * 255)};
+ byte[] hash = hasher.getHashPrefix(originalKey);
+ boolean found = false;
+ for (int k = 0; k < allPrefixes.length; k++) {
+ if (Arrays.equals(allPrefixes[k], hash)) {
+ found = true;
+ break;
+ }
+ }
+ Assert.assertTrue("Hashed prefix wasn't found in all possible prefixes, val: " + Arrays.toString(hash), found);
+ }
+
+ Assert.assertArrayEquals(
+ hasher.getHashPrefix(new byte[] {123, 12, 11}), hasher.getHashPrefix(new byte[] {123, 12, 11}));
+ }
+}
View
55 src/test/java/com/sematext/hbase/wd/TestRowKeyDistributorByHashPrefix.java
@@ -0,0 +1,55 @@
+/**
+ * Copyright 2010 Sematext International
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.sematext.hbase.wd;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * @author Alex Baranau
+ */
+public class TestRowKeyDistributorByHashPrefix extends HBaseWdTestUtil {
+ @Test
+ public void testSimpleScan() throws IOException, InterruptedException {
+ AbstractRowKeyDistributor keyDistributor =
+ new RowKeyDistributorByHashPrefix(new RowKeyDistributorByHashPrefix.OneByteSimpleHash(15));
+ testSimpleScan(keyDistributor);
+
+ // Testing simple get
+ byte[] originalKey = new byte[] {123, 124, 122};
+ Put put = new Put(keyDistributor.getDistributedKey(originalKey));
+ put.add(CF, QUAL, Bytes.toBytes("some"));
+ hTable.put(put);
+
+ byte[] distributedKey = keyDistributor.getDistributedKey(originalKey);
+ Result result = hTable.get(new Get(distributedKey));
+ Assert.assertArrayEquals(originalKey, keyDistributor.getOriginalKey(result.getRow()));
+ Assert.assertArrayEquals(Bytes.toBytes("some"), result.getValue(CF, QUAL));
+ }
+
+ @Test
+ public void testMapreduceJob() throws IOException, ClassNotFoundException, InterruptedException {
+ AbstractRowKeyDistributor keyDistributor =
+ new RowKeyDistributorByHashPrefix(new RowKeyDistributorByHashPrefix.OneByteSimpleHash(17));
+ testMapReduce(keyDistributor);
+ }
+}
View
37 src/test/java/com/sematext/hbase/wd/TestRowKeyDistributorByOneBytePrefix.java
@@ -0,0 +1,37 @@
+/**
+ * Copyright 2010 Sematext International
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.sematext.hbase.wd;
+
+import java.io.IOException;
+
+import org.junit.Test;
+
+/**
+ * @author Alex Baranau
+ */
+public class TestRowKeyDistributorByOneBytePrefix extends HBaseWdTestUtil {
+ @Test
+ public void testSimpleScan() throws IOException, InterruptedException {
+ AbstractRowKeyDistributor keyDistributor = new RowKeyDistributorByOneBytePrefix((byte) 12);
+ testSimpleScan(keyDistributor);
+ }
+
+ @Test
+ public void testMapreduceJob() throws IOException, ClassNotFoundException, InterruptedException {
+ AbstractRowKeyDistributor keyDistributor = new RowKeyDistributorByOneBytePrefix((byte) 13);
+ testMapReduce(keyDistributor);
+ }
+}

0 comments on commit 337838f

Please sign in to comment.
Something went wrong with that request. Please try again.