Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

The C extension works correctly now.

  • Loading branch information...
commit 685e9c95fc4691795f565e742fcdad9a77738668 1 parent fb41a9d
Jeff Rose authored
View
60 ext/event_queue/event_queue.c
@@ -5,9 +5,10 @@
#include "fib.h"
typedef struct Event {
- VALUE event_id;
- VALUE dest_id;
+ VALUE method;
+ VALUE receiver;
VALUE data;
+ VALUE id;
long time;
} Event;
@@ -27,6 +28,11 @@ int compare_events(void *x, void *y)
static struct fibheap *event_queue = NULL;
static ID time_id = Qnil;
static ID send_id = Qnil;
+static ID data_hash_id = Qnil;
+static ID entities_id = Qnil;
+static ID object_id = Qnil;
+VALUE rb_gosim;
+VALUE rb_simulation;
static VALUE reset_event_queue(
VALUE self)
@@ -43,20 +49,38 @@ static VALUE reset_event_queue(
static VALUE schedule_event(
VALUE self,
VALUE event_id,
- VALUE dest_id,
+ VALUE dest_sid,
VALUE time,
VALUE data)
{
Event *event;
+ VALUE entities = rb_ivar_get(self, entities_id);
+ VALUE receiver = rb_hash_aref(entities, dest_sid);
+ VALUE data_hash = rb_cvar_get(rb_simulation, data_hash_id);
+
+ if(NIL_P(receiver) || !rb_respond_to(receiver, SYM2ID(event_id)))
+ {
+ rb_raise(rb_eRuntimeError,
+ "Cannot schedule %s.%s with sid=(%d), invalid method name!",
+ rb_class2name(CLASS_OF(receiver)),
+ rb_id2name(SYM2ID(event_id)),
+ FIX2INT(dest_sid));
+ }
+
event = malloc(sizeof(Event));
- event->event_id = event_id;
- event->dest_id = dest_id;
+ event->method = SYM2ID(event_id);
+ event->receiver = receiver;
event->data = data;
+ event->id = rb_funcall(data, object_id, 0);
event->time = NUM2LONG(time) + NUM2LONG(rb_ivar_get(self, time_id));
fh_insert(event_queue, (void *)event);
+ // Store a reference to the data in the hash so it doesn't get garbage
+ // collected.
+ rb_hash_aset(data_hash, event->id, data);
+
return Qnil;
}
@@ -65,7 +89,7 @@ static VALUE run_main_loop(
VALUE end_time)
{
VALUE running = rb_ivar_get(self, rb_intern("@running"));
- VALUE entities = rb_ivar_get(self, rb_intern("@entities"));
+ VALUE data_hash = rb_cvar_get(rb_simulation, data_hash_id);
Event *cur_event = fh_min(event_queue);
long end = NUM2LONG(end_time);
@@ -75,8 +99,18 @@ static VALUE run_main_loop(
cur_event = fh_extractmin(event_queue);
rb_ivar_set(self, time_id, LONG2NUM(cur_event->time));
- rb_funcall(rb_hash_aref(entities, cur_event->dest_id), send_id, 2,
- cur_event->event_id, cur_event->data);
+ /*
+ printf("%ld: %s.%s\n",
+ cur_event->time,
+ rb_class2name(CLASS_OF(cur_event->receiver)),
+ rb_id2name(cur_event->method));
+ */
+
+ //rb_funcall(cur_event->receiver, send_id, 2, cur_event->method, cur_event->data);
+ rb_funcall(cur_event->receiver, cur_event->method, 1, cur_event->data);
+
+ rb_hash_delete(data_hash, cur_event->id);
+ free(cur_event);
cur_event = fh_min(event_queue);
}
@@ -92,16 +126,16 @@ static VALUE queue_size(
void Init_event_queue()
{
- VALUE rb_gosim;
- VALUE rb_simulation;
-
- //rb_gosim = rb_const_get(rb_cObject, rb_intern("GoSim"));
- //rb_simulation = rb_const_get(rb_gosim, rb_intern("Simulation"));
rb_gosim = rb_define_module("GoSim");
rb_simulation = rb_define_class_under(rb_gosim, "Simulation", rb_cObject);
time_id = rb_intern("@time");
send_id = rb_intern("send");
+ data_hash_id = rb_intern("@data_hash");
+ entities_id = rb_intern("@entities");
+ object_id = rb_intern("object_id");
+
+ rb_cvar_set(rb_simulation, data_hash_id, rb_hash_new(), Qfalse);
rb_define_method(rb_simulation, "schedule_event", schedule_event, 4);
rb_define_method(rb_simulation, "reset_event_queue", reset_event_queue, 0);
View
12 lib/gosim.rb
@@ -1,8 +1,12 @@
+$LOAD_PATH.unshift(File.expand_path(File.dirname(__FILE__)))
+
+require 'rational'
require 'logger'
require 'singleton'
require 'observer'
begin
+ #require 'pqueue'
require 'event_queue'
rescue RuntimeError => e
warn "event_queue extension not loaded! Loading ruby pqueue instead."
@@ -18,12 +22,12 @@ module Base
@@log.level = Logger::FATAL
# So that all derived classes have an easy accessor
- def log(*args)
- @@log.debug(*args)
+ def log(*args, &block)
+ @@log.debug(*args, &block)
end
- def error(*args)
- @@log.fatal(*args)
+ def error(*args, &block)
+ @@log.fatal(*args, &block)
end
# Turn down logging all the way (nice for unit tests etc...)
View
40 lib/gosim/network.rb
@@ -5,7 +5,6 @@ module Net
GSNetworkPacket = Struct.new(:id, :src, :dest, :data)
FailedPacket = Struct.new(:dest, :data)
- LivenessPacket = Struct.new(:alive)
ERROR_NODE_FAILURE = 0
@@ -69,10 +68,10 @@ def set_latency(latency_mean = LATENCY_MEAN, latency_dev = LATENCY_DEV)
# Called by simulation when a reset occurs
def update
- log "Resetting Topology..."
+ log {"Resetting Topology..."}
reset
@nodes = {}
- log "Topology now has sid #{sid}"
+ log {"Topology now has sid #{sid}"}
end
def register_node(node)
@@ -123,7 +122,9 @@ def rpc_request(src, dest, method, args)
# Dispatches an RPC request to a specific method, and return a result
# unless the method returns nil.
def handle_rpc_request(request)
+ #puts "top of request"
if @nodes[request.dest].alive?
+ #puts "1 request...#{request.inspect}"
# If there is no response delete the deferred.
# TODO: Maybe we want to signal something to the deferred here also?
@@ -134,6 +135,7 @@ def handle_rpc_request(request)
rand(@mean_latency) + LATENCY_DEV,
RPCResponse.new(request.uid, result))
else
+ #puts "2 request..."
if @rpc_deferreds.has_key?(request.uid)
@rpc_deferreds[request.uid].errback(Failure.new(request))
end
@@ -141,6 +143,7 @@ def handle_rpc_request(request)
end
def handle_rpc_response(response)
+ #puts "response...#{response}"
if @rpc_deferreds.has_key?(response.uid)
@rpc_deferreds[response.uid].callback(response.result)
end
@@ -150,15 +153,18 @@ def handle_rpc_response(response)
class RPCInvalidMethodError < Exception; end
class Peer
- def initialize(local_node, remote_node)
- @local_node = local_node
- @remote_node = remote_node
+ attr_reader :addr
+ def initialize(local_node, remote_addr)
@topo = Topology.instance
- end
- def addr
- @remote_node.addr
+ @local_node = local_node
+ @remote_node = @topo.get_node(remote_addr)
+ if @remote_node
+ @addr = @remote_node.addr
+ else
+ @addr = nil
+ end
end
def method_missing(method, *args)
@@ -171,8 +177,8 @@ def method_missing(method, *args)
class Node < Entity
attr_reader :addr
- def initialize()
- super()
+ def initialize
+ super
@addr = @sid
@topo = Topology.instance
@alive = true
@@ -188,7 +194,7 @@ def alive?
@alive
end
- def alive=(status)
+ def alive(status)
@alive = status
end
@@ -198,20 +204,16 @@ def send_packet(receivers, pkt)
# Override this in your subclass to do custom demuxing.
def recv_packet(pkt)
- log "default recv_packet handler..."
- end
-
- def get_peer(addr)
- Peer.new(self, @topo.get_node(addr))
+ log {"default recv_packet handler..."}
end
# Implement this method to do something specific for your application.
def handle_failed_packet(pkt)
- log "Got a failed packet! (#{pkt.data.class})"
+ log {"Got a failed packet! (#{pkt.data.class})"}
end
def handle_failed_rpc(method, data)
- log "Got a failed rpc call: #{method}(#{data.join(', ')})"
+ log {"Got a failed rpc call: #{method}(#{data.join(', ')})"}
end
end
end # module Net
View
5 lib/gosim/simulation.rb
@@ -131,6 +131,7 @@ def start_client(ip, port = PORT_NUMBER)
def initialize
@trace = Logger.new(STDOUT)
+ #GC.disable
reset
end
@@ -193,8 +194,8 @@ def trace_log(device)
# Schedule a new event by putting it into the event queue
if not method_defined?(:schedule_event)
- def schedule_event(event_id, dest_id, time, data)
- @event_queue.push(Event.new(event_id, dest_id, @time + time, data))
+ def schedule_event(event_id, dest_sid, time, data)
+ @event_queue.push(Event.new(event_id, dest_sid, @time + time, data))
end
end
View
7 test/network_test.rb
@@ -17,7 +17,7 @@ def initialize
end
def add_neighbor(addr)
- @neighbors[addr] = get_peer(addr)
+ @neighbors[addr] = GoSim::Net::Peer.new(self, addr)
end
def start_flood(pkt)
@@ -90,10 +90,7 @@ def test_liveness_and_failure
node_a.add_neighbor(node_b.addr)
10.times {|i| @sim.schedule_event(:handle_packet, node_a.sid, i*1000, Packet.new(i)) }
- @sim.schedule_event(:handle_liveness_packet,
- node_b.sid,
- 5000,
- GoSim::Net::LivenessPacket.new(false))
+ @sim.schedule_event(:alive, node_b.sid, 5000, false)
@sim.run
assert_equal(5, node_a.failed_packets)
View
8 test/simulation_test.rb
@@ -21,6 +21,7 @@ def initialize(neighbor, num_items)
end
def new_item(event)
+ log {"got new #{event.class} event"}
@sim.schedule_event(:new_item, @neighbor, 5, event)
@dataset.log(@sid, event.name)
end
@@ -72,12 +73,13 @@ def test_logging
end
def test_scheduler
+ num_items = 10000
consumer = Consumer.new
- producer = Producer.new(consumer.sid, 10)
- assert_equal(10, @sim.queue_size, "Schedule event not correctly adding to queue.")
+ producer = Producer.new(consumer.sid, num_items)
+ assert_equal(num_items, @sim.queue_size, "Schedule event not correctly adding to queue.")
@sim.run
- assert_equal(10, consumer.received)
+ assert_equal(num_items, consumer.received)
end
def test_single_step
View
40 tools/benchmark.rb
@@ -0,0 +1,40 @@
+$:.unshift(File.dirname(__FILE__) + '/../lib')
+
+require 'rational'
+
+require 'gosim'
+
+require 'rubygems'
+require 'benchmark'
+
+class Benchmarker < GoSim::Entity
+ attr_reader :counter
+
+ def initialize(n)
+ super()
+
+ @counter = 0
+
+ n.times do |t|
+ @sim.schedule_event(:handle_item, @sid, t * 10 + 1, t)
+ end
+ end
+
+ def handle_item(t)
+ @counter += 1
+ end
+end
+
+def run_benchmark(num_events)
+ sim = GoSim::Simulation::instance
+
+ puts "Starting benchmark for #{num_events} events:\n"
+ Benchmark.bm do |stat|
+ b = Benchmarker.new(num_events)
+ stat.report { sim.run }
+ end
+end
+
+num_events = ARGV.first.to_i || 1000000
+
+run_benchmark(num_events)
View
118 tools/rakehelp.rb
@@ -0,0 +1,118 @@
+def make(makedir)
+ Dir.chdir(makedir) do
+ sh(PLATFORM =~ /win32/ ? 'nmake' : 'make')
+ end
+end
+
+
+def extconf(dir)
+ Dir.chdir(dir) do ruby "extconf.rb" end
+end
+
+
+def setup_tests
+ desc "Run the unit tests in test"
+ Rake::TestTask.new(:test) do |t|
+ t.libs << "test"
+ t.test_files = FileList['test/*_test.rb']
+ t.verbose = true
+ end
+end
+
+
+def setup_clean otherfiles
+ files = ['build/*', '**/*.o', '**/*.so', '**/*.a', 'lib/*-*', '**/*.log'] + otherfiles
+ CLEAN.include(files)
+end
+
+
+def setup_rdoc files
+ Rake::RDocTask.new do |rdoc|
+ rdoc.rdoc_dir = 'doc/rdoc'
+ rdoc.options << '--line-numbers'
+ rdoc.rdoc_files.add(files)
+ end
+end
+
+
+def setup_extension(dir, extension)
+ ext = "ext/#{dir}"
+ ext_so = "#{ext}/#{extension}.#{Config::CONFIG['DLEXT']}"
+ ext_files = FileList[
+ "#{ext}/*.c",
+ "#{ext}/*.h",
+ "#{ext}/extconf.rb",
+ "#{ext}/Makefile",
+ "lib"
+ ]
+
+ task "lib" do
+ directory "lib"
+ end
+
+ desc "Builds just the #{extension} extension"
+ task extension.to_sym => ["#{ext}/Makefile", ext_so ]
+
+ file "#{ext}/Makefile" => ["#{ext}/extconf.rb"] do
+ extconf "#{ext}"
+ end
+
+ file ext_so => ext_files do
+ make "#{ext}"
+ cp ext_so, "lib"
+ end
+end
+
+
+def base_gem_spec(pkg_name, pkg_version)
+ rm_rf "test/coverage"
+ pkg_version = pkg_version
+ pkg_name = pkg_name
+ pkg_file_name = "#{pkg_name}-#{pkg_version}"
+ Gem::Specification.new do |s|
+ s.name = pkg_name
+ s.version = pkg_version
+ s.platform = Gem::Platform::RUBY
+ s.has_rdoc = true
+ s.extra_rdoc_files = [ "README" ]
+
+ s.files = %w(COPYING LICENSE README Rakefile) +
+ Dir.glob("{bin,doc/rdoc,test}/**/*") +
+ Dir.glob("ext/**/*.{h,c,rb,rl}") +
+ Dir.glob("{examples,tools,lib}/**/*.rb")
+
+ s.require_path = "lib"
+ s.extensions = FileList["ext/**/extconf.rb"].to_a
+ s.bindir = "bin"
+ end
+end
+
+def setup_gem(pkg_name, pkg_version)
+ spec = base_gem_spec(pkg_name, pkg_version)
+ yield spec if block_given?
+
+ Rake::GemPackageTask.new(spec) do |p|
+ p.gem_spec = spec
+ p.need_tar = true if RUBY_PLATFORM !~ /mswin/
+ end
+end
+
+def sub_project(project, *targets)
+ targets.each do |target|
+ Dir.chdir "projects/#{project}" do
+ sh %{rake --trace #{target.to_s} }
+ end
+ end
+end
+
+# Conditional require rcov/rcovtask if present
+begin
+ require 'rcov/rcovtask'
+
+ Rcov::RcovTask.new do |t|
+ t.test_files = FileList['test/test*.rb']
+ t.rcov_opts << "-x /usr"
+ t.output_dir = "test/coverage"
+ end
+rescue Object
+end
Please sign in to comment.
Something went wrong with that request. Please try again.