Skip to content
This repository has been archived by the owner on Feb 10, 2021. It is now read-only.

Commit

Permalink
added (!!) operator for accessing handles; made Frame class public; m…
Browse files Browse the repository at this point in the history
…iscellaneous code clean-up
  • Loading branch information
pblasucci committed Aug 23, 2013
1 parent 9058dc6 commit 1f9e39c
Show file tree
Hide file tree
Showing 6 changed files with 29 additions and 30 deletions.
8 changes: 4 additions & 4 deletions fszmq/Context.fs
Expand Up @@ -40,7 +40,7 @@ module Context =
/// Creates a Socket, of the given type, within the given context
[<Extension;CompiledName("Socket")>]
let newSocket (context:Context) socketType =
new Socket(context.Handle,socketType)
new Socket(!!context,socketType)

/// <summary>
/// Creates a peer connected to exactly one other peer.
Expand Down Expand Up @@ -134,13 +134,13 @@ module Context =
/// Gets the value of the given option for the given Context
[<Extension;CompiledName("GetOption")>]
let get (context:Context) contextOption =
let okay = C.zmq_ctx_get(context.Handle,contextOption)
let okay = C.zmq_ctx_get(!!context,contextOption)
if okay = -1 then ZMQ.error()

/// Sets the given option value for the given Context
[<Extension;CompiledName("SetOption")>]
let set (context:Context) (contextOption,value) =
let okay = C.zmq_ctx_set(context.Handle,contextOption,value)
let okay = C.zmq_ctx_set(!!context,contextOption,value)
if okay <> 0 then ZMQ.error()

/// Sets the given block of option values for the given Context
Expand All @@ -161,5 +161,5 @@ module Context =
[<Extension;CompiledName("SetMonitor")>]
let monitor (context:Context) callback =
let bind = (fun s e d -> ZMQEvent.Build(s,e,d) |> callback)
let okay = C.zmq_ctx_set_monitor(context.Handle,C.zmq_monitor_fn(bind))
let okay = C.zmq_ctx_set_monitor(!!context,C.zmq_monitor_fn(bind))
if okay <> 0 then ZMQ.error()
3 changes: 1 addition & 2 deletions fszmq/Core.fs
Expand Up @@ -17,9 +17,8 @@ open System.Runtime.InteropServices

/// <summary>
/// Provides a memory-managed wrapper over ZMQ message operations
/// <remarks>NOTE: For internal use only.</remarks>
/// </summary>
type internal Frame(?source:byte array) =
type Frame(?source:byte array) =
let mutable disposed = false
let mutable _memory = Marshal.AllocHGlobal(C.ZMQ_MSG_T_SIZE)

Expand Down
4 changes: 4 additions & 0 deletions fszmq/Marshal.fs
Expand Up @@ -18,6 +18,10 @@ open System.Runtime.InteropServices
[<AutoOpen>]
module internal Marshal =

//NOTE: this operator isn't really related to Marshaling,
// but this is a convenient location for it's definition
let inline (!!) (w : ^w) = (^w : (member Handle : nativeint) w)

(* reading native values *)
let inline readInt32 pointer = Marshal.ReadInt32(pointer)
let inline readInt64 pointer = Marshal.ReadInt64(pointer)
Expand Down
2 changes: 1 addition & 1 deletion fszmq/Polling.fs
Expand Up @@ -51,7 +51,7 @@ module Polling =
[<CompiledName("PollInOut")>]
let pollIO fn socket = Poll(ZMQ.POLLIN ||| ZMQ.POLLOUT,socket,fn)

let private poller (Poll(v,s,_)) = C.zmq_pollitem_t(s.Handle,v)
let private poller (Poll(v,s,_)) = C.zmq_pollitem_t(!!s,v)
let private invoke (Poll(_,s,f)) = f s

/// <summary>
Expand Down
10 changes: 3 additions & 7 deletions fszmq/Proxy.fs
Expand Up @@ -20,17 +20,13 @@ open System.Runtime.CompilerServices
[<CompilationRepresentation(CompilationRepresentationFlags.ModuleSuffix)>]
module Proxying =

//MAYBE: replace this with inline operator generalized to anything with get_Handle?
let private (|Handle|_|) = Option.map (fun (s:Socket) -> s.Handle)

/// creates a proxy connection passing messages between two sockets,
/// with an (optional) third socket for supplemental data capture
[<CompiledName("Proxy")>]
let proxy (frontend:Socket) (backend:Socket) capture =
let frontend,backend = frontend.Handle,backend.Handle
let proxy (frontend:Socket) (backend:Socket) (capture:Socket option) =
match capture with
| Handle(capture) -> C.zmq_proxy(frontend,backend,capture)
| _ -> C.zmq_proxy(frontend,backend, 0n)
| Some(capture) -> C.zmq_proxy(!!frontend,!!backend,!!capture)
| _ -> C.zmq_proxy(!!frontend,!!backend, 0n)

/// Utilities for working with Polling from languages other than F#
[<Extension>]
Expand Down
32 changes: 16 additions & 16 deletions fszmq/Socket.fs
Expand Up @@ -27,26 +27,26 @@ module Socket =
/// connections at the given address
[<Extension;CompiledName("Bind")>]
let bind (socket:Socket) address =
let okay = C.zmq_bind(socket.Handle,address)
let okay = C.zmq_bind(!!socket,address)
if okay <> 0 then ZMQ.error()

/// Causes an endpoint to stop accepting
/// connections at the given address
[<Extension;CompiledName("Unbind")>]
let unbind (socket:Socket) address =
let okay = C.zmq_unbind(socket.Handle,address)
let okay = C.zmq_unbind(!!socket,address)
if okay <> 0 then ZMQ.error()

/// Connects to an endpoint to the given address
[<Extension;CompiledName("Connect")>]
let connect (socket:Socket) address =
let okay = C.zmq_connect(socket.Handle,address)
let okay = C.zmq_connect(!!socket,address)
if okay <> 0 then ZMQ.error()

/// Disconnects to an endpoint from the given address
[<Extension;CompiledName("Disconnect")>]
let disconnect (socket:Socket) address =
let okay = C.zmq_disconnect(socket.Handle,address)
let okay = C.zmq_disconnect(!!socket,address)
if okay <> 0 then ZMQ.error()

(* socket options *)
Expand All @@ -66,7 +66,7 @@ module Socket =
let buffer = Marshal.AllocHGlobal(size)
try
let mutable size' = unativeint size
let okay = C.zmq_getsockopt(socket.Handle,socketOption,buffer,&size')
let okay = C.zmq_getsockopt(!!socket,socketOption,buffer,&size')
if okay <> 0 then ZMQ.error()
downcast read (size',buffer)
finally
Expand All @@ -87,7 +87,7 @@ module Socket =
let buffer = Marshal.AllocHGlobal(size)
try
write(buffer)
let okay = C.zmq_setsockopt(socket.Handle
let okay = C.zmq_setsockopt(!!socket
,socketOption
,buffer
,unativeint size)
Expand Down Expand Up @@ -125,8 +125,8 @@ module Socket =
/// if the send was successful (or should be re-tried)
[<Extension;CompiledName("TrySend")>]
let trySend (socket:Socket) flags frame =
use frm = new Frame(frame)
match C.zmq_msg_send(frm.Handle,socket.Handle,flags) with
use frame = new Frame(frame)
match C.zmq_msg_send(!!frame,!!socket,flags) with
| Okay -> true
| Busy -> false
| Fail -> ZMQ.error()
Expand Down Expand Up @@ -175,11 +175,11 @@ module Socket =
/// where None indicates the operation should be re-attempted
[<Extension;CompiledName("TryRecv")>]
let tryRecv (socket:Socket) flags =
use frm = new Frame()
match C.zmq_msg_recv(frm.Handle,socket.Handle,flags) with
| Okay -> let mutable frame = Array.empty
frame <- frm.Data
Some(frame)
use frame = new Frame()
match C.zmq_msg_recv(!!frame,!!socket,flags) with
| Okay -> let mutable frame' = Array.empty
frame' <- frame.Data
Some(frame')
| Busy -> None
| Fail -> ZMQ.error()

Expand All @@ -201,15 +201,15 @@ module Socket =
/// first marshalling the message part into the managed code space
[<Extension;CompiledName("Transfer")>]
let transfer (socket:Socket) (target:Socket) =
use frm = new Frame()
use frame = new Frame()
let rec send' flags =
match C.zmq_msg_send(frm.Handle,target.Handle,flags) with
match C.zmq_msg_send(!!frame,!!target,flags) with
| Okay -> ((* pass *))
| Busy -> send' flags
| Fail -> ZMQ.error()
let loop = ref true
while !loop do
match C.zmq_msg_recv(frm.Handle,socket.Handle,ZMQ.WAIT) with
match C.zmq_msg_recv(!!frame,!!socket,ZMQ.WAIT) with
| Okay -> loop := socket |> recvMore
send' (if !loop then ZMQ.SNDMORE else ZMQ.DONTWAIT)
| _ -> ZMQ.error()
Expand Down

0 comments on commit 1f9e39c

Please sign in to comment.