Skip to content

Commit

Permalink
added final example from "inter-broker routing"; minor code clean-up …
Browse files Browse the repository at this point in the history
…for guide chapter 3
  • Loading branch information
pblasucci committed Dec 1, 2011
1 parent 1250673 commit fd861b6
Show file tree
Hide file tree
Showing 3 changed files with 268 additions and 27 deletions.
41 changes: 21 additions & 20 deletions examples/F#/peering2.fsx
Expand Up @@ -66,6 +66,11 @@ let main args =


let rand = srandom() let rand = srandom()
let fePort,bePort = let port = int self in port + 1,port + 2 let fePort,bePort = let port = int self in port + 1,port + 2
//NOTE: to run this example on Windows, we must use tcp...
// so when we do, assume inputs are port numbers, and we use
// them as the basis for additional (internal to the cluster)
// port numbers on non-windows systems, we can use ipc (as per
// the guide) so in *that* case, inputs are alphanumeric IDs


// prepare our context and sockets // prepare our context and sockets
use ctx = new Context(1) use ctx = new Context(1)
Expand All @@ -88,15 +93,15 @@ let main args =
use localbe = ctx |> route use localbe = ctx |> route
bind localbe (sprintf "tcp://*:%i" bePort) bind localbe (sprintf "tcp://*:%i" bePort)


// get user to tell us when we can start // get user to tell us when we can start...
printf' "Press Enter when all brokers are started: " printf' "Press Enter when all brokers are started: "
scanln() |> ignore scanln() |> ignore


// start local workers // start local workers
for _ in 1 .. NBR_WORKERS do (t_spawnp worker_task bePort) |> ignore for _ in 1 .. NBR_WORKERS do ignore (t_spawnp worker_task bePort)


// start local clients // start local clients
for _ in 1 .. NBR_CLIENTS do (t_spawnp client_task fePort) |> ignore for _ in 1 .. NBR_CLIENTS do ignore (t_spawnp client_task fePort)


(* Interesting part (* Interesting part
------------------------------------------------------------- -------------------------------------------------------------
Expand All @@ -107,7 +112,8 @@ let main args =
// queue of available workers // queue of available workers
let workers = Queue<byte array>() let workers = Queue<byte array>()


let msg = ref Array.empty<_> // holds values collected/generated during polling
let msg = ref Array.empty<_>
let reroutable = ref false let reroutable = ref false


let backends = let backends =
Expand Down Expand Up @@ -138,28 +144,23 @@ let main args =
if backends |> poll timeout && (!msg).Length > 0 then if backends |> poll timeout && (!msg).Length > 0 then
let address = (!msg).[0] |> decode let address = (!msg).[0] |> decode
// route reply to cloud if it's addressed to a broker // route reply to cloud if it's addressed to a broker
if peers |> Array.exists (fun peer -> address = peer) // otherwise route reply to client
then !msg |> sendAll cloudfe !msg |> sendAll ( if peers |> Array.exists ((=) address)
else // otherwise route reply to client then cloudfe
!msg |> sendAll localfe else localfe )


// Now route as many clients requests as we can handle // Now route as many clients requests as we can handle
while workers.Count > 0 && frontends |> poll 0L do while workers.Count > 0 && frontends |> poll 0L do
// if reroutable, send to cloud 20% of the time // if reroutable, send to cloud 20% of the time
// here we'd normally use cloud status information // here we'd normally use cloud status information
if !reroutable && peers.Length > 0 && rand.Next(0,5) = 0 let address,backend =
then // route to random broker peer if !reroutable && peers.Length > 0 && rand.Next(0,5) = 0
let address = peers.[rand.Next peers.Length] |> encode then peers.[rand.Next peers.Length] |> encode , cloudbe
!msg else workers.Dequeue() , localbe
|> Array.append [| address; Array.empty |] !msg
|> sendAll cloudbe |> Array.append [| address; Array.empty |]
else // route to local worker |> sendAll backend
let address = workers.Dequeue()
!msg
|> Array.append [| address; Array.empty |]
|> sendAll localbe
// else ... No work, go back to backends // else ... No work, go back to backends

EXIT_SUCCESS EXIT_SUCCESS
| _ -> | _ ->
printfn "syntax: peering2 me {you}..." printfn "syntax: peering2 me {you}..."
Expand Down
247 changes: 247 additions & 0 deletions examples/F#/peering3.fsx
@@ -0,0 +1,247 @@
(*
Broker peering simulation (part 2)
Prototypes the request-reply flow
While this example runs in a single process, that is just to make
it easier to start and stop the example. Each thread has its own
context and conceptually acts as a separate process.
*)
#r @"bin/fszmq.dll"
#r @"bin/fszmq.devices.dll"
open fszmq
open fszmq.Context
open fszmq.devices
open fszmq.Polling
open fszmq.Socket

#load "zhelpers.fs"

open System.Collections.Generic

let [<Literal>] NBR_CLIENTS = 10
let [<Literal>] NBR_WORKERS = 5

let LRU_READY = "\001"B // signals worker is ready

let rand = srandom()

// request-reply client using REQ socket
// to simulate load, clients issue a burst of requests
// and then sleep for a random period.
let client_task (o:obj) =
let frontPort,monitorPort = o :?> (int * int)

use ctx = new Context(1)
use client = ctx |> req
connect client (sprintf "tcp://localhost:%i" frontPort)
use monitor = ctx |> push
connect monitor (sprintf "tcp://localhost:%i" monitorPort)

let shouldLoop = ref true
while !shouldLoop do

sleep ((rand.Next 5) * 1000)

let burst = ref (rand.Next 15)
while !shouldLoop && !burst <> 0 do
decr burst

let taskID = sprintf "%04X" (rand.Next 0x10000)

// send request with random hex ID
taskID |> encode |>> client

let items =
[ Poll( ZMQ.POLLIN,client,fun _ ->
let reply = recv client
// worker is supposed to answer us with our task ID
reply |> dumpFrame (Some "Client: ")
assert (reply = encode taskID) ) ]

// wait max ten seconds for a reply, then complain
if not (items |> poll 10000L) then
shouldLoop := false
(sprintf "E: CLIENT EXIT - lost task %s" taskID)
|> encode
|>> monitor

// worker using REQ socket to do LRU routing
let worker_task (o:obj) =
let backPort = o :?> int

use ctx = new Context(1)
use worker = ctx |> req
connect worker (sprintf "tcp://localhost:%i" backPort)

// tell broker we're ready for work
LRU_READY |>> worker

while true do
// workers are busy for 0/1/2 seconds
let msg = recvAll worker
sleep ((rand.Next 3) * 1000)
msg |> sendAll worker

let main args =
// first argument is this broker's name
// other arguments are our peers' names
match args |> Array.length with
| argc when argc > 1 ->
let cloudPort,self = let n = args.[1] in (int n),(encode n)
let peers = if argc > 2 then args.[2..] else [||]
let frontPort,backPort,statePort,monitorPort = cloudPort + 1,
cloudPort + 2,
cloudPort + 3,
cloudPort + 4
//NOTE: to run this example on Windows, we must use tcp...
// so when we do, assume inputs are port numbers, and we use
// them as the basis for additional (internal to the cluster)
// port numbers on non-windows systems, we can use ipc (as per
// the guide) so in *that* case, inputs are alphanumeric IDs
printfn' "I: preparing broker at %i..." cloudPort

// prepare our context and sockets
use ctx = new Context(1)

// bind cloud frontend to endpoint
use cloudfe = ctx |> route
(ZMQ.IDENTITY,self) |> set cloudfe
bind cloudfe (sprintf "tcp://*:%i" cloudPort)

// bind state backend / publisher to endpoint
use statebe = ctx |> pub
bind statebe (sprintf "tcp://*:%i" statePort)

// connect cloud backend to all peers
use cloudbe = ctx |> route
(ZMQ.IDENTITY,self) |> set cloudbe
peers |> Array.iter (fun peer ->
printfn' "I: connecting to cloud frontend at '%s'" peer
connect cloudbe (sprintf "tcp://localhost:%s" peer))

// connect statefe to all peers
use statefe = ctx |> sub
[""B] |> subscribe statefe
peers |> Array.iter (fun peer ->
let peerPort = (int peer) + 3
printfn' "I: connecting to state backend at '%i'" peerPort
connect statefe (sprintf "tcp://localhost:%i" peerPort))

// prepare local frontend and backend
use localfe = ctx |> route
bind localfe (sprintf "tcp://*:%i" frontPort)

use localbe = ctx |> route
bind localbe (sprintf "tcp://*:%i" backPort)

// prepare monitor socket
use monitor = ctx |> pull
bind monitor (sprintf "tcp://*:%i" monitorPort)

// get user to tell us when we can start...
printf' "Press Enter when all brokers are started: "
scanln() |> ignore

// start local workers
for _ in 1 .. NBR_WORKERS do
ignore (t_spawnp worker_task backPort)

// start local clients
for _ in 1 .. NBR_CLIENTS do
ignore (t_spawnp client_task (frontPort,monitorPort))

(* Interesting part
-------------------------------------------------------------
Publish-subscribe flow
- Poll statefe and process capacity updates
- Each time capacity changes, broadcast new value
Request-reply flow
- Poll primary and process local/cloud replies
- While worker available, route localfe to local or cloud *)

// queue of available workers
let workers = Queue()

// holds values collected/generated during polling
let msg = ref Array.empty
let localCapacity = ref 0
let cloudCapacity = ref 0

let primary =
[ Poll( ZMQ.POLLIN,localbe,fun _ ->
// handle reply from local worker
let msg' = recvAll localbe
msg'.[0] |> workers.Enqueue
incr localCapacity
// if it's READY, don't route the message any further
msg := if msg'.[2] = LRU_READY then [||] else msg'.[2 ..] )

Poll( ZMQ.POLLIN,cloudbe,fun _ ->
// handle reply from peer broker
let msg' = recvAll cloudbe
// we don't use peer broker address for anything
msg := msg'.[2 ..] )

Poll( ZMQ.POLLIN,statefe,fun _ ->
// handle capacity updates
cloudCapacity := statefe |> recv |> decode |> int )

Poll( ZMQ.POLLIN,monitor,fun _ ->
// handle monitor message
monitor |> recv |> decode |> (printfn' "%s") ) ]

let secondary =
let fetch socket = msg := recvAll socket
[ Poll(ZMQ.POLLIN,localfe,fetch)
Poll(ZMQ.POLLIN,cloudfe,fetch) ]

while true do
let previous = !localCapacity
let timeout = if workers.Count = 0 then -1L else 100000L

if primary |> poll timeout && (!msg).Length > 0 then
let address = (!msg).[0] |> decode
// route reply to cloud if it's addressed to a broker
// otherwise route reply to client
!msg |> sendAll ( if peers |> Array.exists ((=) address)
then cloudfe
else localfe )

// Now route as many clients requests as we can handle
// - If we have local capacity we poll both localfe and cloudfe
// - If we have cloud capacity only, we poll just localfe
// - Route any request locally if we can, else to cloud
let shouldLoop = ref true
while !localCapacity + !cloudCapacity > 0 && !shouldLoop do

shouldLoop := ( match !localCapacity with
| 0 -> secondary.Head :: []
| _ -> secondary )
|> poll 0L

if !shouldLoop then
let address,backend =
match !localCapacity with
| 0 -> // route to random broker peer
let random = rand.Next peers.Length
peers.[random] |> string |> encode, cloudbe
| _ -> // route to local worker
decr localCapacity
workers.Dequeue(), localbe
!msg
|> Array.append [| address; Array.empty |]
|> sendAll backend
// else ... No work, go back to backends

if !localCapacity <> previous then
// we stick our own address onto the envelope
statebe <~| (string >> encode) cloudPort
// broadcast new capacity
<<| (string >> encode) !localCapacity

EXIT_SUCCESS
| _ ->
printfn "syntax: peering3 me {you}..."
EXIT_FAILURE

main fsi.CommandLineArgs
7 changes: 0 additions & 7 deletions examples/F#/zhelpers.fs
Expand Up @@ -81,10 +81,3 @@ let t_spawnp fn p =
let t = System.Threading.Thread(fn') let t = System.Threading.Thread(fn')
t.Start(p) t.Start(p)
t t

let assert' cond =
#if COMPILED
assert cond
#else
if not cond then failwith "assertion failed"
#endif

0 comments on commit fd861b6

Please sign in to comment.