Switch branches/tags
Nothing to show
Find file History
Fetching latest commit…
Cannot retrieve the latest commit at this time.
Type Name Latest commit message Commit time
Failed to load latest commit information.



Workflow checkpointing for Cascalog.


Add the following to project.clj:

[cascalog-checkpoint "0.1.1"]


The workflow macro in the checkpoint module allows you to break complicated workflows out into small, checkpointed steps. If one of these steps causes a job to fail and you restart the job, the workflow macro will skip every step up to the previous point of failure. Fault-tolerant MapReduce topologies ftw!

Let’s look at the workflow macro in action. The following function takes an input-path to some existing Twitter data and an output-path, and executes a tweet-processing workflow with five steps:

(defn -main
  [input-path output-path]
  (workflow ["/tmp/example-checkpoint"]          
            step-1     ([:tmp-dirs [staging-path]]
                          (transfer-tweets input-path staging-path))

            step-2     ([:deps :last :tmp-dirs user-path]
                          (harvest-users staging-path user-path))

            step-3a    ([:deps step-2 :tmp-dirs [cluster-path friend-path]]
                          (cluster-users user-path cluster-path)
                          (count-friends user-path friend-path))

            step-3b    ([:deps step-2 :tmp-dirs age-path]
                          (examine-ages user-path age-path))

            final-step ([:deps :all]
                          (big-analysis cluster-path

Let’s look at this one piece at a time. The first argument to workflow is a vector with some path that the workflow can use to stage temporary files.

(workflow ["/tmp/example-checkpoint"] ...)

It doesn’t matter what path you choose; just make sure that Hadoop has access and can write data to the folder.

Following this vector, workflow expects pairs of the form

step-name ([:deps <optional-deps, defaults to :last>]
             :tmp-dirs [<optional, creates temp-dirs if supplied>]
             ...<body, same as inside let>...)

Steps can identify other steps as dependencies by referencing their step-names with the :deps keyword argument.

The first step creates a temporary directory by supplying the symbol staging-path to the :tmp-dirs keyword argument. It then transfers tweets from the input directory into this staging directory, where they will remain available for future steps to consume.

step-1 ([:tmp-dirs [staging-path]]
          (transfer-tweets input-path staging-path))

Step 2 marks :last as a dependency. :last is the default, and marks the step as dependent only on the step directly above. A step will not execute until all of its dependencies have completed successfully.

step-2 uses staging-path defined in step-1 and creates a new temp directory (user-path) for its results.

If step-2 fails for any reason and you restart the workflow, the workflow macro will skip step-1, destroy any temporary directories created in the previous run of step-2, and start step-2 afresh.

step-2 ([:deps :last :tmp-dirs user-path]
          (harvest-users staging-path user-path))

The next two steps, step-3a and step-3b, each mark step-2 as a dependency. Once step-2 completes, step-3a and step-3b will run in parallel.

step-3a ([:deps step-2 :tmp-dirs [cluster-path friend-path]]
           (cluster-users user-path cluster-path)
           (count-friends user-path friend-path))
step-3b ([:deps step-2 :tmp-dirs age-path]
           (examine-ages user-path age-path))

The final step marks its dependencies as :all. This signifies that the step must wait for every step defined above it to complete before running. Again, if final-step fails and the workflow restarts, all previous successful steps will be skipped.

final-step ([:deps :all]
              (big-analysis cluster-path