Skip to content

Commit

Permalink
Enforce use of random port numbers for local endpoints.
Browse files Browse the repository at this point in the history
This might seem like a limitation, but the more I think about it,
the more I am convinced that this is the right decision and that
the restrictions it puts in place will eventually enable quite
interesting backend architectures.

While there, add "zeromq_single" directive to disable this
enforcement (only for testing and development).

Change-Id: I6cb72c227c8581a907047b1c70edd2d5774cfbdd
Signed-off-by: Piotr Sikora <piotr.sikora@frickle.com>
  • Loading branch information
PiotrSikora committed Jan 13, 2012
1 parent 99b534a commit 4a1bc9d
Show file tree
Hide file tree
Showing 4 changed files with 167 additions and 39 deletions.
22 changes: 17 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ Configuration directives
========================
zeromq_threads
--------------
* **syntax**: `zeromq_threads num`
* **syntax**: `zeromq_threads <number>`
* **default**: `1`
* **context**: `main`

Expand All @@ -50,20 +50,32 @@ Configure number of ZeroMQ I/O threads to be used by each worker process.

zeromq_local
------------
* **syntax**: `zeromq_local socket_type local_endpoint`
* **syntax**: `zeromq_local <socket_type> <local_endpoint>`
* **default**: `none`
* **context**: `upstream`

Configure local ZeroMQ endpoint.
Configure local ZeroMQ endpoint (must use random port numbers).


zeromq_remote
-------------
* **syntax**: `zeromq_remote socket_type remote_endpoint`
* **syntax**: `zeromq_remote <socket_type> <remote_endpoint>`
* **default**: `none`
* **context**: `upstream`

Configure remote ZeroMQ endpoint.
Configure remote ZeroMQ endpoint (cannot use random port numbers).


zeromq_single
-------------
* **syntax**: `zeromq_single on|off`
* **default**: `off`
* **context**: `upstream`

Enable `single` mode which allows use of predefined port numbers for local
endpoints. It comes at a price (only one worker can bind to such endpoint
and it will stop working after `nginx` reload) and it's supposed to be used
only during testing and development. Do not use this mode in production.


Sample configuration
Expand Down
80 changes: 72 additions & 8 deletions src/ngx_event_zeromq.c
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ typedef struct {

static void *ngx_zeromq_get_socket(ngx_connection_t *c);
static void ngx_zeromq_log_error(ngx_log_t *log, const char *text);
static void ngx_zeromq_randomized_endpoint_regen(ngx_str_t *addr);

static ngx_int_t ngx_zeromq_ready(void *zmq, ngx_event_t *ev, const char *what,
uint32_t want);
Expand Down Expand Up @@ -137,6 +138,49 @@ ngx_zeromq_log_error(ngx_log_t *log, const char *text)
}


ngx_zeromq_endpoint_t *
ngx_zeromq_randomized_endpoint(ngx_zeromq_endpoint_t *zep, ngx_pool_t *pool)
{
ngx_zeromq_endpoint_t *rand;

rand = ngx_palloc(pool, sizeof(ngx_zeromq_endpoint_t));
if (rand == NULL) {
return NULL;
}

ngx_memcpy(rand, zep, sizeof(ngx_zeromq_endpoint_t));

rand->addr.data = ngx_pnalloc(pool, zep->addr.len + sizeof("65535"));
ngx_memcpy(rand->addr.data, zep->addr.data, zep->addr.len);

return rand;
}


static void
ngx_zeromq_randomized_endpoint_regen(ngx_str_t *addr)
{
in_port_t port;
u_char *p;

p = addr->data + addr->len;

while (p > addr->data) {
if (*p == ':') {
break;
}

p--;
}

port = 1024 + ngx_pid + ngx_random();

addr->len = ngx_snprintf(p + 1, sizeof("65535") - 1, "%d", port)
- addr->data;
addr->data[addr->len] = '\0';
}


ngx_int_t
ngx_zeromq_connect(ngx_peer_connection_t *pc)
{
Expand All @@ -147,6 +191,7 @@ ngx_zeromq_connect(ngx_peer_connection_t *pc)
void *zmq;
int fd, zero;
size_t fdsize;
ngx_uint_t i;

if (zc == NULL || zc->endpoint == NULL) {
return NGX_ERROR;
Expand Down Expand Up @@ -215,26 +260,45 @@ ngx_zeromq_connect(ngx_peer_connection_t *pc)
}

if (zep->bind) {
if (zmq_bind(zmq, (const char *) zep->addr->data) == -1) {
ngx_zeromq_log_error(pc->log, "zmq_bind()");
goto failed;
if (zep->rand) {
for (i = 0; ; i++) {
ngx_zeromq_randomized_endpoint_regen(&zep->addr);

if (zmq_bind(zmq, (const char *) zep->addr.data) == -1) {

if (ngx_errno == NGX_EADDRINUSE && i < 65535) {
continue;
}

ngx_zeromq_log_error(pc->log, "zmq_bind()");
goto failed;
}

break;
}

} else {
if (zmq_bind(zmq, (const char *) zep->addr.data) == -1) {
ngx_zeromq_log_error(pc->log, "zmq_bind()");
goto failed;
}
}

ev = wev;

} else {
if (zmq_connect(zmq, (const char *) zep->addr->data) == -1) {
if (zmq_connect(zmq, (const char *) zep->addr.data) == -1) {
ngx_zeromq_log_error(pc->log, "zmq_connect()");
goto failed;
}

ev = rev;
}

ngx_log_debug6(NGX_LOG_DEBUG_EVENT, pc->log, 0,
"zmq_connect: lazily %s to %V (%V), zmq:%p fd:%d #%d",
zep->bind ? "bound" : "connected",
zep->addr, &zep->type->name, zmq, fd, c->number);
ngx_log_debug7(NGX_LOG_DEBUG_EVENT, pc->log, 0,
"zmq_connect: %s to %V (%V), fd:%d #%d zc:%p zmq:%p",
zep->bind ? "bound" : "lazily connected",
&zep->addr, &zep->type->name, fd, c->number, zc, zmq);

if (ngx_add_conn) {
/* rtsig */
Expand Down
6 changes: 5 additions & 1 deletion src/ngx_event_zeromq.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,9 @@ typedef struct {

typedef struct {
ngx_zeromq_socket_t *type;
ngx_str_t *addr;
ngx_str_t addr;
unsigned bind:1;
unsigned rand:1;
} ngx_zeromq_endpoint_t;


Expand All @@ -57,6 +58,9 @@ typedef struct {
} ngx_zeromq_connection_t;


ngx_zeromq_endpoint_t *ngx_zeromq_randomized_endpoint(
ngx_zeromq_endpoint_t *zep, ngx_pool_t *pool);

ngx_int_t ngx_zeromq_connect(ngx_peer_connection_t *pc);
void ngx_zeromq_close(ngx_connection_t *c);

Expand Down
98 changes: 73 additions & 25 deletions src/ngx_http_upstream_zeromq_module.c
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,9 @@


typedef struct {
ngx_zeromq_endpoint_t send;
ngx_zeromq_endpoint_t recv;
ngx_zeromq_endpoint_t *send;
ngx_zeromq_endpoint_t *recv;
ngx_flag_t single;
} ngx_http_upstream_zeromq_srv_conf_t;


Expand Down Expand Up @@ -75,6 +76,13 @@ static ngx_command_t ngx_http_upstream_zeromq_commands[] = {
0,
NULL },

{ ngx_string("zeromq_single"),
NGX_HTTP_UPS_CONF|NGX_CONF_FLAG,
ngx_conf_set_flag_slot,
NGX_HTTP_SRV_CONF_OFFSET,
offsetof(ngx_http_upstream_zeromq_srv_conf_t, single),
NULL },

ngx_null_command
};

Expand Down Expand Up @@ -117,20 +125,31 @@ ngx_http_upstream_init_zeromq(ngx_conf_t *cf, ngx_http_upstream_srv_conf_t *us)

zcf = ngx_http_conf_upstream_srv_conf(us, ngx_http_upstream_zeromq_module);

if (zcf->send.addr == NULL) {
if (zcf->send == NULL) {
ngx_conf_log_error(NGX_LOG_EMERG, cf, 0,
"missing sending endpoint in upstream \"%V\"",
&us->host);
return NGX_ERROR;
}

if (zcf->recv.addr == NULL) {
if (zcf->recv == NULL) {
ngx_conf_log_error(NGX_LOG_EMERG, cf, 0,
"missing receiving endpoint in upstream \"%V\"",
&us->host);
return NGX_ERROR;
}

if ((zcf->single != 1)
&& ((zcf->send->bind && !zcf->send->rand)
|| (zcf->recv->bind && !zcf->recv->rand)))
{
ngx_conf_log_error(NGX_LOG_EMERG, cf, 0,
"local endpoint must use random port numbers,"
" use \"tcp://A.B.C.D:*\" in upstream \"%V\"",
&us->host);
return NGX_ERROR;
}

us->peer.init = ngx_http_upstream_init_zeromq_peer;

return NGX_OK;
Expand All @@ -144,15 +163,22 @@ ngx_http_upstream_init_zeromq_peer(ngx_http_request_t *r,
ngx_http_upstream_zeromq_peer_data_t *zp;
ngx_http_upstream_zeromq_srv_conf_t *zcf;

zp = ngx_palloc(r->pool, sizeof(ngx_http_upstream_zeromq_peer_data_t));
zp = ngx_pcalloc(r->pool, sizeof(ngx_http_upstream_zeromq_peer_data_t));
if (zp == NULL) {
return NGX_ERROR;
}

zcf = ngx_http_conf_upstream_srv_conf(us, ngx_http_upstream_zeromq_module);

zp->zc.endpoint = &zcf->send;
zp->zc.socket = NULL;
if (zcf->send->rand) {
zp->zc.endpoint = ngx_zeromq_randomized_endpoint(zcf->send, r->pool);
if (zp->zc.endpoint == NULL) {
return NGX_ERROR;
}

} else {
zp->zc.endpoint = zcf->send;
}

zp->request = r;
ngx_http_set_ctx(r, NULL, ngx_zeromq_module);
Expand Down Expand Up @@ -219,15 +245,12 @@ ngx_http_upstream_zeromq_create_conf(ngx_conf_t *cf)
/*
* set by ngx_pcalloc():
*
* conf->send.type = NULL;
* conf->send.addr = NULL;
* conf->send.bind = 0;
*
* conf->recv.type = NULL;
* conf->recv.addr = NULL;
* conf->recv.bind = 0;
* conf->send = NULL;
* conf->recv = NULL;
*/

conf->single = NGX_CONF_UNSET;

return conf;
}

Expand All @@ -240,6 +263,7 @@ ngx_http_upstream_zeromq_endpoint(ngx_conf_t *cf, ngx_command_t *cmd,
ngx_http_upstream_zeromq_srv_conf_t *zcf = conf;
ngx_http_upstream_srv_conf_t *uscf;
ngx_zeromq_socket_t *type;
ngx_zeromq_endpoint_t *zep;
ngx_uint_t i;

uscf = ngx_http_conf_get_module_srv_conf(cf, ngx_http_upstream_module);
Expand All @@ -259,33 +283,57 @@ ngx_http_upstream_zeromq_endpoint(ngx_conf_t *cf, ngx_command_t *cmd,
}

if (type[i].can_send) {
if (zcf->send.addr) {
if (zcf->send) {
ngx_conf_log_error(NGX_LOG_EMERG, cf, 0,
"sending endpoint already set to"
" \"%V\" (%V) in upstream \"%V\"",
zcf->send.addr, &zcf->send.type->name,
&zcf->send->addr, &zcf->send->type->name,
&uscf->host);
return NGX_CONF_ERROR;
}

zcf->send.type = &type[i];
zcf->send.addr = &value[2];
zcf->send.bind = cmd->offset;
}

if (type[i].can_recv) {
if (zcf->recv.addr) {
if (zcf->recv) {
ngx_conf_log_error(NGX_LOG_EMERG, cf, 0,
"receivng endpoint already set to"
" \"%V\" (%V) in upstream \"%V\"",
zcf->recv.addr, &zcf->recv.type->name,
&zcf->recv->addr, &zcf->recv->type->name,
&uscf->host);
return NGX_CONF_ERROR;
}
}

zcf->recv.type = &type[i];
zcf->recv.addr = &value[2];
zcf->recv.bind = cmd->offset;
zep = ngx_pcalloc(cf->pool, sizeof(ngx_zeromq_endpoint_t));
if (zep == NULL) {
return NGX_CONF_ERROR;
}

zep->type = &type[i];
zep->addr = value[2];
zep->bind = cmd->offset;

if ((ngx_strncmp(zep->addr.data, "tcp://", sizeof("tcp://") - 1) == 0)
&& (ngx_strncmp(zep->addr.data + zep->addr.len - (sizeof(":*") - 1),
":*", sizeof(":*") - 1) == 0))
{
zep->rand = 1;
zep->addr.len -= sizeof("*") - 1;
}

if (zep->rand && !zep->bind) {
ngx_conf_log_error(NGX_LOG_EMERG, cf, 0,
"random port numbers don't make sense for remote"
" endpoint in upstream \"%V\"", &uscf->host);
return NGX_CONF_ERROR;
}

if (type[i].can_send) {
zcf->send = zep;
}

if (type[i].can_recv) {
zcf->recv = zep;
}

uscf->servers = (ngx_array_t *) -1;
Expand Down

0 comments on commit 4a1bc9d

Please sign in to comment.