Skip to content

Commit

Permalink
Merge pull request #20 from pedromellofh/master
Browse files Browse the repository at this point in the history
api fixes to czmq 4
  • Loading branch information
bluca committed Apr 20, 2018
2 parents 1f13cfb + 36e6ed8 commit 01ef07f
Show file tree
Hide file tree
Showing 5 changed files with 93 additions and 65 deletions.
7 changes: 4 additions & 3 deletions src/CZMQ-ZWSSock/CZMQ-ZWSSock.vcxproj
@@ -1,5 +1,5 @@
<?xml version="1.0" encoding="utf-8"?>
<Project DefaultTargets="Build" ToolsVersion="12.0" xmlns="http://schemas.microsoft.com/developer/msbuild/2003">
<Project DefaultTargets="Build" ToolsVersion="15.0" xmlns="http://schemas.microsoft.com/developer/msbuild/2003">
<ItemGroup Label="ProjectConfigurations">
<ProjectConfiguration Include="Debug|Win32">
<Configuration>Debug</Configuration>
Expand All @@ -14,18 +14,19 @@
<ProjectGuid>{D2D01459-3588-4911-916E-FEC6D89E7AC4}</ProjectGuid>
<Keyword>Win32Proj</Keyword>
<RootNamespace>CZMQ-ZWSSock</RootNamespace>
<WindowsTargetPlatformVersion>10.0.16299.0</WindowsTargetPlatformVersion>
</PropertyGroup>
<Import Project="$(VCTargetsPath)\Microsoft.Cpp.Default.props" />
<PropertyGroup Condition="'$(Configuration)|$(Platform)'=='Debug|Win32'" Label="Configuration">
<ConfigurationType>Application</ConfigurationType>
<UseDebugLibraries>true</UseDebugLibraries>
<PlatformToolset>v120</PlatformToolset>
<PlatformToolset>v141</PlatformToolset>
<CharacterSet>Unicode</CharacterSet>
</PropertyGroup>
<PropertyGroup Condition="'$(Configuration)|$(Platform)'=='Release|Win32'" Label="Configuration">
<ConfigurationType>Application</ConfigurationType>
<UseDebugLibraries>false</UseDebugLibraries>
<PlatformToolset>v120</PlatformToolset>
<PlatformToolset>v141</PlatformToolset>
<WholeProgramOptimization>true</WholeProgramOptimization>
<CharacterSet>Unicode</CharacterSet>
</PropertyGroup>
Expand Down
11 changes: 5 additions & 6 deletions src/CZMQ-ZWSSock/main.c
Expand Up @@ -2,31 +2,31 @@

#include "zwssock.h"

#define ZMQ_STATIC

static char *listen_on = "tcp://127.0.0.1:8000";

int main(int argc, char **argv)
{
zctx_t *ctx;
zwssock_t *sock;

char *l = argc > 1 ? argv[1] : listen_on;

int major, minor, patch;
zmq_version (&major, &minor, &patch);
zsys_version (&major, &minor, &patch);
printf("built with: ØMQ=%d.%d.%d czmq=%d.%d.%d\n",
major, minor, patch,
CZMQ_VERSION_MAJOR, CZMQ_VERSION_MINOR,CZMQ_VERSION_PATCH);


ctx = zctx_new();
sock = zwssock_new_router(ctx);
sock = zwssock_new_router();

zwssock_bind(sock, l);

zmsg_t* msg;
zframe_t *id;

while (!zctx_interrupted)
while (!zsys_interrupted)
{
msg = zwssock_recv(sock);

Expand Down Expand Up @@ -58,5 +58,4 @@ int main(int argc, char **argv)
}

zwssock_destroy(&sock);
zctx_destroy(&ctx);
}
4 changes: 2 additions & 2 deletions src/CZMQ-ZWSSock/zwshandshake.c
Expand Up @@ -293,7 +293,7 @@ bool zwshandshake_validate(zwshandshake_t *self)
return true;
}

int encode_base64(uint8_t *in, int in_len, char* out, int out_len)
int encode_base64(const uint8_t *in, int in_len, char* out, int out_len)
{
static const uint8_t base64enc_tab[64] = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/";

Expand Down Expand Up @@ -345,7 +345,7 @@ zframe_t* zwshandshake_get_response(zwshandshake_t *self, unsigned char *client_
zdigest_t* digest = zdigest_new();
zdigest_update(digest, (byte *) plain, len);

byte* hash = zdigest_data(digest);
const byte* hash = zdigest_data(digest);

char accept_key[150];

Expand Down
133 changes: 80 additions & 53 deletions src/CZMQ-ZWSSock/zwssock.c
Expand Up @@ -8,32 +8,30 @@

struct _zwssock_t
{
zctx_t *ctx; // Our parent context
void *control; // Control to/from agent
void *data; // Data to/from agent
zactor_t *control_actor; // Control to/from agent
zsock_t *data; // Data to/from agent
};

// This background thread does all the real work
static void s_agent_task(void *args, zctx_t *ctx, void *control);
static void s_agent_task(zsock_t *control, void *args);

// --------------------------------------------------------------------------
// Constructor

zwssock_t* zwssock_new_router(zctx_t *ctx)
zwssock_t* zwssock_new_router()
{
zwssock_t *self = (zwssock_t *)zmalloc(sizeof(zwssock_t));

assert(self);

self->ctx = ctx;
self->control = zthread_fork(self->ctx, s_agent_task, NULL);
self->control_actor = zactor_new(s_agent_task, NULL);

// Create separate data socket, send address on control socket
self->data = zsocket_new(self->ctx, ZMQ_PAIR);
self->data = zsock_new(ZMQ_PAIR);
assert(self->data);
int rc = zsocket_bind(self->data, "inproc://data-%p", self->data);
int rc = zsock_bind(self->data, "inproc://data-%p", self->data);
assert(rc != -1);
zstr_sendf(self->control, "inproc://data-%p", self->data);
zstr_sendf(self->control_actor, "inproc://data-%p", self->data);

return self;
}
Expand All @@ -46,9 +44,11 @@ void zwssock_destroy(zwssock_t **self_p)
assert(self_p);
if (*self_p) {
zwssock_t *self = *self_p;
zstr_send(self->control, "TERMINATE");
zactor_destroy(&self->control_actor);

free(zstr_recv(self->control));
zsock_destroy(&self->data);

// free(zstr_recv(self->control_actor));
free(self);
*self_p = NULL;
}
Expand All @@ -57,7 +57,7 @@ void zwssock_destroy(zwssock_t **self_p)
int zwssock_bind(zwssock_t *self, char *endpoint)
{
assert(self);
return zstr_sendx(self->control, "BIND", endpoint, NULL);
return zstr_sendx(self->control_actor, "BIND", endpoint, NULL);
}

int zwssock_send(zwssock_t *self, zmsg_t **msg_p)
Expand Down Expand Up @@ -85,26 +85,23 @@ void* zwssock_handle(zwssock_t *self)
// ************************* BACK END AGENT *************************

typedef struct {
zctx_t *ctx; // CZMQ context
void *control; // Control socket back to application
void *data; // Data socket to application
void *stream; // stream socket to server
zsock_t *control; // Control socket back to application
zsock_t *data; // Data socket to application
zsock_t *stream; // stream socket to server
zhash_t *clients; // Clients known so far
bool terminated; // Agent terminated by API
} agent_t;

static agent_t *
s_agent_new(zctx_t *ctx, void *control)
s_agent_new(zsock_t *control)
{
agent_t *self = (agent_t *)zmalloc(sizeof(agent_t));
self->ctx = ctx;
self->control = control;
self->stream = zsocket_new(ctx, ZMQ_STREAM);
self->stream = zsock_new(ZMQ_STREAM);

// Connect our data socket to caller's endpoint
self->data = zsocket_new(ctx, ZMQ_PAIR);
self->data = zsock_new(ZMQ_PAIR);
char *endpoint = zstr_recv(self->control);
int rc = zsocket_connect(self->data, "%s", endpoint);
int rc = zsock_connect(self->data, "%s", endpoint);
assert(rc != -1);
free(endpoint);

Expand All @@ -119,6 +116,8 @@ s_agent_destroy(agent_t **self_p)
if (*self_p) {
agent_t *self = *self_p;
zhash_destroy(&self->clients);
zsock_destroy(&self->stream);
zsock_destroy(&self->data);
free(self);
*self_p = NULL;
}
Expand Down Expand Up @@ -252,7 +251,8 @@ void router_message_received(void *tag, byte* payload, int length)
self->state = exception;
zframe_t *address = zframe_dup(self->address);
zframe_send(&address, self->agent->stream, ZFRAME_MORE);
zmq_send(self->agent->stream, NULL, 0, 0);
zframe_t *empty = zframe_new_empty();
zframe_send(&empty, self->agent->stream, 0);
return;
}
}
Expand Down Expand Up @@ -281,12 +281,27 @@ void router_message_received(void *tag, byte* payload, int length)

void close_received(void *tag, byte* payload, int length)
{
// TODO: close received
client_t *self = (client_t *)tag;
zframe_t *address = zframe_dup(self->address);
zframe_send(&address, self->agent->stream, ZFRAME_MORE);
zframe_t *empty = zframe_new_empty();
zframe_send(&empty, self->agent->stream, 0);
}

void ping_received(void *tag, byte* payload, int length)
{
// TODO: implement ping
client_t *self = (client_t *)tag;

byte* pong = (byte*)zmalloc(2 + length);
pong[0] = 0x8A; // Pong and Final
pong[1] = (byte)(length & 127);
memcpy(pong + 2, payload, length);

zframe_t *address = zframe_dup(self->address);
zframe_send(&address, self->agent->stream, ZFRAME_MORE);
zframe_t *pongf = zframe_new(pong, length + 2);
zframe_send(&pongf, self->agent->stream, 0);
free(pong);
}

void pong_received(void *tag, byte* payload, int length)
Expand All @@ -300,7 +315,8 @@ static void not_acceptable(zframe_t *_address, void *dest) {
zstr_send (dest, "HTTP/1.1 406 Not Acceptable\r\n\r\n");

zframe_send(&address, dest, ZFRAME_MORE);
zmq_send(dest, NULL, 0, 0);
zframe_t *empty = zframe_new_empty();
zframe_send(&empty, dest, 0);
}

static void client_data_ready(client_t * self)
Expand Down Expand Up @@ -401,19 +417,18 @@ s_agent_handle_control(agent_t *self)
if (streq(command, "BIND")) {
char *endpoint = zmsg_popstr(request);
puts(endpoint);
int rc = zsocket_bind(self->stream, "%s", endpoint);
int rc = zsock_bind(self->stream, "%s", endpoint);
assert(rc != -1);
free(endpoint);
}
else if (streq(command, "UNBIND")) {
char *endpoint = zmsg_popstr(request);
int rc = zsocket_unbind(self->stream, "%s", endpoint);
int rc = zsock_unbind(self->stream, "%s", endpoint);
assert(rc != -1);
free(endpoint);
}
else if (streq(command, "TERMINATE")) {
self->terminated = true;
zstr_send(self->control, "OK");
else if (streq(command, "$TERM")) {
return -1;
}
free(command);
zmsg_destroy(&request);
Expand Down Expand Up @@ -550,7 +565,9 @@ s_agent_handle_data(agent_t *self)
address = zframe_dup(client->address);

zframe_send(&address, self->stream, ZFRAME_MORE);
zsocket_sendmem(self->stream, outgoingData, frameSize, 0);

zframe_t *data = zframe_new(outgoingData, frameSize);
zframe_send(&data, self->stream, 0);

free(compressedPayload);
zframe_destroy(&receivedFrame);
Expand All @@ -573,7 +590,9 @@ s_agent_handle_data(agent_t *self)
address = zframe_dup(client->address);

zframe_send(&address, self->stream, ZFRAME_MORE);
zsocket_sendmem(self->stream, outgoingData, frameSize, 0);

zframe_t *data = zframe_new(outgoingData, frameSize);
zframe_send(&data, self->stream, 0);

free(outgoingData);
zframe_destroy(&receivedFrame);
Expand All @@ -588,33 +607,41 @@ s_agent_handle_data(agent_t *self)
return 0;
}

void s_agent_task(void *args, zctx_t *ctx, void *control)
void s_agent_task(zsock_t *control, void *args)
{
// let main thread continue
zsock_signal(control, 0);

// Create agent instance as we start this task
agent_t *self = s_agent_new(ctx, control);
agent_t *self = s_agent_new(control);
if (!self) // Interrupted
return;

// We always poll all three sockets
zmq_pollitem_t pollitems[] = {
{ self->control, 0, ZMQ_POLLIN, 0 },
{ self->stream, 0, ZMQ_POLLIN, 0 },
{ self->data, 0, ZMQ_POLLIN, 0 }
};
while (!zctx_interrupted) {
if (zmq_poll(pollitems, 3, -1) == -1)
break; // Interrupted

if (pollitems[0].revents & ZMQ_POLLIN)
s_agent_handle_control(self);
if (pollitems[1].revents & ZMQ_POLLIN)
s_agent_handle_router(self);
if (pollitems[2].revents & ZMQ_POLLIN)
s_agent_handle_data(self);
zpoller_t *poller = zpoller_new(self->control, self->stream, self->data, NULL);
assert(poller);

void *which;

if (self->terminated)
while ((which = zpoller_wait(poller, -1)) != NULL) {
if (zpoller_terminated(poller)) {
break;
}
if (which == self->control) {
// Something went wrong
// TODO: use modern CZMQ patterns for handling control pipe
if (s_agent_handle_control(self) == -1) {
break;
}
}
else if (which == self->stream) {
s_agent_handle_router(self);
}
else if (which == self->data) {
s_agent_handle_data(self);
}
}

// Done, free all agent resources
zpoller_destroy(&poller);
s_agent_destroy(&self);
}
3 changes: 2 additions & 1 deletion src/CZMQ-ZWSSock/zwssock.h
@@ -1,5 +1,6 @@
#ifndef __ZWSSOCK_H_INCLUDED__
#define __ZWSSOCK_H_INCLUDED__
#define LIBCZMQ_EXPORTS

#include <czmq.h>

Expand All @@ -10,7 +11,7 @@ extern "C" {

typedef struct _zwssock_t zwssock_t;

CZMQ_EXPORT zwssock_t* zwssock_new_router(zctx_t *ctx);
CZMQ_EXPORT zwssock_t* zwssock_new_router();

CZMQ_EXPORT void zwssock_destroy(zwssock_t **self_p);

Expand Down

0 comments on commit 01ef07f

Please sign in to comment.