Permalink
Browse files

setup pre/post splitcopy and pre/post coalesce handlers in msg struct

  • Loading branch information...
1 parent 448e3ef commit 1377dcb5bcf5a0376daf9bb8b02d597268b7ddeb Manju Rajashekhar committed Nov 21, 2012
Showing with 81 additions and 65 deletions.
  1. +2 −2 scripts/pipelined_write.sh
  2. +11 −2 src/nc_message.c
  3. +66 −59 src/nc_message.h
  4. +1 −1 src/nc_request.c
  5. +1 −1 src/nc_response.c
@@ -1,6 +1,6 @@
#!/bin/sh
-socatopt="-t 1 -T 1 -b 16384 -d -d"
+socatopt="-t 1 -T 1 -b 16384"
val=`echo 6^6^6 | bc`
val=`printf "%s" "${val}"`
@@ -25,5 +25,5 @@ printf "%b" "$set_commands" > /tmp/socat.input
# write
for i in `seq 1 16`; do
- cat /tmp/socat.input | socat ${socatopt} - TCP:localhost:22123,nodelay,shut-none,nonblock=1 &
+ cat /tmp/socat.input | socat ${socatopt} - TCP:localhost:22123,nodelay,shut-down,nonblock=1 &
done
View
@@ -220,6 +220,11 @@ _msg_get(void)
msg->parser = NULL;
msg->result = MSG_PARSE_OK;
+ msg->pre_splitcopy = NULL;
+ msg->post_splitcopy = NULL;
+ msg->pre_coalesce = NULL;
+ msg->post_coalesce = NULL;
+
msg->type = MSG_UNKNOWN;
msg->key_start = NULL;
@@ -270,6 +275,10 @@ msg_get(struct conn *conn, bool request, bool redis)
} else {
msg->parser = memcache_parse_rsp;
}
+ msg->pre_splitcopy = memcache_pre_splitcopy;
+ msg->post_splitcopy = memcache_post_splitcopy;
+ msg->pre_coalesce = memcache_pre_coalesce;
+ msg->post_coalesce = memcache_post_coalesce;
}
log_debug(LOG_VVERB, "get msg %p id %"PRIu64" request %d owner sd %d",
@@ -439,12 +448,12 @@ msg_fragment(struct context *ctx, struct conn *conn, struct msg *msg)
ASSERT(conn->client && !conn->proxy);
ASSERT(msg->request);
- nbuf = mbuf_split(&msg->mhdr, msg->pos, memcache_pre_splitcopy, msg);
+ nbuf = mbuf_split(&msg->mhdr, msg->pos, msg->pre_splitcopy, msg);
if (nbuf == NULL) {
return NC_ENOMEM;
}
- status = memcache_post_splitcopy(msg);
+ status = msg->post_splitcopy(msg);
if (status != NC_OK) {
mbuf_put(nbuf);
return status;
View
@@ -21,88 +21,95 @@
#include <nc_core.h>
typedef void (*msg_parse_t)(struct msg *);
+typedef rstatus_t (*msg_post_splitcopy_t)(struct msg *);
+typedef void (*msg_coalesce_t)(struct msg *r);
typedef enum msg_parse_result {
- MSG_PARSE_OK, /* parsing ok */
- MSG_PARSE_ERROR, /* parsing error */
- MSG_PARSE_REPAIR, /* more to parse -> repair parsed & unparsed data */
- MSG_PARSE_FRAGMENT, /* multi-vector request -> fragment */
- MSG_PARSE_AGAIN, /* incomplete -> parse again */
+ MSG_PARSE_OK, /* parsing ok */
+ MSG_PARSE_ERROR, /* parsing error */
+ MSG_PARSE_REPAIR, /* more to parse -> repair parsed & unparsed data */
+ MSG_PARSE_FRAGMENT, /* multi-vector request -> fragment */
+ MSG_PARSE_AGAIN, /* incomplete -> parse again */
} msg_parse_result_t;
typedef enum msg_type {
MSG_UNKNOWN,
- MSG_REQ_MC_GET, /* memcache retrieval requests */
+ MSG_REQ_MC_GET, /* memcache retrieval requests */
MSG_REQ_MC_GETS,
- MSG_REQ_MC_DELETE, /* memcache delete request */
- MSG_REQ_MC_CAS, /* memcache cas request and storage request */
- MSG_REQ_MC_SET, /* memcache storage request */
+ MSG_REQ_MC_DELETE, /* memcache delete request */
+ MSG_REQ_MC_CAS, /* memcache cas request and storage request */
+ MSG_REQ_MC_SET, /* memcache storage request */
MSG_REQ_MC_ADD,
MSG_REQ_MC_REPLACE,
MSG_REQ_MC_APPEND,
MSG_REQ_MC_PREPEND,
- MSG_REQ_MC_INCR, /* memcache arithmetic request */
+ MSG_REQ_MC_INCR, /* memcache arithmetic request */
MSG_REQ_MC_DECR,
- MSG_REQ_MC_QUIT, /* memcache quit request */
- MSG_RSP_MC_NUM, /* memcache arithmetic response */
- MSG_RSP_MC_STORED, /* memcache cas and storage response */
+ MSG_REQ_MC_QUIT, /* memcache quit request */
+ MSG_RSP_MC_NUM, /* memcache arithmetic response */
+ MSG_RSP_MC_STORED, /* memcache cas and storage response */
MSG_RSP_MC_NOT_STORED,
MSG_RSP_MC_EXISTS,
MSG_RSP_MC_NOT_FOUND,
MSG_RSP_MC_END,
MSG_RSP_MC_VALUE,
- MSG_RSP_MC_DELETED, /* memcache delete response */
- MSG_RSP_MC_ERROR, /* memcache error responses */
+ MSG_RSP_MC_DELETED, /* memcache delete response */
+ MSG_RSP_MC_ERROR, /* memcache error responses */
MSG_RSP_MC_CLIENT_ERROR,
MSG_RSP_MC_SERVER_ERROR,
MSG_SENTINEL
} msg_type_t;
struct msg {
- TAILQ_ENTRY(msg) c_tqe; /* link in client q */
- TAILQ_ENTRY(msg) s_tqe; /* link in server q */
- TAILQ_ENTRY(msg) m_tqe; /* link in send q / free q */
-
- uint64_t id; /* message id */
- struct msg *peer; /* message peer */
- struct conn *owner; /* message owner - client | server */
-
- struct rbnode tmo_rbe; /* entry in rbtree */
-
- struct mhdr mhdr; /* message mbuf header */
- uint32_t mlen; /* message length */
-
- int state; /* current parser state */
- uint8_t *pos; /* parser position marker */
- uint8_t *token; /* token marker */
-
- msg_parse_t parser; /* message parser */
- msg_parse_result_t result; /* message parsing result */
-
- msg_type_t type; /* message type */
-
- uint8_t *key_start; /* key start */
- uint8_t *key_end; /* key end */
-
- uint32_t vlen; /* value length (memcache) */
- uint8_t *end; /* end marker (memcache) */
-
- struct msg *frag_owner; /* owner of fragment message */
- uint32_t nfrag; /* # fragment */
- uint64_t frag_id; /* id of fragmented message */
-
- err_t err; /* errno on error? */
- unsigned error:1; /* error? */
- unsigned ferror:1; /* one or more fragments are in error? */
- unsigned request:1; /* request? or response? */
- unsigned quit:1; /* quit request? */
- unsigned noreply:1; /* noreply? */
- unsigned done:1; /* done? */
- unsigned fdone:1; /* all fragments are done? */
- unsigned first_fragment:1;/* first fragment? */
- unsigned last_fragment:1; /* last fragment? */
- unsigned swallow:1; /* swallow response? */
- unsigned redis:1; /* redis? */
+ TAILQ_ENTRY(msg) c_tqe; /* link in client q */
+ TAILQ_ENTRY(msg) s_tqe; /* link in server q */
+ TAILQ_ENTRY(msg) m_tqe; /* link in send q / free q */
+
+ uint64_t id; /* message id */
+ struct msg *peer; /* message peer */
+ struct conn *owner; /* message owner - client | server */
+
+ struct rbnode tmo_rbe; /* entry in rbtree */
+
+ struct mhdr mhdr; /* message mbuf header */
+ uint32_t mlen; /* message length */
+
+ int state; /* current parser state */
+ uint8_t *pos; /* parser position marker */
+ uint8_t *token; /* token marker */
+
+ msg_parse_t parser; /* message parser */
+ msg_parse_result_t result; /* message parsing result */
+
+ mbuf_copy_t pre_splitcopy; /* message pre-split copy */
+ msg_post_splitcopy_t post_splitcopy; /* message post-split copy */
+ msg_coalesce_t pre_coalesce; /* message pre-coalesce */
+ msg_coalesce_t post_coalesce; /* message post-coalesce */
+
+ msg_type_t type; /* message type */
+
+ uint8_t *key_start; /* key start */
+ uint8_t *key_end; /* key end */
+
+ uint32_t vlen; /* value length (memcache) */
+ uint8_t *end; /* end marker (memcache) */
+
+ struct msg *frag_owner; /* owner of fragment message */
+ uint32_t nfrag; /* # fragment */
+ uint64_t frag_id; /* id of fragmented message */
+
+ err_t err; /* errno on error? */
+ unsigned error:1; /* error? */
+ unsigned ferror:1; /* one or more fragments are in error? */
+ unsigned request:1; /* request? or response? */
+ unsigned quit:1; /* quit request? */
+ unsigned noreply:1; /* noreply? */
+ unsigned done:1; /* done? */
+ unsigned fdone:1; /* all fragments are done? */
+ unsigned first_fragment:1;/* first fragment? */
+ unsigned last_fragment:1; /* last fragment? */
+ unsigned swallow:1; /* swallow response? */
+ unsigned redis:1; /* redis? */
};
TAILQ_HEAD(msg_tqh, msg);
View
@@ -136,7 +136,7 @@ req_done(struct conn *conn, struct msg *msg)
ASSERT(msg->frag_owner->nfrag == nfragment);
- memcache_post_coalesce(msg->frag_owner);
+ msg->post_coalesce(msg->frag_owner);
log_debug(LOG_DEBUG, "req from c %d with fid %"PRIu64" and %"PRIu32" "
"fragments is done", conn->sd, id, nfragment);
View
@@ -216,7 +216,7 @@ rsp_forward(struct context *ctx, struct conn *s_conn, struct msg *msg)
pmsg->peer = msg;
msg->peer = pmsg;
- memcache_pre_coalesce(msg);
+ msg->pre_coalesce(msg);
c_conn = pmsg->owner;
ASSERT(c_conn->client && !c_conn->proxy);

0 comments on commit 1377dcb

Please sign in to comment.