Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

Updates to HBase code for second edition.

  • Loading branch information...
commit 32dae0187b9c1832f1705af0d586d2bed6041f28 1 parent 2e4c883
@tomwhite authored
View
35 ch13/src/main/java/HBaseStationCli.java
@@ -1,35 +1,40 @@
import java.io.IOException;
-import java.util.*;
+import java.util.HashMap;
+import java.util.Map;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HTable;
-import org.apache.hadoop.hbase.io.*;
+import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.util.*;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
public class HBaseStationCli extends Configured implements Tool {
+ static final byte [] INFO_COLUMNFAMILY = Bytes.toBytes("info");
+ static final byte [] NAME_QUALIFIER = Bytes.toBytes("name");
+ static final byte [] LOCATION_QUALIFIER = Bytes.toBytes("location");
+ static final byte [] DESCRIPTION_QUALIFIER = Bytes.toBytes("description");
public Map<String, String> getStationInfo(HTable table, String stationId)
throws IOException {
- byte[][] columns = { Bytes.toBytes("info:") };
- RowResult res = table.getRow(Bytes.toBytes(stationId), columns);
+ Get get = new Get(Bytes.toBytes(stationId));
+ get.addColumn(INFO_COLUMNFAMILY);
+ Result res = table.get(get);
if (res == null) {
return null;
}
Map<String, String> resultMap = new HashMap<String, String>();
- resultMap.put("name", getValue(res, "info:name"));
- resultMap.put("location", getValue(res, "info:location"));
- resultMap.put("description", getValue(res, "info:description"));
+ resultMap.put("name", getValue(res, INFO_COLUMNFAMILY, NAME_QUALIFIER));
+ resultMap.put("location", getValue(res, INFO_COLUMNFAMILY, LOCATION_QUALIFIER));
+ resultMap.put("description", getValue(res, INFO_COLUMNFAMILY, DESCRIPTION_QUALIFIER));
return resultMap;
}
- private static String getValue(RowResult res, String key) {
- Cell c = res.get(key.getBytes());
- if (c == null) {
- return "";
- }
- return Bytes.toString(c.getValue());
+ private static String getValue(Result res, byte [] cf, byte [] qualifier) {
+ byte [] value = res.getValue(cf, qualifier);
+ return value == null? "": Bytes.toString(value);
}
public int run(String[] args) throws IOException {
@@ -56,4 +61,4 @@ public static void main(String[] args) throws Exception {
new HBaseStationCli(), args);
System.exit(exitCode);
}
-}
+}
View
16 ch13/src/main/java/HBaseStationImporter.java
@@ -4,7 +4,7 @@
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.HTable;
-import org.apache.hadoop.hbase.io.BatchUpdate;
+import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.util.*;
@@ -23,12 +23,14 @@ public int run(String[] args) throws IOException {
Map<String, String> stationIdToNameMap = metadata.getStationIdToNameMap();
for (Map.Entry<String, String> entry : stationIdToNameMap.entrySet()) {
- byte[] rowKey = Bytes.toBytes(entry.getKey());
- BatchUpdate bu = new BatchUpdate(rowKey);
- bu.put("info:name", Bytes.toBytes(entry.getValue()));
- bu.put("info:description", Bytes.toBytes("Description..."));
- bu.put("info:location", Bytes.toBytes("Location..."));
- table.commit(bu);
+ Put put = new Put(Bytes.toBytes(entry.getKey()));
+ put.add(HBaseStationCli.INFO_COLUMNFAMILY, HBaseStationCli.NAME_QUALIFIER,
+ Bytes.toBytes(entry.getValue()));
+ put.add(HBaseStationCli.INFO_COLUMNFAMILY, HBaseStationCli.DESCRIPTION_QUALIFIER,
+ Bytes.toBytes("Description..."));
+ put.add(HBaseStationCli.INFO_COLUMNFAMILY, HBaseStationCli.LOCATION_QUALIFIER,
+ Bytes.toBytes("Location..."));
+ table.put(put);
}
return 0;
}
View
18 ch13/src/main/java/HBaseTemperatureCli.java
@@ -4,33 +4,33 @@
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.*;
-import org.apache.hadoop.hbase.client.Scanner;
-import org.apache.hadoop.hbase.io.RowResult;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.util.*;
public class HBaseTemperatureCli extends Configured implements Tool {
+ static final byte [] DATA_COLUMNFAMILY = Bytes.toBytes("data");
+ static final byte [] AIRTEMP_QUALIFIER = Bytes.toBytes("airtemp");
public NavigableMap<Long, Integer> getStationObservations(HTable table,
String stationId, long maxStamp, int maxCount) throws IOException {
- byte[][] columns = { Bytes.toBytes("data:airtemp") };
byte[] startRow = RowKeyConverter.makeObservationRowKey(stationId, maxStamp);
- RowResult res = null;
NavigableMap<Long, Integer> resultMap = new TreeMap<Long, Integer>();
- byte[] airtempColumn = Bytes.toBytes("data:airtemp");
- Scanner s = table.getScanner(columns, startRow);
+ Scan scan = new Scan(startRow);
+ scan.addColumn(DATA_COLUMNFAMILY, AIRTEMP_QUALIFIER);
+ ResultScanner scanner = table.getScanner(scan);
+ Result res = null;
int count = 0;
try {
- while ((res = s.next()) != null && count++ < maxCount) {
+ while ((res = scanner.next()) != null && count++ < maxCount) {
byte[] row = res.getRow();
- byte[] value = res.get(airtempColumn).getValue();
+ byte[] value = res.getValue(DATA_COLUMNFAMILY, AIRTEMP_QUALIFIER);
Long stamp = Long.MAX_VALUE -
Bytes.toLong(row, row.length - Bytes.SIZEOF_LONG, Bytes.SIZEOF_LONG);
Integer temp = Bytes.toInt(value);
resultMap.put(stamp, temp);
}
} finally {
- s.close();
+ scanner.close();
}
return resultMap;
}
View
16 ch13/src/main/java/HBaseTemperatureImporter.java
@@ -4,7 +4,7 @@
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.HTable;
-import org.apache.hadoop.hbase.io.BatchUpdate;
+import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapred.*;
@@ -26,9 +26,11 @@ public void map(LongWritable key, Text value,
if (parser.isValidTemperature()) {
byte[] rowKey = RowKeyConverter.makeObservationRowKey(parser.getStationId(),
parser.getObservationDate().getTime());
- BatchUpdate bu = new BatchUpdate(rowKey);
- bu.put("data:airtemp", Bytes.toBytes(parser.getAirTemperature()));
- table.commit(bu);
+ Put p = new Put(rowKey);
+ p.add(HBaseTemperatureCli.DATA_COLUMNFAMILY,
+ HBaseTemperatureCli.AIRTEMP_QUALIFIER,
+ Bytes.toBytes(parser.getAirTemperature()));
+ table.put(p);
}
}
@@ -42,6 +44,12 @@ public void configure(JobConf jc) {
throw new RuntimeException("Failed HTable construction", e);
}
}
+
+ @Override
+ public void close() throws IOException {
+ super.close();
+ table.close();
+ }
}
public int run(String[] args) throws IOException {
View
1  ch13/src/main/java/RowKeyConverter.java
@@ -15,5 +15,4 @@
Bytes.putLong(row, STATION_ID_LENGTH, reverseOrderEpoch);
return row;
}
-
}

0 comments on commit 32dae01

Please sign in to comment.
Something went wrong with that request. Please try again.