Skip to content
Browse files

Some doc and a new sample : join.rb

  • Loading branch information...
1 parent 2667e41 commit 2f36f9a8385ce9edcce248e5ef3c9023788679c6 @gmarabout gmarabout committed Mar 24, 2009
View
18 HACKING
@@ -1,2 +1,18 @@
-= Hacking on cascading.jruby
+Some hacking info on cascading.jruby
+ * cascading.jruby can be packaged as a gem. To do so, you must generate the necessary packaging files:
+
+ jruby -S rake gem
+
+will produce the gem in the pkg/ dub-directory.
+After that, just cd this directory and:
+
+ jruby -S rake install cascading.jruby-xxx.gem
+
+ * The "Cascading::Operations" module is mixed-in the Cascading::Assembly class to provide some shortcuts
+ for common operations. I don't know if this is the best way to do it, though...
+
+ * The high level commands for creating new pipes are defined in cascading/helpers.rb.
+
+ * The fil cascading/cascading.rb defines global helper methods for cascading like tap creation, fields
+creation, etc.
View
41 lib/cascading/assembly.rb
@@ -13,7 +13,9 @@
module Cascading
class AssemblyFactory
-
+
+
+ # Builds a join (CoGroup) pipe.
def join(node, *args)
options = args.extract_options!
@@ -37,8 +39,7 @@ def join(node, *args)
group_fields << fields(v)
end
end
-
-
+
group_fields = group_fields.to_java(Java::CascadingTuple::Fields)
declared_fields = options[:declared_fields]
@@ -50,10 +51,7 @@ def join(node, *args)
joiner = Java::CascadingPipeCogroup::InnerJoin.new
end
-
-
- parameters = [pipes.to_java(Java::CascadingPipe::Pipe), group_fields, declared_fields, joiner].compact
-
+ parameters = [pipes.to_java(Java::CascadingPipe::Pipe), group_fields, declared_fields, joiner].compact
node.make_pipe(Java::CascadingPipe::CoGroup, *parameters)
end
@@ -76,18 +74,22 @@ def group_by(node, *args)
reverse = options[:reverse]
parameters = [node.tail_pipe, group_fields, sort_fields, reverse].compact
- node.make_pipe(Java::CascadingPipe::GroupBy, *parameters)
+ node.make_pipe(Java::CascadingPipe::GroupBy, *parameters)
end
+ # Unifies several pipes sharing the same field structure.
+ # This actually creates a GroupBy pipe.
+ # It expects a list of assemblies as parameter.
def union_pipes(node, *args)
pipes = args[0].map do |pipe|
- #puts pipe.class
- pipe.tail_pipe
+ assembly = Assembly.get(pipe)
+ assembly.tail_pipe
end
node.make_pipe(Java::CascadingPipe::GroupBy, pipes.to_java(Java::CascadingPipe::Pipe))
end
+ # Builds an basic _every_ pipe, and adds it to the current assembly.
def every(node, *args)
# puts "Create every pipe"
options = args.extract_options!
@@ -100,6 +102,12 @@ def every(node, *args)
node.make_pipe(Java::CascadingPipe::Every, *parameters)
end
+ # Builds a basic _each_ pipe, and adds it to the current assembly.
+ # --
+ # Example:
+ # each "line", :filter=>regex_splitter(["name", "val1", "val2", "id"],
+ # :pattern => /[.,]*\s+/),
+ # :output=>["id", "name", "val1", "val2"]
def each(node, *args)
# puts "Create each pipe"
options = args.extract_options!
@@ -112,16 +120,16 @@ def each(node, *args)
node.make_pipe(Java::CascadingPipe::Each, *parameters)
end
- def co_group(node, *args)
- raise "not implemented yet"
- end
-
- # Keeps only the specified fields in the assembly:
+ # Restricts the current assembly to the specified fields.
+ # --
+ # Example:
+ # restrict_to "field1", "field2"
def restrict_to(node, *args)
operation = Java::CascadingOperation::Identity.new()
node.make_pipe(Java::CascadingPipe::Each, node.tail_pipe, Cascading.fields(args), operation)
end
+ # Renames the first list of fields to the second one.
def rename(node, *args)
old_names = args[0]
new_names = args[1]
@@ -151,6 +159,9 @@ def assert(node, *args)
node.make_pipe(Java::CascadingPipe::Each, node.tail_pipe, assertion_level, assertion)
end
+
+
+ alias co_group join
end # class Assembly
View
8 lib/cascading/helpers.rb
@@ -175,12 +175,8 @@ def replace(*args)
# The methods outputs all fields.
# The named options are:
def insert(args)
- keys = []
- values = []
- args.each do |k, v|
- keys << k
- values << v
- end
+ keys = args.keys
+ values = args.values
each all_fields, :function => insert_function(keys, :values => values), :output => all_fields
end
View
13 lib/cascading/operations.rb
@@ -128,19 +128,6 @@ def expression_filter(*args)
Java::CascadingOperationExpression::ExpressionFilter.new(*arguments)
end
- # def json_generator(names)
- # fields = []
- # paths = []
- # names.each do |k,v|
- # fields << k
- # paths << v
- # end
- # fields = Cascading.fields(fields)
- #
- # parameters = [fields, paths.to_java(java.lang.String)].compact
- # Java::OrgCascadingJson::JSONGenerator.new(*parameters)
- # end
-
def date_parser(field, format)
fields = fields(field)
Java::CascadingOperationText::DateParser.new(fields, format)
View
3 samples/data/data_join1.txt
@@ -0,0 +1,3 @@
+1 Grégoire
+2 Mathias
+3 Stéphane
View
3 samples/data/data_join2.txt
@@ -0,0 +1,3 @@
+1 33
+2 30
+3 25
View
3 samples/data/data_join3.txt
@@ -0,0 +1,3 @@
+1 Cannes
+2 Boston
+3 Paris
View
34 samples/join.rb
@@ -0,0 +1,34 @@
+require "cascading"
+
+input1 = "samples/data/data_join1.txt"
+input2 = "samples/data/data_join2.txt"
+input3 = "samples/data/data_join3.txt"
+
+output = "output/joined"
+
+flow = Cascading::Flow.new("Join sample") do
+ source "extract1", tap(input1)
+ source "extract2", tap(input2)
+ source "extract3", tap(input3)
+
+ sink "join", tap(output, :replace=>true)
+
+ assembly "extract1" do
+ split "line", :pattern => /[.,]*\s+/, :into=> ["id", "name"], :output => ["id", "name"]
+ end
+
+ assembly "extract2" do
+ split "line", :pattern => /[.,]*\s+/, :into=> ["id", "age"], :output => ["id", "age"]
+ end
+
+ assembly "extract3" do
+ split "line", :pattern => /[.,]*\s+/, :into=> ["id", "city"], :output => ["id", "city"]
+ end
+
+ assembly "join" do
+ join "extract1", "extract2", "extract3", :group_fields => ["id"], :declared_fields => ["id", "name", "id2", "age", "id3", "city"]
+ restrict_to "id", "name", "age", "city"
+ end
+end
+
+flow.complete
View
1 samples/splitter.rb
@@ -10,7 +10,6 @@
assembly "copy" do
- # Split "line" using a JSONSplitter
split "line", :pattern => /[.,]*\s+/, :into=>["name", "score1", "score2", "id"], :output => ["name", "score1", "score2", "id"]
group_by "score1"
View
4 samples/union.rb
@@ -27,10 +27,8 @@
count
rename ["score2"], ["score"]
end
-
-
+
union b1, b2
-
end
end
View
5 test/test_assembly.rb
@@ -228,7 +228,6 @@ def test_splitter
assembly "copy" do
- # Split "line" using a JSONSplitter
split "line", :pattern => /[.,]*\s+/, :into=>["name", "score1", "score2", "id"], :output => ["name", "score1", "score2", "id"]
assert_size_equals 4
@@ -251,7 +250,6 @@ def test_join1
assembly1 = assembly "data1" do
- # Split "line" using a JSONSplitter
split "line", :pattern => /[.,]*\s+/, :into=>["name", "score1", "score2", "id"], :output => ["name", "score1", "score2", "id"]
assert_size_equals 4
@@ -263,7 +261,6 @@ def test_join1
assembly2 = assembly "data2" do
- # Split "line" using a JSONSplitter
split "line", :pattern => /[.,]*\s+/, :into=>["name", "id", "town"], :output => ["name", "id", "town"]
assert_size_equals 3
@@ -293,7 +290,6 @@ def test_join2
assembly1 = assembly "data1" do
- # Split "line" using a JSONSplitter
split "line", :pattern => /[.,]*\s+/, :into=>["name", "score1", "score2", "id"], :output => ["name", "score1", "score2", "id"]
debug :print_fields=>true
@@ -302,7 +298,6 @@ def test_join2
assembly2 = assembly "data2" do
- # Split "line" using a JSONSplitter
split "line", :pattern => /[.,]*\s+/, :into=>["name", "code", "town"], :output => ["name", "code", "town"]
debug :print_fields=>true

0 comments on commit 2f36f9a

Please sign in to comment.
Something went wrong with that request. Please try again.