Permalink
Browse files

Adding some new UDFs and revamping old ones

  • Loading branch information...
xstevens committed Mar 13, 2012
1 parent ff91a53 commit d23603dc39332238025aa557453f9319fd72d4b9
View
@@ -89,7 +89,7 @@
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
- <version>r06</version>
+ <version>11.0.2</version>
</dependency>
<!-- Jackson JSON Processor -->
@@ -371,7 +371,7 @@
</exclusion>
</exclusions>
</dependency>
-
+
</dependencies>
<build>
@@ -0,0 +1,91 @@
+package com.mozilla.pig.eval;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+import org.apache.pig.EvalFunc;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.DataType;
+import org.apache.pig.data.Tuple;
+
+import com.google.common.hash.BloomFilter;
+import com.google.common.hash.Funnels;
+
+public class BloomFilterDistinctCount extends EvalFunc<Integer> {
+
+ private int n;
+// private int k;
+// private int m;
+ private double p;
+
+ public BloomFilterDistinctCount(String n, String p) {
+ this.n = Integer.parseInt(n);
+ this.p = Double.parseDouble(p);
+// int m = (int)Math.ceil((n * Math.log(p)) / Math.log(1.0 / (Math.pow(2.0, Math.log(2.0)))));
+// k = (int)Math.round(Math.log(2.0) * m / n);
+
+ }
+
+ @Override
+ public Integer exec(Tuple input) throws IOException {
+ if (input.size() != 1) {
+ throw new RuntimeException("Expected input to have only a single field");
+ }
+ if (input.getType(0) != DataType.BAG) {
+ throw new RuntimeException("Expected a BAG as input");
+ }
+
+ // guava bloom
+ BloomFilter<CharSequence> filter = BloomFilter.create(Funnels.stringFunnel(), n, p);
+ // hadoop bloom
+ //BloomFilter filter = new BloomFilter(m, k, Hash.MURMUR_HASH);
+ int uniq = 0;
+
+ DataBag db = (DataBag) input.get(0);
+ for (Iterator<Tuple> iter = db.iterator(); iter.hasNext();) {
+ Tuple t = iter.next();
+ if (!filter.mightContain((String)t.get(0))) {
+ filter.put((String)t.get(0));
+ //filter.add(t);
+ uniq++;
+ }
+ }
+
+ return uniq;
+ }
+
+// public static void main(String[] args) {
+// BloomFilter<CharSequence> filter = BloomFilter.create(Funnels.stringFunnel(), 10000, 0.000001d);
+// Set<String> added = new HashSet<String>();
+// Set<String> notAdded = new HashSet<String>();
+// int uniq = 0;
+// int n = 20000;
+// for (int i=0; i < n; i++) {
+// String id = UUID.randomUUID().toString();
+// if (!filter.mightContain(id)) {
+// filter.put(id.toString());
+// uniq++;
+// added.add(id);
+// } else {
+// notAdded.add(id);
+// }
+// }
+//
+// for (int i=0; i < n; i++) {
+// notAdded.add(UUID.randomUUID().toString());
+// }
+//
+// System.out.println(String.format("uniq[%d] added.size[%d] notAdded.size[%d]", uniq, added.size(), notAdded.size()));
+//
+// for (String a : added) {
+// if (!filter.mightContain(a)) {
+// System.out.println("filter thinks it does not contain: " + a);
+// }
+// }
+// for (String na : notAdded) {
+// if (filter.mightContain(na)) {
+// System.out.println("filter thinks it contains: " + na);
+// }
+// }
+// }
+}
@@ -0,0 +1,47 @@
+/*
+ * Copyright 2011 Mozilla Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.mozilla.pig.eval;
+
+import java.io.IOException;
+
+import org.apache.pig.EvalFunc;
+import org.apache.pig.data.Tuple;
+
+/**
+ * Converts any encountered null value to the specified fixed string value.
+ */
+public class ConvertNull extends EvalFunc<String> {
+
+ private String value;
+
+ public ConvertNull(String value) {
+ this.value = value;
+ }
+
+ @Override
+ public String exec(Tuple input) throws IOException {
+ if (input == null || input.size() == 0) {
+ return value;
+ }
+
+ return (input.get(0) == null ? value : (String)input.get(0));
+ }
+
+}
@@ -32,7 +32,7 @@
@SuppressWarnings("rawtypes")
public Long exec(Tuple input) throws IOException {
if (input == null) {
- return null;
+ return 0L;
}
long n = 0;
@@ -0,0 +1,62 @@
+/**
+ * Copyright 2012 Mozilla Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.mozilla.pig.eval;
+
+import java.io.IOException;
+
+import org.apache.pig.EvalFunc;
+import org.apache.pig.PigWarning;
+import org.apache.pig.data.Tuple;
+
+/**
+ * Basically the same as the builtin SUBSTRING except you can optinally leave off the third
+ * argument and it will return beginIndex to the end of the string.
+ */
+public class Substring extends EvalFunc<String> {
+
+ public static enum ERRORS { NULL_SOURCE, OUT_OF_BOUNDS };
+
+ @Override
+ public String exec(Tuple input) throws IOException {
+ if (input == null || input.size() < 2) {
+ warn("invalid number of arguments to Substring", PigWarning.UDF_WARNING_1);
+ return null;
+ }
+ try {
+ String source = (String)input.get(0);
+ if (source == null) {
+ warn("Source was null", ERRORS.NULL_SOURCE);
+ return null;
+ }
+ Integer beginindex = (Integer)input.get(1);
+ // third arg is optional
+ if (input.size() == 3) {
+ Integer endindex = (Integer)input.get(2);
+ return source.substring(beginindex, Math.min(source.length(), endindex));
+ } else {
+ return source.substring(beginindex);
+ }
+ } catch (StringIndexOutOfBoundsException e) {
+ warn(e.toString(), ERRORS.OUT_OF_BOUNDS);
+ return null;
+ }
+ }
+
+}
@@ -32,17 +32,23 @@
public static enum ERRORS { DateParseError };
+ private SimpleDateFormat inputSdf;
+ private SimpleDateFormat outputSdf;
+
+ public ConvertDateFormat(String inputDateFormat, String outputDateFormat) {
+ this.inputSdf = new SimpleDateFormat(inputDateFormat);
+ this.outputSdf = new SimpleDateFormat(outputDateFormat);
+ }
+
@Override
public String exec(Tuple input) throws IOException {
- if (input == null || input.size() < 3) {
+ if (input == null || input.size() == 0) {
return null;
}
-
- SimpleDateFormat inputSdf = new SimpleDateFormat((String)input.get(0));
- SimpleDateFormat outputSdf = new SimpleDateFormat((String)input.get(1));
+
String s = null;
try {
- Date d = inputSdf.parse((String)input.get(2));
+ Date d = inputSdf.parse((String)input.get(0));
s = outputSdf.format(d);
} catch (ParseException e) {
pigLogger.warn(this, "Date parsing error", ERRORS.DateParseError);
@@ -14,7 +14,7 @@
public class ConvertDateFormatTest {
- private ConvertDateFormat convDateFormat = new ConvertDateFormat();
+ private ConvertDateFormat convDateFormat = new ConvertDateFormat("yyyyMMdd", "yyyy-MM-dd");
private TupleFactory tupleFactory = TupleFactory.getInstance();
@Test
@@ -39,13 +39,11 @@ public void testExec3() throws IOException {
SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMdd");
String inputDateStr = sdf.format(cal.getTime());
- input.append("yyyyMMdd");
- input.append("yyMMdd");
input.append(inputDateStr);
String outputDateStr = convDateFormat.exec(input);
assertNotNull(outputDateStr);
- assertEquals(outputDateStr, "110123");
+ assertEquals(outputDateStr, "2011-01-23");
}
}

0 comments on commit d23603d

Please sign in to comment.