Skip to content

Commit

Permalink
able to set arbitery commands as 'broadcast' command.
Browse files Browse the repository at this point in the history
  • Loading branch information
umegaya committed Oct 21, 2015
1 parent 8158d2e commit cb50953
Show file tree
Hide file tree
Showing 7 changed files with 109 additions and 11 deletions.
25 changes: 25 additions & 0 deletions src/nc_message.c
Expand Up @@ -230,6 +230,7 @@ _msg_get(void)
msg->result = MSG_PARSE_OK;

msg->fragment = NULL;
msg->broadcast = NULL;
msg->reply = NULL;
msg->pre_coalesce = NULL;
msg->post_coalesce = NULL;
Expand Down Expand Up @@ -295,6 +296,7 @@ msg_get(struct conn *conn, bool request, bool redis)
}
msg->add_auth = redis_add_auth;
msg->fragment = redis_fragment;
msg->broadcast = redis_broadcast;
msg->reply = redis_reply;
msg->failure = redis_failure;
msg->pre_coalesce = redis_pre_coalesce;
Expand Down Expand Up @@ -888,3 +890,26 @@ msg_send(struct context *ctx, struct conn *conn)

return NC_OK;
}

struct msg *
msg_clone(struct msg *r) {
struct msg *ret = msg_get(r->owner, r->request, r->redis);
struct mbuf *mbuf, *nbuf; /* current and next mbuf */
rstatus_t status; uint32_t mlen;

for (mbuf = STAILQ_FIRST(&r->mhdr); mbuf != NULL; mbuf = nbuf) {
nbuf = STAILQ_NEXT(mbuf, next);

if (mbuf_empty(mbuf)) {
continue;
}

mlen = mbuf_length(mbuf);
status = msg_append(ret, mbuf->pos, mlen);
if (status != NC_OK) {
msg_put(ret);
return NULL;
}
}
return ret;
}
3 changes: 3 additions & 0 deletions src/nc_message.h
Expand Up @@ -25,6 +25,7 @@ typedef rstatus_t (*msg_add_auth_t)(struct context *ctx, struct conn *c_conn, st
typedef rstatus_t (*msg_fragment_t)(struct msg *, uint32_t, struct msg_tqh *);
typedef void (*msg_coalesce_t)(struct msg *r);
typedef rstatus_t (*msg_reply_t)(struct msg *r);
typedef bool (*msg_broadcast_t)(struct msg *r);
typedef bool (*msg_failure_t)(struct msg *r);

typedef enum msg_parse_result {
Expand Down Expand Up @@ -224,6 +225,7 @@ struct msg {

msg_fragment_t fragment; /* message fragment */
msg_reply_t reply; /* generate message reply (example: ping) */
msg_broadcast_t broadcast; /* check this message need broadcast */
msg_add_auth_t add_auth; /* add auth message when we forward msg */
msg_failure_t failure; /* transient failure response? */

Expand Down Expand Up @@ -274,6 +276,7 @@ void msg_deinit(void);
struct string *msg_type_string(msg_type_t type);
struct msg *msg_get(struct conn *conn, bool request, bool redis);
void msg_put(struct msg *msg);
struct msg *msg_clone(struct msg *src);
struct msg *msg_get_error(bool redis, err_t err);
void msg_dump(struct msg *msg, int level);
bool msg_empty(struct msg *msg);
Expand Down
72 changes: 62 additions & 10 deletions src/nc_request.c
Expand Up @@ -553,14 +553,14 @@ req_forward_stats(struct context *ctx, struct server *server, struct msg *msg)
}

static void
req_forward(struct context *ctx, struct conn *c_conn, struct msg *msg)
req_forward_base(struct context *ctx, struct conn *c_conn, struct msg *msg, struct conn *s_conn)
{
rstatus_t status;
struct conn *s_conn;
struct server_pool *pool;
uint8_t *key;
uint32_t keylen;
struct keypos *kpos;
bool fixed_dest = !s_conn;

ASSERT(c_conn->client && !c_conn->proxy);

Expand All @@ -571,12 +571,14 @@ req_forward(struct context *ctx, struct conn *c_conn, struct msg *msg)

pool = c_conn->owner;

ASSERT(array_n(msg->keys) > 0);
kpos = array_get(msg->keys, 0);
key = kpos->start;
keylen = (uint32_t)(kpos->end - kpos->start);
if (s_conn == NULL) {
ASSERT(array_n(msg->keys) > 0);
kpos = array_get(msg->keys, 0);
key = kpos->start;
keylen = (uint32_t)(kpos->end - kpos->start);

s_conn = server_pool_conn(ctx, c_conn->owner, key, keylen);
s_conn = server_pool_conn(ctx, c_conn->owner, key, keylen);
}
if (s_conn == NULL) {
req_forward_error(ctx, c_conn, msg);
return;
Expand Down Expand Up @@ -606,9 +608,47 @@ req_forward(struct context *ctx, struct conn *c_conn, struct msg *msg)

req_forward_stats(ctx, s_conn->owner, msg);

log_debug(LOG_VERB, "forward from c %d to s %d req %"PRIu64" len %"PRIu32
" type %d with key '%.*s'", c_conn->sd, s_conn->sd, msg->id,
msg->mlen, msg->type, keylen, key);
if (fixed_dest) {
log_debug(LOG_VERB, "forward from c %d to s %d req %"PRIu64" len %"PRIu32
" type %d", c_conn->sd, s_conn->sd, msg->id,
msg->mlen, msg->type);
}
else {
log_debug(LOG_VERB, "forward from c %d to s %d req %"PRIu64" len %"PRIu32
" type %d with key '%.*s'", c_conn->sd, s_conn->sd, msg->id,
msg->mlen, msg->type, keylen, key);
}
}

static void
req_forward(struct context *ctx, struct conn *c_conn, struct msg *msg)
{
req_forward_base(ctx, c_conn, msg, NULL);
}

struct broadcast_data {
struct msg *msg;
struct context *ctx;
};

static rstatus_t req_server_pool_iter(void *elem, void *data)
{
struct broadcast_data *p = data;
struct conn *s_conn = server_established_conn(p->ctx, elem);
struct msg *copied = msg_clone(p->msg);
if (!copied) {
return NC_ENOMEM;
}
req_forward_base(p->ctx, p->msg->owner, copied, s_conn);
return NC_OK;
}

static rstatus_t
broadcast(struct context *ctx, struct conn *c_conn, struct msg *msg)
{
struct server_pool *pool = c_conn->owner;
struct broadcast_data data = { msg, ctx };
return array_each(&(pool->server), req_server_pool_iter, &data);
}

void
Expand Down Expand Up @@ -655,6 +695,18 @@ req_recv_done(struct context *ctx, struct conn *conn, struct msg *msg,
return;
}

/* if required, do broadcast */
if (msg->broadcast && msg->broadcast(msg)) {
status = broadcast(ctx, conn, msg);
if (status != NC_OK) {
if (!msg->noreply) {
conn->enqueue_outq(ctx, conn, msg);
}
req_forward_error(ctx, conn, msg);
}
return;
}

/* do fragment */
pool = conn->owner;
TAILQ_INIT(&frag_msgq);
Expand Down
11 changes: 11 additions & 0 deletions src/nc_server.c
Expand Up @@ -215,6 +215,17 @@ server_conn(struct server *server)
return conn;
}

struct conn *
server_established_conn(struct context *ctx, struct server *server) {
struct conn *conn = server_conn(server);
rstatus_t status = server_connect(ctx, server, conn);
if (status != NC_OK) {
server_close(ctx, conn);
return NULL;
}
return conn;
}

static rstatus_t
server_each_preconnect(void *elem, void *data)
{
Expand Down
1 change: 1 addition & 0 deletions src/nc_server.h
Expand Up @@ -130,6 +130,7 @@ bool server_active(struct conn *conn);
rstatus_t server_init(struct array *server, struct array *conf_server, struct server_pool *sp);
void server_deinit(struct array *server);
struct conn *server_conn(struct server *server);
struct conn *server_established_conn(struct context *ctx, struct server *server);
rstatus_t server_connect(struct context *ctx, struct server *server, struct conn *conn);
void server_close(struct context *ctx, struct conn *conn);
void server_connected(struct context *ctx, struct conn *conn);
Expand Down
1 change: 1 addition & 0 deletions src/proto/nc_proto.h
Expand Up @@ -156,6 +156,7 @@ void redis_pre_coalesce(struct msg *r);
void redis_post_coalesce(struct msg *r);
rstatus_t redis_add_auth(struct context *ctx, struct conn *c_conn, struct conn *s_conn);
rstatus_t redis_fragment(struct msg *r, uint32_t ncontinuum, struct msg_tqh *frag_msgq);
bool redis_broadcast(struct msg *r);
rstatus_t redis_reply(struct msg *r);
void redis_post_connect(struct context *ctx, struct conn *conn, struct server *server);
void redis_swallow_msg(struct conn *conn, struct msg *pmsg, struct msg *msg);
Expand Down
7 changes: 6 additions & 1 deletion src/proto/nc_redis.c
Expand Up @@ -324,7 +324,7 @@ redis_argeval(struct msg *r)
*
*/
static bool
redis_need_bcast(struct msg *r)
redis_need_broadcast(struct msg *r)
{
switch (r->type) {
case MSG_REQ_REDIS_SCRIPT:
Expand Down Expand Up @@ -2701,6 +2701,11 @@ redis_fragment(struct msg *r, uint32_t ncontinuum, struct msg_tqh *frag_msgq)
}
}

bool
redis_broadcast(struct msg *r) {
return redis_need_broadcast(r);
}

rstatus_t
redis_reply(struct msg *r)
{
Expand Down

0 comments on commit cb50953

Please sign in to comment.