diff --git a/lib/pregel.rb b/lib/pregel.rb index b6e37a7..7edc9f1 100644 --- a/lib/pregel.rb +++ b/lib/pregel.rb @@ -2,6 +2,32 @@ require 'pregel/worker' require 'pregel/coordinator' -module Pregel - def deliver(to, msg); end -end \ No newline at end of file +require 'singleton' + +class PostOffice + include Singleton + + def initialize + @mailboxes = Hash.new([]) + @mutex = Mutex.new + end + + def deliver(to, msg) + @mutex.synchronize do + if @mailboxes[to] + @mailboxes[to].push msg + else + @mailboxes[to] = [msg] + end + end + end + + def read(box) + @mutex.synchronize do + msgs = @mailboxes[box] + @mailboxes.clear + end + + msgs + end +end diff --git a/lib/pregel/vertex.rb b/lib/pregel/vertex.rb index eb8c6e8..323568b 100644 --- a/lib/pregel/vertex.rb +++ b/lib/pregel/vertex.rb @@ -1,6 +1,6 @@ module Pregel class Vertex - attr_reader :id, :superstep + attr_reader :id attr_accessor :value def initialize(id, value, *outedges) @@ -15,6 +15,15 @@ def edges block_given? ? @outedges.each {|e| yield e} : @outedges end + def deliver_to_all_neighbors(msg) + edges.each {|e| deliver(e, msg)} + end + + def deliver(to, msg) + p [:deliver_to, to, msg] + PostOffice.instance.deliver(to, msg) + end + def step @superstep += 1 compute @@ -24,5 +33,8 @@ def compute; end def halt; @active = false; end def active?; @active; end + def superstep; @superstep; end + def neighbors; @outedges; end + end end diff --git a/spec/coordinator_spec.rb b/spec/coordinator_spec.rb index c6331f7..fc5fbb0 100644 --- a/spec/coordinator_spec.rb +++ b/spec/coordinator_spec.rb @@ -32,6 +32,26 @@ end end + it 'should perform simple PageRank calculation on the graph' do + pending + + class PageRankVertex < Vertex + def compute(msgs) + if superstep >= 1 + sum = msgs.collect(0) {|total,msg| total += msg; total } + @value = (0.15 / 3) + 0.85 * sum + end + + if superstep < 30 + send_to_all_neighbors(@value / neighbors.size) + else + halt + end + end + end + + end + it 'should parition nodes by hashing the node id' it 'should allow scheduling multiple partitions to a single worker' end