Permalink
Browse files

added capability to build up frames using push and pull zmq methods

  • Loading branch information...
1 parent c330db4 commit 86e3d1db998a01039ee884776a24b5e152d23711 John Murphy committed Sep 30, 2012
View
@@ -17,16 +17,24 @@ libzmq.dll libmdp.dll and mdpwrapper.dll have to be in your applications search
Usage Example:
// instantiates client, sends a message and cleans up
-using( Client c = new Client( "tcp://192.168.1.7:5555" ) )
+using (Client client = new Client("tcp://192.168.1.7:5555", true))
{
- c.Send( "my service", "I like cheese" );
- Response r = c.Recv();
-
- if( r != null )
- {
- r.Print();
- }
-
+ Request request = client.CreateRequest("echo") ;
+ request.PushMem(Encoding.Unicode.GetBytes("foo bar baz"));
+ request.PushString("Tasty cheese!");
+ request.Send();
+
+ // reads response, using calls Dispose to clean up resources when
+ // response goes out of scope
+ using (Response response = client.Recv())
+ {
+ string stringcontents = response.PopString();
+ byte[] buff = response.PopMem();
+ string buffcontents = Encoding.Unicode.GetString(buff);
+ }
+
+
+
}
zeromq.majordomo.test.clientconsole is an application to test the zeromq.majordomo.csharp assembly
@@ -49,33 +49,14 @@ void client_send_string( mdp_client_t *self, char *service, char *msg )
}
-void client_recv (mdp_client_t *self, char **service_p, char **response_p )
+zmsg_t* client_recv (mdp_client_t *self, char **service_p )
{
- *response_p = NULL;
- zmsg_t* reply = mdp_client_recv( self, service_p );
- if( reply ) {
- size_t buffer_size = zmsg_content_size( reply ) + 1;
- *response_p = (char*)malloc( buffer_size );
- memset( *response_p, 0, buffer_size );
-
- char* p = *response_p;
-
- while(true) {
- zframe_t* frame = zmsg_pop( reply );
- if( frame ) {
- size_t frame_size = zframe_size(frame);
- memcpy( p, zframe_data(frame), frame_size );
- p += frame_size;
- zframe_destroy(&frame);
- } else {
- break;
- }
- }
- }
+ return mdp_client_recv( self, service_p );
}
void client_send( mdp_client_t *self, char *service, zmsg_t **msg_p )
{
+ mdp_client_send( self, service, msg_p );
}
zmsg_t* msg_new()
@@ -91,4 +72,25 @@ int push_str( zmsg_t* msg, char* str )
int push_mem( zmsg_t* msg, const void* buffer, int length )
{
return zmsg_pushmem( msg, buffer, length );
+}
+
+void msg_destroy( zmsg_t **msg_p )
+{
+ zmsg_destroy( msg_p );
+}
+void frame_destroy( zframe_t **frame_p )
+{
+ zframe_destroy( frame_p );
+}
+
+int pop_mem( zmsg_t* msg, void** buffer )
+{
+ int buffer_size = 0;
+ *buffer = NULL;
+ zframe_t* frame = zmsg_pop( msg );
+ buffer_size = zframe_size( frame );
+ *buffer = malloc( buffer_size );
+ memcpy( *buffer, zframe_data( frame ), buffer_size );
+ zframe_destroy( &frame );
+ return buffer_size;
}
@@ -32,8 +32,12 @@ MDPWRAPPER_API int client_setsockopt (mdp_client_t *self, int option, const void
MDPWRAPPER_API int client_getsockopt (mdp_client_t *self, int option, void *optval, size_t *optvallen);
MDPWRAPPER_API void client_send_data (mdp_client_t *self, char *service, char *data, int size );
MDPWRAPPER_API void client_send_string (mdp_client_t *self, char *service, char *msg );
-MDPWRAPPER_API void client_recv (mdp_client_t *self, char **service_p, char **response_p);
+MDPWRAPPER_API zmsg_t* client_recv (mdp_client_t *self, char **service_p);
MDPWRAPPER_API void client_send( mdp_client_t *self, char *service, zmsg_t **msg_p );
MDPWRAPPER_API zmsg_t* msg_new();
MDPWRAPPER_API int push_str( zmsg_t* msg, char* str );
MDPWRAPPER_API int push_mem( zmsg_t* msg, const void* buffer, int length );
+MDPWRAPPER_API void msg_destroy( zmsg_t **msg_p );
+MDPWRAPPER_API void frame_destroy( zframe_t **frame_p );
+MDPWRAPPER_API int pop_mem( zmsg_t* msg, void** buffer ) ;
+
@@ -15,28 +15,82 @@ public struct Wrapper
[DllImport("mdpwrapper.dll", CharSet=CharSet.Ansi, CallingConvention = CallingConvention.Cdecl, EntryPoint = "#7")]
static extern void _client_send_string(IntPtr handle, String service, String message );
[DllImport("mdpwrapper.dll", CallingConvention = CallingConvention.Cdecl, EntryPoint = "#4")]
- static extern void _client_recv(IntPtr handle, IntPtr service, IntPtr message );
+ static extern IntPtr _client_recv(IntPtr handle, IntPtr service );
[DllImport("mdpwrapper.dll", CallingConvention = CallingConvention.Cdecl, EntryPoint = "#8")]
static extern void _client_set_timeout(IntPtr handle, int timeout);
- [DllImport("mdpwrapper.dll", CallingConvention = CallingConvention.Cdecl, EntryPoint = "#10")]
+ [DllImport("mdpwrapper.dll", CallingConvention = CallingConvention.Cdecl, EntryPoint = "#12")]
static extern IntPtr _msg_new();
- [DllImport("mdpwrapper.dll", CallingConvention = CallingConvention.Cdecl, EntryPoint = "#11")]
+ [DllImport("mdpwrapper.dll", CallingConvention = CallingConvention.Cdecl, EntryPoint = "#14")]
static extern int _push_mem(IntPtr msg_handle, byte[] buffer, int length );
[DllImport("mdpwrapper.dll", CallingConvention = CallingConvention.Cdecl, EntryPoint = "#5")]
static extern void _client_send( IntPtr handle, String service, out IntPtr msg_handle );
+ [DllImport("mdpwrapper.dll", CharSet=CharSet.Ansi, CallingConvention = CallingConvention.Cdecl, EntryPoint = "#15")]
+ static extern int _push_str(IntPtr msg_handle, String buffer);
+ [DllImport("mdpwrapper.dll", CallingConvention = CallingConvention.Cdecl, EntryPoint = "#13")]
+ static extern int _pop_mem(IntPtr msg_handle, IntPtr buffer );
+ [DllImport("mdpwrapper.dll", CallingConvention = CallingConvention.Cdecl, EntryPoint = "#11")]
+ static extern void _msg_destroy(out IntPtr msg_handle);
+
+ internal static void msg_destroy(IntPtr msg_handle)
+ {
+ _msg_destroy(out msg_handle);
+ }
- internal static void client_send(Request request)
+ internal static byte[] pop_mem(IntPtr msg_handle)
{
- // todo implement me
+ byte[] response = null;
+ IntPtr ptrptr = Marshal.AllocHGlobal(Marshal.SizeOf(typeof(IntPtr)));
+ int size = _pop_mem(msg_handle, ptrptr);
+ IntPtr buffptr = (IntPtr)Marshal.PtrToStructure(ptrptr, typeof(IntPtr));
+ if (size > 0)
+ {
+ response = new byte[size];
+ Marshal.Copy(buffptr, response, 0, size);
+ }
+ Marshal.FreeHGlobal(ptrptr);
+ return response;
+
}
+ internal static String pop_str(IntPtr msg_handle)
+ {
+ string response = null;
+ byte[] buff = pop_mem(msg_handle);
+ if (buff != null)
+ {
+ response = System.Text.ASCIIEncoding.ASCII.GetString(buff);
+
+ }
+ return response;
+
+ }
+
+ internal static void client_send(IntPtr handle, String service, IntPtr msg_handle )
+ {
+ _client_send(handle, service, out msg_handle);
+ }
+
+ internal static void push_str(IntPtr msg_handle, String buffer)
+ {
+ int rc = _push_str(msg_handle, buffer);
+ if (rc != 0)
+ {
+ throw new System.Exception("Attempt to push string onto message failed.");
+ }
+
+ }
internal static void push_mem(IntPtr msg_handle, byte[] buffer)
{
int rc = _push_mem(msg_handle, buffer, buffer.Length);
+ if (rc != 0)
+ {
+ throw new System.Exception("Attempt to add buffer to message failed");
+ }
}
+
internal static IntPtr msg_new()
{
return _msg_new();
@@ -62,21 +116,20 @@ internal static void client_send_string(IntPtr handle, string service, string me
_client_send_string(handle, service, message);
}
- internal static void client_recv(IntPtr handle, ref string service, ref string message)
+
+ internal static Response client_recv(IntPtr handle )
{
+ string service = string.Empty;
IntPtr ppservice = Marshal.AllocHGlobal(Marshal.SizeOf(typeof(IntPtr)));
- IntPtr ppmessage = Marshal.AllocHGlobal(Marshal.SizeOf(typeof(IntPtr)));
-
- _client_recv(handle, ppservice, ppmessage);
- IntPtr pmessage = (IntPtr)Marshal.PtrToStructure(ppmessage, typeof(IntPtr));
- message = Marshal.PtrToStringAnsi(pmessage);
- IntPtr pservice = (IntPtr)Marshal.PtrToStructure(ppservice, typeof(IntPtr));
+ IntPtr msg_handle = _client_recv(handle, ppservice);
+ IntPtr pservice = (IntPtr)Marshal.PtrToStructure(ppservice, typeof(IntPtr));
+
if (pservice != IntPtr.Zero)
{
service = Marshal.PtrToStringAnsi(pservice);
}
Marshal.FreeHGlobal(ppservice);
- Marshal.FreeHGlobal(ppmessage);
+ return new Response( msg_handle, service );
}
@@ -98,34 +151,32 @@ public Client(String broker, bool verbose)
handle = Wrapper.client_new(broker, verbose ? 1 : 0);
}
+ public Request CreateRequest(String service)
+ {
+ Request request = new Request(handle, service);
+ return request;
+ }
+
/*
* Send a message to worker identified by service argument
*
* ex. c.Send( "echo", "I like cheese" );
+ *
+ * Extended send functionality is provided in the Request object.
*/
public void Send( String service, String message )
{
Wrapper.client_send_string( handle, service, message );
}
- public void Send(Request request)
- {
- }
/*
- * Handles response (if any from worker) response may be null
+ * Handles response (if any from worker)
*/
public Response Recv()
{
- Response response = null;
- string service = string.Empty;
- string message = string.Empty;
- Wrapper.client_recv(handle, ref service, ref message);
- if( !string.IsNullOrEmpty( message ) )
- {
- response = new Response(service, message);
- }
- return response;
+
+ return Wrapper.client_recv(handle);
}
public int Timeout
@@ -0,0 +1,56 @@
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Text;
+
+namespace zeromq.majordomo.csharp
+{
+ public class Request
+ {
+ IntPtr handle;
+ IntPtr msg_handle;
+ String service;
+ bool sent = false;
+
+ private void CheckSent()
+ {
+ if( sent )
+ {
+ throw new Exception( @"Adding information to request is not allowed once the request has been sent" );
+ }
+ }
+
+ internal Request(IntPtr handle, String service)
+ {
+ this.service = service;
+ this.handle = handle;
+ msg_handle = Wrapper.msg_new();
+ }
+
+ public void PushString(String data)
+ {
+ CheckSent();
+ Wrapper.push_str(msg_handle, data);
+
+ }
+
+ public void PushMem(byte[] buffer)
+ {
+ CheckSent();
+ Wrapper.push_mem(msg_handle, buffer);
+ }
+
+ public void Send()
+ {
+ CheckSent();
+ Wrapper.client_send(handle, this.service, this.msg_handle);
+ sent = true;
+ }
+
+
+
+
+
+
+ }
+}
@@ -5,40 +5,52 @@
namespace zeromq.majordomo.csharp
{
- public class Response
+ public class Response :IDisposable
{
private string service ;
- private string message;
+ private IntPtr msg_handle;
- internal Response( string service, string message )
+ internal Response( IntPtr handle, string service )
{
this.service = service;
- this.message = message;
+ this.msg_handle = handle;
+
}
- public string Service
+ public string PopString()
{
- get
- {
- return this.service;
- }
+ return Wrapper.pop_str(msg_handle);
+ }
+
+ public byte[] PopMem()
+ {
+ return Wrapper.pop_mem(msg_handle);
}
- public string Message
+ public string Service
{
get
{
- return this.message;
+ return this.service;
}
}
+
public void Print()
{
Console.WriteLine("Response");
Console.WriteLine(string.Format(" Service: {0}", this.service ));
- Console.WriteLine(string.Format(" Message: {0}", this.message ));
+ // Console.WriteLine(string.Format(" Message: {0}", this.message ));
}
+ // ALWAYS call to clean up
+ public void Dispose()
+ {
+ if (msg_handle != IntPtr.Zero)
+ {
+ Wrapper.msg_destroy(msg_handle);
+ }
+ }
}
}
Oops, something went wrong.

0 comments on commit 86e3d1d

Please sign in to comment.