Skip to content
Browse files

fix NPE and wordcount results

  • Loading branch information...
1 parent 1aefc32 commit 89a1c4e715b21baf2131544c032950b02d7ae93f @tjake committed Jan 25, 2012
View
0 examples/hadoop_word_count/bin/word_count_counters 100644 → 100755
File mode changed.
View
16 examples/hadoop_word_count/src/WordCount.java
@@ -20,7 +20,6 @@
import java.nio.ByteBuffer;
import java.util.*;
-import org.apache.cassandra.cql3.CFDefinition;
import org.apache.cassandra.thrift.*;
import org.apache.cassandra.hadoop.ColumnFamilyOutputFormat;
import org.slf4j.Logger;
@@ -80,15 +79,21 @@ public static void main(String[] args) throws Exception
protected void setup(org.apache.hadoop.mapreduce.Mapper.Context context)
throws IOException, InterruptedException
{
- sourceColumn = ByteBufferUtil.bytes(context.getConfiguration().get(CONF_COLUMN_NAME));
}
public void map(ByteBuffer key, SortedMap<ByteBuffer, IColumn> columns, Context context) throws IOException, InterruptedException
{
for (IColumn column : columns.values())
{
- String value = ByteBufferUtil.string(column.value());
- logger.debug("read " + key + ":" + value + " from " + context.getInputSplit());
+ String name = ByteBufferUtil.string(column.name());
+ String value = null;
+
+ if (name.contains("int"))
+ value = String.valueOf(ByteBufferUtil.toInt(column.value()));
+ else
+ value = ByteBufferUtil.string(column.value());
+
+ System.err.println("read " + ByteBufferUtil.string(key) + ":" +name + ":" + value + " from " + context.getInputSplit());
StringTokenizer itr = new StringTokenizer(value);
while (itr.hasMoreTokens())
@@ -185,6 +190,7 @@ public int run(String[] args) throws Exception
job.setOutputFormatClass(ColumnFamilyOutputFormat.class);
ConfigHelper.setOutputColumnFamily(job.getConfiguration(), KEYSPACE, OUTPUT_COLUMN_FAMILY);
+ job.getConfiguration().set(CONF_COLUMN_NAME, "sum");
}
job.setInputFormatClass(ColumnFamilyInputFormat.class);
@@ -205,7 +211,7 @@ public int run(String[] args) throws Exception
if (i == 5)
{
// this will cause the predicate to be ignored in favor of scanning everything as a wide row
- ConfigHelper.setInputColumnFamily(job.getConfiguration(), KEYSPACE, COLUMN_FAMILY, CFDefinition.Kind.DYNAMIC);
+ ConfigHelper.setInputColumnFamily(job.getConfiguration(), KEYSPACE, COLUMN_FAMILY, true);
}
ConfigHelper.setOutputInitialAddress(job.getConfiguration(), "localhost");
View
10 src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java
@@ -453,9 +453,15 @@ private void maybeInit()
{
rows = client.get_paged_slice(cfName, keyRange, startColumn, consistencyLevel);
+ // nothing found?
+ if (rows == null || rows.isEmpty() || rows.get(0).columns.isEmpty())
+ {
+ rows = null;
+ return;
+ }
+
// nothing new? reached the end
- if (rows.get(0).columns.isEmpty()
- || (rows.get(0).key.equals(lastRow.key) && rows.get(0).columns.get(0).column.equals(startColumn)))
+ if (lastRow != null && (rows.get(0).key.equals(lastRow.key) || rows.get(0).columns.get(0).column.equals(startColumn)))
{
rows = null;
return;

0 comments on commit 89a1c4e

Please sign in to comment.
Something went wrong with that request. Please try again.