Skip to content

Commit

Permalink
Document Cascading module and Taps
Browse files Browse the repository at this point in the history
  • Loading branch information
mrwalker committed Apr 24, 2013
1 parent 9ba7db1 commit 3814101
Show file tree
Hide file tree
Showing 4 changed files with 138 additions and 24 deletions.
2 changes: 1 addition & 1 deletion README.md
Expand Up @@ -30,6 +30,6 @@ cascading.jruby provides a clean Ruby interface to Cascading, but doesn't attemp

For operations you can apply to your dataflow within a pipe assembly, see the [Assembly](http://rubydoc.info/gems/cascading.jruby/0.0.10/Cascading/Assembly) class. For operations available within a block passed to a group_by, union, or join, see the [Aggregations](http://rubydoc.info/gems/cascading.jruby/0.0.10/Cascading/Aggregations) class.

Note that the Ruby code you write merely constructs a Cascading job, so no JRuby runtime is required on your cluster. This stands in contrast with writing [Hadoop streaming jobs in Ruby](http://www.quora.com/How-do-the-different-options-for-Ruby-on-Hadoop-compare). To run cascading.jruby applications on a Hadoop cluster, you must use [Jading](https://github.com/etsy/jading) to package them into a job jar.
Note that the Ruby code you write merely constructs a Cascading job, so no JRuby runtime is required on your cluster. This stands in contrast with writing [Hadoop streaming jobs in Ruby](http://www.quora.com/How-do-the-different-options-for-Ruby-on-Hadoop-compare). To run cascading.jruby applications on a Hadoop cluster, you must use [Jading](https://github.com/mrwalker/jading) to package them into a job jar.

cascading.jruby has been tested on JRuby versions 1.2.0, 1.4.0, 1.5.3, 1.6.5, 1.6.7.2, 1.7.0, and 1.7.3.
2 changes: 1 addition & 1 deletion cascading.jruby.gemspec
Expand Up @@ -8,7 +8,7 @@ Gem::Specification.new do |s|
s.authors = ["Matt Walker", "Gr\303\251goire Marabout"]
s.description = "cascading.jruby is a small DSL above Cascading, written in JRuby"
s.email = "mwalker@etsy.com"
s.extra_rdoc_files = ["LICENSE.txt"]
s.extra_rdoc_files = ["README.md", "LICENSE.txt"]
s.files = Dir.glob("lib/**/*.rb")
s.homepage = "http://github.com/etsy/cascading.jruby"
s.rdoc_options = ["--main", "README.md"]
Expand Down
108 changes: 99 additions & 9 deletions lib/cascading/cascading.rb
@@ -1,5 +1,30 @@
require 'cascading/cascade'
require 'cascading/flow'
require 'cascading/expr_stub'

# The Cascading module contains all of the cascading.jruby DSL. Inserting the
# following into your script:
# require 'rubygems'
# require 'cascading'
# includes this module at the top level, making all of its features available.
#
# To build a dataflow like the one in the README.md or
# {samples}[http://github.com/mrwalker/cascading.jruby/tree/master/samples],
# start by looking at Cascade or Flow. These are the
# highest level structures you'll use to put together your job.
#
# Within a flow, you'll connect sources to sinks by way of Assembly, which
# refers to "pipe assemblies" from Cascading. Within an Assembly, you'll use
# functions and filters (see Operations, IdentityOperations, RegexOperations,
# FilterOperations, and TextOperations) as well as Assembly#group_by,
# Assembly#union, and Assembly#join. You can provide those last pipes with a
# block that can select operations from Aggregations.
#
# Finally, you'll want to address the execution of your job, whether it be
# locally testing or running remotely on a Hadoop cluster. See the Mode class
# for the available modes, and parameterize your script such that it can operate
# in Cascading local mode locally and in Hadoop mode when run in a jar produced
# with {Jading}[http://github.com/mrwalker/jading].
module Cascading
# Mapping that defines a convenient syntax for specifying Java classes, used
# in Janino expressions and elsewhere.
Expand All @@ -26,8 +51,21 @@ module Cascading
# directly building their own cascades and flows so that jading can send them
# default properties.

# Builds a top-level cascade given a name and a block. Optionally accepts a
# :mode, as explained in Cascading::Cascade#initialize.
# Builds a top-level cascade given a name and a block.
#
# The named options are:
# [mode] See Cascade#initialize
#
# Example:
# cascade 'wordcount', :mode => :local do
# flow 'first_step' do
# ...
# end
#
# flow 'second_step' do
# ...
# end
# end
def cascade(name, options = {}, &block)
raise "Could not build cascade '#{name}'; block required" unless block_given?
raise 'Cascading::cascade does not accept the :properties param only the global $jobconf_properties' if options[:properties]
Expand All @@ -40,8 +78,21 @@ def cascade(name, options = {}, &block)
end

# Builds a top-level flow given a name and block for applications built of
# flows with no cascades. Optionally accepts a :mode, as explained in
# Cascading::Flow#initialize.
# flows with no cascades.
#
# The named options are:
# [mode] See Cascade#initialize
#
# Example:
# flow 'wordcount', :mode => :local do
# assembly 'first_step' do
# ...
# end
#
# assembly 'second_step' do
# ...
# end
# end
def flow(name, options = {}, &block)
raise "Could not build flow '#{name}'; block required" unless block_given?
raise 'Cascading::flow does not accept the :properties param only the global $jobconf_properties' if options[:properties]
Expand All @@ -53,6 +104,11 @@ def flow(name, options = {}, &block)
flow
end

# Produces a textual description of all Cascades in the global registry. The
# description details structure, sources, sinks, and the input and output
# fields of each assembly.
#
# NOTE: will be moved to Job in later version
def describe
Cascade.all.map{ |cascade| cascade.describe }.join("\n")
end
Expand All @@ -63,7 +119,14 @@ def expr(expression, options = {})
ExprStub.expr(expression, options)
end

# Creates a cascading.tuple.Fields instance from a string or an array of strings.
# Utility method for creating Cascading c.t.Fields from a field name (string)
# or list of field names (array of strings). If the input fields is already a
# c.t.Fields or nil, it is passed through. This allows for flexible use of
# the method at multiple layers in the DSL.
#
# Example:
# cascading_fields = fields(['first', 'second', 'third'])
# # cascading_fields.to_a == ['first', 'second', 'third']
def fields(fields)
if fields.nil?
return nil
Expand All @@ -78,16 +141,24 @@ def fields(fields)
return Java::CascadingTuple::Fields.new([fields].flatten.map{ |f| f.kind_of?(Fixnum) ? java.lang.Integer.new(f) : f }.to_java(java.lang.Comparable))
end

# Convenience method wrapping c.t.Fields::ALL
def all_fields
Java::CascadingTuple::Fields::ALL
end

# Convenience method wrapping c.t.Fields::VALUES
def last_grouping_fields
Java::CascadingTuple::Fields::VALUES
end

# Computes fields formed by removing remove_fields from base_fields. Operates
# only on named fields, not positional fields.
#
# Example:
# base_fields = fields(['a', 'b', 'c'])
# remove_fields = fields(['b'])
# result_fields = difference_fields(base_fields, remove_fields)
# # results_fields.to_a == ['a', 'c']
def difference_fields(base_fields, remove_fields)
fields(base_fields.to_a - remove_fields.to_a)
end
Expand All @@ -102,6 +173,13 @@ def dedup_fields(*fields)

# Helper used by dedup_fields that operates on arrays of field names rather
# than fields objects.
#
# Example:
# left_names = ['a', 'b']
# mid_names = ['a', 'c']
# right_names = ['a', 'd']
# deduped_names = dedup_field_names(left_names, mid_names, right_names)
# # deduped_names == ['a', 'b', 'a_', 'c', 'a__', 'd']
def dedup_field_names(*names)
names.inject([]) do |acc, arr|
acc + arr.map{ |e| search_field_name(acc, e) }
Expand All @@ -114,15 +192,14 @@ def search_field_name(names, candidate)
private :search_field_name

# Creates a TextLine scheme (can be used in both Cascading local and hadoop
# modes). Positional args are used if <tt>:source_fields</tt> is not
# provided.
# modes). Positional args are used if :source_fields is not provided.
#
# The named options are:
# [source_fields] Fields to be read from a source with this scheme. Defaults
# to ['offset', 'line'].
# [sink_fields] Fields to be written to a sink with this scheme. Defaults to
# all_fields.
# [compression] A symbol, either <tt>:enable</tt> or <tt>:disable</tt>, that
# [compression] A symbol, either :enable or :disable, that
# governs the TextLine scheme's compression. Defaults to the
# default TextLine compression (only applies to c.s.h.TextLine).
def text_line_scheme(*args_with_options)
Expand Down Expand Up @@ -151,15 +228,28 @@ def sequence_file_scheme(*fields)
}
end

# Convenience access to MultiTap.multi_source_tap. This constructor is more
# "DSL-like" because it allows you to pass taps directly as actual args rather
# than in an array:
# multi_source_tap tap1, tap2, tap3, ..., tapn
#
# See MultiTap.multi_source_tap for more details.
def multi_source_tap(*taps)
MultiTap.multi_source_tap(taps)
end

# Convenience access to MultiTap.multi_sink_tap. This constructor is more
# "DSL-like" because it allows you to pass taps directly as actual args rather
# than in an array:
# multi_sink_tap tap1, tap2, tap3, ..., tapn
#
# See MultiTap.multi_sink_tap for more details.
def multi_sink_tap(*taps)
MultiTap.multi_sink_tap(taps)
end

# Creates a Cascading::Tap given a path and optional :scheme and :sink_mode.
# Convenience constructor for a Tap, that accepts the same options as that
# class' constructor. See Tap for more details.
def tap(path, options = {})
Tap.new(path, options)
end
Expand Down
50 changes: 37 additions & 13 deletions lib/cascading/tap.rb
@@ -1,37 +1,52 @@
module Cascading
# A Cascading::BaseTap wraps up a pair of Cascading taps, one for Cascading
# local mode and the other for Hadoop mode.
# A BaseTap wraps up a pair of Cascading taps, one for Cascading local mode
# and the other for Hadoop mode. Note that these are optional, but at least
# one must be provided for most taps. A SequenceFile is a notable example of
# a Scheme for which their is no Cascading local mode version, so a Tap you
# build with it will have no local_tap.
class BaseTap
attr_reader :local_tap, :hadoop_tap

# Constructor that accepts the local_tap and hadoop_tap, which may be nil
def initialize(local_tap, hadoop_tap)
@local_tap = local_tap
@hadoop_tap = hadoop_tap
end

# Passes through printing the local_tap and hadoop_tap
def to_s
"Local: #{local_tap}, Hadoop: #{hadoop_tap}"
end

# Returns false if the local_tap is nil, true otherwise
def local?
!local_tap.nil?
end

# Returns false if the hadoop_tap is nil, true otherwise
def hadoop?
!hadoop_tap.nil?
end
end

# A Cascading::Tap represents a non-aggregate tap with a scheme, path, and
# optional sink_mode. c.t.l.FileTap is used in Cascading local mode and
# c.t.h.Hfs is used in Hadoop mode. Whether or not these can be created is
# governed by the :scheme parameter, which must contain at least one of
# :local_scheme or :hadoop_scheme. Schemes like TextLine are supported in
# both modes (by Cascading), but SequenceFile is only supported in Hadoop
# mode.
# A Tap represents a non-aggregate tap with a scheme, path, and optional
# sink_mode. c.t.l.FileTap is used in Cascading local mode and c.t.h.Hfs is
# used in Hadoop mode. Whether or not these can be created is governed by the
# :scheme parameter, which must contain at least one of :local_scheme or
# :hadoop_scheme. Schemes like TextLine are supported in both modes (by
# Cascading), but SequenceFile is only supported in Hadoop mode.
class Tap < BaseTap
attr_reader :scheme, :path, :sink_mode

# Builds a Tap given a required path
#
# The named options are:
# [scheme] A Hash which must contain at least one of :local_scheme or
# :hadoop_scheme but may contain both. Default is
# text_line_scheme, which works in both modes.
# [sink_mode] A symbol or string that may be :keep, :replace, or :append,
# and corresponds to the c.t.SinkMode enumeration. The default
# is :keep, which matches Cascading's default.
def initialize(path, options = {})
@path = path

Expand All @@ -53,19 +68,28 @@ def initialize(path, options = {})
end
end

# A Cascading::MultiTap represents one of Cascading's aggregate taps and is
# built via static constructors that accept an array of Cascading::Taps. In
# order for a mode (Cascading local or Hadoop) to be supported, all provided
# taps must support it.
# A MultiTap represents one of Cascading's aggregate taps and is built via
# static constructors that accept an array of Taps. In order for a mode
# (Cascading local or Hadoop) to be supported, all provided taps must support
# it.
class MultiTap < BaseTap
# Do not call this constructor directly; instead, use one of
# MultiTap.multi_source_tap or MultiTap.multi_sink_tap.
def initialize(local_tap, hadoop_tap)
super(local_tap, hadoop_tap)
end

# Static constructor that builds a MultiTap wrapping a c.t.MultiSourceTap
# from the given array of Taps. The resulting MultiTap will only be
# available in Cascading local mode or Hadoop mode if all input taps support
# them.
def self.multi_source_tap(taps)
multi_tap(taps, Java::CascadingTap::MultiSourceTap)
end

# Static constructor that builds a MultiTap wrapping a c.t.MultiSinkTap from
# the given array of Taps. The resulting MultiTap will only be available in
# Cascading local mode or Hadoop mode if all input taps support them.
def self.multi_sink_tap(taps)
multi_tap(taps, Java::CascadingTap::MultiSinkTap)
end
Expand Down

0 comments on commit 3814101

Please sign in to comment.