diff --git a/src/CZMQ-ZWSSock/CZMQ-ZWSSock.vcxproj b/src/CZMQ-ZWSSock/CZMQ-ZWSSock.vcxproj index ee69580..eb9c91d 100644 --- a/src/CZMQ-ZWSSock/CZMQ-ZWSSock.vcxproj +++ b/src/CZMQ-ZWSSock/CZMQ-ZWSSock.vcxproj @@ -1,5 +1,5 @@  - + Debug @@ -14,18 +14,19 @@ {D2D01459-3588-4911-916E-FEC6D89E7AC4} Win32Proj CZMQ-ZWSSock + 10.0.16299.0 Application true - v120 + v141 Unicode Application false - v120 + v141 true Unicode diff --git a/src/CZMQ-ZWSSock/main.c b/src/CZMQ-ZWSSock/main.c index 82f4195..255cae3 100644 --- a/src/CZMQ-ZWSSock/main.c +++ b/src/CZMQ-ZWSSock/main.c @@ -2,31 +2,31 @@ #include "zwssock.h" +#define ZMQ_STATIC + static char *listen_on = "tcp://127.0.0.1:8000"; int main(int argc, char **argv) { - zctx_t *ctx; zwssock_t *sock; char *l = argc > 1 ? argv[1] : listen_on; int major, minor, patch; - zmq_version (&major, &minor, &patch); + zsys_version (&major, &minor, &patch); printf("built with: ØMQ=%d.%d.%d czmq=%d.%d.%d\n", major, minor, patch, CZMQ_VERSION_MAJOR, CZMQ_VERSION_MINOR,CZMQ_VERSION_PATCH); - ctx = zctx_new(); - sock = zwssock_new_router(ctx); + sock = zwssock_new_router(); zwssock_bind(sock, l); zmsg_t* msg; zframe_t *id; - while (!zctx_interrupted) + while (!zsys_interrupted) { msg = zwssock_recv(sock); @@ -58,5 +58,4 @@ int main(int argc, char **argv) } zwssock_destroy(&sock); - zctx_destroy(&ctx); } diff --git a/src/CZMQ-ZWSSock/zwshandshake.c b/src/CZMQ-ZWSSock/zwshandshake.c index 594eb18..7592ee2 100644 --- a/src/CZMQ-ZWSSock/zwshandshake.c +++ b/src/CZMQ-ZWSSock/zwshandshake.c @@ -293,7 +293,7 @@ bool zwshandshake_validate(zwshandshake_t *self) return true; } -int encode_base64(uint8_t *in, int in_len, char* out, int out_len) +int encode_base64(const uint8_t *in, int in_len, char* out, int out_len) { static const uint8_t base64enc_tab[64] = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/"; @@ -345,7 +345,7 @@ zframe_t* zwshandshake_get_response(zwshandshake_t *self, unsigned char *client_ zdigest_t* digest = zdigest_new(); zdigest_update(digest, (byte *) plain, len); - byte* hash = zdigest_data(digest); + const byte* hash = zdigest_data(digest); char accept_key[150]; diff --git a/src/CZMQ-ZWSSock/zwssock.c b/src/CZMQ-ZWSSock/zwssock.c index 6936157..ac07805 100644 --- a/src/CZMQ-ZWSSock/zwssock.c +++ b/src/CZMQ-ZWSSock/zwssock.c @@ -8,32 +8,30 @@ struct _zwssock_t { - zctx_t *ctx; // Our parent context - void *control; // Control to/from agent - void *data; // Data to/from agent + zactor_t *control_actor; // Control to/from agent + zsock_t *data; // Data to/from agent }; // This background thread does all the real work -static void s_agent_task(void *args, zctx_t *ctx, void *control); +static void s_agent_task(zsock_t *control, void *args); // -------------------------------------------------------------------------- // Constructor -zwssock_t* zwssock_new_router(zctx_t *ctx) +zwssock_t* zwssock_new_router() { zwssock_t *self = (zwssock_t *)zmalloc(sizeof(zwssock_t)); assert(self); - self->ctx = ctx; - self->control = zthread_fork(self->ctx, s_agent_task, NULL); + self->control_actor = zactor_new(s_agent_task, NULL); // Create separate data socket, send address on control socket - self->data = zsocket_new(self->ctx, ZMQ_PAIR); + self->data = zsock_new(ZMQ_PAIR); assert(self->data); - int rc = zsocket_bind(self->data, "inproc://data-%p", self->data); + int rc = zsock_bind(self->data, "inproc://data-%p", self->data); assert(rc != -1); - zstr_sendf(self->control, "inproc://data-%p", self->data); + zstr_sendf(self->control_actor, "inproc://data-%p", self->data); return self; } @@ -46,9 +44,11 @@ void zwssock_destroy(zwssock_t **self_p) assert(self_p); if (*self_p) { zwssock_t *self = *self_p; - zstr_send(self->control, "TERMINATE"); + zactor_destroy(&self->control_actor); - free(zstr_recv(self->control)); + zsock_destroy(&self->data); + + // free(zstr_recv(self->control_actor)); free(self); *self_p = NULL; } @@ -57,7 +57,7 @@ void zwssock_destroy(zwssock_t **self_p) int zwssock_bind(zwssock_t *self, char *endpoint) { assert(self); - return zstr_sendx(self->control, "BIND", endpoint, NULL); + return zstr_sendx(self->control_actor, "BIND", endpoint, NULL); } int zwssock_send(zwssock_t *self, zmsg_t **msg_p) @@ -85,26 +85,23 @@ void* zwssock_handle(zwssock_t *self) // ************************* BACK END AGENT ************************* typedef struct { - zctx_t *ctx; // CZMQ context - void *control; // Control socket back to application - void *data; // Data socket to application - void *stream; // stream socket to server + zsock_t *control; // Control socket back to application + zsock_t *data; // Data socket to application + zsock_t *stream; // stream socket to server zhash_t *clients; // Clients known so far - bool terminated; // Agent terminated by API } agent_t; static agent_t * -s_agent_new(zctx_t *ctx, void *control) +s_agent_new(zsock_t *control) { agent_t *self = (agent_t *)zmalloc(sizeof(agent_t)); - self->ctx = ctx; self->control = control; - self->stream = zsocket_new(ctx, ZMQ_STREAM); + self->stream = zsock_new(ZMQ_STREAM); // Connect our data socket to caller's endpoint - self->data = zsocket_new(ctx, ZMQ_PAIR); + self->data = zsock_new(ZMQ_PAIR); char *endpoint = zstr_recv(self->control); - int rc = zsocket_connect(self->data, "%s", endpoint); + int rc = zsock_connect(self->data, "%s", endpoint); assert(rc != -1); free(endpoint); @@ -119,6 +116,8 @@ s_agent_destroy(agent_t **self_p) if (*self_p) { agent_t *self = *self_p; zhash_destroy(&self->clients); + zsock_destroy(&self->stream); + zsock_destroy(&self->data); free(self); *self_p = NULL; } @@ -252,7 +251,8 @@ void router_message_received(void *tag, byte* payload, int length) self->state = exception; zframe_t *address = zframe_dup(self->address); zframe_send(&address, self->agent->stream, ZFRAME_MORE); - zmq_send(self->agent->stream, NULL, 0, 0); + zframe_t *empty = zframe_new_empty(); + zframe_send(&empty, self->agent->stream, 0); return; } } @@ -281,12 +281,27 @@ void router_message_received(void *tag, byte* payload, int length) void close_received(void *tag, byte* payload, int length) { - // TODO: close received + client_t *self = (client_t *)tag; + zframe_t *address = zframe_dup(self->address); + zframe_send(&address, self->agent->stream, ZFRAME_MORE); + zframe_t *empty = zframe_new_empty(); + zframe_send(&empty, self->agent->stream, 0); } void ping_received(void *tag, byte* payload, int length) { - // TODO: implement ping + client_t *self = (client_t *)tag; + + byte* pong = (byte*)zmalloc(2 + length); + pong[0] = 0x8A; // Pong and Final + pong[1] = (byte)(length & 127); + memcpy(pong + 2, payload, length); + + zframe_t *address = zframe_dup(self->address); + zframe_send(&address, self->agent->stream, ZFRAME_MORE); + zframe_t *pongf = zframe_new(pong, length + 2); + zframe_send(&pongf, self->agent->stream, 0); + free(pong); } void pong_received(void *tag, byte* payload, int length) @@ -300,7 +315,8 @@ static void not_acceptable(zframe_t *_address, void *dest) { zstr_send (dest, "HTTP/1.1 406 Not Acceptable\r\n\r\n"); zframe_send(&address, dest, ZFRAME_MORE); - zmq_send(dest, NULL, 0, 0); + zframe_t *empty = zframe_new_empty(); + zframe_send(&empty, dest, 0); } static void client_data_ready(client_t * self) @@ -401,19 +417,18 @@ s_agent_handle_control(agent_t *self) if (streq(command, "BIND")) { char *endpoint = zmsg_popstr(request); puts(endpoint); - int rc = zsocket_bind(self->stream, "%s", endpoint); + int rc = zsock_bind(self->stream, "%s", endpoint); assert(rc != -1); free(endpoint); } else if (streq(command, "UNBIND")) { char *endpoint = zmsg_popstr(request); - int rc = zsocket_unbind(self->stream, "%s", endpoint); + int rc = zsock_unbind(self->stream, "%s", endpoint); assert(rc != -1); free(endpoint); } - else if (streq(command, "TERMINATE")) { - self->terminated = true; - zstr_send(self->control, "OK"); + else if (streq(command, "$TERM")) { + return -1; } free(command); zmsg_destroy(&request); @@ -550,7 +565,9 @@ s_agent_handle_data(agent_t *self) address = zframe_dup(client->address); zframe_send(&address, self->stream, ZFRAME_MORE); - zsocket_sendmem(self->stream, outgoingData, frameSize, 0); + + zframe_t *data = zframe_new(outgoingData, frameSize); + zframe_send(&data, self->stream, 0); free(compressedPayload); zframe_destroy(&receivedFrame); @@ -573,7 +590,9 @@ s_agent_handle_data(agent_t *self) address = zframe_dup(client->address); zframe_send(&address, self->stream, ZFRAME_MORE); - zsocket_sendmem(self->stream, outgoingData, frameSize, 0); + + zframe_t *data = zframe_new(outgoingData, frameSize); + zframe_send(&data, self->stream, 0); free(outgoingData); zframe_destroy(&receivedFrame); @@ -588,33 +607,41 @@ s_agent_handle_data(agent_t *self) return 0; } -void s_agent_task(void *args, zctx_t *ctx, void *control) +void s_agent_task(zsock_t *control, void *args) { + // let main thread continue + zsock_signal(control, 0); + // Create agent instance as we start this task - agent_t *self = s_agent_new(ctx, control); + agent_t *self = s_agent_new(control); if (!self) // Interrupted return; - // We always poll all three sockets - zmq_pollitem_t pollitems[] = { - { self->control, 0, ZMQ_POLLIN, 0 }, - { self->stream, 0, ZMQ_POLLIN, 0 }, - { self->data, 0, ZMQ_POLLIN, 0 } - }; - while (!zctx_interrupted) { - if (zmq_poll(pollitems, 3, -1) == -1) - break; // Interrupted - - if (pollitems[0].revents & ZMQ_POLLIN) - s_agent_handle_control(self); - if (pollitems[1].revents & ZMQ_POLLIN) - s_agent_handle_router(self); - if (pollitems[2].revents & ZMQ_POLLIN) - s_agent_handle_data(self); + zpoller_t *poller = zpoller_new(self->control, self->stream, self->data, NULL); + assert(poller); + + void *which; - if (self->terminated) + while ((which = zpoller_wait(poller, -1)) != NULL) { + if (zpoller_terminated(poller)) { break; + } + if (which == self->control) { + // Something went wrong + // TODO: use modern CZMQ patterns for handling control pipe + if (s_agent_handle_control(self) == -1) { + break; + } + } + else if (which == self->stream) { + s_agent_handle_router(self); + } + else if (which == self->data) { + s_agent_handle_data(self); + } } + // Done, free all agent resources + zpoller_destroy(&poller); s_agent_destroy(&self); } diff --git a/src/CZMQ-ZWSSock/zwssock.h b/src/CZMQ-ZWSSock/zwssock.h index 8c813e9..6077977 100644 --- a/src/CZMQ-ZWSSock/zwssock.h +++ b/src/CZMQ-ZWSSock/zwssock.h @@ -1,5 +1,6 @@ #ifndef __ZWSSOCK_H_INCLUDED__ #define __ZWSSOCK_H_INCLUDED__ +#define LIBCZMQ_EXPORTS #include @@ -10,7 +11,7 @@ extern "C" { typedef struct _zwssock_t zwssock_t; -CZMQ_EXPORT zwssock_t* zwssock_new_router(zctx_t *ctx); +CZMQ_EXPORT zwssock_t* zwssock_new_router(); CZMQ_EXPORT void zwssock_destroy(zwssock_t **self_p);