Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

Binary protocol updates from the fourth hackathon.

Also separated much of the headers that may be interesting to client
implementors into a separate header file.
  • Loading branch information...
commit a85a6e15d38fc9a0c15d23073a8b38fc54edb1c1 1 parent 98a629b
Trond Norbye authored
Showing with 814 additions and 279 deletions.
  1. +467 −232 memcached.c
  2. +9 −43 memcached.h
  3. +334 −0 protocol_binary.h
  4. +4 −4 thread.c
View
699 memcached.c
@@ -94,6 +94,7 @@ static int transmit(conn *c);
static int ensure_iov_space(conn *c);
static int add_iov(conn *c, const void *buf, int len);
static int add_msghdr(conn *c);
+static uint64_t swap64(uint64_t in);
/* time handling */
static void set_current_time(void); /* update the global variable holding
@@ -840,7 +841,7 @@ static void complete_nread_ascii(conn *c) {
if (strncmp(ITEM_data(it) + it->nbytes - 2, "\r\n", 2) != 0) {
out_string(c, "CLIENT_ERROR bad data chunk");
} else {
- ret = store_item(it, comm);
+ ret = store_item(it, comm, c);
if (ret == 1)
out_string(c, "STORED");
else if(ret == 2)
@@ -855,9 +856,25 @@ static void complete_nread_ascii(conn *c) {
c->item = 0;
}
-static void add_bin_header(conn *c, int err, int hdr_len, int body_len) {
- int i = 0;
- uint32_t *res_header;
+/**
+ * get a pointer to the start of the request struct for the current command
+ */
+static void* binary_get_request(conn *c) {
+ char *ret = c->rcurr;
+ ret -= (sizeof(c->binary_header) + c->binary_header.request.keylen +
+ c->binary_header.request.extlen);
+ return ret;
+}
+
+/**
+ * get a pointer to the key in this request
+ */
+static char* binary_get_key(conn *c) {
+ return c->rcurr - (c->binary_header.request.keylen);
+}
+
+static void add_bin_header(conn *c, uint16_t err, uint8_t hdr_len, uint16_t key_len, uint32_t body_len) {
+ protocol_binary_response_header* header;
assert(c);
assert(body_len >= 0);
@@ -871,64 +888,76 @@ static void add_bin_header(conn *c, int err, int hdr_len, int body_len) {
return;
}
- res_header = (uint32_t *)c->wbuf;
+ header = (protocol_binary_response_header *)c->wbuf;
- res_header[0] = ((uint32_t)BIN_RES_MAGIC) << 24;
- res_header[0] |= ((0xff & c->cmd) << 16);
- res_header[0] |= err & 0xffff;
+ header->response.magic = (uint8_t)PROTOCOL_BINARY_RES;
+ header->response.opcode = c->cmd;
+ header->response.keylen = (uint16_t)htons(key_len);
- res_header[1] = hdr_len << 24;
- /* TODO: Support datatype */
- res_header[2] = body_len;
- res_header[3] = c->opaque;
+ header->response.extlen = (uint8_t)hdr_len;
+ header->response.datatype = (uint8_t)PROTOCOL_BINARY_RAW_BYTES;
+ header->response.status = (uint16_t)htons(err);
- if(settings.verbose > 1) {
- fprintf(stderr, "Writing bin response: %08x %08x %08x %08x\n",
- res_header[0], res_header[1], res_header[2], res_header[3]);
- }
+ header->response.bodylen = htonl(body_len);
+ header->response.opaque = c->opaque;
+ header->response.cas = swap64(c->cas);
- for(i = 0; i<BIN_PKT_HDR_WORDS; i++) {
- res_header[i] = htonl(res_header[i]);
+ if (settings.verbose > 1) {
+ int ii;
+ fprintf(stderr, ">%d Writing bin response:", c->sfd);
+ for (ii = 0; ii < sizeof(header->bytes); ++ii) {
+ if (ii % 4 == 0) {
+ fprintf(stderr, "\n>%d ", c->sfd);
+ }
+ fprintf(stderr, " 0x%02x", header->bytes[ii]);
+ }
+ fprintf(stderr, "\n");
}
- assert(c->wsize >= MIN_BIN_PKT_LENGTH);
- add_iov(c, c->wbuf, MIN_BIN_PKT_LENGTH);
+ add_iov(c, c->wbuf, sizeof(header->response));
}
-static void write_bin_error(conn *c, int err, int swallow) {
+static void write_bin_error(conn *c, protocol_binary_response_status err, int swallow) {
const char *errstr = "Unknown error";
- switch(err) {
- case ERR_OUT_OF_MEMORY:
+ size_t len;
+
+ switch (err) {
+ case PROTOCOL_BINARY_RESPONSE_ENOMEM:
errstr = "Out of memory";
break;
- case ERR_UNKNOWN_CMD:
+ case PROTOCOL_BINARY_RESPONSE_UNKNOWN_COMMAND:
errstr = "Unknown command";
break;
- case ERR_NOT_FOUND:
+ case PROTOCOL_BINARY_RESPONSE_KEY_ENOENT:
errstr = "Not found";
break;
- case ERR_INVALID_ARGUMENTS:
+ case PROTOCOL_BINARY_RESPONSE_EINVAL:
errstr = "Invalid arguments";
break;
- case ERR_EXISTS:
+ case PROTOCOL_BINARY_RESPONSE_KEY_EEXISTS:
errstr = "Data exists for key.";
break;
- case ERR_TOO_LARGE:
+ case PROTOCOL_BINARY_RESPONSE_E2BIG:
errstr = "Too large.";
break;
- case ERR_NOT_STORED:
+ case PROTOCOL_BINARY_RESPONSE_NOT_STORED:
errstr = "Not stored.";
break;
default:
+ assert(false);
errstr = "UNHANDLED ERROR";
- fprintf(stderr, "UNHANDLED ERROR: %d\n", err);
+ fprintf(stderr, ">%d UNHANDLED ERROR: %d\n", c->sfd, err);
}
- if(settings.verbose > 0) {
- fprintf(stderr, "Writing an error: %s\n", errstr);
+
+ if (settings.verbose > 0) {
+ fprintf(stderr, ">%d Writing an error: %s\n", c->sfd, errstr);
}
- add_bin_header(c, err, 0, strlen(errstr));
- add_iov(c, errstr, strlen(errstr));
+ len = strlen(errstr);
+ add_bin_header(c, err, 0, 0, len);
+ if (len > 0) {
+ add_iov(c, errstr, len);
+ }
conn_set_state(c, conn_mwrite);
if(swallow > 0) {
c->sbytes = swallow;
@@ -939,8 +968,8 @@ static void write_bin_error(conn *c, int err, int swallow) {
}
/* Form and send a response to a command over the binary protocol */
-static void write_bin_response(conn *c, void *d, int hlen, int dlen) {
- add_bin_header(c, 0, hlen, dlen);
+static void write_bin_response(conn *c, void *d, int hlen, int keylen, int dlen) {
+ add_bin_header(c, 0, hlen, keylen, dlen);
if(dlen > 0) {
add_iov(c, d, dlen);
}
@@ -949,7 +978,7 @@ static void write_bin_response(conn *c, void *d, int hlen, int dlen) {
}
/* Byte swap a 64-bit number */
-static int64_t swap64(int64_t in) {
+static uint64_t swap64(uint64_t in) {
#ifdef ENDIAN_LITTLE
/* Little endian, flip the bytes around until someone makes a faster/better
* way to do this. */
@@ -968,64 +997,73 @@ static int64_t swap64(int64_t in) {
static void complete_incr_bin(conn *c) {
item *it;
- int64_t delta;
- uint64_t initial;
- int32_t exptime;
char *key;
size_t nkey;
- int i;
- uint64_t *response_buf = (uint64_t*) c->wbuf + BIN_INCR_HDR_LEN;
-
- assert(c != NULL);
-
- key = c->rbuf + BIN_INCR_HDR_LEN;
- nkey = c->keylen;
- key[nkey] = 0x00;
+#define INCR_MAX_STORAGE_LEN 24
- delta = swap64(*((int64_t*)(c->rbuf)));
- initial = (uint64_t)swap64(*((int64_t*)(c->rbuf + 8)));
- exptime = ntohl(*((int*)(c->rbuf + 16)));
+ protocol_binary_response_incr* rsp = (protocol_binary_response_incr*)c->wbuf;
+ protocol_binary_request_incr* req = binary_get_request(c);
- if(settings.verbose) {
+ assert(c != NULL);
+ assert(c->rbytes >= sizeof(*req));
+ assert(c->wsize >= sizeof(*rsp));
+
+ /* fix byteorder in the request */
+ req->message.body.delta = swap64(req->message.body.delta);
+ req->message.body.initial = swap64(req->message.body.initial);
+ req->message.body.expiration = ntohl(req->message.body.expiration);
+ key = binary_get_key(c);
+ nkey = c->binary_header.request.keylen;
+
+ if (settings.verbose) {
+ int i;
fprintf(stderr, "incr ");
- for(i = 0; i<nkey; i++) {
+
+ for (i = 0; i < nkey; i++) {
fprintf(stderr, "%c", key[i]);
}
- fprintf(stderr, " %lld, %llu, %d\n", delta, initial, exptime);
+ fprintf(stderr, " %lld, %llu, %d\n", req->message.body.delta,
+ req->message.body.initial, req->message.body.expiration);
}
it = item_get(key, nkey);
- if (it) {
+ if (it && (c->binary_header.request.cas == 0 || c->binary_header.request.cas == it->cas_id)) {
/* Weird magic in add_delta forces me to pad here */
char tmpbuf[INCR_MAX_STORAGE_LEN];
uint64_t l = 0;
- memset(tmpbuf, ' ', INCR_MAX_STORAGE_LEN);
- tmpbuf[INCR_MAX_STORAGE_LEN] = 0x00;
- add_delta(it, c->cmd == CMD_INCR, delta, tmpbuf);
- *response_buf = swap64(strtoull(tmpbuf, NULL, 10));
-
- write_bin_response(c, response_buf, BIN_INCR_HDR_LEN, INCR_RES_LEN);
+ add_delta(it, c->cmd == PROTOCOL_BINARY_CMD_INCREMENT,
+ req->message.body.delta, tmpbuf);
+ rsp->message.body.value = swap64(strtoull(tmpbuf, NULL, 10));
+ write_bin_response(c, &rsp->message.body, 0, 0,
+ sizeof(rsp->message.body.value));
item_remove(it); /* release our reference */
- } else {
- if(exptime >= 0) {
- /* Save some room for the response */
- assert(c->wsize > BIN_INCR_HDR_LEN + BIN_DEL_HDR_LEN);
- *response_buf = swap64(initial);
- it = item_alloc(key, nkey, 0, realtime(exptime),
- INCR_MAX_STORAGE_LEN);
- snprintf(ITEM_data(it), INCR_MAX_STORAGE_LEN, "%llu", initial);
-
- if(store_item(it, NREAD_SET)) {
- write_bin_response(c, response_buf, BIN_INCR_HDR_LEN,
- INCR_RES_LEN);
+ } else if (!it && req->message.body.expiration != 0xffffffff) {
+ /* Save some room for the response */
+ rsp->message.body.value = swap64(req->message.body.initial);
+ it = item_alloc(key, nkey, 0, realtime(req->message.body.expiration),
+ INCR_MAX_STORAGE_LEN);
+
+ if (it != NULL) {
+ snprintf(ITEM_data(it), INCR_MAX_STORAGE_LEN, "%llu",
+ req->message.body.initial);
+
+ if (store_item(it, NREAD_SET, c)) {
+ write_bin_response(c, &rsp->message.body, 0, 0, sizeof(rsp->message.body.value));
} else {
- write_bin_error(c, ERR_NOT_STORED, 0);
+ write_bin_error(c, PROTOCOL_BINARY_RESPONSE_NOT_STORED, 0);
}
item_remove(it); /* release our reference */
} else {
- write_bin_error(c, ERR_NOT_FOUND, 0);
+ write_bin_error(c, PROTOCOL_BINARY_RESPONSE_ENOMEM, 0);
}
+ } else if (it) {
+ /* incorrect CAS */
+ item_remove(it); /* release our reference */
+ write_bin_error(c, PROTOCOL_BINARY_RESPONSE_KEY_EEXISTS, 0);
+ } else {
+ write_bin_error(c, PROTOCOL_BINARY_RESPONSE_KEY_ENOENT, 0);
}
+#undef INCR_MAX_STORAGE_LEN
}
static void complete_update_bin(conn *c) {
@@ -1043,24 +1081,24 @@ static void complete_update_bin(conn *c) {
*(ITEM_data(it) + it->nbytes - 2) = '\r';
*(ITEM_data(it) + it->nbytes - 1) = '\n';
- switch (store_item(it, c->item_comm)) {
+ switch (store_item(it, c->item_comm, c)) {
case 1:
/* Stored */
- write_bin_response(c, NULL, BIN_SET_HDR_LEN, 0);
+ write_bin_response(c, NULL, 0, 0, 0);
break;
case 2:
- write_bin_error(c, ERR_EXISTS, 0);
+ write_bin_error(c, PROTOCOL_BINARY_RESPONSE_KEY_EEXISTS, 0);
break;
case 3:
- write_bin_error(c, ERR_NOT_FOUND, 0);
+ write_bin_error(c, PROTOCOL_BINARY_RESPONSE_KEY_ENOENT, 0);
break;
default:
- if(c->item_comm == NREAD_ADD) {
- eno = ERR_EXISTS;
+ if (c->item_comm == NREAD_ADD) {
+ eno = PROTOCOL_BINARY_RESPONSE_KEY_EEXISTS;
} else if(c->item_comm == NREAD_REPLACE) {
- eno = ERR_NOT_FOUND;
+ eno = PROTOCOL_BINARY_RESPONSE_KEY_ENOENT;
} else {
- eno = ERR_NOT_STORED;
+ eno = PROTOCOL_BINARY_RESPONSE_NOT_STORED;
}
write_bin_error(c, eno, 0);
}
@@ -1072,40 +1110,61 @@ static void complete_update_bin(conn *c) {
static void process_bin_get(conn *c) {
item *it;
- it = item_get(c->rbuf, c->keylen);
- if (it) {
- int *flags;
- uint64_t* identifier;
+ protocol_binary_response_get* rsp = (protocol_binary_response_get*)c->wbuf;
+ protocol_binary_request_get* req = binary_get_request(c);
+ char* key = binary_get_key(c);
+ size_t nkey = c->binary_header.request.keylen;
- assert(c->rsize >= MIN_BIN_PKT_LENGTH + 4);
+ if (settings.verbose) {
+ int ii;
+ fprintf(stderr, "<%d GET ", c->sfd);
+ for (ii = 0; ii < nkey; ++ii) {
+ fprintf(stderr, "%c", key[ii]);
+ }
+ fprintf(stderr, "\n");
+ }
- /* the length has two unnecessary bytes, and then we write four more */
- add_bin_header(c, 0, GET_RES_HDR_LEN, it->nbytes - 2 + GET_RES_HDR_LEN);
+ it = item_get(key, nkey);
+ if (it) {
+ /* the length has two unnecessary bytes ("\r\n") */
+ uint16_t keylen = 0;
+ uint32_t bodylen = sizeof(rsp->message.body) + (it->nbytes - 2);
- /* Add the "extras" field: CAS-id followed by flags. The cas is a 64-
- bit datatype and require alignment to 8-byte boundaries on some
- architechtures. Verify that the size of the packet header is of
- the correct size (if not the following code generates SIGBUS on
- sparc hardware).
- */
- assert(MIN_BIN_PKT_LENGTH % 8 == 0);
- identifier = (uint64_t*)(c->wbuf + MIN_BIN_PKT_LENGTH);
- *identifier = swap64(it->cas_id);
- add_iov(c, identifier, 8);
+ if (c->cmd == PROTOCOL_BINARY_CMD_GETK ||
+ c->cmd == PROTOCOL_BINARY_CMD_GETKQ) {
+ bodylen += nkey;
+ keylen = nkey;
+ }
+ add_bin_header(c, 0, sizeof(rsp->message.body), keylen, bodylen);
+ rsp->message.header.response.cas = swap64(it->cas_id);
+
+ // add the flags
+ rsp->message.body.flags = htonl(strtoul(ITEM_suffix(it), NULL, 10));
+ add_iov(c, &rsp->message.body, sizeof(rsp->message.body));
- /* Add the flags */
- flags = (int*)(c->wbuf + MIN_BIN_PKT_LENGTH + 8);
- *flags = htonl(strtoul(ITEM_suffix(it), NULL, 10));
- add_iov(c, flags, 4);
+ if (c->cmd == PROTOCOL_BINARY_CMD_GETK ||
+ c->cmd == PROTOCOL_BINARY_CMD_GETKQ) {
+ add_iov(c, ITEM_key(it), nkey);
+ }
/* Add the data minus the CRLF */
add_iov(c, ITEM_data(it), it->nbytes - 2);
conn_set_state(c, conn_mwrite);
} else {
- if(c->cmd == CMD_GETQ) {
+ if (c->cmd == PROTOCOL_BINARY_CMD_GETQ ||
+ c->cmd == PROTOCOL_BINARY_CMD_GETKQ) {
conn_set_state(c, conn_new_cmd);
} else {
- write_bin_error(c, ERR_NOT_FOUND, 0);
+ if (c->cmd == PROTOCOL_BINARY_CMD_GETK) {
+ char *ofs = c->wbuf + sizeof(protocol_binary_response_header);
+ add_bin_header(c, PROTOCOL_BINARY_RESPONSE_KEY_ENOENT,
+ 0, nkey, nkey);
+ memcpy(ofs, key, nkey);
+ add_iov(c, ofs, nkey);
+ conn_set_state(c, conn_mwrite);
+ } else {
+ write_bin_error(c, PROTOCOL_BINARY_RESPONSE_KEY_ENOENT, 0);
+ }
}
}
}
@@ -1115,50 +1174,99 @@ static void bin_read_key(conn *c, enum bin_substates next_substate, int extra) {
c->substate = next_substate;
c->rlbytes = c->keylen + extra;
assert(c->rsize >= c->rlbytes);
- c->ritem = c->rbuf;
+ /* preserve the header in the buffer.. */
+ c->ritem = c->rcurr + sizeof(protocol_binary_request_header);
conn_set_state(c, conn_nread);
}
static void dispatch_bin_command(conn *c) {
time_t exptime = 0;
+ int protocol_error = 0;
+
+ int extlen = c->binary_header.request.extlen;
+ int keylen = c->binary_header.request.keylen;
+ uint32_t bodylen = c->binary_header.request.bodylen;
+
switch(c->cmd) {
- case CMD_VERSION:
- write_bin_response(c, VERSION, 0, strlen(VERSION));
+ case PROTOCOL_BINARY_CMD_VERSION:
+ if (extlen == 0 && keylen == 0 && bodylen == 0) {
+ write_bin_response(c, VERSION, 0, 0, strlen(VERSION));
+ } else {
+ protocol_error = 1;
+ }
break;
- case CMD_FLUSH:
- set_current_time();
-
- settings.oldest_live = current_time - 1;
- item_flush_expired();
- write_bin_response(c, NULL, 0, 0);
+ case PROTOCOL_BINARY_CMD_FLUSH:
+ if (keylen == 0 && bodylen == extlen && (extlen == 0 || extlen == 4)) {
+ bin_read_key(c, bin_read_flush_exptime, extlen);
+ } else {
+ protocol_error = 1;
+ }
break;
- case CMD_NOOP:
- write_bin_response(c, NULL, 0, 0);
+ case PROTOCOL_BINARY_CMD_NOOP:
+ if (extlen == 0 && keylen == 0 && bodylen == 0) {
+ write_bin_response(c, NULL, 0, 0, 0);
+ } else {
+ protocol_error = 1;
+ }
break;
- case CMD_SET:
- /* Fallthrough */
- case CMD_ADD:
- /* Fallthrough */
- case CMD_REPLACE:
- bin_read_key(c, bin_reading_set_header, BIN_SET_HDR_LEN);
+ case PROTOCOL_BINARY_CMD_SET: /* FALLTHROUGH */
+ case PROTOCOL_BINARY_CMD_ADD: /* FALLTHROUGH */
+ case PROTOCOL_BINARY_CMD_REPLACE:
+ if (extlen == 8 && keylen != 0 && bodylen >= (keylen + 8)) {
+ bin_read_key(c, bin_reading_set_header, 8);
+ } else {
+ protocol_error = 1;
+ }
break;
- case CMD_GETQ:
- case CMD_GET:
- bin_read_key(c, bin_reading_get_key, 0);
+ case PROTOCOL_BINARY_CMD_GETQ: /* FALLTHROUGH */
+ case PROTOCOL_BINARY_CMD_GET: /* FALLTHROUGH */
+ case PROTOCOL_BINARY_CMD_GETKQ: /* FALLTHROUGH */
+ case PROTOCOL_BINARY_CMD_GETK:
+ if (extlen == 0 && bodylen == keylen && keylen > 0) {
+ bin_read_key(c, bin_reading_get_key, 0);
+ } else {
+ protocol_error = 1;
+ }
break;
- case CMD_DELETE:
- bin_read_key(c, bin_reading_del_header, BIN_DEL_HDR_LEN);
+ case PROTOCOL_BINARY_CMD_DELETE:
+ if (keylen > 0 && (extlen == 0 || extlen == 4) && bodylen == (keylen + extlen)) {
+ bin_read_key(c, bin_reading_del_header, extlen);
+ } else {
+ protocol_error = 1;
+ }
break;
- case CMD_INCR:
- case CMD_DECR:
- bin_read_key(c, bin_reading_incr_header, BIN_INCR_HDR_LEN);
+ case PROTOCOL_BINARY_CMD_INCREMENT:
+ case PROTOCOL_BINARY_CMD_DECREMENT:
+ if (keylen > 0 && extlen == 20 && bodylen == (keylen + extlen)) {
+ bin_read_key(c, bin_reading_incr_header, 20);
+ } else {
+ protocol_error = 1;
+ }
break;
- case CMD_QUIT:
- write_bin_response(c, NULL, 0, 0);
- c->write_and_go = conn_closing;
+ case PROTOCOL_BINARY_CMD_APPEND:
+ case PROTOCOL_BINARY_CMD_PREPEND:
+ if (keylen > 0 && extlen == 0) {
+ bin_read_key(c, bin_reading_set_header, 0);
+ } else {
+ protocol_error = 1;
+ }
+ break;
+ case PROTOCOL_BINARY_CMD_QUIT:
+ if (keylen == 0 && extlen == 0 && bodylen == 0) {
+ write_bin_response(c, NULL, 0, 0, 0);
+ c->write_and_go = conn_closing;
+ } else {
+ protocol_error = 1;
+ }
break;
default:
- write_bin_error(c, ERR_UNKNOWN_CMD, c->bin_header[2]);
+ write_bin_error(c, PROTOCOL_BINARY_RESPONSE_UNKNOWN_COMMAND, bodylen);
+ }
+
+ if (protocol_error) {
+ /* Just write an error message and disconnect the client */
+ write_bin_error(c, PROTOCOL_BINARY_RESPONSE_EINVAL, 0);
+ c->write_and_go = conn_closing;
}
}
@@ -1166,53 +1274,68 @@ static void process_bin_update(conn *c) {
char *key;
int nkey;
int vlen;
- int flags;
- int exptime;
item *it;
- int comm;
- int hdrlen = BIN_SET_HDR_LEN;
+ protocol_binary_request_set* req = binary_get_request(c);
assert(c != NULL);
+ assert(c->rbytes >= sizeof(*req));
- key = c->rbuf + hdrlen;
- nkey = c->keylen;
- key[nkey] = 0x00;
+ key = binary_get_key(c);
+ nkey = c->binary_header.request.keylen;
- flags = ntohl(*((int*)(c->rbuf + 8)));
- exptime = ntohl(*((int*)(c->rbuf + 12)));
- vlen = c->bin_header[2] - (nkey + hdrlen);
+ /* fix byteorder in the request */
+ req->message.body.flags = ntohl(req->message.body.flags);
+ req->message.body.expiration = ntohl(req->message.body.expiration);
- if(settings.verbose > 1) {
- fprintf(stderr, "Value len is %d\n", vlen);
+ vlen = c->binary_header.request.bodylen - (nkey + c->binary_header.request.extlen);
+
+ if (settings.verbose) {
+ int ii;
+ if (c->cmd == PROTOCOL_BINARY_CMD_ADD) {
+ fprintf(stderr, "<%d ADD ", c->sfd);
+ } else if (c->cmd == PROTOCOL_BINARY_CMD_SET) {
+ fprintf(stderr, "<%d SET ", c->sfd);
+ } else {
+ fprintf(stderr, "<%d REPLACE ", c->sfd);
+ }
+ for (ii = 0; ii < nkey; ++ii) {
+ fprintf(stderr, "%c", key[ii]);
+ }
+
+ if (settings.verbose > 1) {
+ fprintf(stderr, " Value len is %d", vlen);
+ }
+ fprintf(stderr, "\n");
}
if (settings.detail_enabled) {
stats_prefix_record_set(key);
}
- it = item_alloc(key, nkey, flags, realtime(exptime), vlen+2);
+ it = item_alloc(key, nkey, req->message.body.flags,
+ realtime(req->message.body.expiration), vlen+2);
if (it == 0) {
- if (! item_size_ok(nkey, flags, vlen + 2)) {
- write_bin_error(c, ERR_TOO_LARGE, vlen);
+ if (! item_size_ok(nkey, req->message.body.flags, vlen + 2)) {
+ write_bin_error(c, PROTOCOL_BINARY_RESPONSE_E2BIG, vlen);
} else {
- write_bin_error(c, ERR_OUT_OF_MEMORY, vlen);
+ write_bin_error(c, PROTOCOL_BINARY_RESPONSE_ENOMEM, vlen);
}
/* swallow the data line */
c->write_and_go = conn_swallow;
return;
}
- it->cas_id = (uint64_t)swap64(*((int64_t*)(c->rbuf)));
+ it->cas_id = c->binary_header.request.cas;
- switch(c->cmd) {
- case CMD_ADD:
+ switch (c->cmd) {
+ case PROTOCOL_BINARY_CMD_ADD:
c->item_comm = NREAD_ADD;
break;
- case CMD_SET:
+ case PROTOCOL_BINARY_CMD_SET:
c->item_comm = NREAD_SET;
break;
- case CMD_REPLACE:
+ case PROTOCOL_BINARY_CMD_REPLACE:
c->item_comm = NREAD_REPLACE;
break;
default:
@@ -1230,19 +1353,101 @@ static void process_bin_update(conn *c) {
c->substate = bin_read_set_value;
}
-static void process_bin_delete(conn *c) {
+static void process_bin_append_prepend(conn *c) {
char *key;
- size_t nkey;
+ int nkey;
+ int vlen;
item *it;
- time_t exptime = 0;
+ protocol_binary_request_append* req = binary_get_request(c);
assert(c != NULL);
+ assert(c->rbytes >= sizeof(*req));
+
+ key = binary_get_key(c);
+ nkey = c->binary_header.request.keylen;
+ vlen = c->binary_header.request.bodylen - nkey;
+
+ if (settings.verbose > 1) {
+ fprintf(stderr, "Value len is %d\n", vlen);
+ }
+
+ if (settings.detail_enabled) {
+ stats_prefix_record_set(key);
+ }
+
+ it = item_alloc(key, nkey, 0, 0, vlen+2);
+
+ if (it == 0) {
+ if (! item_size_ok(nkey, 0, vlen + 2)) {
+ write_bin_error(c, PROTOCOL_BINARY_RESPONSE_E2BIG, vlen);
+ } else {
+ write_bin_error(c, PROTOCOL_BINARY_RESPONSE_ENOMEM, vlen);
+ }
+ /* swallow the data line */
+ c->write_and_go = conn_swallow;
+ return;
+ }
+
+ it->cas_id = c->binary_header.request.cas;
+
+ switch (c->cmd) {
+ case PROTOCOL_BINARY_CMD_APPEND:
+ c->item_comm = NREAD_APPEND;
+ break;
+ case PROTOCOL_BINARY_CMD_PREPEND:
+ c->item_comm = NREAD_PREPEND;
+ break;
+ default:
+ assert(0);
+ }
+
+ c->item = it;
+ c->ritem = ITEM_data(it);
+ c->rlbytes = vlen;
+ conn_set_state(c, conn_nread);
+ c->substate = bin_read_set_value;
+}
- exptime = ntohl(*((int*)(c->rbuf)));
- key = c->rbuf + 4;
- nkey = c->keylen;
- key[nkey] = 0x00;
+static void process_bin_flush(conn *c) {
+ time_t exptime = 0;
+ protocol_binary_request_flush* req = binary_get_request(c);
+
+ if (c->binary_header.request.extlen == sizeof(req->message.body)) {
+ exptime = ntohl(req->message.body.expiration);
+ }
+
+ set_current_time();
+
+ settings.oldest_live = current_time - 1;
+
+ if (exptime != 0) {
+ settings.oldest_live = realtime(exptime) - 1;
+ } else {
+ settings.oldest_live = current_time - 1;
+ }
+ item_flush_expired();
+
+ write_bin_response(c, NULL, 0, 0, 0);
+}
+static void process_bin_delete(conn *c) {
+ item *it;
+ time_t exptime = 0;
+
+ protocol_binary_response_delete* rsp = (protocol_binary_response_delete*)c->wbuf;
+ protocol_binary_request_delete* req = binary_get_request(c);
+
+ char* key = binary_get_key(c);
+ size_t nkey = c->binary_header.request.keylen;
+
+ assert(c != NULL);
+ assert(c->rbytes >= sizeof(*req));
+ assert(c->wsize >= sizeof(*rsp));
+
+ if (c->binary_header.request.extlen == sizeof(req->message.body)) {
+ /* fix byteorder in the request */
+ exptime = ntohl(req->message.body.expiration);
+ }
if(settings.verbose) {
fprintf(stderr, "Deleting %s with a timeout of %d\n", key, exptime);
}
@@ -1256,18 +1461,16 @@ static void process_bin_delete(conn *c) {
if (exptime == 0) {
item_unlink(it);
item_remove(it); /* release our reference */
- write_bin_response(c, NULL, 0, 0);
+ write_bin_response(c, NULL, 0, 0, 0);
} else {
- /* XXX: This is really lame, but defer_delete returns a string */
- char *res = defer_delete(it, exptime);
- if(res[0] == 'D') {
- write_bin_response(c, NULL, 0, 0);
+ if (defer_delete(it, exptime) != -1) {
+ write_bin_response(c, NULL, 0, 0, 0);
} else {
- write_bin_error(c, ERR_OUT_OF_MEMORY, 0);
+ write_bin_error(c, PROTOCOL_BINARY_RESPONSE_ENOMEM, 0);
}
}
} else {
- write_bin_error(c, ERR_NOT_FOUND, 0);
+ write_bin_error(c, PROTOCOL_BINARY_RESPONSE_KEY_ENOENT, 0);
}
}
@@ -1277,7 +1480,12 @@ static void complete_nread_binary(conn *c) {
switch(c->substate) {
case bin_reading_set_header:
- process_bin_update(c);
+ if (c->cmd == PROTOCOL_BINARY_CMD_APPEND ||
+ c->cmd == PROTOCOL_BINARY_CMD_PREPEND) {
+ process_bin_append_prepend(c);
+ } else {
+ process_bin_update(c);
+ }
break;
case bin_read_set_value:
complete_update_bin(c);
@@ -1291,6 +1499,9 @@ static void complete_nread_binary(conn *c) {
case bin_reading_incr_header:
complete_incr_bin(c);
break;
+ case bin_read_flush_exptime:
+ process_bin_flush(c);
+ break;
default:
fprintf(stderr, "Not handling substate %d\n", c->substate);
assert(0);
@@ -1325,7 +1536,7 @@ static void complete_nread(conn *c) {
*
* Returns true if the item was stored.
*/
-int do_store_item(item *it, int comm) {
+int do_store_item(item *it, int comm, conn *c) {
char *key = ITEM_key(it);
bool delete_locked = false;
item *old_it = do_item_get_notedeleted(key, it->nkey, &delete_locked);
@@ -1370,48 +1581,62 @@ int do_store_item(item *it, int comm) {
* Append - combine new and old record into single one. Here it's
* atomic and thread-safe.
*/
-
if (comm == NREAD_APPEND || comm == NREAD_PREPEND) {
+ /*
+ * Validate CAS
+ */
+ if (it->cas_id != 0) {
+ // CAS much be equal
+ if (it->cas_id != old_it->cas_id) {
+ stored = 2;
+ }
+ }
- /* we have it and old_it here - alloc memory to hold both */
- /* flags was already lost - so recover them from ITEM_suffix(it) */
+ if (stored == 0) {
+ /* we have it and old_it here - alloc memory to hold both */
+ /* flags was already lost - so recover them from ITEM_suffix(it) */
- flags = (int) strtol(ITEM_suffix(old_it), (char **) NULL, 10);
+ flags = (int) strtol(ITEM_suffix(old_it), (char **) NULL, 10);
- new_it = do_item_alloc(key, it->nkey, flags, old_it->exptime, it->nbytes + old_it->nbytes - 2 /* CRLF */);
+ new_it = do_item_alloc(key, it->nkey, flags, old_it->exptime, it->nbytes + old_it->nbytes - 2 /* CRLF */);
- if (new_it == NULL) {
- /* SERVER_ERROR out of memory */
- return 0;
- }
+ if (new_it == NULL) {
+ /* SERVER_ERROR out of memory */
+ return 0;
+ }
- /* copy data from it and old_it to new_it */
+ /* copy data from it and old_it to new_it */
- if (comm == NREAD_APPEND) {
- memcpy(ITEM_data(new_it), ITEM_data(old_it), old_it->nbytes);
- memcpy(ITEM_data(new_it) + old_it->nbytes - 2 /* CRLF */, ITEM_data(it), it->nbytes);
- } else {
- /* NREAD_PREPEND */
- memcpy(ITEM_data(new_it), ITEM_data(it), it->nbytes);
- memcpy(ITEM_data(new_it) + it->nbytes - 2 /* CRLF */, ITEM_data(old_it), old_it->nbytes);
- }
+ if (comm == NREAD_APPEND) {
+ memcpy(ITEM_data(new_it), ITEM_data(old_it), old_it->nbytes);
+ memcpy(ITEM_data(new_it) + old_it->nbytes - 2 /* CRLF */, ITEM_data(it), it->nbytes);
+ } else {
+ /* NREAD_PREPEND */
+ memcpy(ITEM_data(new_it), ITEM_data(it), it->nbytes);
+ memcpy(ITEM_data(new_it) + it->nbytes - 2 /* CRLF */, ITEM_data(old_it), old_it->nbytes);
+ }
- it = new_it;
+ it = new_it;
+ }
}
- /* "set" commands can override the delete lock
- window... in which case we have to find the old hidden item
- that's in the namespace/LRU but wasn't returned by
- item_get.... because we need to replace it */
- if (delete_locked)
- old_it = do_item_get_nocheck(key, it->nkey);
+ if (stored == 0) {
+ /* "set" commands can override the delete lock
+ window... in which case we have to find the old hidden item
+ that's in the namespace/LRU but wasn't returned by
+ item_get.... because we need to replace it */
+ if (delete_locked)
+ old_it = do_item_get_nocheck(key, it->nkey);
- if (old_it != NULL)
- do_item_replace(old_it, it);
- else
- do_item_link(it);
+ if (old_it != NULL)
+ do_item_replace(old_it, it);
+ else
+ do_item_link(it);
+
+ c->cas = it->cas_id;
- stored = 1;
+ stored = 1;
+ }
}
if (old_it != NULL)
@@ -2115,7 +2340,11 @@ static void process_delete_command(conn *c, token_t *tokens, const size_t ntoken
out_string(c, "DELETED");
} else {
/* our reference will be transfered to the delete queue */
- out_string(c, defer_delete(it, exptime));
+ if (defer_delete(it, exptime) == -1) {
+ out_string(c, "SERVER_ERROR out of memory expanding delete queue");
+ } else {
+ out_string(c, "DELETED");
+ }
}
} else {
out_string(c, "NOT_FOUND");
@@ -2127,7 +2356,7 @@ static void process_delete_command(conn *c, token_t *tokens, const size_t ntoken
*
* Returns the result to send to the client.
*/
-char *do_defer_delete(item *it, time_t exptime)
+int do_defer_delete(item *it, time_t exptime)
{
if (delcurr >= deltotal) {
item **new_delete = realloc(todelete, sizeof(item *) * deltotal * 2);
@@ -2140,7 +2369,7 @@ char *do_defer_delete(item *it, time_t exptime)
* but we ran out of memory for the delete queue
*/
item_remove(it); /* release reference */
- return "SERVER_ERROR out of memory expanding delete queue";
+ return -1;
}
}
@@ -2149,7 +2378,7 @@ char *do_defer_delete(item *it, time_t exptime)
it->it_flags |= ITEM_DELETED;
todelete[delcurr++] = it;
- return "DELETED";
+ return 0;
}
static void process_verbosity_command(conn *c, token_t *tokens, const size_t ntokens) {
@@ -2378,7 +2607,7 @@ static int try_read_command(conn *c) {
assert(c->rbytes > 0);
if (c->protocol == negotiating_prot) {
- if ((c->rbuf[0] & 0xff) == BIN_REQ_MAGIC) {
+ if ((unsigned char)c->rbuf[0] == (unsigned char)PROTOCOL_BINARY_REQ) {
c->protocol = binary_prot;
} else {
c->protocol = ascii_prot;
@@ -2392,28 +2621,35 @@ static int try_read_command(conn *c) {
if (c->protocol == binary_prot) {
/* Do we have the complete packet header? */
- if (c->rbytes < MIN_BIN_PKT_LENGTH) {
+ if (c->rbytes < sizeof(c->binary_header)) {
/* need more data! */
return 0;
} else {
- int i = 0;
- memcpy(c->bin_header, c->rcurr, sizeof(c->bin_header));
- assert(BIN_PKT_HDR_WORDS == 4);
- for (i = 0; i<BIN_PKT_HDR_WORDS; i++) {
- c->bin_header[i] = ntohl(c->bin_header[i]);
- }
+ protocol_binary_request_header* req;
+ req = (protocol_binary_request_header*)c->rcurr;
- if (settings.verbose) {
- fprintf(stderr,
- "Read binary protocol data: %08x %08x %08x %08x\n",
- c->bin_header[0], c->bin_header[1], c->bin_header[2],
- c->bin_header[3]);
+ if (settings.verbose > 1) {
+ /* Dump the packet before we convert it to host order */
+ int ii;
+ fprintf(stderr, "<%d Read binary protocol data:", c->sfd);
+ for (ii = 0; ii < sizeof(req->bytes); ++ii) {
+ if (ii % 4 == 0) {
+ fprintf(stderr, "\n<%d ", c->sfd);
+ }
+ fprintf(stderr, " 0x%02x", req->bytes[ii]);
+ }
+ fprintf(stderr, "\n");
}
- if ((c->bin_header[0] >> 24) != BIN_REQ_MAGIC) {
+ c->binary_header = *req;
+ c->binary_header.request.keylen = ntohs(req->request.keylen);
+ c->binary_header.request.bodylen = ntohl(req->request.bodylen);
+ c->binary_header.request.cas = swap64(req->request.cas);
+
+ if (c->binary_header.request.magic != PROTOCOL_BINARY_REQ) {
if (settings.verbose) {
fprintf(stderr, "Invalid magic: %x\n",
- c->bin_header[0] >> 24);
+ c->binary_header.request.magic);
}
conn_set_state(c, conn_closing);
return 0;
@@ -2427,19 +2663,16 @@ static int try_read_command(conn *c) {
return 0;
}
- c->cmd = (c->bin_header[0] >> 16) & 0xff;
- c->keylen = c->bin_header[0] & 0xffff;
- c->opaque = c->bin_header[3];
- if (settings.verbose > 1) {
- fprintf(stderr,
- "Command: %d, opaque=%08x, keylen=%d, total_len=%d\n",
- c->cmd, c->opaque, c->keylen, c->bin_header[2]);
- }
+ c->cmd = c->binary_header.request.opcode;
+ c->keylen = c->binary_header.request.keylen;
+ c->opaque = c->binary_header.request.opaque;
+ /* clear the returned cas value */
+ c->cas = 0;
dispatch_bin_command(c);
- c->rbytes -= MIN_BIN_PKT_LENGTH;
- c->rcurr += MIN_BIN_PKT_LENGTH;
+ c->rbytes -= sizeof(c->binary_header);
+ c->rcurr += sizeof(c->binary_header);
}
} else {
char *el, *cont;
@@ -2768,7 +3001,9 @@ static void drive_machine(conn *c) {
/* first check if we have leftovers in the conn_read buffer */
if (c->rbytes > 0) {
int tocopy = c->rbytes > c->rlbytes ? c->rlbytes : c->rbytes;
- memmove(c->ritem, c->rcurr, tocopy);
+ if (c->ritem != c->rcurr) {
+ memmove(c->ritem, c->rcurr, tocopy);
+ }
c->ritem += tocopy;
c->rlbytes -= tocopy;
c->rcurr += tocopy;
View
52 memcached.h
@@ -12,6 +12,8 @@
#include <event.h>
#include <netdb.h>
+#include "protocol_binary.h"
+
#define DATA_BUFFER_SIZE 2048
#define UDP_READ_BUFFER_SIZE 65536
#define UDP_MAX_PAYLOAD_SIZE 1400
@@ -41,46 +43,8 @@
/* Binary protocol stuff */
#define MIN_BIN_PKT_LENGTH 16
-/* flags:32, expiration:32, cas:64 */
-#define BIN_SET_HDR_LEN 16
-/* incr:64, initial:64, expiration:32 */
-#define BIN_INCR_HDR_LEN 20
-/* flags:32, cas:64 */
-#define GET_RES_HDR_LEN (4+8)
-/* timeout:32 */
-#define BIN_DEL_HDR_LEN 4
#define BIN_PKT_HDR_WORDS (MIN_BIN_PKT_LENGTH/sizeof(uint32_t))
-/* Body is a single 64-bit int */
-#define INCR_RES_LEN 8
-/* len(18446744073709551616) + 2 (or so) */
-#define INCR_MAX_STORAGE_LEN 24
-
-#define BIN_REQ_MAGIC 0x80
-#define BIN_RES_MAGIC 0x81
-
-#define CMD_GET 0
-#define CMD_SET 1
-#define CMD_ADD 2
-#define CMD_REPLACE 3
-#define CMD_DELETE 4
-#define CMD_INCR 5
-#define CMD_DECR 6
-#define CMD_QUIT 7
-#define CMD_FLUSH 8
-#define CMD_GETQ 9
-#define CMD_NOOP 10
-#define CMD_VERSION 11
-
-#define ERR_UNKNOWN_CMD 0x81
-#define ERR_OUT_OF_MEMORY 0x82
-
-#define ERR_NOT_FOUND 0x1
-#define ERR_EXISTS 0x2
-#define ERR_TOO_LARGE 0x3
-#define ERR_INVALID_ARGUMENTS 0x4
-#define ERR_NOT_STORED 0x5
-
/* Get a consistent bool type */
#if HAVE_STDBOOL_H
# include <stdbool.h>
@@ -200,6 +164,7 @@ enum bin_substates {
bin_reading_get_key,
bin_reading_del_header,
bin_reading_incr_header,
+ bin_read_flush_exptime
};
enum protocol {
@@ -293,7 +258,8 @@ struct conn {
bool noreply; /* True if the reply should not be sent. */
/* Binary protocol stuff */
/* This is where the binary header goes */
- uint32_t bin_header[MIN_BIN_PKT_LENGTH/sizeof(uint32_t)];
+ protocol_binary_request_header binary_header;
+ uint64_t cas; /* the cas to return */
short cmd;
int opaque;
int keylen;
@@ -315,10 +281,10 @@ conn *do_conn_from_freelist();
bool do_conn_add_to_freelist(conn *c);
char *do_suffix_from_freelist();
bool do_suffix_add_to_freelist(char *s);
-char *do_defer_delete(item *item, time_t exptime);
+int do_defer_delete(item *item, time_t exptime);
void do_run_deferred_deletes(void);
char *do_add_delta(item *item, const bool incr, const int64_t delta, char *buf);
-int do_store_item(item *item, int comm);
+int do_store_item(item *item, int comm, conn* c);
conn *conn_new(const int sfd, const enum conn_states init_state, const int event_flags, const int read_buffer_size, enum protocol prot, struct event_base *base);
@@ -346,7 +312,7 @@ conn *conn_from_freelist(void);
bool conn_add_to_freelist(conn *c);
char *suffix_from_freelist(void);
bool suffix_add_to_freelist(char *s);
-char *defer_delete(item *it, time_t exptime);
+int defer_delete(item *it, time_t exptime);
int is_listen_thread(void);
item *item_alloc(char *key, size_t nkey, int flags, rel_time_t exptime, int nbytes);
char *item_cachedump(const unsigned int slabs_clsid, const unsigned int limit, unsigned int *bytes);
@@ -366,7 +332,7 @@ int slabs_reassign(unsigned char srcid, unsigned char dstid);
char *slabs_stats(int *buflen);
void STATS_LOCK(void);
void STATS_UNLOCK(void);
-int store_item(item *item, int comm);
+int store_item(item *item, int comm, conn *c);
/* If supported, give compiler hints for branch prediction. */
#if !defined(__GNUC__) || (__GNUC__ == 2 && __GNUC_MINOR__ < 96)
View
334 protocol_binary.h
@@ -0,0 +1,334 @@
+/*
+ * Copyright (c) <2008>, Sun Microsystems, Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above copyright
+ * notice, this list of conditions and the following disclaimer in the
+ * documentation and/or other materials provided with the distribution.
+ * * Neither the name of the nor the
+ * names of its contributors may be used to endorse or promote products
+ * derived from this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY SUN MICROSYSTEMS, INC. ``AS IS'' AND ANY
+ * EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+ * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
+ * DISCLAIMED. IN NO EVENT SHALL SUN MICROSYSTEMS, INC. BE LIABLE FOR ANY
+ * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
+ * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
+ * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
+ * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
+ * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+/*
+ * Summary: Constants used by to implement the binary protocol.
+ *
+ * Copy: See Copyright for the status of this software.
+ *
+ * Author: Trond Norbye <trond.norbye@sun.com>
+ */
+
+#ifndef PROTOCOL_BINARY_H
+#define PROTOCOL_BINARY_H
+
+/**
+ * This file contains definitions of the constants and packet formats
+ * defined in the binary specification. Please note that you _MUST_ remember
+ * to convert each multibyte field to / from network byte order to / from
+ * host order.
+ */
+#ifdef __cplusplus
+extern "C"
+{
+#endif
+
+ /**
+ * Definition of the legal "magic" values used in a packet.
+ * See section 3.1 Magic byte
+ */
+ typedef enum {
+ PROTOCOL_BINARY_REQ = 0x80,
+ PROTOCOL_BINARY_RES = 0x81,
+ } protocol_binary_magic;
+
+ /**
+ * Definition of the valid response status numbers.
+ * See section 3.2 Response Status
+ */
+ typedef enum {
+ PROTOCOL_BINARY_RESPONSE_SUCCESS = 0x00,
+ PROTOCOL_BINARY_RESPONSE_KEY_ENOENT = 0x01,
+ PROTOCOL_BINARY_RESPONSE_KEY_EEXISTS = 0x02,
+ PROTOCOL_BINARY_RESPONSE_E2BIG = 0x03,
+ PROTOCOL_BINARY_RESPONSE_EINVAL = 0x04,
+ PROTOCOL_BINARY_RESPONSE_NOT_STORED = 0x05,
+ PROTOCOL_BINARY_RESPONSE_UNKNOWN_COMMAND = 0x81,
+ PROTOCOL_BINARY_RESPONSE_ENOMEM = 0x82,
+ } protocol_binary_response_status;
+
+ /**
+ * Defintion of the different command opcodes.
+ * See section 3.3 Command Opcodes
+ */
+ typedef enum {
+ PROTOCOL_BINARY_CMD_GET = 0x00,
+ PROTOCOL_BINARY_CMD_SET = 0x01,
+ PROTOCOL_BINARY_CMD_ADD = 0x02,
+ PROTOCOL_BINARY_CMD_REPLACE = 0x03,
+ PROTOCOL_BINARY_CMD_DELETE = 0x04,
+ PROTOCOL_BINARY_CMD_INCREMENT = 0x05,
+ PROTOCOL_BINARY_CMD_DECREMENT = 0x06,
+ PROTOCOL_BINARY_CMD_QUIT = 0x07,
+ PROTOCOL_BINARY_CMD_FLUSH = 0x08,
+ PROTOCOL_BINARY_CMD_GETQ = 0x09,
+ PROTOCOL_BINARY_CMD_NOOP = 0x0a,
+ PROTOCOL_BINARY_CMD_VERSION = 0x0b,
+ PROTOCOL_BINARY_CMD_GETK = 0x0c,
+ PROTOCOL_BINARY_CMD_GETKQ = 0x0d,
+ PROTOCOL_BINARY_CMD_APPEND = 0x0e,
+ PROTOCOL_BINARY_CMD_PREPEND = 0x0f,
+ } protocol_binary_command;
+
+ /**
+ * Definition of the data types in the packet
+ * See section 3.4 Data Types
+ */
+ typedef enum {
+ PROTOCOL_BINARY_RAW_BYTES = 0x00,
+ } protocol_binary_datatypes;
+
+ /**
+ * Definition of the header structure for a request packet.
+ * See section 2
+ */
+ typedef union {
+ struct {
+ uint8_t magic;
+ uint8_t opcode;
+ uint16_t keylen;
+ uint8_t extlen;
+ uint8_t datatype;
+ uint16_t reserved;
+ uint32_t bodylen;
+ uint32_t opaque;
+ uint64_t cas;
+ } request;
+ uint8_t bytes[24];
+ } protocol_binary_request_header;
+
+ /**
+ * Definition of the header structure for a response packet.
+ * See section 2
+ */
+ typedef union {
+ struct {
+ uint8_t magic;
+ uint8_t opcode;
+ uint16_t keylen;
+ uint8_t extlen;
+ uint8_t datatype;
+ uint16_t status;
+ uint32_t bodylen;
+ uint32_t opaque;
+ uint64_t cas;
+ } response;
+ uint8_t bytes[24];
+ } protocol_binary_response_header;
+
+ /**
+ * Definition of a request-packet containing no extras
+ */
+ typedef union {
+ struct {
+ protocol_binary_request_header header;
+ } message;
+ uint8_t bytes[sizeof(protocol_binary_request_header)];
+ } protocol_binary_request_no_extras;
+
+ /**
+ * Definition of a response-packet containing no extras
+ */
+ typedef union {
+ struct {
+ protocol_binary_response_header header;
+ } message;
+ uint8_t bytes[sizeof(protocol_binary_response_header)];
+ } protocol_binary_response_no_extras;
+
+ /**
+ * Definition of the packet used by the get, getq, getk and getkq command.
+ * See section 4.1
+ */
+ typedef protocol_binary_request_no_extras protocol_binary_request_get;
+ typedef protocol_binary_request_no_extras protocol_binary_request_getq;
+ typedef protocol_binary_request_no_extras protocol_binary_request_getk;
+ typedef protocol_binary_request_no_extras protocol_binary_request_getkq;
+
+ /**
+ * Definition of the packet returned from a successful get, getq, getk and
+ * getkq.
+ * See section 4.1
+ */
+ typedef union {
+ struct {
+ protocol_binary_response_header header;
+ struct {
+ uint32_t flags;
+ } body;
+ } message;
+ uint8_t bytes[sizeof(protocol_binary_response_header) + 4];
+ } protocol_binary_response_get;
+
+ typedef protocol_binary_response_get protocol_binary_response_getq;
+ typedef protocol_binary_response_get protocol_binary_response_getk;
+ typedef protocol_binary_response_get protocol_binary_response_getkq;
+
+ /**
+ * Definition of the packet used by the delete command
+ * See section 4.2
+ * Please note that the expiration field is optional, so remember to see
+ * check the header.bodysize to see if it is present.
+ */
+ typedef union {
+ struct {
+ protocol_binary_request_header header;
+ struct {
+ uint32_t expiration;
+ } body;
+ } message;
+ uint8_t bytes[sizeof(protocol_binary_request_header) + 4];
+ } protocol_binary_request_delete;
+
+ /**
+ * Definition of the packet returned by the delete command
+ * See section 4.2
+ */
+ typedef protocol_binary_response_no_extras protocol_binary_response_delete;
+
+ /**
+ * Definition of the packet used by the flush command
+ * See section 4.3
+ * Please note that the expiration field is optional, so remember to see
+ * check the header.bodysize to see if it is present.
+ */
+ typedef union {
+ struct {
+ protocol_binary_request_header header;
+ struct {
+ uint32_t expiration;
+ } body;
+ } message;
+ uint8_t bytes[sizeof(protocol_binary_request_header) + 4];
+ } protocol_binary_request_flush;
+
+ /**
+ * Definition of the packet returned by the flush command
+ * See section 4.3
+ */
+ typedef protocol_binary_response_no_extras protocol_binary_response_flush;
+
+ /**
+ * Definition of the packet used by set, add and replace
+ * See section 4.4
+ */
+ typedef union {
+ struct {
+ protocol_binary_request_header header;
+ struct {
+ uint32_t flags;
+ uint32_t expiration;
+ } body;
+ } message;
+ uint8_t bytes[sizeof(protocol_binary_request_header) + 8];
+ } protocol_binary_request_set;
+ typedef protocol_binary_request_set protocol_binary_request_add;
+ typedef protocol_binary_request_set protocol_binary_request_replace;
+
+ /**
+ * Definition of the packet returned by set, add and replace
+ * See section 4.4
+ */
+ typedef protocol_binary_response_no_extras protocol_binary_response_set;
+ typedef protocol_binary_response_no_extras protocol_binary_response_add;
+ typedef protocol_binary_response_no_extras protocol_binary_response_replace;
+
+ /**
+ * Definition of the noop packet
+ * See section 4.5
+ */
+ typedef protocol_binary_request_no_extras protocol_binary_request_noop;
+
+ /**
+ * Definition of the packet returned by the noop command
+ * See section 4.5
+ */
+ typedef protocol_binary_response_no_extras protocol_binary_response_nnoop;
+
+ /**
+ * Definition of the structure used by the increment and decrement
+ * command.
+ * See section 4.6
+ */
+ typedef union {
+ struct {
+ protocol_binary_request_header header;
+ struct {
+ uint64_t delta;
+ uint64_t initial;
+ uint32_t expiration;
+ } body;
+ } message;
+ uint8_t bytes[sizeof(protocol_binary_request_header) + 20];
+ } protocol_binary_request_incr;
+ typedef protocol_binary_request_incr protocol_binary_request_decr;
+
+ /**
+ * Definition of the response from an incr or decr command
+ * command.
+ * See section 4.6
+ */
+ typedef union {
+ struct {
+ protocol_binary_response_header header;
+ struct {
+ uint64_t value;
+ } body;
+ } message;
+ uint8_t bytes[sizeof(protocol_binary_response_header) + 8];
+ } protocol_binary_response_incr;
+ typedef protocol_binary_response_incr protocol_binary_response_decr;
+
+ /**
+ * Definition of the quit
+ * See section 4.7
+ */
+ typedef protocol_binary_request_no_extras protocol_binary_request_quit;
+
+ /**
+ * Definition of the packet returned by the quit command
+ * See section 4.7
+ */
+ typedef protocol_binary_response_no_extras protocol_binary_response_quit;
+
+ /**
+ * Definition of the packet used by append and prepend command
+ * See section 4.8
+ */
+ typedef protocol_binary_request_no_extras protocol_binary_request_append;
+ typedef protocol_binary_request_no_extras protocol_binary_request_prepend;
+
+ /**
+ * Definition of the packet returned from a successful append or prepend
+ * See section 4.8
+ */
+ typedef protocol_binary_response_no_extras protocol_binary_response_append;
+ typedef protocol_binary_response_no_extras protocol_binary_response_prepend;
+
+#ifdef __cplusplus
+}
+#endif
+#endif /* PROTOCOL_BINARY_H */
View
8 thread.c
@@ -482,8 +482,8 @@ void item_update(item *item) {
/*
* Adds an item to the deferred-delete list so it can be reaped later.
*/
-char *defer_delete(item *item, time_t exptime) {
- char *ret;
+int defer_delete(item *item, time_t exptime) {
+ int ret;
pthread_mutex_lock(&cache_lock);
ret = do_defer_delete(item, exptime);
@@ -506,11 +506,11 @@ char *add_delta(item *item, int incr, const int64_t delta, char *buf) {
/*
* Stores an item in the cache (high level, obeys set/add/replace semantics)
*/
-int store_item(item *item, int comm) {
+int store_item(item *item, int comm, conn* c) {
int ret;
pthread_mutex_lock(&cache_lock);
- ret = do_store_item(item, comm);
+ ret = do_store_item(item, comm, c);
pthread_mutex_unlock(&cache_lock);
return ret;
}
Please sign in to comment.
Something went wrong with that request. Please try again.