Skip to content

Commit

Permalink
Merge pull request booksbyus#139 from pblasucci/master
Browse files Browse the repository at this point in the history
Updated F# examples
  • Loading branch information
hintjens committed Dec 1, 2011
2 parents a1d4b64 + fd861b6 commit 945b863
Show file tree
Hide file tree
Showing 26 changed files with 1,382 additions and 48 deletions.
108 changes: 106 additions & 2 deletions examples/F#/asyncsrv.fsx
@@ -1,8 +1,112 @@
(*
Asynchronous client-to-server (DEALER to ROUTER)
While this example runs in a single process, that is just to make
it easier to start and stop the example. Each task has its own
context and conceptually acts as a separate process.
*)
#r @"bin/fszmq.dll"
#r @"bin/fszmq.devices.dll"
open fszmq
open fszmq.Context
open fszmq.devices
open fszmq.Polling
open fszmq.Socket

let main () =
((* put code here *))
#load "zhelpers.fs"

// this is our client task
// it connects to the server, and then sends a request once per second
// it collects responses as they arrive, and it prints them out. We will
// run several client tasks in parallel, each with a different random ID.
let client_task () =
use ctx = new Context(1)
use client = deal ctx

// set random identity to make tracing easier
s_setID client
let identity = ZMQ.IDENTITY |> get<byte[]> client |> decode
connect client "tcp://localhost:5570"

let printMsg socket =
let content = match socket |> recvAll with
| [| content |] -> decode content
| _ -> "<NULL>"
printfn' "(%s) %s" identity content

let request_nbr = ref 0
while true do
// tick once per second, pulling in arriving messages
for _ in 1 .. 100 do
[Poll(ZMQ.POLLIN,client,printMsg)] |> poll 10000L |> ignore
incr request_nbr
(sprintf "request %d" !request_nbr) |> s_send client

// accept a request and reply with the same text
// a random number of times, with random delays between replies.
let rand = srandom()
let server_worker (ctx:obj) =
use worker = (ctx :?> Context) |> deal
connect worker "tcp://localhost:5600"

while true do
// The DEALER socket gives us the address envelope and message
let message = worker |> recvAll
// Send 0..4 replies back
let replies = rand.Next(0,5)
for _ in 1 .. replies do
sleep (rand.Next 1000)
message |> sendAll worker

// this is our server task
// it uses the multithreaded server model to deal requests out to a pool
// of workers and route replies back to clients. One worker can handle
// one request at a time but one client can talk to multiple workers at
// once.
let server_task () =
use ctx = new Context(1)

// frontend socket talks to clients over TCP
use frontend = ctx |> route
bind frontend "tcp://*:5570"

// backend socket talks to workers over inproc
use backend = ctx |> deal
// bind backend "inproc://backend"
// ... except on Windows where 0MQ doesn't have a binding
// for named pipes, so we use TCP instead
bind backend "tcp://*:5600"

// launch pool of worker threads, precise number is not critical
for _ in 1 .. 5 do
ctx |> s_spawnp server_worker |> ignore

// connect backend to frontend via a queue device
// we could do this:
// Devices.queue(frontend,backend)
// but doing it ourselves means we can debug this more easily

// switch messages between frontend and backend
let items =
[ Poll(ZMQ.POLLIN,frontend,
fun _ -> let msg = frontend |> recvAll
//printfn' "request from client:"
//dumpMsg msg
msg |> sendAll backend)
Poll(ZMQ.POLLIN,backend ,
fun _ -> let msg = backend |> recvAll
//printfn' "reply from worker:"
//dumpMsg msg
msg |> sendAll frontend) ]
while items |> poll -1L do ((* loop *))

let main () =
s_spawn client_task |> ignore
s_spawn client_task |> ignore
s_spawn client_task |> ignore
s_spawn server_task |> ignore
// run for 5 seconds then quit
sleep 5000
EXIT_SUCCESS

main ()
30 changes: 28 additions & 2 deletions examples/F#/durapub.fsx
@@ -1,8 +1,34 @@

(*
Publisher for durable subscriber
*)
#r @"bin/fszmq.dll"
open fszmq
open fszmq.Context
open fszmq.Socket

#load "zhelpers.fs"

let main () =
((* put code here *))
use context = new Context(1)

// subscriber tells us when it's ready here
use sync = pull context
"tcp://*:5564" |> bind sync

// we send updates via this socket
use publisher = pub context
"tcp://*:5565" |> bind publisher

// wait for a synchronization request
sync |> s_recv |> ignore

// now broadcast exactly 10 updates with pause
for update_nbr in 0 .. 9 do
let message = sprintf "Update %d" update_nbr
message |> s_send publisher
sleep 1000
"END" |> s_send publisher

EXIT_SUCCESS

main ()
36 changes: 34 additions & 2 deletions examples/F#/durapub2.fsx
@@ -1,8 +1,40 @@

(*
Publisher for durable subscriber
*)
#r @"bin/fszmq.dll"
open fszmq
open fszmq.Context
open fszmq.Socket

#load "zhelpers.fs"

let main () =
((* put code here *))
use context = new Context(1)

// subscriber tells us when it's ready here
use sync = pull context
"tcp://*:5564" |> bind sync

// we send updates via this socket
use publisher = pub context

// prevent publisher overflow from slow subscribers
(ZMQ.HWM,1UL) |> set publisher

// specify swap space in bytes, this covers all subscribers
(ZMQ.SWAP,25000000UL) |> set publisher
"tcp://*:5565" |> bind publisher

// wait for a synchronization request
sync |> s_recv |> ignore

// now broadcast exactly 10 updates with pause
for update_nbr in 0 .. 9 do
let message = sprintf "Update %d" update_nbr
message |> s_send publisher
sleep 1000
"END" |> s_send publisher

EXIT_SUCCESS

main ()
30 changes: 28 additions & 2 deletions examples/F#/durasub.fsx
@@ -1,8 +1,34 @@

(*
Durable subscriber
*)
#r @"bin/fszmq.dll"
open fszmq
open fszmq.Context
open fszmq.Socket

#load "zhelpers.fs"

let main () =
((* put code here *))
use context = new Context(1)

// connect our subscriber socket
use subscriber = sub context
(ZMQ.IDENTITY,"Hello"B) |> set subscriber
[ ""B ] |> subscribe subscriber
"tcp://localhost:5565" |> connect subscriber

// synchronize with publisher
use sync = push context
"tcp://localhost:5564" |> connect sync
"" |> s_send sync

// get updates, exit when told to do so
let rec loop () =
let message = s_recv subscriber
printfn "%s" message
if message <> "END" then loop()
loop()

EXIT_SUCCESS

main ()
30 changes: 28 additions & 2 deletions examples/F#/identity.fsx
@@ -1,8 +1,34 @@

(*
Demonstrate identities as used by the request-reply pattern. Run this
program by itself. Note that the utility functions s_ are provided by
zhelpers.fs. It gets boring for everyone to keep repeating this code.
*)
#r @"bin/fszmq.dll"
open fszmq
open fszmq.Context
open fszmq.Socket

#load "zhelpers.fs"

let main () =
((* put code here *))
use context = new Context(1)

use sink = route context
"inproc://example" |> bind sink

// first allow 0MQ to set the identity
use anonymous = req context
"inproc://example" |> connect anonymous
"ROUTER uses a generated UUID" |> s_send anonymous
s_dump sink

// then set the identity ourself
use identified = req context
(ZMQ.IDENTITY,"Hello"B) |> set identified
"inproc://example" |> connect identified
"ROUTER socket uses REQ's socket identity" |> s_send identified
s_dump sink

EXIT_SUCCESS

main ()

0 comments on commit 945b863

Please sign in to comment.