Navigation Menu

Skip to content

Commit

Permalink
roll back tcp stuff
Browse files Browse the repository at this point in the history
  • Loading branch information
palvaro committed Oct 30, 2012
1 parent 8676879 commit aac3914
Show file tree
Hide file tree
Showing 5 changed files with 39 additions and 40 deletions.
20 changes: 7 additions & 13 deletions lib/bud.rb
Expand Up @@ -62,7 +62,7 @@
#
# :main: Bud
module Bud
attr_reader :budtime, :inbound, :options, :meta_parser, :viz, :rtracer, :dsock, :connections
attr_reader :budtime, :inbound, :options, :meta_parser, :viz, :rtracer, :dsock
attr_reader :tables, :builtin_tables, :channels, :zk_tables, :dbm_tables, :app_tables, :lattices
attr_reader :push_sources, :push_elems, :push_joins, :scanners, :merge_targets
attr_reader :this_stratum, :this_rule, :rule_orig_src, :done_bootstrap
Expand Down Expand Up @@ -119,7 +119,6 @@ def initialize(options={})
@push_elems = {}
@callbacks = {}
@callback_id = 0
@connections = {}
@shutdown_callbacks = {}
@shutdown_callback_id = 0
@post_shutdown_callbacks = []
Expand Down Expand Up @@ -947,13 +946,9 @@ def do_shutdown(do_shutdown_cb=true)
end
@timers.each {|t| t.cancel}
@tables.each_value {|t| t.close}
@connections.each_value do |c|
c.close_connection
end

if EventMachine::reactor_running? and @bud_started
#@dsock.close_connection
EventMachine::stop_server @server
@dsock.close_connection
end
@bud_started = false
@running_async = false
Expand All @@ -963,12 +958,11 @@ def do_shutdown(do_shutdown_cb=true)
end

def do_start_server
#@dsock = EventMachine::open_datagram_socket(@ip, @options[:port],
# BudServer, self,
# @options[:channel_filter])
@server = EventMachine::start_server(@ip, @options[:port], BudServer, self, @options[:channel_filter])

@port = Socket.unpack_sockaddr_in( EventMachine.get_sockname( @server))[0]
# N.B. we bind to address '' (empty string) to get EM to use INADDR_ANY (so that UDP works)
@dsock = EventMachine::open_datagram_socket('', @options[:port],
BudServer, self,
@options[:channel_filter])
@port = Socket.unpack_sockaddr_in(@dsock.get_sockname)[0]
end

public
Expand Down
11 changes: 1 addition & 10 deletions lib/bud/collections.rb
Expand Up @@ -973,20 +973,11 @@ def flush # :nodoc: all
end
end
wire_str = [qualified_tabname.to_s, wire_tuple, marshall_indexes].to_msgpack
###toplevel.dsock.send_datagram(wire_str, the_locspec[0], the_locspec[1])
establish_connection(the_locspec) if toplevel.connections[the_locspec].nil? || toplevel.connections[the_locspec].error?
#toplevel.connections[the_locspec].send_data [@tabname, t].to_msgpack
toplevel.connections[the_locspec].send_data wire_str
toplevel.dsock.send_datagram(wire_str, the_locspec[0], the_locspec[1])
end
@pending.clear
end

def establish_connection(l)
toplevel = @bud_instance.toplevel
toplevel.connections[l] = EventMachine::connect l[0], l[1], BudServer, @bud_instance, toplevel.options[:channel_filter]
toplevel.connections.delete(l) if toplevel.connections[l].error?
end

public
# project to the non-address fields
def payloads
Expand Down
16 changes: 1 addition & 15 deletions lib/bud/server.rb
Expand Up @@ -9,20 +9,6 @@ def initialize(bud, channel_filter)
super
end

def post_init
pname = get_peername
if pname
@port, @ip = Socket.unpack_sockaddr_in(pname)
# puts "-- server inbound connection from #{@ip}:#{@port}"
else
@port, @ip = Socket.unpack_sockaddr_in(get_sockname)
# puts "-- server connection to #{@ip}:#{@port}"
end
@bud.connections[[@ip, @port]] = self
rescue
puts "An error occurred post_init on BudServer: #{$!}"
end

def receive_data(data)
# Feed the received data to the deserializer
@pac.feed data
Expand Down Expand Up @@ -72,7 +58,7 @@ def message_received(obj)
unless (obj.class <= Array and obj.length == 3 and
@bud.tables.include?(obj[0].to_sym) and
obj[1].class <= Array and obj[2].class <= Array)
raise Bud::Error, "bad inbound message of class #{obj.class}: #{obj.inspect} #{obj.length} tab #{obj[0]} #{obj[1].class}, #{obj[2].class}"
raise Bud::Error, "bad inbound message of class #{obj.class}: #{obj.inspect}"
end

# Deserialize any nested marshalled values
Expand Down
28 changes: 28 additions & 0 deletions lib/bud/state.rb.rej
@@ -0,0 +1,28 @@
***************
*** 155,160 ****
@channels[name] = @tables[name]
end

# Define methods to implement the state declarations for every registered kind
# of lattice.
def load_lattice_defs
--- 155,173 ----
@channels[name] = @tables[name]
end

+ # an alternative approach to declaring interfaces
+ def interfaces(direction, collections)
+ mode = case direction
+ when :input then true
+ when :output then false
+ else
+ raise "unrecognized interface type #{direction}"
+ end
+ collections.each do |tab|
+ t_provides << [tab.to_s, mode]
+ end
+ end
+
# Define methods to implement the state declarations for every registered kind
# of lattice.
def load_lattice_defs
4 changes: 2 additions & 2 deletions test/ts_bud.rb
Expand Up @@ -13,7 +13,7 @@
require 'tc_dbm'
require 'tc_delta'
require 'tc_errors'
#require 'tc_execmodes' unless $quick_mode
require 'tc_execmodes' unless $quick_mode
require 'tc_exists'
require 'tc_halt'
require 'tc_inheritance'
Expand All @@ -32,6 +32,6 @@
require 'tc_schemafree'
require 'tc_sort'
require 'tc_temp'
#require 'tc_terminal'
require 'tc_terminal'
require 'tc_timer'
require 'tc_wc'

0 comments on commit aac3914

Please sign in to comment.