Skip to content
This repository
Browse code

[delivery] Delivery protocol and implementations

Summary:
The delivery modules from the old sandbox.
  • Loading branch information...
commit aa6f4cf9af1cdcdbd3fa8a4d6137cdcc95897567 1 parent 8975b71
Seshadri Mahalingam authored
56  new_sandbox/delivery/dastardly.rb
... ...
@@ -0,0 +1,56 @@
  1
+require 'delivery/delivery'
  2
+
  3
+#randomly reorders messages, but reports success upon first send
  4
+#may delay messages up until max_delay timesteps if no other messages are sent
  5
+
  6
+module DastardlyDeliveryControl
  7
+  state do
  8
+    interface input, :set_max_delay, [] => [:delay]
  9
+  end
  10
+end
  11
+
  12
+module DastardlyDelivery
  13
+  include DeliveryProtocol
  14
+  include DastardlyDeliveryControl
  15
+
  16
+  state do
  17
+    table :max_delay, [] => [:delay]
  18
+    table :buf, [:msg, :whenbuf]
  19
+
  20
+    channel :pipe_chan, [:@dst, :src, :ident] => [:payload]
  21
+
  22
+    periodic :clock, 1
  23
+  end
  24
+
  25
+  bootstrap do
  26
+    max_delay <= [[5]]
  27
+  end
  28
+
  29
+  bloom :control do
  30
+    max_delay <+- set_max_delay
  31
+  end
  32
+
  33
+  bloom :queue do
  34
+    buf <+ pipe_in { |m| [m, @budtime] }
  35
+  end
  36
+
  37
+  bloom :done do
  38
+    # Report success immediately
  39
+    pipe_sent <= pipe_in
  40
+  end
  41
+
  42
+  bloom :snd do
  43
+    temp :do_send <= (buf.argagg(:choose_rand, [], :whenbuf)*max_delay).pairs do |b,d|
  44
+      if (buf.length != 1) || (@budtime - b.whenbuf >= d.delay)
  45
+        b
  46
+      end
  47
+    end
  48
+
  49
+    buf <- do_send
  50
+    pipe_chan <~ do_send { |s| s.msg }
  51
+  end
  52
+
  53
+  bloom :rcv do
  54
+    pipe_out <= pipe_chan
  55
+  end
  56
+end
36  new_sandbox/delivery/delivery.rb
... ...
@@ -0,0 +1,36 @@
  1
+module DeliveryProtocol
  2
+  state do
  3
+    # At the sender side, used to request that a new message be delivered. The
  4
+    # recipient address is given by the "dst" field.
  5
+    interface input, :pipe_in, [:dst, :src, :ident] => [:payload]
  6
+
  7
+    # At the sender side, the transport protocol will insert a corresponding
  8
+    # "pipe_sent" fact when a message has been delivered.
  9
+    interface output, :pipe_sent, [:dst, :src, :ident] => [:payload]
  10
+
  11
+    # At the recipient side, this indicates that a new message has been delivered.
  12
+    interface output, :pipe_out, [:dst, :src, :ident] => [:payload]
  13
+  end
  14
+end
  15
+
  16
+module BestEffortDelivery
  17
+  include DeliveryProtocol
  18
+
  19
+  state do
  20
+    channel :pipe_chan, [:@dst, :src, :ident] => [:payload]
  21
+  end
  22
+
  23
+  bloom :snd do
  24
+    pipe_chan <~ pipe_in
  25
+  end
  26
+
  27
+  bloom :rcv do
  28
+    pipe_out <= pipe_chan
  29
+  end
  30
+
  31
+  bloom :done do
  32
+    # Report success immediately -- this implementation of "best effort" is more
  33
+    # like "an effort".
  34
+    pipe_sent <= pipe_in
  35
+  end
  36
+end
44  new_sandbox/delivery/demonic.rb
... ...
@@ -0,0 +1,44 @@
  1
+require 'delivery/delivery'
  2
+
  3
+#intentionally drops messages, but reports success
  4
+
  5
+module DemonicDeliveryControl
  6
+  state do
  7
+    #percentage chance of message loss, 0 to 100
  8
+    interface input, :set_drop_pct, [] => [:pct]
  9
+  end
  10
+end
  11
+
  12
+module DemonicDelivery
  13
+  include DeliveryProtocol
  14
+  include DemonicDeliveryControl
  15
+
  16
+  state do
  17
+    table :drop_pct, [] => [:pct]
  18
+    channel :pipe_chan, [:@dst, :src, :ident] => [:payload]
  19
+  end
  20
+
  21
+  bootstrap do
  22
+    drop_pct <= [[50]]
  23
+  end
  24
+
  25
+  bloom :control do
  26
+    drop_pct <+- set_drop_pct
  27
+  end
  28
+
  29
+  bloom :snd do
  30
+    pipe_chan <~ (pipe_in * drop_pct).pairs do |i, p|
  31
+      if p.pct <= rand(100)
  32
+        i
  33
+      end
  34
+    end
  35
+  end
  36
+
  37
+  bloom :rcv do
  38
+    pipe_out <= pipe_chan
  39
+  end
  40
+
  41
+  bloom :done do
  42
+    pipe_sent <= pipe_in
  43
+  end
  44
+end
32  new_sandbox/delivery/reliable.rb
... ...
@@ -0,0 +1,32 @@
  1
+require 'delivery/delivery'
  2
+
  3
+# Note that this provides at-least-once delivery. If you need exactly-once, the
  4
+# receiver-side can record the message IDs that have been received to avoid
  5
+# processing duplicate messages.
  6
+module ReliableDelivery
  7
+  include DeliveryProtocol
  8
+  import BestEffortDelivery => :bed
  9
+
  10
+  state do
  11
+    table :buf, pipe_in.schema
  12
+    channel :ack, [:@src, :dst, :ident]
  13
+    periodic :clock, 2
  14
+  end
  15
+
  16
+  bloom :remember do
  17
+    buf <= pipe_in
  18
+    bed.pipe_in <= pipe_in
  19
+    bed.pipe_in <= (buf * clock).lefts
  20
+  end
  21
+
  22
+  bloom :rcv do
  23
+    pipe_out <= bed.pipe_out
  24
+    ack <~ bed.pipe_out {|p| [p.src, p.dst, p.ident]}
  25
+  end
  26
+
  27
+  bloom :done do
  28
+    temp :msg_acked <= (buf * ack).lefts(:ident => :ident)
  29
+    pipe_sent <= msg_acked
  30
+    buf <- msg_acked
  31
+  end
  32
+end

0 notes on commit aa6f4cf

Please sign in to comment.
Something went wrong with that request. Please try again.