Skip to content

Commit

Permalink
Fix bug in outer joins w/ multiple predicates.
Browse files Browse the repository at this point in the history
In the previous coding, we only used the first join predicate to probe the
join's hash table; the rest of the predicates were checked manually
(`test_locals`). The outer join code incorrectly assumed that finding a match on
the hash table probe was equivalent to finding a matching tuple for a given join
input.

To fix this, we could apply `test_locals` in the outer join code. However, the
previous coding also imposes a performance penalty on join operators with
multiple predicates. So instead, check all the join predicates by probing the
hash table.

One consequence is that the keys of the join hash table are now arrays, rather
than scalar values. This results in a ~10% slowdown for a join microbenchmark
(test/perf/join_bench.rb), which just does a ton of hash table probes. I'll look
into addressing that shortly.

Fixes bloom-lang#315.
  • Loading branch information
neilconway committed Apr 19, 2013
1 parent 7456d6d commit f211b10
Show file tree
Hide file tree
Showing 3 changed files with 54 additions and 49 deletions.
7 changes: 4 additions & 3 deletions History.txt
Expand Up @@ -12,13 +12,14 @@
or <~ for all insertions from outside Bloom rules.
* Fix bug in join operators whose qualifiers reference collections defined
inside imported modules (#301)
* Fix bug in join operators when the same table/column name appears on the LHS
of multiple join qualifiers (#313)
* Fix bug in join operators when the same table and column name appears on the
LHS of multiple join predicates (#313)
* Fix bug in outer joins with multiple predicates (#315)
* Fix several bugs in rescan/invalidation logic for tables in the presence of
upstream deletions (#303)
* Fix regression in file_reader collections (#304)
* Improve chaining of notin operators
* Optimize join evaluation
* Optimize join evaluation with multiple predicates
* Optimize notin self joins
* Improve error reporting for syntax errors on temp collections (#310)
* Add Travis CI integration hooks (#300, Josh Rosen)
Expand Down
63 changes: 23 additions & 40 deletions lib/bud/executor/join.rb
Expand Up @@ -121,17 +121,10 @@ def setup_preds(preds) # :nodoc: all
@rels[0].setup_preds(otherpreds)
end

if @localpreds.length > 0
# We evaluate the first predicate via probing the hash table, so remove
# it from the list of @localpreds to be checked separately.
first_pred = @localpreds.shift
@right_offset = first_pred[1][1]
@left_subtuple, @left_offset = join_offset(first_pred[0])
@keys = [[@left_subtuple, @left_offset], [1, @right_offset]]

if @left_is_array
@left_join_offsets = @localpreds.map {|p| join_offset(p[0])}
end
@localpreds.each do |lp|
right_offset = lp[1][1]
left_subtuple, left_offset = join_offset(lp[0])
@keys << [[left_subtuple, left_offset], [1, right_offset]]
end
end

Expand Down Expand Up @@ -239,25 +232,6 @@ def canonicalize_localpreds(rel_list, preds) # :nodoc:all
end
end

private
# right is a tuple
# left is a tuple or an array (combo) of joined tuples.
def test_locals(left, right)
@localpreds.each_with_index do |pred,i|
# assumption of left-deep joins here
rfield = right[pred[1][1]]
if @left_is_array
ix, off = @left_join_offsets[i]
lfield = left[ix][off]
else
lfield = left[pred[0][1]]
end
return false if lfield != rfield
end

return true
end

undef do_insert

public
Expand Down Expand Up @@ -285,12 +259,17 @@ def insert_item(item, offset)
if @keys.empty?
the_key = nil
else
the_key = []
# assumes left-deep trees
if @left_is_array and offset == 0
left_subtuple, left_offset = @keys.first
the_key = item[left_subtuple][left_offset]
@keys.each do |k|
left_subtuple, left_offset = k.first
the_key << item[left_subtuple][left_offset]
end
else
the_key = item[@keys[offset][1]]
@keys.each do |k|
the_key << item[k[offset][1]]
end
end
end
#build
Expand Down Expand Up @@ -341,10 +320,9 @@ def process_matches(item, the_matches, offset)
right = item
end

if test_locals(left, right)
result = @left_is_array ? left + [right] : [left, right] # FIX: reduce arrays being created.
push_out(result)
end
# FIX: reduce arrays being created
result = @left_is_array ? left + [right] : [left, right]
push_out(result)
end
end

Expand Down Expand Up @@ -463,11 +441,16 @@ def insert_item(item, offset)
if @keys.empty?
the_key = nil
else
the_key = []
if @left_is_array and offset == 1
left_subtuple, left_offset = keys.first
the_key = item[left_subtuple][left_offset]
@keys.each do |k|
left_subtuple, left_offset = k.first
the_key << item[left_subtuple][left_offset]
end
else
the_key = item[@keys[offset][1]]
@keys.each do |k|
the_key << item[k[offset][1]]
end
end
end
#build
Expand Down
33 changes: 27 additions & 6 deletions test/tc_joins.rb
Expand Up @@ -255,9 +255,7 @@ class PartlyQualifiedCombo
end

bloom do
# result is never populated
result1 <= (tee * arr * ess).combos(arr.key => ess.key)
# but it is when the join is specified in this order
result2 <= (arr * ess * tee).combos(arr.key => ess.key)
end
end
Expand Down Expand Up @@ -406,7 +404,7 @@ class FlattenJoins

def test_flatten_joins
p = FlattenJoins.new
p.tick; #p.tick
p.tick
assert_equal(2, p.out.length)
assert_equal(1, p.out2.length)
end
Expand Down Expand Up @@ -479,7 +477,7 @@ class SharedJoin

def test_shared_join
p = SharedJoin.new
p.tick;#p.tick
p.tick
assert_equal([[1, 1, 1], [2, 1, 1], [3, 2, 2], [3, 3, 2]], p.out1.to_a.sort)
assert_equal([[1, 1], [2, 1], [3, 2]], p.out2.to_a.sort)
end
Expand Down Expand Up @@ -601,7 +599,7 @@ def test_192
p = Issue192.new
p.intab1 << [-1]
p.intab2 << [-1]
p.tick;
p.tick
assert_equal([[0]], p.outtab1.to_a)
assert_equal([[0]], p.outtab2.to_a)
end
Expand Down Expand Up @@ -643,7 +641,21 @@ class OjChannel
end
end

class OjChannelTest < MiniTest::Unit::TestCase
class OjMultiplePreds
include Bud

state do
table :t1
table :t2
scratch :t3
end

bloom do
t3 <= (t1 * t2).outer(:key => :key, :val => :val) {|x,y| x}
end
end

class TestOuterJoins < MiniTest::Unit::TestCase
def test_oj_channel
o = OjChannel.new
o.run_bg
Expand All @@ -660,6 +672,15 @@ def test_oj_channel
assert_equal([[o.ip_port, "franklin", true]], rv.to_a.sort)
o.stop
end

def test_oj_multi_preds
i = OjMultiplePreds.new
i.t1 <+ [[5, 10]]
i.t2 <+ [[5, 11]]
i.tick

assert_equal([[5, 10]], i.t3.to_a.sort)
end
end

class TestScanReplay
Expand Down

0 comments on commit f211b10

Please sign in to comment.