Skip to content

Commit

Permalink
Changes to threading API
Browse files Browse the repository at this point in the history
- attached and detatched thread model
- attached threads get explicit arguments (not via thread_t block)
- detached threads look like POSIX threads
- this fixed a problem of dangling pipe sockets in some cases
Also fixed zframe_dump to print newlines & tabs
  • Loading branch information
hintjens committed Apr 7, 2011
1 parent bcd734b commit 22b5d5b
Show file tree
Hide file tree
Showing 5 changed files with 143 additions and 78 deletions.
2 changes: 2 additions & 0 deletions NEWS
Expand Up @@ -4,6 +4,8 @@ libzapi version 1.2.2 (beta), released on 2011/04/xx
Changes
-------

* Threading API now supports attached and detached threads.

* In zframe class, added print, reset, strdup, streq, strhex, dup methods.

* In zmsg class, added last, wrap, unwrap, popstr, pushstr, addstr, dup
Expand Down
40 changes: 26 additions & 14 deletions include/zctx.h
Expand Up @@ -29,31 +29,34 @@
extern "C" {
#endif


// Opaque class structure
typedef struct _zctx_t zctx_t;

// @interface
// Structure passed to threads created via this class
typedef struct {
zctx_t *ctx; // Context shared with parent thread
void *pipe; // Pipe to parent thread (PAIR)
void *arg; // Application argument
} zthread_t;
// Detached threads follow POSIX pthreads API
typedef void *(zthread_detached_fn) (void *args);
// Attached threads get context and pipe from parent
typedef void (zthread_attached_fn) (void *args, zctx_t *ctx, void *pipe);

// Create new context, returns context object, replaces zmq_init
zctx_t *
zctx_new (void);

// Create new shadow context, returns context object
zctx_t *
zctx_shadow (zctx_t *self);

// Destroy context and all sockets in it, replaces zmq_term
void
zctx_destroy (zctx_t **self_p);
// Raise default I/O threads from 1, for crazy heavy applications

// Raise default I/O threads from 1, for crazy heavy applications
void
zctx_set_iothreads (zctx_t *self, int iothreads);

// Set msecs to flush sockets when closing them
void
void
zctx_set_linger (zctx_t *self, int linger);

// Create socket within this context, replaces zmq_socket
Expand All @@ -64,11 +67,20 @@ void *
void
zctx_socket_destroy (zctx_t *self, void *socket);

// Create thread, return PAIR socket to talk to thread. The child thread
// receives a (zthread_t *) object including a zctx, a pipe back to the
// creating thread, and the arg passed in this call.
// --- SHOULD MOVE TO zthread class
// Create an attached thread. An attached thread gets a ctx and a PAIR
// pipe back to its parent. It must monitor its pipe, and exit if the
// pipe becomes unreadable.
void *
zctx_thread_new (zctx_t *self, void *(*thread_fn) (void *), void *arg);
zctx_attach_thread (zctx_t *self, zthread_attached_fn *thread_fn,
void *args);

// Create a detached thread. A detached thread operates autonomously
// and is used to simulate a separate process. It gets no ctx, and no
// pipe.
void
zctx_detach_thread (zctx_t *self, zthread_detached_fn *thread_fn,
void *args);

// Self test of this class
int
Expand Down
21 changes: 21 additions & 0 deletions notes.txt
@@ -0,0 +1,21 @@
zsocket_

- subscribe
- unsubscribe
- hwm / set_
- swap / set_
- affinity / set_
- identity / set_
- rate / set_
- recovery_ivl / set_
- recovery_ivl_msec / set_
- mcast_loop / set_
- sndbuf / set_
- rcvbuf / set_
- linger / set_
- reconnect_ivl / set_
- reconnect_ivl_max / set_
- backlog / set_
- fd
- events
- type
156 changes: 93 additions & 63 deletions src/zctx.c
Expand Up @@ -226,101 +226,72 @@ zctx_socket_destroy (zctx_t *self, void *socket)


// --------------------------------------------------------------------------
// Thread creation code, taken from ZFL's zfl_thread class and customized.
// Thread creation code, wrapping POSIX and Win32 thread APIs

typedef struct {
void *(*thread_fn) (void *);
void *args;
// Two thread handlers, one will be set, one NULL
zthread_attached_fn *attached;
zthread_detached_fn *detached;
void *args; // Application arguments
zctx_t *ctx; // Context object if any
void *pipe; // Pipe, if any, back to parent
#if defined (__WINDOWS__)
HANDLE handle;
HANDLE handle; // Win32 thread handle
#endif
} shim_t;

#if defined (__UNIX__)
// Thread shim for UNIX calls the real thread and cleans up afterwards.

void *
s_call_thread_fn (void *args)
s_thread_shim (void *args)
{
assert (args);
shim_t *shim = (shim_t *) args;
shim->thread_fn (shim->args);
if (shim->attached)
shim->attached (shim->args, shim->ctx, shim->pipe);
else
shim->detached (shim->args);

zthread_t *zthread = (zthread_t *) shim->args;
zctx_destroy (&zthread->ctx);
free (zthread);
zctx_destroy (&shim->ctx);
free (shim);
return NULL;
}

#elif defined (__WINDOWS__)
// Thread shim for Windows that wraps a POSIX-style thread handler
// and does the _endthreadex for us automatically.

unsigned __stdcall
s_call_thread_fn (void *args)
s_thread_shim (void *args)
{
assert (args);
shim_t *shim = (shim_t *) args;
shim->thread_fn (shim->args);
if (shim->attached)
shim->attached (shim->args, shim->ctx, shim->pipe);
else
shim->detached (shim->args);

_endthreadex (0);
CloseHandle (shim->handle);

zthread_t *zthread = (zthread_t *) shim->args;
zctx_destroy (&zthread->ctx);
free (zthread);
zctx_destroy (&shim->ctx);
free (shim);
return 0;
}
#endif


// --------------------------------------------------------------------------
// Create a child thread able to speak to this thread over inproc sockets.
// The child thread receives a zthread_t structure as argument. Returns a
// PAIR socket that is connected to the child thread. You can ignore the
// socket if you don't need it.

void *
zctx_thread_new (zctx_t *self, void *(*thread_fn) (void *), void *arg)
static void
s_thread_start (shim_t *shim)
{
// Create our end of the pipe
void *pipe = zctx_socket_new (self, ZMQ_PAIR);
char endpoint [64];
int rc = snprintf (endpoint, 64, "inproc://zctx-pipe-%p", pipe);
assert (rc < 64);
rc = zmq_bind (pipe, endpoint);
assert (rc == 0);

// Child thread gets a zthread_t arguments block
zthread_t *args = (zthread_t *) zmalloc (sizeof (zthread_t));
args->arg = arg; // Application arguments

// Create new zctx_t for child, and new pipe in that
args->ctx = (zctx_t *) zmalloc (sizeof (zctx_t));
args->ctx->context = self->context;
args->ctx->sockets = zlist_new ();

// Create child end of pipe, and connect to ours
args->pipe = zctx_socket_new (args->ctx, ZMQ_PAIR);
rc = zmq_connect (args->pipe, endpoint);
assert (rc == 0);

// Now start child thread, passing our arguments
shim_t *shim = (shim_t *) zmalloc (sizeof (shim_t));
shim->thread_fn = thread_fn;
shim->args = args;

#if defined (__UNIX__)
pthread_t thread;
pthread_create (&thread, NULL, s_call_thread_fn, shim);
pthread_create (&thread, NULL, s_thread_shim, shim);
pthread_detach (thread);

#elif defined (__WINDOWS__)
shim->handle = (HANDLE)_beginthreadex(
NULL, // Handle is private to this process
0, // Use a default stack size for new thread
&s_call_thread_fn, // Start real thread function via this shim
&s_thread_shim, // Start real thread function via this shim
shim, // Which gets the current object as argument
CREATE_SUSPENDED, // Set thread priority before starting it
NULL); // We don't use the thread ID
Expand All @@ -332,28 +303,84 @@ zctx_thread_new (zctx_t *self, void *(*thread_fn) (void *), void *arg)
// Now start thread
ResumeThread (shim->handle);
#endif
}


// --------------------------------------------------------------------------
// Create an attached thread. An attached thread gets a ctx and a PAIR
// pipe back to its parent. It must monitor its pipe, and exit if the
// pipe becomes unreadable.

void *
zctx_attach_thread (zctx_t *self, zthread_attached_fn *thread_fn, void *args)
{
// Create our end of the pipe
void *pipe = zctx_socket_new (self, ZMQ_PAIR);
char endpoint [64];
int rc = snprintf (endpoint, 64, "inproc://zctx-pipe-%p", pipe);
assert (rc < 64);
rc = zmq_bind (pipe, endpoint);
assert (rc == 0);

// Prepare argument shim for child thread
shim_t *shim = (shim_t *) zmalloc (sizeof (shim_t));
shim->attached = thread_fn;
shim->args = args;

// Create shadow zctx_t object for child thread
shim->ctx = (zctx_t *) zmalloc (sizeof (zctx_t));
shim->ctx->context = self->context;
shim->ctx->sockets = zlist_new ();

// Connect child pipe to our pipe
shim->pipe = zctx_socket_new (shim->ctx, ZMQ_PAIR);
rc = zmq_connect (shim->pipe, endpoint);
assert (rc == 0);

s_thread_start (shim);
return pipe;
}


// --------------------------------------------------------------------------
// Create a detached thread. A detached thread operates autonomously
// and is used to simulate a separate process. It gets no ctx, and no
// pipe.

void
zctx_detach_thread (zctx_t *self, zthread_detached_fn *thread_fn, void *args)
{
// Prepare argument shim for child thread
shim_t *shim = (shim_t *) zmalloc (sizeof (shim_t));
shim->detached = thread_fn;
shim->args = args;
s_thread_start (shim);
}


// --------------------------------------------------------------------------
// Selftest

// @selftest
static void *
s_test_thread (void *args_ptr)
s_test_detached (void *args)
{
zthread_t *args = (zthread_t *) args_ptr;
zctx_t *ctx = zctx_new ();
// Create a socket to check it'll be automatically deleted
void *push = zctx_socket_new (ctx, ZMQ_PUSH);
zctx_destroy (&ctx);
return NULL;
}

// Create a socket to check it'll be properly deleted at exit
zctx_socket_new (args->ctx, ZMQ_PUSH);
static void
s_test_attached (void *args, zctx_t *ctx, void *pipe)
{
// Create a socket to check it'll be automatically deleted
zctx_socket_new (ctx, ZMQ_PUSH);

// Wait for our parent to ping us, and pong back
char *ping = zstr_recv (args->pipe);
free (ping);
zstr_send (args->pipe, "pong");
return NULL;
free (zstr_recv (pipe));
zstr_send (pipe, "pong");
}

// @end
Expand Down Expand Up @@ -388,12 +415,15 @@ zctx_test (Bool verbose)
zmq_connect (s6, "tcp://127.0.0.1:5555");

// Create a child thread, check it's safely alive
void *pipe = zctx_thread_new (ctx, s_test_thread, NULL);

void *pipe = zctx_attach_thread (ctx, s_test_attached, NULL);
zstr_send (pipe, "ping");
char *pong = zstr_recv (pipe);
assert (streq (pong, "pong"));
free (pong);

zctx_detach_thread (ctx, s_test_detached, NULL);

// Everything should be cleanly closed now
zctx_destroy (&ctx);
// @end
Expand Down
2 changes: 1 addition & 1 deletion src/zframe.c
Expand Up @@ -248,7 +248,7 @@ zframe_print (zframe_t *self, char *prefix)
int is_text = 1;
int char_nbr;
for (char_nbr = 0; char_nbr < size; char_nbr++)
if (data [char_nbr] < 32 || data [char_nbr] > 127)
if (data [char_nbr] < 9 || data [char_nbr] > 127)
is_text = 0;

fprintf (stderr, "[%03d] ", (int) size);
Expand Down

0 comments on commit 22b5d5b

Please sign in to comment.