Skip to content
Browse files

Changed scans code to use list instead of map in MultiScan classes

  • Loading branch information...
1 parent b9bac0a commit ac3704dc4cf57c9673c3f4918ac6da944f23cff2 @xstevens xstevens committed Jan 17, 2012
View
3 src/main/java/com/mozilla/hadoop/fs/SequenceFileDirectoryReader.java
@@ -90,8 +90,7 @@ public void close() {
public boolean next(Writable k, Writable v) throws IOException {
if (curReader == null) {
- boolean success = nextReader();
- if (!success) {
+ if (!nextReader()) {
return false;
}
}
View
21 src/main/java/com/mozilla/hadoop/hbase/mapreduce/MultiScanTableMapReduceUtil.java
@@ -28,7 +28,7 @@
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Calendar;
-import java.util.Map;
+import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -42,6 +42,7 @@
import org.apache.hadoop.mapreduce.Job;
import com.mozilla.util.DateUtil;
+import com.mozilla.util.Pair;
public class MultiScanTableMapReduceUtil {
@@ -127,7 +128,7 @@ public static String convertScanArrayToString(final Scan[] scans) throws IOExcep
* @param endDate
* @return
*/
- public static Scan[] generateScans(Calendar startCal, Calendar endCal, Map<byte[], byte[]> columns, int caching, boolean cacheBlocks) {
+ public static Scan[] generateScans(Calendar startCal, Calendar endCal, List<Pair<String,String>> columns, int caching, boolean cacheBlocks) {
return generateHexPrefixScans(startCal, endCal, "yyMMdd", columns, caching, cacheBlocks);
}
@@ -141,7 +142,7 @@ public static String convertScanArrayToString(final Scan[] scans) throws IOExcep
* @param cacheBlocks
* @return
*/
- public static Scan[] generateHexPrefixScans(Calendar startCal, Calendar endCal, String dateFormat, Map<byte[], byte[]> columns, int caching, boolean cacheBlocks) {
+ public static Scan[] generateHexPrefixScans(Calendar startCal, Calendar endCal, String dateFormat, List<Pair<String,String>> columns, int caching, boolean cacheBlocks) {
ArrayList<Scan> scans = new ArrayList<Scan>();
String[] salts = new String[16];
for (int i=0; i < 16; i++) {
@@ -160,8 +161,8 @@ public static String convertScanArrayToString(final Scan[] scans) throws IOExcep
s.setCacheBlocks(cacheBlocks);
// add columns
- for (Map.Entry<byte[], byte[]> col : columns.entrySet()) {
- s.addColumn(col.getKey(), col.getValue());
+ for (Pair<String,String> pair : columns) {
+ s.addColumn(pair.getFirst().getBytes(), pair.getSecond().getBytes());
}
s.setStartRow(Bytes.toBytes(salts[i] + String.format("%06d", d)));
@@ -190,7 +191,7 @@ public static String convertScanArrayToString(final Scan[] scans) throws IOExcep
* @param cacheBlocks
* @return
*/
- public static Scan[] generateBytePrefixScans(Calendar startCal, Calendar endCal, String dateFormat, Map<byte[], byte[]> columns, int caching, boolean cacheBlocks) {
+ public static Scan[] generateBytePrefixScans(Calendar startCal, Calendar endCal, String dateFormat, List<Pair<String,String>> columns, int caching, boolean cacheBlocks) {
return generateBytePrefixScans(startCal, endCal, dateFormat, columns, caching, cacheBlocks, 1);
}
@@ -205,7 +206,7 @@ public static String convertScanArrayToString(final Scan[] scans) throws IOExcep
* @param batch
* @return
*/
- public static Scan[] generateBytePrefixScans(Calendar startCal, Calendar endCal, String dateFormat, Map<byte[], byte[]> columns, int caching, boolean cacheBlocks, int batch) {
+ public static Scan[] generateBytePrefixScans(Calendar startCal, Calendar endCal, String dateFormat, List<Pair<String,String>> columns, int caching, boolean cacheBlocks, int batch) {
ArrayList<Scan> scans = new ArrayList<Scan>();
SimpleDateFormat rowsdf = new SimpleDateFormat(dateFormat);
@@ -223,9 +224,9 @@ public static String convertScanArrayToString(final Scan[] scans) throws IOExcep
s.setBatch(batch);
}
// add columns
- for (Map.Entry<byte[], byte[]> col : columns.entrySet()) {
- s.addColumn(col.getKey(), col.getValue());
- }
+ for (Pair<String,String> pair : columns) {
+ s.addColumn(pair.getFirst().getBytes(), pair.getSecond().getBytes());
+ }
temp[0] = b;
s.setStartRow(Bytes.add(temp , Bytes.toBytes(String.format("%06d", d))));
View
2 src/main/java/com/mozilla/hadoop/riak/RiakExportToHDFS2.java
@@ -65,7 +65,7 @@
*
* This should be invoked like so:
*
- * com.mozilla.hadoop.riak.RiakExportToHDFS
+ * com.mozilla.hadoop.riak.RiakExportToHDFS2
* -Dmapred.map.tasks=<number>
* -Dmapred.reduce.tasks=<number>
* -Driak.servers=http://yourserver:8098/riak
View
20 src/main/java/com/mozilla/pig/eval/BytesSize.java
@@ -23,16 +23,32 @@
import java.io.IOException;
import org.apache.pig.EvalFunc;
+import org.apache.pig.data.DataByteArray;
+import org.apache.pig.data.DataType;
import org.apache.pig.data.Tuple;
public class BytesSize extends EvalFunc<Long> {
public Long exec(Tuple input) throws IOException {
- if (input == null || input.size() == 0) {
+ if (input == null || input.size() == 0 || input.isNull()) {
return 0L;
}
- return input.getMemorySize();
+ long bytesSize = 0L;
+ switch(input.getType(0)) {
+ case DataType.BYTEARRAY:
+ DataByteArray dba = (DataByteArray)input.get(0);
+ bytesSize = dba.size();
+ break;
+ case DataType.CHARARRAY:
+ String str = (String)input.get(0);
+ bytesSize = str.getBytes().length;
+ break;
+ default:
+ break;
+ }
+
+ return bytesSize;
}
}
View
2 src/main/java/com/mozilla/pig/eval/date/FormatDate.java
@@ -37,7 +37,7 @@ public String exec(Tuple input) throws IOException {
SimpleDateFormat outputSdf = new SimpleDateFormat((String)input.get(0));
Calendar cal = Calendar.getInstance();
- cal.setTimeInMillis((Long)input.get(1));
+ cal.setTimeInMillis(((Number)input.get(1)).longValue());
return outputSdf.format(cal.getTime());
}
View
4 src/main/java/com/mozilla/pig/eval/json/JsonMap.java
@@ -100,9 +100,9 @@ private DataBag convertListToBag(List<Object> l) {
Map<String,Object> values = jsonMapper.readValue((String)input.get(0), new TypeReference<Map<String,Object>>() { });
return makeSafe(values);
} catch(JsonParseException e) {
- pigLogger.warn(this, "JSON Parse Error", ERRORS.JSONParseError);
+ pigLogger.warn(this, "JSON Parse Error: " + e.getMessage(), ERRORS.JSONParseError);
} catch(JsonMappingException e) {
- pigLogger.warn(this, "JSON Mapping Error", ERRORS.JSONMappingError);
+ pigLogger.warn(this, "JSON Mapping Error: " + e.getMessage(), ERRORS.JSONMappingError);
} catch(EOFException e) {
pigLogger.warn(this, "Hit EOF unexpectedly", ERRORS.EOFError);
}
View
16 src/main/java/com/mozilla/pig/load/HBaseMultiScanLoader.java
@@ -22,9 +22,9 @@
import java.io.IOException;
import java.text.ParseException;
import java.text.SimpleDateFormat;
+import java.util.ArrayList;
import java.util.Calendar;
-import java.util.HashMap;
-import java.util.Map;
+import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -34,7 +34,6 @@
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.RecordReader;
@@ -46,6 +45,7 @@
import com.mozilla.hadoop.hbase.mapreduce.MultiScanTableInputFormat;
import com.mozilla.hadoop.hbase.mapreduce.MultiScanTableMapReduceUtil;
+import com.mozilla.util.Pair;
/**
* A Pig 0.7+ loader for HBase tables using MultiScanTableInputFormat
@@ -55,7 +55,7 @@
private Configuration conf = new Configuration();
private RecordReader<ImmutableBytesWritable,Result> reader;
private Scan[] scans;
- private Map<byte[], byte[]> columns = new HashMap<byte[], byte[]>(); // family:qualifier
+ private List<Pair<String,String>> columns = new ArrayList<Pair<String,String>>(); // family:qualifier
private static final Log LOG = LogFactory.getLog(HBaseMultiScanLoader.class);
@@ -95,9 +95,9 @@ public HBaseMultiScanLoader(String startDate, String stopDate, String dateFormat
LOG.debug("Adding column to map: " + colPairs[i]);
}
if (familyQualifier.length == 2) {
- columns.put(Bytes.toBytes(familyQualifier[0]), Bytes.toBytes(familyQualifier[1]));
+ columns.add(new Pair<String,String>(familyQualifier[0], familyQualifier[1]));
} else {
- columns.put(Bytes.toBytes(familyQualifier[0]), Bytes.toBytes(""));
+ columns.add(new Pair<String,String>(familyQualifier[0], ""));
}
}
@@ -132,8 +132,8 @@ public Tuple getNext() throws IOException {
Tuple tuple = TupleFactory.getInstance().newTuple(columns.size()+1);
tuple.set(0, new String(rowKey.get()));
int i = 1;
- for (Map.Entry<byte[], byte[]> entry : columns.entrySet()) {
- byte[] v = result.getValue(entry.getKey(), entry.getValue());
+ for (Pair<String,String> pair : columns) {
+ byte[] v = result.getValue(pair.getFirst().getBytes(), pair.getSecond().getBytes());
if (v != null) {
tuple.set(i, new DataByteArray(v));
}

0 comments on commit ac3704d

Please sign in to comment.
Something went wrong with that request. Please try again.