Skip to content
Browse files

flush for sigmod evals

  • Loading branch information...
1 parent 15e0a40 commit 9ebb64681bc6db83950bd50a9c0ca821628f1408 @palvaro committed Oct 20, 2012
Showing with 855 additions and 23 deletions.
  1. +1 −3 bin/budlabel
  2. +26 −1 lib/bud.rb
  3. +1 −0 lib/bud/bud_meta.rb
  4. +66 −10 lib/bud/labeling/labeling.rb
  5. +287 −0 lib/bud/labeling/labeling.rb.orig
  6. +363 −1 lib/bud/rewrite.rb
  7. +17 −7 lib/bud/storage/zookeeper.rb
  8. +93 −0 test/tc_meta.rb
  9. +1 −1 test/tc_zookeeper.rb
View
4 bin/budlabel
@@ -36,8 +36,6 @@ end
require @opts["r"]
c = Label.new(@opts["i"])
-puts "--- Report for module #{@opts["i"]} ---"
-
if @opts["C"]
puts "---------------"
puts "Output\t\tLabel"
@@ -48,11 +46,11 @@ if @opts["C"]
end
if @opts["P"]
- c.path_report.each_pair do |output, inpaths|
puts ""
puts "--------------------"
puts "Output\tInput\tLabel"
puts "--------------------"
+ c.path_report.each_pair do |output, inpaths|
puts output
inpaths.each_pair do |inp, lbl|
puts "\t#{inp}\t#{hreadable[lbl]}"
View
27 lib/bud.rb
@@ -268,6 +268,11 @@ def resolve_imports
self.t_depends << [imp_dep.bud_obj, imp_dep.rule_id, qlname,
imp_dep.op, qrname, imp_dep.nm, imp_dep.in_body]
end
+ mod_inst.t_lineage.each do |imp_lin|
+ qlsrc = "#{local_name}.#{imp_lin.src_tab}"
+ self.t_lineage << [imp_lin.bud_obj, imp_lin.rule_id, imp_lin.target_col,
+ qlsrc, imp_lin.src_col, imp_lin.func]
+ end
mod_inst.t_provides.each do |imp_pro|
qintname = "#{local_name}.#{imp_pro.interface}"
self.t_provides << [qintname, imp_pro.input]
@@ -307,8 +312,24 @@ def call_state_methods
meth_map[id] << self.method(m)
end
+ #tabs_pre = self.tables.keys
+ tabs_pre = {}
+ self.tables.keys.each{|t| tabs_pre[t] = true}
meth_map.keys.sort.each do |i|
- meth_map[i].each {|m| m.call}
+
+ meth_map[i].each do |m|
+ tabs_pre = {}
+ self.tables.keys.each{|t| tabs_pre[t] = true}
+ m.call
+
+ self.tables.keys.each do |k|
+ unless tabs_pre[k]
+ next unless m.to_s =~ /__state\d+__([^>]+)/
+ mod = Regexp.last_match.captures.first
+ t_module_defines << [k.to_s, mod]
+ end
+ end
+ end
end
end
@@ -683,6 +704,7 @@ def stop(stop_em=false, do_shutdown_cb=true)
Bud.stop_em_loop
EventMachine::reactor_thread.join
end
+
report_metrics if options[:metrics]
end
alias :stop_bg :stop
@@ -924,6 +946,7 @@ def do_shutdown(do_shutdown_cb=true)
end
@timers.each {|t| t.cancel}
@tables.each_value {|t| t.close}
+
if EventMachine::reactor_running? and @bud_started
@dsock.close_connection
end
@@ -1142,6 +1165,8 @@ def builtin_state
table :t_cycle, [:predicate, :via, :neg, :temporal]
table :t_table_info, [:tab_name, :tab_type]
table :t_table_schema, [:tab_name, :col_name, :ord, :loc]
+ table :t_lineage, [:bud_obj, :rule_id, :target_col, :src_tab, :src_col, :func]
+ table :t_module_defines, [:tab_name, :defined_by]
# Identify builtin tables as such
@builtin_tables = @tables.clone if toplevel
View
1 lib/bud/bud_meta.rb
@@ -65,6 +65,7 @@ def shred_rules
rulebag.each_value do |v|
v.rules.each {|r| @bud_instance.t_rules << r}
v.depends.each {|d| @bud_instance.t_depends << d}
+ v.lineage.each {|l| @bud_instance.t_lineage << l}
end
end
View
76 lib/bud/labeling/labeling.rb
@@ -3,25 +3,40 @@
module Validate
state do
- scratch :dep, [:body, :head, :label]
+ scratch :dep, [:body, :head, :label, :subscript]
scratch :dep_tc, [:body, :head, :members]
scratch :scc, [:pred, :cluster]
scratch :scc_raw, scc.schema
- scratch :new_dep, [:body, :head, :label]
+ scratch :new_dep, [:body, :head, :label, :subscript]
scratch :labeled_path, [:body, :head, :path, :label]
scratch :full_path, labeled_path.schema
scratch :ndn, new_dep.schema
scratch :iinterface, t_provides.schema
scratch :ointerface, t_provides.schema
scratch :iin, t_provides.schema
scratch :iout, t_provides.schema
+ scratch :dnot, t_depends.schema
+
+ scratch :group_rollup_tmp, [:tag, :subscript]
+ scratch :group_rollup, [:bud_obj, :rule_id, :subscript]
+ scratch :lin, t_lineage.schema
+ scratch :lin1, t_lineage.schema
end
bloom do
- dep <= t_depends do |d|
- [d.body, d.lhs, labelof(d.op, d.nm)]
+
+ dep <= (t_depends * group_rollup).pairs(:rule_id => :rule_id, :bud_obj => :bud_obj) do |d, l|
+ [d.body, d.lhs, labelof(d.op, d.nm), l.subscript]
end
+ dnot <= t_depends.notin(group_rollup, :rule_id => :rule_id, :bud_obj => :bud_obj)
+ dnot <= t_depends.notin(group_rollup, :rule_id => :rule_id)
+ dep <= dnot{|d| [d.body, d.lhs, labelof(d.op, d.nm), nil]}
+
+ #dep <= t_depends do |d|
+ # [d.body, d.lhs, labelof(d.op, d.nm)]
+ #end
+
dep_tc <= dep do |d|
[d.body, d.head, Set.new([d.body, d.head])]
end
@@ -43,14 +58,14 @@ module Validate
new_dep <= (dep * scc * scc).combos do |d, s1, s2|
if d.head == s1.pred and d.body == s2.pred
- ["#{s2.cluster.join(",")}_IN", "#{s1.cluster.join(",")}_OUT", d.label]
+ ["#{s2.cluster.join(",")}_IN", "#{s1.cluster.join(",")}_OUT", d.label, d.subscript]
end
end
new_dep <= (dep * scc).pairs(:body => :pred) do |d, s|
- ["#{s.cluster.join(",")}_OUT", d.head, d.label] unless s.cluster.include? d.head
+ ["#{s.cluster.join(",")}_OUT", d.head, d.label, d.subscript] unless s.cluster.include? d.head
end
new_dep <= (dep * scc).pairs(:head => :pred) do |d, s|
- [d.body, "#{s.cluster.join(",")}_IN", d.label] unless s.cluster.include? d.body
+ [d.body, "#{s.cluster.join(",")}_IN", d.label, d.subscript] unless s.cluster.include? d.body
end
ndn <= dep.notin(scc, :body => :pred)
@@ -75,13 +90,44 @@ module Validate
ointerface <= iout.notin(new_dep, :interface => :body)
labeled_path <= (new_dep * iinterface).pairs(:body => :interface) do |d, p|
- [d.body, d.head, [d.body, d.head], [d.label]]
+ [d.body, d.head, [d.body, d.head], [subscripted_label(d.label, d.subscript)]]
end
labeled_path <= (labeled_path * new_dep).pairs(:head => :body) do |p, d|
- [p.body, d.head, p.path + [d.head], p.label + [d.label]]
+ [p.body, d.head, p.path + [d.head], p.label + [subscripted_label(d.label, d.subscript)]]
end
full_path <= (labeled_path * ointerface).lefts(:head => :interface)
+
+ #stdio <~ full_path.inspected
+ end
+
+ bloom :nm_context do
+ group_rollup_tmp <= t_lineage.reduce({}) do |memo, i|
+ if i.func == :group
+ memo[[i.bud_obj, i.rule_id]] ||= []
+ #memo[[i.bud_obj, i.rule_id]] << i.src_col
+ memo[[i.bud_obj, i.rule_id]] = memo[[i.bud_obj, i.rule_id]] | [i.src_col]
+ end
+ memo
+ end
+
+ lin1 <= (t_lineage * t_lineage).pairs(:rule_id => :rule_id, :target_col => :target_col) do |l1, l2|
+ if l2.func == :qual and l1.func != :qual
+ l1
+ end
+ end
+
+ lin <= (lin1 * t_depends).pairs(:rule_id => :rule_id) do |l, d|
+ l if d.nm
+ end
+
+ group_rollup_tmp <= lin.reduce({}) do |memo, i|
+ memo[[i.bud_obj, i.rule_id]] ||= []
+ #memo[[i.bud_obj, i.rule_id]] << i.src_col
+ memo[[i.bud_obj, i.rule_id]] = memo[[i.bud_obj, i.rule_id]] | [i.src_col]
+ memo
+ end
+ group_rollup <= group_rollup_tmp{|g| [g.tag[0], g.tag[1], g.subscript]}
end
def validate
@@ -109,6 +155,14 @@ def validate
return report
end
+ def subscripted_label(label, script)
+ if label == "N"
+ "N_#{script}"
+ else
+ label
+ end
+ end
+
def do_collapse(left, right)
l = left.pop
r = right.shift
@@ -162,7 +216,9 @@ module GuardedAsync
meet_stg <= (dep_tc_type * dep_tc_type).pairs(:head => :head) do |l, r|
ltab = self.tables[l.body.to_sym]
rtab = self.tables[r.body.to_sym]
- if ltab.class == Bud::BudChannel and rtab.class == Bud::BudChannel and l.body != r.body
+ #if ltab.class == Bud::BudChannel and rtab.class == Bud::BudChannel and l.body != r.body
+ # paa, establish ordering
+ if ltab.class == Bud::BudChannel and rtab.class == Bud::BudChannel and l.body < r.body
[l.body, r.body, l.head, l.types, r.types]
end
end
View
287 lib/bud/labeling/labeling.rb.orig
@@ -0,0 +1,287 @@
+require 'rubygems'
+require 'bud'
+
+module Validate
+ state do
+ scratch :dep, [:body, :head, :label]
+ scratch :dep_tc, [:body, :head, :members]
+ scratch :scc, [:pred, :cluster]
+ scratch :scc_raw, scc.schema
+ scratch :new_dep, [:body, :head, :label]
+ scratch :labeled_path, [:body, :head, :path, :label]
+ scratch :full_path, labeled_path.schema
+ scratch :ndn, new_dep.schema
+ scratch :iinterface, t_provides.schema
+ scratch :ointerface, t_provides.schema
+ scratch :iin, t_provides.schema
+ scratch :iout, t_provides.schema
+ end
+
+ bloom do
+ dep <= t_depends do |d|
+ [d.body, d.lhs, labelof(d.op, d.nm)]
+ end
+
+ dep_tc <= dep do |d|
+ [d.body, d.head, Set.new([d.body, d.head])]
+ end
+ dep_tc <= (dep * dep_tc).pairs(:head => :body) do |d, t|
+ [d.body, t.head, t.members | [d.head]]
+ end
+
+ scc_raw <= dep_tc do |d|
+ if d.head == d.body
+ [d.head, d.members.to_a.sort]
+ end
+ end
+
+ scc <= scc_raw.reduce({}) do |memo, i|
+ memo[i.pred] ||= []
+ memo[i.pred] = memo[i.pred] | i.cluster
+ memo
+ end
+
+ new_dep <= (dep * scc * scc).combos do |d, s1, s2|
+ if d.head == s1.pred and d.body == s2.pred
+ ["#{s2.cluster.join(",")}_IN", "#{s1.cluster.join(",")}_OUT", d.label]
+ end
+ end
+ new_dep <= (dep * scc).pairs(:body => :pred) do |d, s|
+ ["#{s.cluster.join(",")}_OUT", d.head, d.label] unless s.cluster.include? d.head
+ end
+ new_dep <= (dep * scc).pairs(:head => :pred) do |d, s|
+ [d.body, "#{s.cluster.join(",")}_IN", d.label] unless s.cluster.include? d.body
+ end
+
+ ndn <= dep.notin(scc, :body => :pred)
+ new_dep <= ndn.notin(scc, :head => :pred)
+ end
+
+ bloom :channel_inputs do
+ temp :dummy_input <= t_provides do |p|
+ if p.input and coll_type(p.interface) == Bud::BudChannel
+ [p.interface]
+ end
+ end
+ dep <= dummy_input{|i| ["#{i.first}_INPUT", i.first, "A"]}
+ dep <= dummy_input{|i| ["#{i.first}_INPUT", i.first, "A"]}
+ t_provides <= dummy_input{|i| ["#{i.first}_INPUT", true]}
+ end
+
+ bloom :full_paths do
+ iin <= t_provides{|p| p if p.input}
+ iout <= t_provides{|p| p if !p.input}
+ iinterface <= iin.notin(new_dep, :interface => :head)
+ ointerface <= iout.notin(new_dep, :interface => :body)
+
+ labeled_path <= (new_dep * iinterface).pairs(:body => :interface) do |d, p|
+ [d.body, d.head, [d.body, d.head], [d.label]]
+ end
+ labeled_path <= (labeled_path * new_dep).pairs(:head => :body) do |p, d|
+ [p.body, d.head, p.path + [d.head], p.label + [d.label]]
+ end
+
+ full_path <= (labeled_path * ointerface).lefts(:head => :interface)
+ end
+
+ def validate
+ dp = Set.new
+ divergent_preds.each do |p|
+ dp.add(p.coll)
+ end
+ report = []
+ full_path.to_a.each do |p|
+ state = ["Bot"]
+ start_a = -1
+ p.label.each_with_index do |lbl, i|
+ if lbl == "A"
+ start_a = i + 1
+ end
+ os = state.first
+ state = do_collapse(state, [lbl])
+ end
+ if dp.include? p.head
+ report << (p.to_a + [:unguarded, ["D"]])
+ else
+ report << (p.to_a + [:path, state])
+ end
+ end
+ return report
+ end
+
+ def do_collapse(left, right)
+ l = left.pop
+ r = right.shift
+ left + collapse(l, r) + right
+ end
+
+ def labelof(op, nm)
+ if op == "<~"
+ "A"
+ elsif nm
+ "N"
+ else
+ "Bot"
+ end
+ end
+
+ def collapse(left, right)
+ return [right] if left == 'Bot'
+ return [left] if right == 'Bot'
+ return [left] if left == right
+ return ['D'] if left == 'D' or right == 'D'
+ # CALM
+ return ['D'] if left == 'A' and right =~ /N/
+ # sometimes we cannot reduce
+ return [left, right]
+ end
+end
+
+
+module GuardedAsync
+ include Validate
+ state do
+ scratch :meet, [:chan, :partner, :at, :lpath, :rpath]
+ scratch :meet_stg, meet.schema
+ scratch :channel_race, [:chan, :partner, :to, :guarded]
+ scratch :dep_tc_type, [:body, :head, :types]
+ scratch :divergent_preds, [:coll]
+ end
+
+ bloom do
+ dep_tc_type <= dep do |d|
+ btab = coll_type(d.body)
+ htab = coll_type(d.head)
+ [d.body, d.head, Set.new([btab, htab])]
+ end
+ dep_tc_type <= (dep * dep_tc_type).pairs(:head => :body) do |d, t|
+ htab = coll_type(d.head)
+ [d.body, t.head, t.types | [htab]]
+ end
+
+ meet_stg <= (dep_tc_type * dep_tc_type).pairs(:head => :head) do |l, r|
+ ltab = self.tables[l.body.to_sym]
+ rtab = self.tables[r.body.to_sym]
+ if ltab.class == Bud::BudChannel and rtab.class == Bud::BudChannel and l.body != r.body
+ [l.body, r.body, l.head, l.types, r.types]
+ end
+ end
+
+ meet <= meet_stg.notin(dep_tc_type, :chan => :body, :partner => :head)
+ channel_race <= meet{|m| [m.chan, m.partner, m.at, guarded(m.lpath, m.rpath)]}
+ divergent_preds <= channel_race{|r| [r.to] unless r.guarded}
+ divergent_preds <= (channel_race * dep_tc_type).pairs(:to => :body){|r, t| [t.head] unless r.guarded}
+ end
+
+ def coll_type(nm)
+ tab = self.tables[nm.to_sym]
+ if tab.nil?
+ tab = self.lattices[nm.to_sym]
+ end
+ tab.class
+ end
+
+ def guarded(lpath, rpath)
+ if lpath.include? Bud::BudTable or lpath.include? Bud::LatticeWrapper
+ if rpath.include? Bud::BudTable or rpath.include? Bud::LatticeWrapper
+ return true
+ end
+ end
+ false
+ end
+end
+
+require 'bud/labeling/bloomgraph'
+require 'bud/labeling/budplot_style'
+
+module MetaMods
+ include Validate
+ include GuardedAsync
+ include BloomGraph
+ include PDG
+end
+
+class Label
+ attr_reader :f
+ def initialize(mod)
+ @report = nil
+ @mod = Object.const_get(mod)
+ if @mod.class == Class
+ nc = new_class_from_class(@mod)
+ elsif @mod.class == Module
+ nc = new_class(@mod)
+ else
+ raise "#{mod} neither class nor module"
+ end
+ @f = nc.new
+ @f.tick
+ end
+
+ def validate
+ @report = @f.validate if @report.nil?
+ end
+
+ def output_report
+ validate
+ rep = {}
+ @report.each do |from, to, path, labels, reason, final|
+ rep[to] ||= "Bot"
+ rep[to] = disjunction(rep[to], final.last)
+ end
+ rep
+ end
+
+ def path_report
+ validate
+ zips = {}
+ @report.each do |from, to, path, labels, reason, final|
+ zips[to] ||= {}
+ zips[to][from] ||= "Bot"
+ zips[to][from] = disjunction(zips[to][from], final.last)
+ end
+ zips
+ end
+
+ def disjunction(l, r)
+ both = [l, r]
+ if both.include? "D"
+ "D"
+ elsif both.include? "N"
+ if both.include? "A"
+ return "D"
+ else
+ return "N"
+ end
+ elsif both.include? "A"
+ return "A"
+ else
+ return "Bot"
+ end
+ end
+
+ def new_class(mod)
+ Class.new do
+ include Bud
+ include MetaMods
+ include mod
+ end
+ end
+
+ def new_class_from_class(cls)
+ Class.new(cls) do
+ include MetaMods
+ end
+ end
+
+ def internal_tabs
+ cls = Class.new do
+ include Bud
+ include MetaMods
+ end
+ cls.new.tables.keys
+ end
+
+ def write_graph(fmt=:pdf)
+ f.finish(internal_tabs, "#{@mod.to_s}.#{fmt}", fmt)
+ end
+end
View
364 lib/bud/rewrite.rb
@@ -2,8 +2,358 @@
require 'ruby2ruby'
require 'set'
+
+class Lineage
+ def initialize(bud_instance, lhs, rid)
+ @froms = []
+ @grouping_cols = []
+ @quals = {}
+ @cquals = {}
+ @proj = nil
+ @bud_instance = bud_instance
+ @lhs = lhs
+ @rule_indx = rid
+ @has_body = false
+ end
+
+ def startup(ast)
+ @orig_ast = ast
+ if ast.first == :call and [:-@, :~, :+@].include? ast[2]
+ startup(ast[1])
+ else
+ process_rhs(ast)
+ end
+ end
+
+ def process_rhs(ast)
+ if ast.first == :call
+ fromlist(ast)
+ else
+ process_body(ast)
+ end
+ end
+
+ def process_body(ast)
+ @has_body = true
+ # there is a body
+ _, from, binds, ret = ast
+ if from.nil?
+ # bizarre tc_collections legacy; foo <= true
+ return
+ end
+ fromlist(from)
+ unless binds.nil?
+ @binds = get_bindings(binds)
+ exprs = get_expr(ret)
+ @rets = clean_exprs(exprs)
+ else
+ @binds = []
+ @rets = []
+ end
+ end
+
+ def clean_exprs(exprs)
+ bmap = make_bmap
+ rets = []
+ exprs.each do |r|
+ if r.class == Symbol
+ rets << bmap[r]
+ else
+ real_r = r.map do |m|
+ rh = {}
+ if m.class == Hash and !m[:bind].nil?
+ m[:tab] = bmap[m[:bind]]
+ end
+ m
+ end
+ rets << real_r
+ end
+ end
+ rets
+ end
+
+ def make_bmap
+ bmap = {}
+ if @binds.class == Symbol
+ bmap[@binds] = @froms.first
+ else
+ @binds.each_with_index do |b, i|
+ bmap[b] = @froms[i]
+ end
+ end
+ bmap
+ end
+
+ def schemaof(tab, second)
+ # unprincipled. return the schema of tab, unless tab is a temp,
+ # in which case return the schema of second. punt if second is a temp.
+ tobj = @bud_instance.tables[tab.to_sym]
+ if tobj.nil?
+ []
+ elsif tobj.class == Bud::BudTemp
+ sobj = @bud_instance.tables[second.to_sym]
+ if sobj.nil? or sobj.class == Bud::BudTemp
+ # no schema, or christknows
+ []
+ else
+ (sobj.key_cols + sobj.val_cols)
+ end
+ else
+ (tobj.key_cols + tobj.val_cols)
+ end
+ end
+
+ def lhs_schema
+ lhs = @bud_instance.tables[@lhs.to_sym]
+ # this is a huge hack to get around the problem that temp tables
+ # create. they have no schemas!
+ if lhs.class == Bud::BudTemp
+ # arg. well, then its schema is uniquely determined by a single body subgoal, no?
+ if @froms.length == 1 or @proj == :lefts
+ s = @froms.first
+ elsif @proj == :rights
+ s = @froms.last
+ else
+ #raise "hm, what is that temp's schema?? #{@lhs}"
+ #puts "hm, what is that temp's schema?? #{@lhs}"
+ return []
+ end
+ stab = @bud_instance.tables[s.to_sym]
+ if stab.nil? or stab.class == Bud::BudTemp
+ []
+ else
+ (stab.key_cols + stab.val_cols)
+ end
+ else
+ (lhs.key_cols + lhs.val_cols)
+ end
+ end
+
+ def fillin(tab)
+ rets = []
+ rightcol = schemaof(tab, @lhs)
+ schemaof(@lhs, tab).each_with_index do |c, i|
+ if @grouping_cols.length == 0
+ rightcol = schemaof(tab, @lhs)
+ rets <<[@bud_instance, @rule_indx, c, tab, rightcol[i], nil]
+ else
+ break if i == @grouping_cols.length
+ rets << [@bud_instance, @rule_indx, c, tab, @grouping_cols[i], :group]
+ end
+ end
+ rets
+ end
+
+ def records
+ # get the lhs schema out of the way right away.
+ ls = lhs_schema
+ return nil if @froms.length == 0
+
+ rets = []
+ if !@has_body
+ # if a rule has no body, a unique other collection should define
+ unless @froms.length == 1 or (@proj == :lefts or @proj == :rights)
+ ###raise "how can you have no projection and have more than 1 table? #{@orig_ast}"
+ #puts "how can you have no projection and have more than 1 table? #{@orig_ast}"
+ #return
+ end
+
+ tab = @froms.first
+ rets = fillin(tab)
+ else
+ @rets.each do |ret|
+ if ret.class == Symbol
+ rets = fillin(ret)
+ else
+ ret.each_with_index do |r, i|
+ if r.has_key? :pos
+ sname = schemaof(r[:tab], @lhs)[r[:pos]]
+ rets << [@bud_instance, @rule_indx, ls[i], r[:tab], sname, nil]
+ elsif r.has_key? :key
+ rets << [@bud_instance, @rule_indx, ls[i], r[:tab], r[:key], nil]
+ elsif r.has_key? :func
+
+ end
+ end
+ end
+ end
+ end
+
+ rets.each do |r|
+ yield r
+ bi, rid, lhs, tn, cn, f = r
+ if @cquals[[tn, cn]]
+ t, c = @cquals[[tn, cn]]
+ yield [bi, rid, lhs, t, c, :qual]
+ end
+ end
+
+ end
+
+ def to_s
+ "FROMLIST: #{@froms.inspect}, quals #{@quals.inspect}, binds #{@binds.inspect}, proj #{@proj.inspect}, rets #{@rets.inspect}"
+ end
+
+ def process_proj(ast)
+ if ast.nil?
+ {}
+ elsif ast.first == :lit
+ {:lit => ast[1]}
+ elsif ast.first == :call
+ if ast[1].nil?
+ # looks like a self.function call
+ {:func => ast[2], :args => process_proj(ast[3][1])}
+ elsif ast[1].first == :lvar
+ # common case of a call on a variable.
+ b = ast[1][1]
+ if ast[2] == :[]
+ indx = ast[3][1][1]
+ {:bind => b, :pos => indx}
+ else
+ {:bind => b, :key => ast[2]}
+ end
+ else
+ #puts "UM I DUNNO #{ast.inspect}"
+ {:func => ast}
+ end
+ else
+ #puts "OTHER AST #{ast}"
+ {:blob => ast[1]}
+ end
+ end
+
+ def get_expr(ast)
+ # the goal is to return all possible return expressions of a rhs code block.
+ # we special-case some common expression types.
+ if ast.nil?
+ []
+ elsif ast.first == :lvar
+ [ast[1]]
+ elsif ast.first == :array
+ [ast[1..-1].map{|a| process_proj(a)}]
+ elsif ast.first == :if or ast.first == :unless
+ get_expr(ast[2]) + get_expr(ast[3])
+ elsif ast.first == :block
+ ret = []
+ ast[1..-1].each do |node|
+ ret = ret + get_expr(node)
+ end
+ ret
+ else
+ ##puts "don't grok expression #{ast}"
+ []
+ end
+ end
+
+ def get_bindings(ast)
+ if ast.first == :lasgn
+ ast[1]
+ elsif ast.first == :masgn and ast[1].first == :array
+ ast[1][1..-1].map{|a| get_bindings(a)}
+ else
+ #puts "HUH bindings #{ast}"
+ []
+ end
+ end
+
+ def get_quals(ast)
+ if ast.first == :arglist and !ast[1].nil? and ast[1].first == :hash
+ @quals = Hash[*ast[1][1..-1].map{|a| a[1]}]
+ end
+ end
+
+ def tab_handle(ast)
+ ast
+ end
+
+ def get_grouping_col(ast)
+ if ast.first == :lit
+ ast[1]
+ end
+ end
+
+ def get_group(ast)
+ if ast.first == :arglist and ast[1].first == :array
+ @grouping_cols = ast[1][1..-1].map{|g| get_grouping_col(g)}
+ end
+ end
+
+ def handle_notin(ast)
+ if ast.first == :arglist
+ @froms << ast[1][2]
+ if ast[2] and ast[2].first == :hash
+ @quals = Hash[*ast[2][1..-1].map{|a| a[1]}]
+ end
+
+ end
+ end
+
+ def fromlist(ast)
+ #pp ast
+ if ast[1].nil?
+ @froms << tab_handle(ast[2])
+ else
+ froms = ast[1]
+ proj = ast[2]
+ if proj == :*
+ # there is deeper still to go.
+ fromlist(froms)
+ @froms << tab_handle(ast[3][1][2])
+ elsif [:group, :argagg].include? proj
+ fromlist(froms)
+ get_group(ast[3])
+ elsif proj == :notin
+ # special syntax here.
+ @froms << tab_handle(ast[1][2])
+ handle_notin(ast[3])
+ elsif [:lefts, :rights, :matches, :pairs, :combos, :outer].include? proj
+ @proj = proj
+ fromlist(froms)
+ get_quals(ast[3])
+ else
+ # let's assume that this is a dot-notation situation.
+ if proj == :inspected
+ @froms << froms[2]
+ else
+ @froms << "#{froms[2]}.#{proj}".to_sym
+ end
+
+ end
+ end
+
+ # canonicalize those dang quals. we need them because they tell us about equivalence constraints
+ # between lineages we know something about and lineages about which we know less.
+ canonicalize_quals
+ end
+
+ def tab_assoc(col, starting=0)
+ @froms[starting..-1].each_with_index do |tn, i|
+ if schemaof(tn, tn).include? col
+ return [tn, i]
+ end
+ end
+ return [nil, nil]
+ end
+
+ def canonicalize_quals
+ # XXX total hack.
+ # need to review the bud code to see how this works in practice but for now...
+ @quals.each_pair do |k, v|
+ # assume k is associated with the first table with a column k... and v with whatever table after
+ # that has a column v
+ l,i = tab_assoc(k)
+ unless i.nil?
+ r,_ = tab_assoc(v, i + 1)
+ @cquals[[l, k]] = [r, v]
+ @cquals[[r, v]] = [l, k]
+ end
+ end
+ end
+end
+
+
class RuleRewriter < Ruby2Ruby # :nodoc: all
- attr_accessor :rule_indx, :rules, :depends
+ attr_accessor :rule_indx, :rules, :depends, :lineage
OP_LIST = Set.new([:<<, :<, :<=])
TEMP_OP_LIST = Set.new([:-@, :~, :+@])
@@ -22,6 +372,7 @@ def initialize(seed, bud_instance)
@collect = false
@rules = []
@depends = []
+ @lineage = []
@iter_stack = []
@refs_in_body = Set.new
super()
@@ -262,6 +613,17 @@ def do_rule(exp)
ufr = UnsafeFuncRewriter.new
rhs_ast = ufr.process(rhs_ast)
+ #info = call_info(rhs_ast)
+ begin
+ l = Lineage.new(@bud_instance, lhs, @rule_indx)
+ l.startup(rhs_ast)
+
+ l.records do |rec|
+ @lineage << rec
+ end
+ rescue
+ ###puts "something went wrong with lineage: #{$!}"
+ end
if @bud_instance.options[:no_attr_rewrite]
rhs = collect_rhs(rhs_ast)
rhs_pos = rhs
View
24 lib/bud/storage/zookeeper.rb
@@ -13,7 +13,8 @@ def initialize(name, zk_path, zk_addr, bud_instance)
end
# schema = {[:key] => [:val]}
- super(name, bud_instance, nil)
+ #super(name, bud_instance, nil)
+ super(name, bud_instance, {[:key] => [:val, :notes]})
zk_path = zk_path.chomp("/") unless zk_path == "/"
@zk = Zookeeper.new(zk_addr)
@@ -31,8 +32,8 @@ def initialize(name, zk_path, zk_addr, bud_instance)
# EM startup to start watching for Zk events.
def start_watchers
# NB: Watcher callbacks are invoked in a separate Ruby thread.
- @child_watcher = Zookeeper::WatcherCallback.new { get_and_watch }
- @stat_watcher = Zookeeper::WatcherCallback.new { stat_and_watch }
+ @child_watcher = Zookeeper::Callbacks::WatcherCallback.new { get_and_watch }
+ @stat_watcher = Zookeeper::Callbacks::WatcherCallback.new { stat_and_watch }
stat_and_watch
end
@@ -56,14 +57,20 @@ def stat_and_watch
def cancel_child_watch
if @child_watch_id
- @zk.unregister_watcher(@child_watch_id)
+ #puts "zk is #{@zk.class}"
+ #@zk.methods.sort.each{|m| puts "METH: #{m}"}
+ # not sure what to do about this.
+ #@zk.unregister_watcher(@child_watch_id)
+
+
@child_watch_id = nil
end
end
def cancel_stat_watch
if @stat_watch_id
- @zk.unregister_watcher(@stat_watch_id)
+ # again, problematic
+ #@zk.unregister_watcher(@stat_watch_id)
@stat_with_id = nil
end
end
@@ -128,8 +135,11 @@ def flush
ephemeral = false
sequence = false
- if t.length > 2
- opts = t.last.first
+ #if t.length > 2
+ unless t.notes.nil?
+ puts "t was #{t}"
+ #opts = t.last.first
+ opts = t.notes
if opts[:ephemeral] == true
ephemeral = true
end
View
93 test/tc_meta.rb
@@ -4,6 +4,7 @@
require 'bud/depanalysis'
require 'bud/meta_algebra'
+
include VizUtil
class LocalShortestPaths
@@ -148,6 +149,62 @@ class InterfaceAlternate
end
end
+module LHelper
+ state do
+ scratch :rlin, [:rhs, :rcol, :lhs, :lcol, :tag]
+ end
+ bloom do
+ rlin <= (t_rules * t_lineage).pairs(:rule_id => :rule_id, :bud_obj => :bud_obj) do |r, l|
+ [l.src_tab, l.src_col, r.lhs.to_sym, l.target_col, l.func]
+ end
+ end
+end
+
+
+module LineageTestM
+ state do
+ scratch :t1, [:a, :b, :c]
+ scratch :t2, [:d, :e, :f]
+ scratch :t3, [:g, :h, :i]
+ scratch :t4, t3.schema
+ scratch :t5, t3.schema
+ scratch :t6, t1.schema
+ scratch :t7, t3.schema
+ scratch :t8, t3.schema
+ end
+
+ bloom do
+ t2 <= t1
+ t3 <= (t2 * t1).pairs{|t, w| t if true}
+ t4 <= t3{|t| [t[0], t[1], false]}
+ t7 <= (t5 * t6).lefts(:g => :a)
+ t8 <= t5.notin(t6, :g => :a)
+ end
+end
+
+
+class TestLineage
+ include Bud
+ include LHelper
+ include LineageTestM
+end
+
+class TestNestedLineage
+ include Bud
+ include LHelper
+ import LineageTestM => :tm
+
+ state do
+ scratch :nt1, [:x, :yy, :z]
+ scratch :ont1, nt1.schema
+ end
+
+ bloom do
+ tm.t1 <= nt1
+ ont1 <= tm.t8
+ end
+end
+
class TestMeta < MiniTest::Unit::TestCase
def test_paths
program = LocalShortestPaths.new
@@ -375,3 +432,39 @@ def test_theta
end
end
end
+
+class TestLineageMeta < Minitest::Unit::TestCase
+
+ def test_lineage
+ p = TestLineage.new
+ p.tick
+ #p.rlin.each do |l|
+ # puts "RLIN #{l}"
+ #end
+ expected = Set.new([
+ [:t3, :h, :t4, :h, nil],
+ [:t3, :g, :t4, :g, nil],
+ [:t2, :d, :t3, :g, nil],
+ [:t2, :e, :t3, :h, nil],
+ [:t1, :c, :t2, :f, nil],
+ [:t5, :g, :t7, :g, nil],
+ [:t5, :h, :t7, :h, nil],
+ [:t6, :a, :t7, :g, :qual],
+ [:t5, :h, :t8, :h, nil],
+ [:t5, :g, :t8, :g, nil],
+ [:t6, :a, :t8, :g, :qual]
+ ])
+ s = Set.new(p.rlin.map{|l| l.to_a})
+ assert((expected.subset? s), "subset mismatch #{expected.inspect} and #{s.inspect}")
+ puts "s #{s.inspect}"
+ end
+
+ def test_nested_lineage
+ p = TestNestedLineage.new
+ p.tick
+
+ #p.rlin.each{|l| puts "RLIN #{l}"}
+ #p.t_lineage.each{|l| puts "TLIN #{l}"}
+ #p.t_rules.each{|l| puts "RUL #{l}"}
+ end
+end
View
2 test/tc_zookeeper.rb
@@ -43,7 +43,7 @@ class ZkMirror
end
end
-class TestZk < Test::Unit::TestCase
+class TestZk < MiniTest::Unit::TestCase
def setup
zk_delete(ZK_ROOT)
end

0 comments on commit 9ebb646

Please sign in to comment.
Something went wrong with that request. Please try again.