Join GitHub today
GitHub is home to over 28 million developers working together to host and review code, manage projects, and build software together.Sign up
Welcome to PyCascading! PyCascading is a light wrapper aroung Cascading that lets you write Python code suitable for MapReduce-like execution. Data may reside on any backend that Cascading supports, most importantly Hadoop and the local filesystem. The workflow is defined in Python, and functions that operate on the data are also defined in Python. PyCascading is open source, and I'd like to thank Twitter for its support to me in this.
The equivalent of the "Hello, world!" program in MapReduce is counting the words in a text file, and this is how it looks like in PyCascading:
from pycascading.helpers import * def main(): flow = Flow() input = flow.source(Hfs(TextLine(), 'input_file.txt')) output = flow.sink(Hfs(TextDelimited(), 'output_folder')) @udf def split_words(tuple): for word in tuple.get(1).split(): yield [word] input | map_replace(split_words, 'word') | group_by('word', native.count()) | output flow.run(num_reducers=10)
PyCascading works with data flows by applying various operations on the flow. The operations can broadly be characterized into three classes. Map-like operations apply a Python function on each record of data stream, join-like operations perform a joining of two or more streams a'la SQL JOIN, and group-like operations apply aggregations on groups of data having the same value in a given field (in SQL, GROUP BY).
Since PyCascading is built on Cascading, it is very well worth knowing how Cascading works. The user guide covers everything. You can get started without reading the full Cascading guide, but if you are new to Cascading, at the very least read the chapter on (data processing)[http://www.cascading.org/1.2/userguide/htmlsingle/#N20106], with particular attention to pipes, field algebra, and sources and sinks. It's interesting!
PyCascading needs to be built before it can be used (it's not too hard, though). The requirements for building the 'master' branch are:
- Cascading 2.0 or Cascading 1.2.*
- Jython 2.5.2+
- Hadoop 0.20.2+, the version you build with preferably matching the Hadoop runtime
- A Java JDK (PyCascading is developed with Sun's version)
After cloning the repo, edit the
to point the variables where the Cascading, Jython, and Hadoop
frameworks were downloaded to. Note that only Jython needs to be
"installed", the Cascading and Hadoop archives can simply be extracted
After this, still in the
java folder, invoke
ant. If the Java JDK
is all right this should result in a successful build within a few
seconds. The result is that two archive files created in the
directory. One of them is a Java jar (
pycascading.jar) with the
PyCascading, Cascading, and Jython classes and jars needed for running
a script. The other is a tarball (
build/pycascading.tgz) with the
PyCascading sources and other Python system libraries.
Running PyCascading jobs
PyCascading scripts can be run in remote or local mode. The two modes of execution are described below.
In remote mode the PyCascading scripts are run on a Hadoop cluster that the user has SSH access to. In general, a Hadoop job is a jar file that needs to be submitted to Hadoop for execution. In the case of PyCascading, this jar contains the PyCascading and Cascading classes, and even if we make changes to the Python scripts, this jar file does not change. So we can create the jar once, copy it to the server, and just use it locally on the server whenever we submit a new job.
remote_deploy.sh script, included with PyCascading, can copy
this master jar to the user's home directory on the Hadoop
server. It is also used to copy the PyCascading scripts to the server
to be run, after you edited them on your computer.
After building PyCascading, deploy the master jar to the Hadoop server like this:
remote_deploy.sh -m -s <user@hadoop_server>
user@hadoop_server is the SSH account for the Hadoop server. The
-m option specifies that the master archives should be copied to the
server in this case. They are approximately 12MB, so it shouldn't take
much time for this to complete. The master archives will be
~/pycascading/master by default, but this--and other
default options (Hadoop server, SSH keys, etc.)--are defined in the
beginning of the
remote_deploy.sh script, and may be changed there.
Once the master archives are on the Hadoop server, the deployment of PyCascading scripts will be very fast and use very little bandwith.
To run a PyCascading script, we first invoke
remote_deploy.sh -s <user@hadoop_server> <script.py>
to copy the sources to the server, and to set up a few helper
scripts. If all went well, there will be a temporary folder created
for the script in
~/pycascading/deploys by default (this location
can be changed in
remote_deploy.sh), and you will also see the path
run.sh shell script that was generated on the server. This
shell script can be run to submit the PyCascading job to Hadoop.
Note that you may pass in command line parameters for the script, and
they can be read in the usual way from
After this you can SSH into the server, and start the
to invoke Hadoop with the PyCascading framework. I use
screen to run
the jobs, so even if the network connection goes down, the job won't
run.sh also assumes that the
hadoop command is on the
search path, so if it isn't, it is a good practice to set
hadoop will be found.
For testing, an alternative to using a full-blown Hadoop cluster is to run Hadoop in pseudo-distributed mode on your laptop/workstation. Performance is not great, as jobs won't benefit from parallel processing, but for quick tests and smallish files it's OK. For more information on setting up Hadoop in pseudo-distributed mode, see this.
Local mode means that we are not using HDFS at all, and all source and
sink taps are bound to local files. By default, if you do not specify
hdfs:// prefixes for your taps, the files/folders will
be searched on the default file system, which in the case of local
execution is the local file system. I tend not to use any prefixes
when specifying input/output paths, so the PyCascading script can be
run without changes either in local or remote mode.
Local mode is great for experimentation and testing scripts quickly,
as execution is very fast compared to the time it takes to set up
proper MR jobs on a Hadoop cluster. In particular, if you are trying
out the PyCascaading examples, local mode is the best way to see
results in the shortest time. Scripts can be run in local mode
local_run.sh, like this:
How to write PyCascading scripts
PyCascading scripts are simple Python scripts that use the PyCascading libraries, and are executed with Hadoop. The script's purpose is two-fold: 1) it creates the workflow for the Cascading job; 2) it defines all the Python functions that will be executed by the mappers and reducers as filters, aggregators, etc.
They won't run directly with
python, as you would expect from normal
Python scripts, but are instead executed through
local_run.sh. One reason for this is that we need to do some setup
and bookkeeping in the beginning before the script can start. The
other reason is that PyCascading scripts contain the definitions of
functions applied to the data, so the source is imported by the
map/reduce processes in the Hadoop cluster. Thus they cannot contain
anything substantial in their main body (outside of function and class
definitions), otherwise execution would start there every time.
In particular, the Cascading flow is created and executed in the
main() function of the script. This way when the mappers/reducers
import the script source, they won't accidentally try to build and run
the flow again. But these are the technical details, it suffices if
you know that building the flow should happen in
function. When we finished building the flow, we submit it to the
Cascading executor with
Flows and operations
The way Cascading works is with flows, where we choose some source files as input, apply a series of atomic operations operations on them in sequence, and write the resulting output in some other HDFS files. This process can be best described as building data processing pipelines, as data is flowing through these pipes from sources to sinks, and along the way it's modified, and pipes are split and merged based on some rules.
We will use the word count example above to see how we construct pipelines from basic operations, and then take a look at what kinds of operations we have available in detail a bit later.
First, we need to import everything from the
module. This is the easiest way to have everything available for the
script without having to use module names.
imports frequently used Cascading classes such as
TupleEntry, as well as the primitive types from
Integer etc.). These are useful to have handy when reading/writing
As mentioned, PyCascading executes the
main() function of your
script, and thus it's there where we define and run the Cascading
flow. The first statement of
main() creates a
Flow object, which
corresponds to a Cascading flow. In a flow data comes from sources,
goes through various pipe operations, and the results are finally
written into one or more sinks. Sources and sinks are bound to flows,
and they are created next with
input = flow.source(Hfs(TextLine(), 'input_file.txt')) output = flow.sink(Hfs(TextDelimited(), 'output_folder'))
Flow.sink both take Cascading taps as arguments,
which in this case we specify using
is a standard Cascading Java class, and we pass in the two required
parameters to it, a
Scheme describing the formatting of the data,
and the location of the files (
input_file.txt is a normal file, and
output_folder is the name where the
part-* files will be generated
by the reducers). Note that we didn't specify resource locators in the
file names: if we run the script with
remote_deploy.sh, they default
to HDFS, and if we run it with
local_run.sh, they will be local.
output are the head and tail of the data flow pipeline,
input will be used as a source from where data comes
from, and the results will be written into
output. The operations
are chained together with the pipe operator
input | map_replace(split_words, 'word') | group_by('word', native.count()) | output
This means that records from
input are mapped to new records using
the Python function
split_words splits up the
incoming lines on whitespaces using the
split() Python function, and
it produces a single column, which we will call
takes some fields from the input tuples (in the example it takes all
the fields), and replaces these fields with the fields in the
output. In the example, the incoming fields are
and these are going to be mapped to the output of
which we call
We then group the words (that's what
group_by does on the words
field), and count how many times each word occurs. The second
group_by is the aggregator we are applying to each
grouping, in this case the built-in Cascading
Count aggregator. The
aggregator's output field will be appended to the grouping field, and
this is what we are piping into
output: the words and the times they
appeared in the text.