Permalink
Browse files

Merge branch 'CPs' into master

  • Loading branch information...
2 parents 03d42aa + f1504ff commit edf18fb4f022e99eec26f5e62f3798c3fa70a03c Alex Baranau committed Nov 20, 2012
View
8 README
@@ -14,7 +14,7 @@ discussions on HBase mailing lists around the problem with having Get&Put
for each update operation, which affects write throughput dramatically.
Another force behind behind the approach used in HBaseHUT was recent
activity on Coprocessors development. Although usage of CPs is very limited
-in the current implementation (see CPs branch) HBaseHUT is designed with
+in the current implementation (see cp package) HBaseHUT is designed with
broader use of CPs in mind because they add more
flexibility when it comes to alternative MapReduce data processing
approaches in addition to allowing seamlessly integrate the logic in places
@@ -47,18 +47,16 @@ http://www.slideshare.net/alexbaranau/realtime-analytics-with-hbase-long-version
Build Notes:
------------
-Current pom configured to build against HBase 0.90.4 (cdh3u3). The project
+Current pom configured to build against HBase 0.92.1 (cdh4). The project
start date goes back to the time when 0.89.20100924 was the latest version
available. The code should be compatible with it too (and any version after it).
-CPs branch is configured to be run on HBase 0.92.xx (first CPs-capable release).
Note: unit-tests take some time to execute (up to several minutes), to skip
their execution use -Dmaven.skip.tests=true.
HBase Version Compatibility:
----------------------------
-Compatible with HBase 0.20.5 - 0.92.xxx
-May be compatible with older: tests needed (TODO)
+Compatible with HBase 0.92.xxx+
Released under Apache License 2.0.
View
55 pom.xml
@@ -14,9 +14,8 @@
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compile.source>1.6</maven.compile.source>
<maven.compile.target>1.6</maven.compile.target>
- <hadoop.version>2.0.0-mr1-cdh4.0.1</hadoop.version>
- <hadoop.common.version>2.0.0-cdh4.0.1</hadoop.common.version>
- <hbase.version>0.92.1-cdh4.0.1</hbase.version>
+ <hadoop.version>1.0.0</hadoop.version>
+ <hbase.version>0.92.1</hbase.version>
<log4j.version>1.2.16</log4j.version>
<junit.version>[4.8.2,5.0.0)</junit.version>
<slf4j.version>1.6.1</slf4j.version>
@@ -68,9 +67,8 @@
<repositories>
<repository>
- <id>cloudera</id>
- <name>Cloudera Repository</name>
- <url>https://repository.cloudera.com/content/groups/public/</url>
+ <id>apache release</id>
+ <url>https://repository.apache.org/content/repositories/releases/</url>
</repository>
</repositories>
@@ -80,20 +78,17 @@
<artifactId>hadoop-core</artifactId>
<version>${hadoop.version}</version>
<scope>provided</scope>
- </dependency>
-
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-common</artifactId>
- <version>${hadoop.common.version}</version>
- <scope>provided</scope>
- </dependency>
-
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-hdfs</artifactId>
- <version>${hadoop.common.version}</version>
- <scope>provided</scope>
+ <!-- This is needed to eliminate some weird -->
+ <exclusions>
+ <exclusion>
+ <groupId>org.codehaus.jackson</groupId>
+ <artifactId>jackson-mapper-asl</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.codehaus.jackson</groupId>
+ <artifactId>jackson-core-asl</artifactId>
+ </exclusion>
+ </exclusions>
</dependency>
<dependency>
@@ -112,24 +107,16 @@
<!-- Tests dependencies -->
<dependency>
- <groupId>org.apache.hbase</groupId>
- <artifactId>hbase</artifactId>
- <version>${hbase.version}</version>
- <type>test-jar</type>
- <scope>test</scope>
- </dependency>
- <dependency>
<groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-common</artifactId>
- <version>${hadoop.common.version}</version>
- <classifier>tests</classifier>
+ <artifactId>hadoop-test</artifactId>
+ <version>${hadoop.version}</version>
<scope>test</scope>
</dependency>
<dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-hdfs</artifactId>
- <version>${hadoop.common.version}</version>
- <classifier>tests</classifier>
+ <groupId>org.apache.hbase</groupId>
+ <artifactId>hbase</artifactId>
+ <version>${hbase.version}</version>
+ <type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
@@ -35,6 +35,8 @@
* Use it when scanning records written as {@link com.sematext.hbase.hut.HutPut}s
* TODO: needs refactoring: extract base class ResultScanner "wrapper" independent on the actual data source
*/
+// TODO: refactor to accept arbitrary {@link Result}s provider (currently
+// inheritance used to override default source)
public class HutResultScanner implements ResultScanner {
private final ResultScanner resultScanner;
private final ResultsAccessor resultsAccessor;
@@ -67,7 +69,7 @@ public void setMinRecordsToProcess(int minRecordsToProcess) {
this.minRecordsToProcess = minRecordsToProcess;
}
- void verifyInitParams(ResultScanner resultScanner, UpdateProcessor updateProcessor, HTable hTable, boolean storeProcessedUpdates) {
+ protected void verifyInitParams(ResultScanner resultScanner, UpdateProcessor updateProcessor, HTable hTable, boolean storeProcessedUpdates) {
if (resultScanner == null) {
throw new IllegalArgumentException("ResultScanner should NOT be null.");
}
@@ -147,7 +149,7 @@ protected boolean isMergeNeeded(byte[] firstKey, byte[] secondKey) {
return HutRowKeyUtil.sameOriginalKeys(firstKey, secondKey);
}
- Result fetchNext() throws IOException {
+ protected Result fetchNext() throws IOException {
return resultScanner.next(); // TODO: make sure caching is used to reduce RPC requests number
}
@@ -216,7 +218,9 @@ public void remove() {
@Override
public void close() {
- resultScanner.close();
+ if (resultScanner != null) {
+ resultScanner.close();
+ }
}
static class UpdateProcessingResultImpl implements UpdateProcessingResult {
@@ -126,7 +126,8 @@ void deleteProcessedRecords(List<byte[]> rows) throws IOException {
}
@Override
- void verifyInitParams(ResultScanner resultScanner, UpdateProcessor updateProcessor, HTable hTable, boolean storeProcessedUpdates) {
+ protected void verifyInitParams(ResultScanner resultScanner, UpdateProcessor updateProcessor,
+ HTable hTable, boolean storeProcessedUpdates) {
if (updateProcessor == null) {
throw new IllegalArgumentException("UpdateProcessor should NOT be null.");
}
@@ -141,7 +142,7 @@ public Result next() throws IOException {
return super.next();
}
- Result fetchNext() throws IOException {
+ protected Result fetchNext() throws IOException {
return fetchNextFromBuffer();
}
@@ -0,0 +1,166 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.sematext.hbase.hut.cp;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.coprocessor.Batch;
+
+import com.sematext.hbase.hut.UpdateProcessor;
+
+/**
+ * HBaseHut {@link org.apache.hadoop.hbase.client.ResultScanner} implementation
+ * that makes use of CPs to fetch the data.
+ * TODO: This is just proof-of-concept implementation, should be revised from performance to improve performance
+ * This is not a thread-safe object (TODO: explain).
+ */
+// TODO: ensure unit-tests cover multi-region case
+public class HutCpsResultScanner implements ResultScanner {
+ // Underlying map is a TreeMap (refer to implementation of HTable.coprocessorExec) - TODO: is it safe to rely on it?
+ // (values sorted by key:
+ private final Map<byte[], List<Result>> recordsFromRegions;
+
+ private boolean exhausted;
+
+ public HutCpsResultScanner(HTable hTable, Scan scan, final UpdateProcessor processor) throws Throwable {
+ recordsFromRegions = get(hTable, scan, processor);
+ exhausted = recordsFromRegions.size() <= 0;
+ }
+
+ @Override
+ public Result next() throws IOException {
+ if (exhausted) {
+ return null;
+ }
+
+ // TODO: is this check really needed?
+ if (recordsFromRegions.size() == 0) {
+ exhausted = true;
+ return null;
+ }
+
+ Collection<List<Result>> values = recordsFromRegions.values();
+ List<List<Result>> toDelete = null;
+ Result next = null;
+ for (List<Result> list : values) {
+ if (list.size() == 0) {
+ if (toDelete == null) {
+ toDelete = new ArrayList<List<Result>>();
+ }
+
+ toDelete.add(list);
+ continue;
+ }
+
+ next = list.get(0);
+ list.remove(next);
+ }
+
+ if (next == null) {
+ exhausted = true;
+ return null;
+ }
+
+ return next;
+ }
+
+ @Override
+ public Result[] next(int nbRows) throws IOException {
+ if (exhausted) {
+ return new Result[0];
+ }
+ List<Result> list = new ArrayList<Result>();
+ for (int i = 0; i < nbRows; i++) {
+ Result res = next();
+
+ if (res == null) {
+ break;
+ }
+
+ list.add(res);
+ }
+
+ return list.toArray(new Result[list.size()]);
+ }
+
+ @Override
+ public void close() {
+ // DO NOTHING
+ }
+
+ // Implementation is identical to HTable.ClientScanner.iterator()
+ @Override
+ public Iterator<Result> iterator() {
+ return new Iterator<Result>() {
+ // The next RowResult, possibly pre-read
+ Result next = null;
+
+ // return true if there is another item pending, false if there isn't.
+ // this method is where the actual advancing takes place, but you need
+ // to call next() to consume it. hasNext() will only advance if there
+ // isn't a pending next().
+ public boolean hasNext() {
+ if (next == null) {
+ try {
+ next = HutCpsResultScanner.this.next();
+ return next != null;
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ return true;
+ }
+
+ // get the pending next item and advance the iterator. returns null if
+ // there is no next item.
+ public Result next() {
+ // since hasNext() does the real advancing, we call this to determine
+ // if there is a next before proceeding.
+ if (!hasNext()) {
+ return null;
+ }
+
+ // if we get to here, then hasNext() has given us an item to return.
+ // we want to return the item and then null out the next pointer, so
+ // we use a temporary variable.
+ Result temp = next;
+ next = null;
+ return temp;
+ }
+
+ public void remove() {
+ throw new UnsupportedOperationException();
+ }
+ };
+ }
+
+ private Map<byte[], List<Result>> get(HTable hTable, final Scan scan, final UpdateProcessor processor) throws Throwable {
+ return hTable.coprocessorExec(HutReadProtocol.class, scan.getStartRow(), scan.getStopRow(),
+ new Batch.Call<HutReadProtocol, List<Result>>() {
+ public List<Result> call(HutReadProtocol instance)throws IOException {
+ return instance.get(scan, processor);
+ }
+ });
+ }
+}
Oops, something went wrong.

0 comments on commit edf18fb

Please sign in to comment.