Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

Start building multi-key joins

  • Loading branch information...
commit 8de783e15165a9cf0645b119cf115bd7ac217938 1 parent 79e9948
Darrick Wiebe authored
Showing with 12 additions and 8 deletions.
  1. +12 −8 lib/pacer/transform/join.rb
View
20 lib/pacer/transform/join.rb
@@ -12,7 +12,7 @@ def join(name = nil, options = {}, &block)
graph: options.fetch(:multi_graph, Pacer::MultiGraph.new),
from_graph: graph
}
- args[:multi_graph] = options[:multi_graph] if options[:multi_graph]
+ args[:existing_multi_graph] = options[:multi_graph] if options[:multi_graph]
route = chain_route(args)
route = route.key { |v| v } unless name == :key
if block and name == :key
@@ -37,7 +37,7 @@ class CombinePipe < Pacer::Pipes::RubyPipe
include SideEffectPipe rescue nil # may raise exception on reload.
attr_accessor :multi_graph, :current_keys, :current_values, :join_on
- attr_reader :key_expando, :key_end, :values_pipes, :from_graph
+ attr_reader :key_names, :keys, :values_pipes, :from_graph
def initialize(from_graph, multi_graph)
super()
@@ -46,10 +46,13 @@ def initialize(from_graph, multi_graph)
@values_pipes = []
@current_keys = []
@current_values = []
+ @key_names = []
+ @keys = []
end
- def setKeyPipe(from_pipe, to_pipe)
- @key_expando, @key_end = prepare_aggregate_pipe(from_pipe, to_pipe)
+ def addKeyPipe(name, from_pipe, to_pipe)
+ key_names << name
+ keys << prepare_aggregate_pipe(from_pipe, to_pipe)
end
def addValuesPipe(name, from_pipe, to_pipe)
@@ -94,11 +97,12 @@ def processNextStart
def get_keys(element)
array = LinkedList.new
+ keys.each do |key_expando, key_end|
if key_expando
array.addAll next_results(key_expando, key_end, element)
- else
- array.add nil
end
+ array.add nil if array.empty?
+
array
end
@@ -134,7 +138,7 @@ def prepare_aggregate_pipe(from_pipe, to_pipe)
attr_accessor :existing_multi_graph, :key_route, :values_routes, :from_graph
attr_writer :join_on
- def key(&block)
+ def key(name = :key, &block)
self.key_route = Pacer::Route.block_branch(self, block)
self
end
@@ -167,7 +171,7 @@ def after_initialize
def attach_pipe(end_pipe)
pipe = CombinePipe.new(from_graph, existing_multi_graph)
self.graph = pipe.multi_graph
- pipe.setKeyPipe *key_route.send(:build_pipeline) if key_route
+ pipe.addKeyPipe :key, *key_route.send(:build_pipeline) if key_route
pipe.join_on = @join_on
values_routes.each do |name, route|
pipe.addValuesPipe name, *route.send(:build_pipeline)
Please sign in to comment.
Something went wrong with that request. Please try again.