Permalink
Browse files

Merge pull request #2 from seshness/leader_election

Leader election
  • Loading branch information...
2 parents 6db7f57 + 5269144 commit 28d6162eca50b661640fb0c3d22e18db584ecca6 @palvaro palvaro committed Dec 23, 2011
@@ -6,51 +6,51 @@
module SequencesProtocol
state do
# increment the counter for an id. If the id does not exist, initialize it.
- # @param [String] ident the unique identifier of a row in a collection of counts
+ # @param [String] ident the unique identifier of a row in a collection of counts
interface input, :increment_count, [:ident]
-
+
# reset the counter for an id. In any implementation, the row identified by ident should either be set to
- # its initial value again, or no longer be in the collection of counters.
- # @param [String] ident the unique identifier of a row in a collection of counts
+ # its initial value again, or no longer be in the collection of counters.
+ # @param [String] ident the unique identifier of a row in a collection of counts
interface input, :clear_ident, [:ident]
-
+
# request the count of an id. If the id is non-existant, then get_count will initialize the given id.
- # @param [String] ident the unique identifier of a row in a collection of counts
+ # @param [String] ident the unique identifier of a row in a collection of counts
interface input, :get_count, [:ident]
-
+
# output the count of an id. A get_count invocation should never result in return_count being empty.
- # @param [String] ident the unique identifier of a row in a collection of counts
- # @param [Number] the count associated with the ident
+ # @param [String] ident the unique identifier of a row in a collection of counts
+ # @param [Number] the count associated with the ident
interface output, :return_count, [:ident]=>[:tally]
end
end
# Counter is a simple implementation of SequencesProtocol
# @see SequencesProtocol implements SequencesProtocol
module Counter
- include SequencesProtocol
-
+ include SequencesProtocol
+
state do
- # used to keep state for all counters in Counter
+ # used to keep state for all counters in Counter
table :total_counts, [:ident] => [:tally]
end
-
+
bloom do
# when first count for an ident comes in, set up new count tuple for ident
total_counts <+ increment_count do |u|
- [u.ident, 1] if not total_counts.exists? do |t|
+ [u.ident, 0] if not total_counts.exists? do |t|
u.ident==t.ident
end
end
- # when get count for nonexistent ident comes in, set up new count tuple for ident
- total_counts <+ get_count do |u|
+ # when get count for nonexistent ident comes in, set up new count tuple for ident
+ total_counts <+ get_count do |u|
[u.ident, 0] if not total_counts.exists? do |t|
u.ident==t.ident
end
end
- return_count <= get_count do |u|
+ return_count <= get_count do |u|
[u.ident, 0] if not total_counts.exists? do |t|
u.ident==t.ident
end
@@ -63,7 +63,7 @@ module Counter
# return count when get request comes in
return_count <= (get_count*total_counts).rights(:ident=>:ident)
-
+
# clear count when clear request comes in
total_counts <- (clear_ident*total_counts).rights(:ident=>:ident)
end
@@ -0,0 +1,56 @@
+require 'delivery/delivery'
+
+#randomly reorders messages, but reports success upon first send
+#may delay messages up until max_delay timesteps if no other messages are sent
+
+module DastardlyDeliveryControl
+ state do
+ interface input, :set_max_delay, [] => [:delay]
+ end
+end
+
+module DastardlyDelivery
+ include DeliveryProtocol
+ include DastardlyDeliveryControl
+
+ state do
+ table :max_delay, [] => [:delay]
+ table :buf, [:msg, :whenbuf]
+
+ channel :pipe_chan, [:@dst, :src, :ident] => [:payload]
+
+ periodic :clock, 1
+ end
+
+ bootstrap do
+ max_delay <= [[5]]
+ end
+
+ bloom :control do
+ max_delay <+- set_max_delay
+ end
+
+ bloom :queue do
+ buf <+ pipe_in { |m| [m, @budtime] }
+ end
+
+ bloom :done do
+ # Report success immediately
+ pipe_sent <= pipe_in
+ end
+
+ bloom :snd do
+ temp :do_send <= (buf.argagg(:choose_rand, [], :whenbuf)*max_delay).pairs do |b,d|
+ if (buf.length != 1) || (@budtime - b.whenbuf >= d.delay)
+ b
+ end
+ end
+
+ buf <- do_send
+ pipe_chan <~ do_send { |s| s.msg }
+ end
+
+ bloom :rcv do
+ pipe_out <= pipe_chan
+ end
+end
@@ -0,0 +1,36 @@
+module DeliveryProtocol
+ state do
+ # At the sender side, used to request that a new message be delivered. The
+ # recipient address is given by the "dst" field.
+ interface input, :pipe_in, [:dst, :src, :ident] => [:payload]
+
+ # At the sender side, the transport protocol will insert a corresponding
+ # "pipe_sent" fact when a message has been delivered.
+ interface output, :pipe_sent, [:dst, :src, :ident] => [:payload]
+
+ # At the recipient side, this indicates that a new message has been delivered.
+ interface output, :pipe_out, [:dst, :src, :ident] => [:payload]
+ end
+end
+
+module BestEffortDelivery
+ include DeliveryProtocol
+
+ state do
+ channel :pipe_chan, [:@dst, :src, :ident] => [:payload]
+ end
+
+ bloom :snd do
+ pipe_chan <~ pipe_in
+ end
+
+ bloom :rcv do
+ pipe_out <= pipe_chan
+ end
+
+ bloom :done do
+ # Report success immediately -- this implementation of "best effort" is more
+ # like "an effort".
+ pipe_sent <= pipe_in
+ end
+end
@@ -0,0 +1,44 @@
+require 'delivery/delivery'
+
+#intentionally drops messages, but reports success
+
+module DemonicDeliveryControl
+ state do
+ #percentage chance of message loss, 0 to 100
+ interface input, :set_drop_pct, [] => [:pct]
+ end
+end
+
+module DemonicDelivery
+ include DeliveryProtocol
+ include DemonicDeliveryControl
+
+ state do
+ table :drop_pct, [] => [:pct]
+ channel :pipe_chan, [:@dst, :src, :ident] => [:payload]
+ end
+
+ bootstrap do
+ drop_pct <= [[50]]
+ end
+
+ bloom :control do
+ drop_pct <+- set_drop_pct
+ end
+
+ bloom :snd do
+ pipe_chan <~ (pipe_in * drop_pct).pairs do |i, p|
+ if p.pct <= rand(100)
+ i
+ end
+ end
+ end
+
+ bloom :rcv do
+ pipe_out <= pipe_chan
+ end
+
+ bloom :done do
+ pipe_sent <= pipe_in
+ end
+end
@@ -0,0 +1,32 @@
+require 'delivery/delivery'
+
+# Note that this provides at-least-once delivery. If you need exactly-once, the
+# receiver-side can record the message IDs that have been received to avoid
+# processing duplicate messages.
+module ReliableDelivery
+ include DeliveryProtocol
+ import BestEffortDelivery => :bed
+
+ state do
+ table :buf, pipe_in.schema
+ channel :ack, [:@src, :dst, :ident]
+ periodic :clock, 2
+ end
+
+ bloom :remember do
+ buf <= pipe_in
+ bed.pipe_in <= pipe_in
+ bed.pipe_in <= (buf * clock).lefts
+ end
+
+ bloom :rcv do
+ pipe_out <= bed.pipe_out
+ ack <~ bed.pipe_out {|p| [p.src, p.dst, p.ident]}
+ end
+
+ bloom :done do
+ temp :msg_acked <= (buf * ack).lefts(:ident => :ident)
+ pipe_sent <= msg_acked
+ buf <- msg_acked
+ end
+end
@@ -0,0 +1,29 @@
+require 'rubygems'
+require 'bud'
+
+module MembershipProtocol
+ state do
+ interface input, :add_member, [:ident] => [:host]
+ interface input, :remove_member, [:ident]
+ interface output, :member, [:ident] => [:host]
+
+ interface output, :added_member, [:ident] => [:host]
+# interface output, :removed_member, [:ident] => [:host]
+ end
+end
+
+module Membership
+ state do
+ table :private_members, [:ident] => [:host]
+ end
+
+ bloom do
+ private_members <= add_member
+ private_members <- (remove_member * private_members).pairs(:ident => :ident)
+ member <= private_members
+ end
+
+ bloom :report_status do
+ added_member <= (add_member * private_members).pairs(:ident => :ident)
+ end
+end
@@ -36,8 +36,8 @@ module Multicast
# a given mcast id.
scratch :acked_count, [:ident] => [:num]
end
-
- bloom :snd_mcast do
+
+ bloom :snd_mcast do
pipe_in <= (mcast_send * member).pairs do |s, m|
[m.host, ip_port, s.ident, s.payload] unless m.host == ip_port
end
@@ -52,7 +52,7 @@ module Multicast
bloom :done_mcast do
acked_count <= pipe_sent.group([:ident], count(:ident))
- unacked_count <+- (acked_count * unacked_count).pairs do |a, u|
+ unacked_count <+- (acked_count * unacked_count).pairs(:ident => :ident) do |a, u|
[a.ident, u.num - a.num]
end
mcast_done <= unacked_count {|u| [u.ident] if u.num == 0}
@@ -63,11 +63,9 @@ module Multicast
module BestEffortMulticast
include BestEffortDelivery
include Multicast
- include StaticMembership
end
module ReliableMulticast
include ReliableDelivery
include Multicast
- include StaticMembership
end
Oops, something went wrong.

0 comments on commit 28d6162

Please sign in to comment.