Browse files

HBASEHUT-13: Implement buffered updates client-side writer

  • Loading branch information...
1 parent ad49060 commit 7d9361f2d64a8503e359aad1de6b66ab97b5192c Alex Baranau committed Mar 25, 2012
View
123 src/main/java/com/sematext/hbase/hut/BufferedHutPutWriter.java
@@ -0,0 +1,123 @@
+/**
+ * Copyright 2010 Sematext International
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.sematext.hbase.hut;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.LinkedHashMap;
+import java.util.List;
+
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+
+/**
+ * Buffers puts during writing (client-side) and performs updates processing during flushes
+ */
+public class BufferedHutPutWriter {
+ private final HTable hTable;
+ private final UpdateProcessor updateProcessor;
+ // buffer is similar to LRU cache
+ // TODO: consider setting max timeout for record group being in the buffer to prevent
+ // some records stuck for too long
+ // TODO: consider removing not just lru but old *and* where we can compact more
+ private final LinkedHashMap<ByteArrayWrapper, List<HutPut>> buffer;
+ // Can be converted to local variable, but we want to reuse processingResult instance
+ private HutResultScanner.UpdateProcessingResultImpl processingResult =
+ new HutResultScanner.UpdateProcessingResultImpl();
+
+ // TODO: do these fields really belong to this class and not to the buffer class?
+ private final int maxBufferSize;
+ private int bufferedCount;
+
+ public BufferedHutPutWriter(HTable hTable, UpdateProcessor updateProcessor, int bufferSize) {
+ this.hTable = hTable;
+ this.updateProcessor = updateProcessor;
+ // Notes:
+ // * initial capacity is just an estimate TODO: allow client code (which uses writer) control it
+ // * using insertion order so that records don't stuck for long time in the buffer
+ this.buffer = new LinkedHashMap<ByteArrayWrapper, List<HutPut>>(bufferSize / 3, 1.0f, false);
+ this.bufferedCount = 0;
+ this.maxBufferSize = bufferSize;
+ }
+
+ public void write(HutPut put) {
+ // think over reusing object instance
+ ByteArrayWrapper key = new ByteArrayWrapper(HutRowKeyUtil.getOriginalKey(put.getRow()));
+ List<HutPut> puts = buffer.get(key);
+ if (puts == null) {
+ puts = new ArrayList<HutPut>();
+ buffer.put(key, puts);
+ }
+
+ puts.add(put);
+ bufferedCount++;
+
+ flushBufferPartIfNeeded();
+ }
+
+ public void flushBufferPartIfNeeded() {
+ boolean removeFromBuffer = bufferedCount > maxBufferSize;
+ // TODO: is it safe to flush here?
+ if (removeFromBuffer) {
+ // TODO: is there a way to get & delete oldest record "in-place"?
+ List<HutPut> eldest = buffer.values().iterator().next();
+ // eldest and eldestKey in sync since using LinkedHashMap
+ ByteArrayWrapper eldestKey = buffer.keySet().iterator().next();
+ processGroupAndWrite(eldest);
+ buffer.remove(eldestKey);
+ bufferedCount -= eldest.size();
+ }
+ }
+
+ public void flush() {
+ for (List<HutPut> group : buffer.values()) {
+ processGroupAndWrite(group);
+ }
+
+ buffer.clear();
+
+ bufferedCount = 0;
+ }
+
+ private void processGroupAndWrite(List<HutPut> list) {
+ HutPut first = list.get(0);
+ if (list.size() > 1) {
+ // TODO: do we need to place this into result by default *always*? May be only
+ // when user code didn't place anything? Also see other places
+ processingResult.init(first.getRow());
+ List<Result> records = new ArrayList<Result>();
+ for (HutPut put : list) {
+ records.add(HTableUtil.convert(put));
+ }
+ updateProcessor.process(records, processingResult);
+
+ try {
+ Put put = HutResultScanner.createPutWithProcessedResult(processingResult.getResult(),
+ first.getRow(), list.get(list.size() - 1).getRow());
+ hTable.put(put);
+ } catch (IOException e) {
+ throw new RuntimeException("Error during writing processed Puts into HBase table", e);
+ }
+ } else {
+ try {
+ hTable.put(first);
+ } catch (IOException e) {
+ throw new RuntimeException("Error during writing Puts into HBase table", e);
+ }
+ }
+ }
+}
View
57 src/main/java/com/sematext/hbase/hut/ByteArrayWrapper.java
@@ -0,0 +1,57 @@
+/**
+ * Copyright 2010 Sematext International
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.sematext.hbase.hut;
+
+import java.util.Arrays;
+
+/**
+ * Byte array wrapper. It can be used as a HashMap key.
+ * It is assumed that array is never changes. (Yep, we could copy array in constructor
+ * to enforce that, we just don't want to use redundant space)
+ * TODO: isn't there some existing class for that?
+ */
+public class ByteArrayWrapper {
+ private final byte[] array;
+ private final int hash;
+
+ public ByteArrayWrapper(byte[] array) {
+ this.array = array;
+ this.hash = array != null ? Arrays.hashCode(array) : 0;
+ }
+
+ public byte[] getBytes() {
+ return array;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+
+ ByteArrayWrapper that = (ByteArrayWrapper) o;
+
+ return Arrays.equals(array, that.array);
+ }
+
+ @Override
+ public int hashCode() {
+ return hash;
+ }
+}
View
13 src/main/java/com/sematext/hbase/hut/HTableUtil.java
@@ -15,15 +15,18 @@
*/
package com.sematext.hbase.hut;
+import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.util.Bytes;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.List;
/**
* Provides utility methods on top of {@link org.apache.hadoop.hbase.client.HTable}
@@ -96,6 +99,16 @@ public static void deleteRange(HTable hTable, Scan deleteScan, ResultFilter resu
}
}
+ public static Result convert(Put put) {
+ List<KeyValue> kvs = new ArrayList<KeyValue>();
+ for (List<KeyValue> l : put.getFamilyMap().values()) {
+ kvs.addAll(l);
+ }
+ Result result = new Result(kvs);
+
+ return result;
+ }
+
private static Scan getDeleteScan(byte[] firstInclusive, byte[] lastNonInclusive) {
// TODO: think over limiting of fetched data: set single columnFam:qual with small value to fetch
return new Scan(firstInclusive, lastNonInclusive);
View
25 src/main/java/com/sematext/hbase/hut/HutResultScanner.java
@@ -93,7 +93,7 @@ public Result next() throws IOException {
// TODO: modify API of processor to return true/false instead of extra method?
// TODO: adjust API of updateProcessor.isMergeNeeded method (add offset/length params) to avoid creating extra objects
if (updateProcessor.isMergeNeeded(HutRowKeyUtil.getOriginalKey(firstResult.getRow()))) {
- processingResult.init(firstResult.getRow(), firstResult.raw());
+ processingResult.init(firstResult.getRow());
updateProcessor.process(iterableRecords, processingResult);
result = processingResult.getResult();
@@ -190,13 +190,13 @@ public void close() {
resultScanner.close();
}
- private static class UpdateProcessingResultImpl implements UpdateProcessingResult {
- private KeyValue[] kvs;
+ static class UpdateProcessingResultImpl implements UpdateProcessingResult {
+ private KeyValue[] kvs; // TODO: consider using a List
private byte[] row;
- public void init(byte[] row, KeyValue[] kvs) {
- this.kvs = kvs;
+ public void init(byte[] row) {
this.row = row;
+ this.kvs = new KeyValue[0];
}
@Override
@@ -423,18 +423,23 @@ private void storeProcessedUpdates(Result processingResult, Result last) throws
// hence we can utilize its write time as start time for the compressed interval
byte[] firstRow = processingResult.getRow();
byte[] lastRow = last.getRow();
+ Put put = createPutWithProcessedResult(processingResult, firstRow, lastRow);
+
+ store(put);
+ deleteProcessedRecords(processingResult.getRow(), lastRow, put.getRow());
+ }
+
+ // TODO: move this method out of this class? (looks like utility method)
+ static Put createPutWithProcessedResult(Result processingResult, byte[] firstRow, byte[] lastRow) throws IOException {
// adjusting row, so that it "covers" interval from first record to last record
byte[] row = Arrays.copyOf(firstRow, firstRow.length);
HutRowKeyUtil.setIntervalEnd(row, lastRow); // can row here remain the same?
- Put put = createPutWithProcessedResult(processingResult, row);
-
- store(put);
- deleteProcessedRecords(processingResult.getRow(), lastRow, row);
+ return createPutWithProcessedResult(processingResult, row);
}
- private Put createPutWithProcessedResult(Result processingResult, byte[] row) throws IOException {
+ private static Put createPutWithProcessedResult(Result processingResult, byte[] row) throws IOException {
Put put = new Put(row);
for (KeyValue kv : processingResult.raw()) {
// using copying here, otherwise processingResult is affected when its
View
53 src/test/java/com/sematext/hbase/hut/TestHBaseHut.java
@@ -558,6 +558,51 @@ public void testRollbackWithMr() throws IOException, InterruptedException, Class
hTable.close();
}
+ // TODO: this is the simplest test for BufferedHutPutWriter, add more
+ @Test
+ public void testBufferedHutPutWriter() throws IOException, InterruptedException {
+ StockSaleUpdateProcessor processor = new StockSaleUpdateProcessor();
+ BufferedHutPutWriter writer = new BufferedHutPutWriter(hTable, processor, 6);
+
+ // Writing data
+ writer.write(createPut(CHRYSLER, 90));
+ writer.write(createPut(CHRYSLER, 100));
+ writer.write(createPut(FORD, 18));
+ writer.write(createPut(CHRYSLER, 120));
+ writer.write(createPut(CHRYSLER, 115));
+ writer.write(createPut(FORD, 22));
+ writer.write(createPut(CHRYSLER, 110));
+
+ // writer should process updates for first group of records
+ // after processed updates has been stored, "native" HBase scanner should return processed results too
+ verifyLastSalesWithNativeScanner(hTable, CHRYSLER, new int[] {110, 115, 120, 100, 90});
+
+ writer.write(createPut(CHRYSLER, 105));
+ writer.write(createPut(FORD, 24));
+ writer.write(createPut(FORD, 28));
+ writer.write(createPut(CHRYSLER, 107));
+ writer.write(createPut(FORD, 32));
+
+ verifyLastSalesWithNativeScanner(hTable, FORD, new int[] {32, 28, 24, 22, 18});
+
+ writer.write(createPut(FORD, 40));
+ writer.write(createPut(CHRYSLER, 113));
+
+ writer.flush();
+
+ // we use compaction since updates were partially compacted by writer
+ // (since its max buffer size is less then total puts #)
+ verifyLastSalesWithCompation(hTable, processor, CHRYSLER, new int[] {113, 107, 105, 110, 115});
+ verifyLastSalesWithCompation(hTable, processor, FORD, new int[] {40, 32, 28, 24, 22});
+
+ UpdatesProcessingUtil.processUpdates(hTable, processor);
+ // after processed updates has been stored, "native" HBase scanner should return processed results too
+ verifyLastSalesWithCompation(hTable, processor, CHRYSLER, new int[] {113, 107, 105, 110, 115});
+ verifyLastSalesWithCompation(hTable, processor, FORD, new int[] {40, 32, 28, 24, 22});
+
+ hTable.close();
+ }
+
private static void performUpdatesProcessingButWithoutDeletionOfProcessedRecords(final HTable hTable, UpdateProcessor updateProcessor) throws IOException {
ResultScanner resultScanner =
new HutResultScanner(hTable.getScanner(new Scan()), updateProcessor, hTable, true) {
@@ -571,11 +616,15 @@ void deleteProcessedRecords(byte[] firstInclusive, byte[] lastInclusive, byte[]
}
private static void recordSale(HTable hTable, byte[] company, int price) throws InterruptedException, IOException {
- Put put = new HutPut(company);
- put.add(SALE_CF, Bytes.toBytes("lastPrice0"), Bytes.toBytes(price));
+ Put put = createPut(company, price);
Thread.sleep(1); // sanity interval
hTable.put(put);
+ }
+ private static HutPut createPut(byte[] company, int price) {
+ HutPut put = new HutPut(company);
+ put.add(SALE_CF, Bytes.toBytes("lastPrice0"), Bytes.toBytes(price));
+ return put;
}
private static void verifyLastSalesWithNativeScanner(HTable hTable, byte[] company, int[] prices) throws IOException {
View
55 src/test/java/com/sematext/hbase/hut/TestHTableUtil.java
@@ -0,0 +1,55 @@
+/**
+ * Copyright 2010 Sematext International
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.sematext.hbase.hut;
+
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ */
+public class TestHTableUtil {
+ @Test
+ public void testConvert() throws Exception {
+ HBaseTestingUtility testingUtility = new HBaseTestingUtility();
+ testingUtility.startMiniCluster();
+ HTable hTable = testingUtility.createTable(Bytes.toBytes("mytable"),
+ new byte[][] {Bytes.toBytes("colfam1"), Bytes.toBytes("colfam2")});
+
+ byte[] key = Bytes.toBytes("some-key");
+ Put put = new Put(key);
+ put.add(Bytes.toBytes("colfam1"), Bytes.toBytes("qual1.1"), System.currentTimeMillis(), Bytes.toBytes("val1.1"));
+ put.add(Bytes.toBytes("colfam1"), Bytes.toBytes("qual1.2"), System.currentTimeMillis(), Bytes.toBytes("val1.2"));
+ put.add(Bytes.toBytes("colfam2"), Bytes.toBytes("qual2.1"), System.currentTimeMillis(), Bytes.toBytes("val2.1"));
+
+ Result converted = HTableUtil.convert(put);
+
+ hTable.put(put);
+
+ Result readFromTable = hTable.get(new Get(key));
+
+ Assert.assertArrayEquals(readFromTable.raw(), converted.raw());
+
+ testingUtility.shutdownMiniCluster();
+ }
+}

0 comments on commit 7d9361f

Please sign in to comment.