Skip to content
This repository has been archived by the owner on Dec 30, 2019. It is now read-only.
mattbaggott edited this page Mar 19, 2013 · 7 revisions

PyCascading

Welcome to PyCascading! PyCascading is a light wrapper aroung Cascading that lets you write Python code suitable for MapReduce-like execution. Data may reside on any backend that Cascading supports, most importantly Hadoop and the local filesystem. The workflow is defined in Python, and functions that operate on the data are also defined in Python. PyCascading is open source, and I'd like to thank Twitter for its support to me in this.

The equivalent of the "Hello, world!" program in MapReduce is counting the words in a text file, and this is how it looks like in PyCascading:

from pycascading.helpers import *

def main():
    flow = Flow()
    input = flow.source(Hfs(TextLine(), 'input_file.txt'))
    output = flow.sink(Hfs(TextDelimited(), 'output_folder'))

    @udf
    def split_words(tuple):
        for word in tuple.get(1).split():
            yield [word]

    input | map_replace(split_words, 'word') | group_by('word', native.count()) | output

    flow.run(num_reducers=10)

Introduction

PyCascading works with data flows by applying various operations on the flow. The operations can broadly be characterized into three classes. Map-like operations apply a Python function on each record of data stream, join-like operations perform a joining of two or more streams a'la SQL JOIN, and group-like operations apply aggregations on groups of data having the same value in a given field (in SQL, GROUP BY).

Since PyCascading is built on Cascading, it is very well worth knowing how Cascading works. The user guide covers everything. You can get started without reading the full Cascading guide, but if you are new to Cascading, at the very least read the chapter on (data processing)[http://www.cascading.org/1.2/userguide/htmlsingle/#N20106], with particular attention to pipes, field algebra, and sources and sinks. It's interesting!

Building PyCascading

PyCascading needs to be built before it can be used (it's not too hard, though). The requirements for building the 'master' branch are:

After cloning the repo, edit the java/dependencies.properties file to point the variables where the Cascading, Jython, and Hadoop frameworks were downloaded to. Note that only Jython needs to be "installed", the Cascading and Hadoop archives can simply be extracted somewhere.

After this, still in the java folder, invoke ant. If the Java JDK is all right this should result in a successful build within a few seconds. The result is that two archive files created in the build directory. One of them is a Java jar (pycascading.jar) with the PyCascading, Cascading, and Jython classes and jars needed for running a script. The other is a tarball (build/pycascading.tgz) with the PyCascading sources and other Python system libraries.

Running PyCascading jobs

PyCascading scripts can be run in remote or local mode. The two modes of execution are described below.

Remote mode

In remote mode the PyCascading scripts are run on a Hadoop cluster that the user has SSH access to. In general, a Hadoop job is a jar file that needs to be submitted to Hadoop for execution. In the case of PyCascading, this jar contains the PyCascading and Cascading classes, and even if we make changes to the Python scripts, this jar file does not change. So we can create the jar once, copy it to the server, and just use it locally on the server whenever we submit a new job.

The remote_deploy.sh script, included with PyCascading, can copy this master jar to the user's home directory on the Hadoop server. It is also used to copy the PyCascading scripts to the server to be run, after you edited them on your computer.

After building PyCascading, deploy the master jar to the Hadoop server like this:

remote_deploy.sh -m -s <user@hadoop_server>

user@hadoop_server is the SSH account for the Hadoop server. The -m option specifies that the master archives should be copied to the server in this case. They are approximately 12MB, so it shouldn't take much time for this to complete. The master archives will be stored in ~/pycascading/master by default, but this--and other default options (Hadoop server, SSH keys, etc.)--are defined in the beginning of the remote_deploy.sh script, and may be changed there.

Once the master archives are on the Hadoop server, the deployment of PyCascading scripts will be very fast and use very little bandwith.

To run a PyCascading script, we first invoke

remote_deploy.sh -s <user@hadoop_server> <script.py>

to copy the sources to the server, and to set up a few helper scripts. If all went well, there will be a temporary folder created for the script in ~/pycascading/deploys by default (this location can be changed in remote_deploy.sh), and you will also see the path to the run.sh shell script that was generated on the server. This shell script can be run to submit the PyCascading job to Hadoop.

Note that you may pass in command line parameters for the script, and they can be read in the usual way from sys.argv.

After this you can SSH into the server, and start the run.sh script to invoke Hadoop with the PyCascading framework. I use screen to run the jobs, so even if the network connection goes down, the job won't get killed. run.sh also assumes that the hadoop command is on the search path, so if it isn't, it is a good practice to set PATH where hadoop will be found.

For testing, an alternative to using a full-blown Hadoop cluster is to run Hadoop in pseudo-distributed mode on your laptop/workstation. Performance is not great, as jobs won't benefit from parallel processing, but for quick tests and smallish files it's OK. For more information on setting up Hadoop in pseudo-distributed mode, see this.

Local mode

Local mode means that we are not using HDFS at all, and all source and sink taps are bound to local files. By default, if you do not specify file:// or hdfs:// prefixes for your taps, the files/folders will be searched on the default file system, which in the case of local execution is the local file system. I tend not to use any prefixes when specifying input/output paths, so the PyCascading script can be run without changes either in local or remote mode.

Local mode is great for experimentation and testing scripts quickly, as execution is very fast compared to the time it takes to set up proper MR jobs on a Hadoop cluster. In particular, if you are trying out the PyCascaading examples, local mode is the best way to see results in the shortest time. Scripts can be run in local mode local_run.sh, like this:

local_run.sh examples/word_count.py

How to write PyCascading scripts

PyCascading scripts are simple Python scripts that use the PyCascading libraries, and are executed with Hadoop. The script's purpose is two-fold: 1) it creates the workflow for the Cascading job; 2) it defines all the Python functions that will be executed by the mappers and reducers as filters, aggregators, etc.

They won't run directly with python, as you would expect from normal Python scripts, but are instead executed through remote_deploy.sh or local_run.sh. One reason for this is that we need to do some setup and bookkeeping in the beginning before the script can start. The other reason is that PyCascading scripts contain the definitions of functions applied to the data, so the source is imported by the map/reduce processes in the Hadoop cluster. Thus they cannot contain anything substantial in their main body (outside of function and class definitions), otherwise execution would start there every time.

In particular, the Cascading flow is created and executed in the main() function of the script. This way when the mappers/reducers import the script source, they won't accidentally try to build and run the flow again. But these are the technical details, it suffices if you know that building the flow should happen in main() function. When we finished building the flow, we submit it to the Cascading executor with flow.run().

Flows and operations

The way Cascading works is with flows, where we choose some source files as input, apply a series of atomic operations operations on them in sequence, and write the resulting output in some other HDFS files. This process can be best described as building data processing pipelines, as data is flowing through these pipes from sources to sinks, and along the way it's modified, and pipes are split and merged based on some rules.

We will use the word count example above to see how we construct pipelines from basic operations, and then take a look at what kinds of operations we have available in detail a bit later.

First, we need to import everything from the pycascading.helpers module. This is the easiest way to have everything available for the script without having to use module names. pycascading.helpers also imports frequently used Cascading classes such as Fields, Tuple, and TupleEntry, as well as the primitive types from java.lang (Integer etc.). These are useful to have handy when reading/writing HDFS files.

As mentioned, PyCascading executes the main() function of your script, and thus it's there where we define and run the Cascading flow. The first statement of main() creates a Flow object, which corresponds to a Cascading flow. In a flow data comes from sources, goes through various pipe operations, and the results are finally written into one or more sinks. Sources and sinks are bound to flows, and they are created next with

input = flow.source(Hfs(TextLine(), 'input_file.txt'))
output = flow.sink(Hfs(TextDelimited(), 'output_folder'))

Flow.source and Flow.sink both take Cascading taps as arguments, which in this case we specify using Hfs. Hfs is a standard Cascading Java class, and we pass in the two required parameters to it, a Scheme describing the formatting of the data, and the location of the files (input_file.txt is a normal file, and output_folder is the name where the part-* files will be generated by the reducers). Note that we didn't specify resource locators in the file names: if we run the script with remote_deploy.sh, they default to HDFS, and if we run it with local_run.sh, they will be local.

input and output are the head and tail of the data flow pipeline, respectively. input will be used as a source from where data comes from, and the results will be written into output. The operations are chained together with the pipe operator |:

input | map_replace(split_words, 'word') | group_by('word', native.count()) | output

This means that records from input are mapped to new records using the Python function split_words. split_words splits up the incoming lines on whitespaces using the split() Python function, and it produces a single column, which we will call word. map_replace takes some fields from the input tuples (in the example it takes all the fields), and replaces these fields with the fields in the output. In the example, the incoming fields are offset and line, and these are going to be mapped to the output of split_words, which we call word.

We then group the words (that's what group_by does on the words field), and count how many times each word occurs. The second parameter to group_by is the aggregator we are applying to each grouping, in this case the built-in Cascading Count aggregator. The aggregator's output field will be appended to the grouping field, and this is what we are piping into output: the words and the times they appeared in the text.

Getting help