Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

Sync'ing latest changes

  • Loading branch information...
commit cd1b50d2fb0d55b25171d43880b24a916718a84d 1 parent 5177124
@xstevens xstevens authored
View
5 README.md
@@ -11,6 +11,8 @@ This code is built with the following assumptions. You may get mixed results if
* [HBase](http://hbase.apache.org) 0.90+
* [Pig](http://pig.apache.org) 0.9+
* [Hive](https://github.com/xstevens/hive) 0.7 with [automatic promotion of certain types](https://github.com/xstevens/hive/commit/566ca633546e5231cf5ea20d554c1f61784f39e4)
+* [Jackson](https://github.com/FasterXML) 2.x (for all things JSON)
+ * We don't use anything fancy but fasterxml switch broke from 1.x packaging. You can probably fork and compile fairly easily if you want to use Jackson 1.x since that's what ships with Hadoop projects.
### Building ###
To make a jar you can do:
@@ -28,4 +30,5 @@ All aspects of this software written in Java are distributed under Apache Softwa
### Contributors ###
* Xavier Stevens ([@xstevens](http://twitter.com/xstevens))
-* Daniel Einspanjer ([@deinspanjer](http://twitter/deinspanjer))
+* Daniel Einspanjer ([@deinspanjer](http://twitter/deinspanjer))
+* Mark Reid ([@reid_write](http://twitter.com/reid_write))
View
126 src/main/java/com/mozilla/hadoop/fs/TextFileDirectoryReader.java
@@ -0,0 +1,126 @@
+/*
+ * Copyright 2012 Mozilla Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.mozilla.hadoop.fs;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.log4j.Logger;
+
+public class TextFileDirectoryReader {
+
+ private static final Logger LOG = Logger.getLogger(TextFileDirectoryReader.class);
+
+ private Configuration conf = new Configuration();
+ private FileSystem fs;
+ private List<Path> paths;
+ private Iterator<Path> pathIter;
+ private Path curPath;
+ private BufferedReader curReader;
+
+ public TextFileDirectoryReader(Path inputPath) throws IOException {
+ fs = FileSystem.get(inputPath.toUri(), conf);
+ paths = new ArrayList<Path>();
+ for(FileStatus status : fs.listStatus(inputPath)) {
+ Path p = status.getPath();
+ if (!status.isDir() && !p.getName().startsWith("_")) {
+ paths.add(p);
+ }
+ }
+
+ pathIter = paths.iterator();
+ }
+
+ private boolean nextReader() throws IOException {
+ if (curReader != null) {
+ curReader.close();
+ }
+
+ if (!pathIter.hasNext()) {
+ return false;
+ }
+
+ curPath = pathIter.next();
+ curReader = new BufferedReader(new InputStreamReader(fs.open(curPath)));
+
+ return true;
+ }
+
+ public void close() {
+ if (curReader != null) {
+ try {
+ curReader.close();
+ } catch (IOException e) {
+ LOG.error("Error closing reader", e);
+ }
+ }
+ if (fs != null) {
+ try {
+ fs.close();
+ } catch (IOException e) {
+ LOG.error("Error closing filesystem", e);
+ }
+ }
+ }
+
+ public String next() throws IOException {
+ if (curReader == null) {
+ if (!nextReader()) {
+ return null;
+ }
+ }
+
+ String line = curReader.readLine();
+ if (line == null) {
+ boolean success = nextReader();
+ if (success) {
+ line = curReader.readLine();
+ }
+ }
+
+ return line;
+ }
+
+ public static void main(String[] args) {
+ String inputPath = args[0];
+ TextFileDirectoryReader tfdr = null;
+ try {
+ tfdr = new TextFileDirectoryReader(new Path(inputPath));
+ String line = null;
+ while ((line = tfdr.next()) != null) {
+ System.out.println(line);
+ }
+ } catch (IOException e) {
+ LOG.error("IO error while reading directory", e);
+ } finally {
+ if (tfdr != null) {
+ tfdr.close();
+ }
+ }
+ }
+}
View
63 src/main/java/com/mozilla/pig/eval/date/DaysAgo.java
@@ -1,63 +0,0 @@
-/*
- * Copyright 2012 Mozilla Foundation
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.mozilla.pig.eval.date;
-
-import static java.util.Calendar.DATE;
-
-import java.io.IOException;
-import java.text.ParseException;
-import java.text.SimpleDateFormat;
-import java.util.Date;
-
-import org.apache.pig.EvalFunc;
-import org.apache.pig.data.Tuple;
-
-import com.mozilla.util.DateUtil;
-
-public class DaysAgo extends EvalFunc<Integer> {
-
- public static enum ERRORS { DateParseError };
-
- private SimpleDateFormat sdf;
- private long currentDay;
-
- public DaysAgo(String dateFormat) {
- sdf = new SimpleDateFormat(dateFormat);
- currentDay = DateUtil.getTimeAtResolution(System.currentTimeMillis(), DATE);
- }
-
- @Override
- public Integer exec(Tuple input) throws IOException {
- if (input == null || input.size() == 0) {
- return null;
- }
-
- Integer daysAgo = null;
- try {
- Date d = sdf.parse((String)input.get(0));
- daysAgo = (int)DateUtil.getTimeDelta(d.getTime(), currentDay, DATE);
- } catch (ParseException e) {
- pigLogger.warn(this, "Date parsing error", ERRORS.DateParseError);
- }
-
- return daysAgo;
- }
-
-}
View
2  src/main/java/com/mozilla/pig/eval/date/FormatDate.java
@@ -38,7 +38,7 @@ public FormatDate(String format) {
@Override
public String exec(Tuple input) throws IOException {
- if (input == null || input.size() == 0) {
+ if (input == null || input.size() == 0 || input.get(0) == null) {
return null;
}
View
3  src/main/java/com/mozilla/pig/eval/date/TimeDelta.java
@@ -58,7 +58,8 @@ public TimeDelta(String deltaUnitStr, String dateFormat) throws ParseException {
@Override
public Long exec(Tuple input) throws IOException {
- if (input == null || input.size() == 0) {
+ if (input == null || input.size() < 2 ||
+ input.get(0) == null || input.get(1) == null) {
return null;
}
View
2  src/main/java/com/mozilla/pig/eval/geoip/GeoIpLookup.java
@@ -51,7 +51,7 @@
*
* This will expect a file in hdfs in /user/you/GeoIPCity.dat
*
- * Using the getCacheFiles approach,y ou no longer need to specify the
+ * Using the getCacheFiles approach, you no longer need to specify the
* -Dmapred.cache.archives
* -Dmapred.create.symlink
* options to pig.
View
13 src/test/java/com/mozilla/pig/eval/ExampleTest.java
@@ -19,24 +19,15 @@
*/
package com.mozilla.pig.eval;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
import java.io.IOException;
-import org.apache.pig.data.Tuple;
-import org.apache.pig.data.TupleFactory;
-import org.apache.pig.data.DataBag;
-import org.apache.pig.data.BagFactory;
import org.junit.Test;
public class ExampleTest {
-
- private TupleFactory tupleFactory = TupleFactory.getInstance();
- private BagFactory bagFactory = BagFactory.getInstance();
@Test
- public void testInstantiate() throws IOException {
+ public void testConstructor() throws IOException {
Example e = new Example();
}
+
}
View
25 ...t/java/com/mozilla/pig/eval/date/DaysAgoTest.java → ...java/com/mozilla/pig/eval/date/TimeDeltaTest.java
@@ -1,4 +1,4 @@
-/*
+/**
* Copyright 2012 Mozilla Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one
@@ -23,6 +23,7 @@
import static org.junit.Assert.assertNull;
import java.io.IOException;
+import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Calendar;
@@ -30,36 +31,38 @@
import org.apache.pig.data.TupleFactory;
import org.junit.Test;
-public class DaysAgoTest {
+public class TimeDeltaTest {
private static final String TIME_FORMAT = "yyyyMMdd";
- private DaysAgo daysAgo = new DaysAgo(TIME_FORMAT);
private TupleFactory tupleFactory = TupleFactory.getInstance();
@Test
- public void testExec1() throws IOException {
- Integer deltaDays = daysAgo.exec(null);
+ public void testExec1() throws IOException, ParseException {
+ TimeDelta daysAgo = new TimeDelta("5", TIME_FORMAT);
+ Long deltaDays = daysAgo.exec(null);
assertNull(deltaDays);
}
@Test
- public void testExec2() throws IOException {
+ public void testExec2() throws IOException, ParseException {
SimpleDateFormat sdf = new SimpleDateFormat(TIME_FORMAT);
Calendar cal = Calendar.getInstance();
cal.add(Calendar.DATE, -1);
- Tuple input = tupleFactory.newTuple(1);
-
+ Tuple input = tupleFactory.newTuple(2);
input.set(0, sdf.format(cal.getTime()));
- Integer deltaDays = daysAgo.exec(input);
- assertEquals(1, (int)deltaDays);
+ input.set(1, sdf.format(Calendar.getInstance().getTime()));
+
+ TimeDelta daysAgo = new TimeDelta("5", TIME_FORMAT);
+ Long deltaDays = daysAgo.exec(input);
+ assertEquals(1, (long)deltaDays);
cal = Calendar.getInstance();
cal.add(Calendar.DATE, -30);
input.set(0, sdf.format(cal.getTime()));
deltaDays = daysAgo.exec(input);
- assertEquals(30, (int)deltaDays);
+ assertEquals(30, (long)deltaDays);
}
}
Please sign in to comment.
Something went wrong with that request. Please try again.