Permalink
Browse files

SHDP-109

remove use of HTable in favor of HTableInterface
  • Loading branch information...
Costin Leau
Costin Leau committed Nov 12, 2012
1 parent 1ef5ec2 commit ee0aad4af7c4e3cd4b7abba82521ef27270603de
View
@@ -8,10 +8,10 @@ slf4jVersion = 1.6.6
springVersion = 3.0.7.RELEASE
springBatchVersion = 2.1.9.RELEASE
springIntVersion = 2.1.2.RELEASE
-hadoopVersion = 1.1.0
+hadoopVersion = 1.0.4
hiveVersion = 0.9.0
pigVersion = 0.10.0
-hbaseVersion = 0.92.1
+hbaseVersion = 0.94.1
jacksonVersion = 1.8.8
commonsioVersion = 2.1
cglibVersion = 2.2.2
@@ -53,7 +53,7 @@ public Object invoke(MethodInvocation methodInvocation) throws Throwable {
for (String tableName : tableNames) {
if (!HbaseSynchronizationManager.hasResource(tableName)) {
boundTables.add(tableName);
- HTable table = HbaseUtils.getHTable(getTableFactory(), getCharset(), getConfiguration(), tableName);
+ HTableInterface table = HbaseUtils.getHTable(getTableFactory(), getCharset(), getConfiguration(), tableName);
HbaseSynchronizationManager.bindResource(tableName, table);
}
}
@@ -21,6 +21,7 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
@@ -50,7 +51,7 @@ public HbaseTemplate(Configuration configuration) {
Assert.notNull(action, "Callback object must not be null");
Assert.notNull(tableName, "No table specified");
- HTable table = getTable(tableName);
+ HTableInterface table = getTable(tableName);
try {
boolean previousFlushSetting = applyFlushSetting(table);
@@ -70,26 +71,34 @@ public HbaseTemplate(Configuration configuration) {
}
}
- private HTable getTable(String tableName) {
+ private HTableInterface getTable(String tableName) {
return HbaseUtils.getHTable(getTableFactory(), getCharset(), getConfiguration(), tableName);
}
- private void releaseTable(String tableName, HTable table) {
+ private void releaseTable(String tableName, HTableInterface table) {
HbaseUtils.releaseTable(tableName, table);
}
- private boolean applyFlushSetting(HTable table) {
+ private boolean applyFlushSetting(HTableInterface table) {
boolean autoFlush = table.isAutoFlush();
- table.setAutoFlush(this.autoFlush);
+ if (table instanceof HTable) {
+ ((HTable) table).setAutoFlush(this.autoFlush);
+ }
return autoFlush;
}
- private void flushIfNecessary(HTable table, boolean oldFlush) throws IOException {
+ private void restoreFlushSettings(HTableInterface table, boolean oldFlush) {
+ if (table instanceof HTable) {
+ if (table.isAutoFlush() != oldFlush) {
+ ((HTable) table).setAutoFlush(oldFlush);
+ }
+ }
+ }
+
+ private void flushIfNecessary(HTableInterface table, boolean oldFlush) throws IOException {
// TODO: check whether we can consider or not a table scope
table.flushCommits();
- if (table.isAutoFlush() != oldFlush) {
- table.setAutoFlush(oldFlush);
- }
+ restoreFlushSettings(table, oldFlush);
}
public DataAccessException convertHbaseAccessException(Exception ex) {
@@ -114,7 +123,7 @@ public DataAccessException convertHbaseAccessException(Exception ex) {
public <T> T find(String tableName, final Scan scan, final ResultsExtractor<T> action) {
return execute(tableName, new TableCallback<T>() {
@Override
- public T doInTable(HTable htable) throws Throwable {
+ public T doInTable(HTableInterface htable) throws Throwable {
ResultScanner scanner = htable.getScanner(scan);
try {
return action.extractData(scanner);
@@ -158,7 +167,7 @@ public T doInTable(HTable htable) throws Throwable {
public <T> T get(String tableName, final String rowName, final String familyName, final String qualifier, final RowMapper<T> mapper) {
return execute(tableName, new TableCallback<T>() {
@Override
- public T doInTable(HTable htable) throws Throwable {
+ public T doInTable(HTableInterface htable) throws Throwable {
Get get = new Get(rowName.getBytes(getCharset()));
if (familyName != null) {
byte[] family = familyName.getBytes(getCharset());
@@ -184,5 +193,4 @@ public T doInTable(HTable htable) throws Throwable {
public void setAutoFlush(boolean autoFlush) {
this.autoFlush = autoFlush;
}
-
}
@@ -23,7 +23,6 @@
import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.hadoop.hbase.client.HTableInterfaceFactory;
import org.springframework.dao.DataAccessException;
-import org.springframework.util.Assert;
import org.springframework.util.StringUtils;
/**
@@ -50,7 +49,7 @@ public static DataAccessException convertHbaseException(Exception ex) {
* @param tableName table name
* @return table instance
*/
- public static HTable getHTable(Configuration configuration, String tableName) {
+ public static HTableInterface getHTable(Configuration configuration, String tableName) {
return getHTable(null, getCharset(null), configuration, tableName);
}
@@ -63,17 +62,15 @@ public static HTable getHTable(Configuration configuration, String tableName) {
* @param tableName table name
* @return table instance
*/
- public static HTable getHTable(HTableInterfaceFactory tableFactory, Charset charset, Configuration configuration, String tableName) {
+ public static HTableInterface getHTable(HTableInterfaceFactory tableFactory, Charset charset, Configuration configuration, String tableName) {
if (HbaseSynchronizationManager.hasResource(tableName)) {
return (HTable) HbaseSynchronizationManager.getResource(tableName);
}
- HTable t = null;
+ HTableInterface t = null;
try {
if (tableFactory != null) {
- HTableInterface table = tableFactory.createHTableInterface(configuration, tableName.getBytes(charset));
- Assert.isInstanceOf(HTable.class, table, "The table factory needs to create HTable instances");
- t = (HTable) table;
+ t = tableFactory.createHTableInterface(configuration, tableName.getBytes(charset));
}
else {
t = new HTable(configuration, tableName.getBytes(charset));
@@ -15,7 +15,7 @@
*/
package org.springframework.data.hadoop.hbase;
-import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.HTableInterface;
/**
* Callback interface for Hbase code. To be used with {@link HbaseTemplate}'s execution methods, often as anonymous classes within a method implementation without
@@ -32,5 +32,5 @@
* @return a result object, or null if none
* @throws Throwable thrown by the Hbase API
*/
- T doInTable(HTable table) throws Throwable;
+ T doInTable(HTableInterface table) throws Throwable;
}
@@ -23,6 +23,7 @@
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
@@ -111,11 +112,11 @@ public void testHBaseConnection() throws Exception {
public void testTemplate() throws Exception {
assertTrue(HbaseSynchronizationManager.getTableNames().isEmpty());
- final HTable t = HbaseUtils.getHTable(config, tableName);
+ final HTableInterface t = HbaseUtils.getHTable(config, tableName);
template.execute(tableName, new TableCallback<Object>() {
@Override
- public Object doInTable(HTable table) throws Throwable {
+ public Object doInTable(HTableInterface table) throws Throwable {
assertSame(t, table);
Put p = new Put(Bytes.toBytes(rowName));
p.add(Bytes.toBytes(columnName), Bytes.toBytes(qualifier), Bytes.toBytes(value));
@@ -33,13 +33,13 @@
<!-- default id is 'hadoopConfiguration' / -->
<!-- properties-location="s3.properties" -->
<!-- -->
- <hdp:configuration register-url-handler="false" >
+ <hdp:configuration register-url-handler="false">
fs.default.name=${hd.fs}
- mapred.job.tracker=local
+ mapred.job.tracker=${jt}
cfg=main
</hdp:configuration>
- <hdp:file-system user="hadoop" />
+ <hdp:file-system />
<!--
<bean id="fs-config" class="org.springframework.data.hadoop.configuration.ConfigurationFactoryBean">
@@ -4,27 +4,22 @@
# Amazon EMR
#hive.port=10003
-#hd.fs=s3n://work-emr/tmp/
+hd.fs=s3n://work-emr/tmp
#jt=localhost:20001
-#hd.fs=hdfs://ec2-54-242-45-228.compute-1.amazonaws.com:9000/
-#jt=ec2-54-242-45-228.compute-1.amazonaws.com:9001
-#jt=54.242.45.228:9001
-#jt=localhost:20001
-
+# jt=10.80.205.79:9001
# Apache Whirr - EC2
#hd.fs=hdfs://xxx.amazonaws.com:8020
#jt=xxx.amazonaws.com:8021
# Default - Vanilla Installs
hd.fs=hdfs://localhost:9000
-jt=localhost:9001
-#jt=
+jt=local
hive.host=localhost
hive.port=12345
hive.url=jdbc:hive://${hive.host}:${hive.port}
-hd.host=localhost
+#hd.host=localhost
#path.cat=bin${file.separator}stream-bin${file.separator}cat
#path.wc=bin${file.separator}stream-bin${file.separator}wc

0 comments on commit ee0aad4

Please sign in to comment.