Permalink
Browse files

ApplyQuantile should take a tuple as first param, where the first ele…

…ment is the value to convert. Otherwise there is no identifying info that can be attached to it.
  • Loading branch information...
matthayes committed Apr 20, 2012
1 parent e7dd686 commit 8906856b163115c650fd993ed13d2f0f3e29dbe1
@@ -7,13 +7,18 @@
import org.apache.pig.data.DataBag;
import org.apache.pig.data.DataType;
import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
import org.apache.pig.impl.logicalLayer.FrontendException;
import org.apache.pig.impl.logicalLayer.schema.Schema;
+import org.apache.pig.impl.logicalLayer.schema.Schema.FieldSchema;
import datafu.pig.util.SimpleEvalFunc;
/**
* Given a set of pre-computed quantiles, converts a value to the quantile it belongs to.
+ * It accepts two parameters. The first is a tuple, where the first element is the value to
+ * convert. The second is a tuple of computed quantiles. It returns the first tuple, with
+ * the value to convert being replaced by the quantile it belongs to.
* <p>
* Quantiles can be computed with either Quantile or StreamingQuantile.
* </p>
@@ -23,7 +28,7 @@
* @see Quantile
* @see StreamingQuantile
*/
-public class ApplyQuantile extends SimpleEvalFunc<Double>
+public class ApplyQuantile extends SimpleEvalFunc<Tuple>
{
private List<Double> quantiles;
@@ -32,14 +37,18 @@ public ApplyQuantile(String... k)
this.quantiles = QuantileUtil.getQuantilesFromParams(k);
}
- public Double call(Double value, Tuple quantilesComputed) throws IOException
+ public Tuple call(Tuple value, Tuple quantilesComputed) throws IOException
{
if (quantilesComputed.size() != quantiles.size())
{
throw new IOException("Expected computed quantiles to have size " + quantiles.size()
+ " but found quantiles with size " + quantilesComputed.size());
}
- return findQuantile(value,quantilesComputed);
+
+ Double quantileValue = findQuantile((Double)value.get(0),quantilesComputed);
+ value.set(0, quantileValue);
+
+ return value;
}
private Double findQuantile(Double value, Tuple quantilesComputed) throws ExecException
@@ -76,4 +85,28 @@ private Double findQuantile(Double value, Tuple quantilesComputed) throws ExecEx
}
}
}
+
+ @Override
+ public Schema outputSchema(Schema input)
+ {
+ try
+ {
+ FieldSchema fieldSchema = input.getField(0);
+ if (fieldSchema.type != DataType.TUPLE)
+ {
+ throw new RuntimeException("Expected a tuple");
+ }
+
+ if (fieldSchema.schema.getField(0).type != DataType.DOUBLE)
+ {
+ throw new RuntimeException("Expected a double");
+ }
+
+ return fieldSchema.schema;
+ }
+ catch (FrontendException e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
}
@@ -40,14 +40,14 @@ public void applyQuantilesTest() throws Exception {
String[] input1 = {"1","2","3","4","10","5","6","7","8","9"};
writeLinesToFile("input1", input1);
- String[] input2 = {"0.9", "1.0", "1.1","2.0","3.0","4.0","5.0","5.49", "5.5", "5.51", "6.0","7.0","8.0","9.0","9.99","10.0","10.1"};
+ String[] input2 = {"0.9\t1", "1.0\t2", "1.1\t3","2.0\t4","3.0\t5","4.0\t6","5.0\t7","5.49\t8", "5.5\t9", "5.51\t10", "6.0\t11","7.0\t12","8.0\t13","9.0\t14","9.99\t15","10.0\t16","10.1\t17"};
writeLinesToFile("input2", input2);
test.runScript();
List<Tuple> output = getLinesForAlias(test, "test_data", true);
- String[] expected = {"(0.0)", "(0.0)", "(0.0)", "(0.0)", "(0.25)", "(0.25)", "(0.25)", "(0.25)", "(0.5)", "(0.5)", "(0.5)", "(0.5)", "(0.75)", "(0.75)", "(0.75)", "(1.0)", "(1.0)"};
+ String[] expected = {"(0.0,1)", "(0.0,2)", "(0.0,3)", "(0.0,4)", "(0.25,5)", "(0.25,6)", "(0.25,7)", "(0.25,8)", "(0.5,9)", "(0.5,10)", "(0.5,11)", "(0.5,12)", "(0.75,13)", "(0.75,14)", "(0.75,15)", "(1.0,16)", "(1.0,17)"};
assertEquals(output.size(),expected.length);
for (int i=0; i<expected.length; i++)
@@ -14,10 +14,12 @@ quantiles = FOREACH data_grouped {
/*describe data_out;*/
-test_data = LOAD 'input2' as (val:double);
+test_data = LOAD 'input2' as (val:double,id:int);
+
+test_data = FOREACH test_data GENERATE TOTUPLE(val,id) as vals;
test_data = CROSS test_data, quantiles;
-test_data = FOREACH test_data GENERATE ApplyQuantile((double)$0,$1);
+test_data = FOREACH test_data GENERATE FLATTEN(ApplyQuantile($0,$1)) as (qval,id);
STORE test_data INTO 'output';

0 comments on commit 8906856

Please sign in to comment.