Permalink
Browse files

Adding a generic regex Find UDF

  • Loading branch information...
1 parent d23603d commit b2c5ff4798cbdd7cb7cd27cebeda00a378489867 @xstevens xstevens committed Mar 28, 2012
@@ -1,159 +0,0 @@
-/*
- * Copyright 2011 Mozilla Foundation
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.mozilla.hadoop.hbase;
-
-import java.io.IOException;
-import java.text.ParseException;
-import java.util.regex.Pattern;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.client.Put;
-import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-import org.apache.hadoop.hbase.mapreduce.TableOutputFormat;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.Mapper;
-import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
-import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
-import org.apache.hadoop.util.GenericOptionsParser;
-import org.apache.hadoop.util.Tool;
-import org.apache.hadoop.util.ToolRunner;
-import org.apache.log4j.Logger;
-
-public class CSVImport implements Tool {
-
- private static final Logger LOG = Logger.getLogger(CSVImport.class);
-
- private static final String NAME = "CSVImport";
-
- private Configuration conf;
-
- public static class CSVImportMapper extends Mapper<LongWritable, Text, ImmutableBytesWritable, Writable> {
-
- public enum ReportStats { UNEXPECTED_LENGTH };
-
- private Pattern commaPattern;
-
- /* (non-Javadoc)
- * @see org.apache.hadoop.mapreduce.Mapper#setup(org.apache.hadoop.mapreduce.Mapper.Context)
- */
- @Override
- public void setup(Context context) throws IOException, InterruptedException {
- super.setup(context);
- commaPattern = Pattern.compile(",");
- }
-
- /* (non-Javadoc)
- * @see org.apache.hadoop.mapreduce.Mapper#map(KEYIN, VALUEIN, org.apache.hadoop.mapreduce.Mapper.Context)
- */
- @Override
- public void map(LongWritable key, Text value, Context context) throws InterruptedException, IOException {
- String[] splits = commaPattern.split(value.toString());
- if (splits.length != 4) {
- context.getCounter(ReportStats.UNEXPECTED_LENGTH);
- return;
- }
-
- byte[] rowkey = Bytes.toBytes(splits[0]);
- Put put = new Put(rowkey);
- put.add(Bytes.toBytes(splits[1]), Bytes.toBytes(splits[2]), Bytes.toBytes(splits[3]));
- context.write(new ImmutableBytesWritable(rowkey), put);
- }
-
- }
- /**
- * @param args
- * @return
- * @throws IOException
- * @throws ParseException
- */
- public Job initJob(String[] args) throws IOException, ParseException {
-
- Path inputPath = new Path(args[0]);
- String table = args[1];
-
- Job job = new Job(getConf());
- job.setJobName(NAME);
- job.setJarByClass(CSVImport.class);
- job.setMapperClass(CSVImportMapper.class);
- job.setInputFormatClass(TextInputFormat.class);
- job.setOutputFormatClass(TableOutputFormat.class);
- job.getConfiguration().set(TableOutputFormat.OUTPUT_TABLE, table);
- job.setOutputKeyClass(ImmutableBytesWritable.class);
- job.setOutputValueClass(Writable.class);
- job.setNumReduceTasks(0);
-
- FileInputFormat.addInputPath(job, inputPath);
-
- return job;
- }
-
- /**
- * @return
- */
- private static int printUsage() {
- System.out.println("Usage: " + NAME + " [generic-options] <input-path> <output-table>");
- System.out.println();
- GenericOptionsParser.printGenericCommandUsage(System.out);
-
- return -1;
- }
-
- /* (non-Javadoc)
- * @see org.apache.hadoop.util.Tool#run(java.lang.String[])
- */
- public int run(String[] args) throws Exception {
- if (args.length < 2) {
- return printUsage();
- }
-
- Job job = initJob(args);
-
- return (job.waitForCompletion(true) ? 0 : 1);
- }
-
- /* (non-Javadoc)
- * @see org.apache.hadoop.conf.Configurable#getConf()
- */
- public Configuration getConf() {
- return this.conf;
- }
-
- /* (non-Javadoc)
- * @see org.apache.hadoop.conf.Configurable#setConf(org.apache.hadoop.conf.Configuration)
- */
- public void setConf(Configuration conf) {
- this.conf = conf;
- }
-
- /**
- * @param args
- * @throws Exception
- */
- public static void main(String[] args) throws Exception {
- int res = ToolRunner.run(new Configuration(), new CSVImport(), args);
- System.exit(res);
- }
-
-}
@@ -11,19 +11,33 @@
import com.google.common.hash.BloomFilter;
import com.google.common.hash.Funnels;
+/**
+ * BloomFilterDistinctCount is designed to be a way to get a count of distinct items
+ * as they pass through while using a minimal amount of memory. This should only be used if
+ * you have no other choice and if you are have out of memory issues during bag spills.
+ *
+ * You can compensate somewhat, for the probabilistic nature of bloom filters by adjusting your
+ * counts after the fact based on your n and p values. You should also keep in mind that the lesser
+ * the p value; you will gain more accuracy, but at a considerable CPU cost if you have more than a
+ * few hashes to perform.
+ *
+ * Refer to {@link http://hur.st/bloomfilter} calculator as a way to see the effects of n and p values.
+ * Take particular note of what the resulting k value is. If its greater than 10 you are likely entering
+ * a world of pain.
+ *
+ * Example for expecting 1 million items at a 0.001 probablity of a false positive:
+ *
+ * define DistinctCount com.mozilla.pig.eval.BloomFilterDistinctCount(1000000, 0.001);
+ *
+ */
public class BloomFilterDistinctCount extends EvalFunc<Integer> {
private int n;
-// private int k;
-// private int m;
private double p;
public BloomFilterDistinctCount(String n, String p) {
this.n = Integer.parseInt(n);
this.p = Double.parseDouble(p);
-// int m = (int)Math.ceil((n * Math.log(p)) / Math.log(1.0 / (Math.pow(2.0, Math.log(2.0)))));
-// k = (int)Math.round(Math.log(2.0) * m / n);
-
}
@Override
@@ -53,39 +67,5 @@ public Integer exec(Tuple input) throws IOException {
return uniq;
}
-
-// public static void main(String[] args) {
-// BloomFilter<CharSequence> filter = BloomFilter.create(Funnels.stringFunnel(), 10000, 0.000001d);
-// Set<String> added = new HashSet<String>();
-// Set<String> notAdded = new HashSet<String>();
-// int uniq = 0;
-// int n = 20000;
-// for (int i=0; i < n; i++) {
-// String id = UUID.randomUUID().toString();
-// if (!filter.mightContain(id)) {
-// filter.put(id.toString());
-// uniq++;
-// added.add(id);
-// } else {
-// notAdded.add(id);
-// }
-// }
-//
-// for (int i=0; i < n; i++) {
-// notAdded.add(UUID.randomUUID().toString());
-// }
-//
-// System.out.println(String.format("uniq[%d] added.size[%d] notAdded.size[%d]", uniq, added.size(), notAdded.size()));
-//
-// for (String a : added) {
-// if (!filter.mightContain(a)) {
-// System.out.println("filter thinks it does not contain: " + a);
-// }
-// }
-// for (String na : notAdded) {
-// if (filter.mightContain(na)) {
-// System.out.println("filter thinks it contains: " + na);
-// }
-// }
-// }
+
}
@@ -0,0 +1,45 @@
+/**
+ * Copyright 2012 Mozilla Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.mozilla.pig.eval.regex;
+
+import java.io.IOException;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import org.apache.pig.EvalFunc;
+import org.apache.pig.data.Tuple;
+
+public class Find extends EvalFunc<String> {
+
+ private Pattern p;
+
+ public Find(String pattern) {
+ p = Pattern.compile(pattern);
+ }
+
+ public String exec(Tuple input) throws IOException {
+ if (input == null || input.size() == 0) {
+ return null;
+ }
+
+ Matcher m = p.matcher((String)input.get(0));
+ return m.find() ? m.group() : null;
+ }
+}
@@ -17,7 +17,6 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
package com.mozilla.pig.eval.regex;
import java.io.IOException;
@@ -32,16 +31,17 @@
private static TupleFactory tupleFactory = TupleFactory.getInstance();
+ private Pattern p;
+
+ public FindAll(String pattern) {
+ p = Pattern.compile(pattern);
+ }
+
public Tuple exec(Tuple input) throws IOException {
- if (input == null) {
+ if (input == null || input.size() == 0) {
return null;
}
- if (input.size() != 2) {
- throw new IOException("FindAll requires exactly 2 parameters");
- }
-
- Pattern p = Pattern.compile((String)input.get(1));
Matcher m = p.matcher((String)input.get(0));
if (!m.find()) {
return null;

0 comments on commit b2c5ff4

Please sign in to comment.