Permalink
Browse files

Changed dependencies.properties to more recent versions of the libraries

  • Loading branch information...
1 parent 9f43528 commit 56b26dcc7a923dfd395c73fffb5375d9d660b2a9 Gabor Szabo committed Jan 12, 2012
Showing with 31 additions and 30 deletions.
  1. +2 −2 java/dependencies.properties
  2. +2 −1 java/src/com/twitter/pycascading/Util.java
  3. +27 −27 python/pycascading/tap.py
@@ -1,11 +1,11 @@
# The folder where Cascading was downloaded to
# http://www.concurrentinc.com/downloads/
-cascading=/opt/cascading-1.2.4-hadoop-0.19.2+
+cascading=/opt/cascading-1.2.5-hadoop-0.19.2+
# At least Jython version 2.5.2 required
# Download from http://www.jython.org/downloads.html
jython=/opt/jython
# Hadoop's folder
# Download from http://www.apache.org/dyn/closer.cgi/hadoop/common/
-hadoop=/opt/hadoop-0.20.2
+hadoop=/opt/hadoop-0.20.203.0
@@ -81,7 +81,8 @@ public static void run(int numReducers, Map<String, Object> config, Map<String,
// See https://github.com/twitter/pycascading/issues/2
// TODO: find the reason for this
properties.put("io.serializations",
- "com.twitter.pycascading.bigintegerserialization.BigIntegerSerialization,org.apache.hadoop.io.serializer.WritableSerialization");
+ "com.twitter.pycascading.bigintegerserialization.BigIntegerSerialization,"
+ + "org.apache.hadoop.io.serializer.WritableSerialization");
properties.put("mapred.jobtracker.completeuserjobs.maximum", 50000);
properties.put("mapred.input.dir.recursive", "true");
FlowConnector.setApplicationJarClass(properties, Util.class);
View
@@ -41,10 +41,10 @@
def expand_path_with_home(output_folder):
"""Prepend the home folder to a relative location on HDFS if necessary.
-
+
If we specified a relative path, prepend it with the home folder
of the user on HDFS. If we are running in local mode, don't do anything.
-
+
Arguments:
output_folder -- the absolute or relative path of the output HDFS folder
"""
@@ -84,10 +84,10 @@ def _connect_source(self, pipe_name, cascading_tap):
taps/pipelines being connected up to the flow.
"""
self.source_map[pipe_name] = cascading_tap
-
+
def source(self, cascading_tap):
"""A generic source using Cascading taps.
-
+
Arguments:
cascading_tap -- the Cascading Scheme object to store data into
"""
@@ -101,26 +101,26 @@ def source(self, cascading_tap):
def meta_source(self, input_path):
"""Use data files in a folder and read the scheme from the meta file.
-
+
Defines a source tap using files in input_path, which should be a
(HDFS) folder. Takes care of using the appropriate scheme that was
used to store the data, using meta data in the data folder.
-
+
Arguments:
input_path -- the HDFS folder to store data into
"""
input_path = expand_path_with_home(input_path)
source_scheme = MetaScheme.getSourceScheme(input_path)
return self.source(cascading.tap.Hfs(source_scheme, input_path))
-
+
def sink(self, cascading_scheme):
"""A Cascading sink using a Cascading Scheme.
Arguments:
cascading_scheme -- the Cascading Scheme used to store the data
"""
return _Sink(self, cascading_scheme)
-
+
def meta_sink(self, cascading_scheme, output_path):
"""Store data together with meta information about the scheme used.
@@ -138,27 +138,27 @@ def meta_sink(self, cascading_scheme, output_path):
sink_scheme = MetaScheme.getSinkScheme(cascading_scheme, output_path)
return self.sink(cascading.tap.Hfs(sink_scheme, output_path,
cascading.tap.SinkMode.REPLACE))
-
+
def tsv_sink(self, output_path, fields=Fields.ALL):
# TODO: in local mode, do not prepend the home folder to the path
"""A sink to store the tuples as tab-separated values in text files.
-
+
Arguments:
output_path -- the folder for the output
fields -- the fields to store. Defaults to all fields.
"""
output_path = expand_path_with_home(output_path)
return self.meta_sink(cascading.scheme.TextDelimited(fields, '\t'),
output_path)
-
+
def binary_sink(self, output_path, fields=Fields.ALL):
"""A sink to store binary sequence files to store the output.
-
+
This is a sink that uses the efficient Cascading SequenceFile scheme to
store data. This is a serialized version of all tuples and is
recommended when we want to store intermediate results for fast access
later.
-
+
Arguments:
output_path -- the (HDFS) folder to store data into
fields -- the Cascading Fields field selector of which tuple fields to
@@ -170,24 +170,24 @@ def binary_sink(self, output_path, fields=Fields.ALL):
def cache(self, identifier, refresh=False):
"""A sink for temporary results.
-
+
This caches results into a temporary folder if the folder does not
exist yet. If we need to run slightly modified versions of the
PyCascading script several times during testing for instance, this is
very useful to store some results that can be reused without having to
go through the part of the flow that generated them again.
-
+
Arguments:
identifier -- the unique identifier for this cache. This is used as
part of the path where the temporary files are stored.
refresh -- if True, we will regenerate the cache data as if it was
the first time creating it
"""
return _Cache(self, identifier, refresh)
-
+
def run(self, num_reducers=50, config=None):
"""Start the Cascading job.
-
+
We call this when we are done building the pipeline and explicitly want
to start the flow process.
"""
@@ -206,31 +206,31 @@ def run(self, num_reducers=50, config=None):
class Source(Chainable):
"""A PyCascading source that can be used as the head of a pipeline.
-
+
Used internally.
"""
def __init__(self, taps, cascading_tap):
Chainable.__init__(self)
self.__cascading_tap = cascading_tap
self.__taps = taps
-
+
def _apply(self, parent):
raise Exception('A source cannot be chained into')
-
-
+
+
class _Sink(Chainable):
"""A PyCascading sink that can be used as the tail in a pipeline.
-
+
Used internally.
"""
def __init__(self, taps, cascading_tap):
Chainable.__init__(self)
self.__cascading_tap = cascading_tap
self.__taps = taps
-
+
def _create_with_parent(self, parent):
# We need to name every tail differently so that Cascading can assign
# a tail map to all sinks.
@@ -243,17 +243,17 @@ def _create_with_parent(self, parent):
class _Cache:
-
+
"""Act as a source or sink to store and retrieve temporary data."""
-
+
def __init__(self, taps, hdfs_folder, refresh=False):
tmp_folder = 'pycascading.cache/' + hdfs_folder
self.__cache_folder = expand_path_with_home(tmp_folder)
self.__hdfs_folder_exists = \
self.hdfs_folder_exists(self.__cache_folder)
self.__taps = taps
self.__refresh = refresh
-
+
def hdfs_folder_exists(self, folder):
path = Path(folder)
fs = path.getFileSystem(Configuration())
@@ -263,7 +263,7 @@ def hdfs_folder_exists(self, folder):
return status.isDir()
except:
return False
-
+
def __or__(self, pipe):
if not self.__refresh and self.__hdfs_folder_exists:
# We remove all sources that are replaced by this cache, otherwise

0 comments on commit 56b26dc

Please sign in to comment.