Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

Working now with bytes and not strings. As the utf-8

encoding breakes/modifies byte data stored in hbase.
The application codes (i.e. the dumbo mapper/reducer)
needs to know by itself how to interpret the data
coming from Hbase (like in java).
This is imho more natural and Python handles byte strings
very well.
  • Loading branch information...
commit 9c287d7363175c25727c6d3b9deda29ea4a557b7 1 parent f5ca9b5
@luk luk authored
View
7 README.txt
@@ -7,10 +7,9 @@ formats for using dumbo with hbase.
=== Using dumbo over HBase ===
-These assume you are storing everything in hbase as if they are strings. A nice
-addition would be to actually store the typedbytes in hbase directly. You should
-also note, the tables and families you are writing and reading to must
-already exist in hbase.
+These assume you are storing everything in hbase as if they are byte strings.
+You should also note, the tables and families you are writing and reading to
+must already exist in hbase.
The input format, will give key values to your mapper as:
(row, {family: {qualifier1: value, qualifier2 : value2}, family2: {qualifier3: value3} })
View
5 changelog.txt
@@ -1,6 +1,11 @@
#Add changelog entries here as required
~~~~~~~~~~~~~~~~
+Unreleased
+~~~~~~~~~~~~~~~~
+ - Changed from strings to byte strings
+
+~~~~~~~~~~~~~~~~
Release 0.3 (2009-07-09 16:20:00)
~~~~~~~~~~~~~~~~
- Public release
View
15 src/java/fm/last/hbase/mapred/TypedBytesTableInputFormat.java
@@ -46,6 +46,7 @@
import org.apache.hadoop.mapred.JobConfigurable;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.record.Buffer;
import org.apache.hadoop.typedbytes.TypedBytesWritable;
import org.apache.hadoop.util.StringUtils;
@@ -199,7 +200,7 @@ public boolean next(TypedBytesWritable key, TypedBytesWritable value) throws IOE
if (result != null && result.size() > 0) {
// family->qualifier->value
- Map<String, Map<String, String>> columns = new HashMap<String, Map<String, String>>();
+ Map<Buffer, Map<Buffer, Buffer>> columns = new HashMap<Buffer, Map<Buffer, Buffer>>();
for (KeyValue kv : result.list()) {
byte[] family = kv.getFamily();
@@ -209,16 +210,16 @@ public boolean next(TypedBytesWritable key, TypedBytesWritable value) throws IOE
continue;
}
- String familyStr = Bytes.toString(family);
- Map<String, String> column = columns.get(familyStr);
+ Buffer family_bfr = new Buffer(family);
+ Map<Buffer, Buffer> column = columns.get(family_bfr);
if (column == null) {
- column = new HashMap<String, String>();
+ column = new HashMap<Buffer, Buffer>();
}
- column.put(Bytes.toString(qualifier), Bytes.toString(columnValue));
- columns.put(familyStr, column);
+ column.put(new Buffer(qualifier), new Buffer(columnValue));
+ columns.put(family_bfr, column);
}
- key.setValue(Bytes.toString(result.getRow()));
+ key.setValue(new Buffer(result.getRow()));
lastRow = result.getRow();
value.setValue(columns);
return true;
View
26 src/java/fm/last/hbase/mapred/TypedBytesTableOutputFormat.java
@@ -24,13 +24,13 @@
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
-import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.mapred.FileAlreadyExistsException;
+import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.InvalidJobConfException;
import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.RecordWriter;
import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.record.Buffer;
import org.apache.hadoop.typedbytes.TypedBytesWritable;
import org.apache.hadoop.util.Progressable;
@@ -72,35 +72,43 @@ public void write(TypedBytesWritable key, TypedBytesWritable value) throws IOExc
return;
}
try {
- put = new Put(Bytes.toBytes((String) key.getValue()));
+ put = new Put(key.getBytes());
} catch (Exception e) {
- throw new IOException("expecting key of type String", e);
+ throw new IOException("expecting key of type byte[]", e);
}
try {
Map columns = (Map) value.getValue();
for (Object famObj : columns.keySet()) {
- String family = (String) famObj;
+ byte[] family = getBytesFromBuffer((Buffer) famObj);
Object qualifierCellValueObj = columns.get(family);
if (qualifierCellValueObj == null) {
continue;
}
Map qualifierCellValue = (Map) qualifierCellValueObj;
for (Object qualifierObj : qualifierCellValue.keySet()) {
- String qualifier = (String) qualifierObj;
+ byte[] qualifier = getBytesFromBuffer((Buffer) qualifierObj);
Object cellValueObj = qualifierCellValue.get(qualifier);
if (cellValueObj == null) {
continue;
}
- String cellValue = (String) cellValueObj;
- put.add(Bytes.toBytes(family), Bytes.toBytes(qualifier), Bytes.toBytes(cellValue));
+ byte[] cellValue = getBytesFromBuffer((Buffer) cellValueObj);
+ put.add(family, qualifier, cellValue);
}
}
} catch (Exception e) {
- throw new IOException("couldn't get column values, expecting Map<String, Map<String, String>>", e);
+ throw new IOException("couldn't get column values, expecting Map<Buffer, Map<Buffer, Buffer>>", e);
}
m_table.put(put);
}
+
+
+ private byte[] getBytesFromBuffer(final Buffer buffer){
+ final int count = buffer.getCount();
+ byte[] bytes = new byte[count];
+ System.arraycopy(buffer.get(), 0, bytes, 0, count);
+ return bytes;
+ }
}
@Override
Please sign in to comment.
Something went wrong with that request. Please try again.