Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

avva's bucket/generation patch

git-svn-id: http://code.sixapart.com/svn/memcached/trunk@252 b0b603af-a30f-0410-a34e-baf09ae79d0b
  • Loading branch information...
commit 7a308025661a49a5e19f98d2c5b8df04d96b4642 1 parent c252f6e
Anatoly Vorobey authored
Showing with 187 additions and 31 deletions.
  1. +4 −0 ChangeLog
  2. +1 −1  configure.ac
  3. +169 −25 memcached.c
  4. +13 −5 memcached.h
View
4 ChangeLog
@@ -1,3 +1,7 @@
+2006-03-04
+ * avva: bucket/generation patch (old, but Brad's just finally
+ committing it)
+
2006-01-01
* Brad Fitzpatrick <brad@danga.com>: allocate 1 slab per class
on start-up, to avoid confusing users with out-of-memory errors
View
2  configure.ac
@@ -1,5 +1,5 @@
AC_PREREQ(2.52)
-AC_INIT(memcached, 1.1.12, brad@danga.com)
+AC_INIT(memcached, 1.1.13-cvs, brad@danga.com)
AC_CANONICAL_SYSTEM
AC_CONFIG_SRCDIR(memcached.c)
AM_INIT_AUTOMAKE(AC_PACKAGE_NAME, AC_PACKAGE_VERSION)
View
194 memcached.c
@@ -56,6 +56,8 @@ static item **todelete = 0;
static int delcurr;
static int deltotal;
+int *buckets = 0; /* bucket->generation array for a managed instance */
+
time_t realtime(time_t exptime) {
time_t now;
@@ -93,6 +95,7 @@ void settings_init(void) {
settings.verbose = 0;
settings.oldest_live = 0;
settings.evict_to_free = 1; /* push old items out of cache when memory runs out */
+ settings.managed = 0;
}
conn **freeconns;
@@ -141,6 +144,7 @@ conn *conn_new(int sfd, int init_state, int event_flags) {
return 0;
}
c->rsize = c->wsize = DATA_BUFFER_SIZE;
+ c->rcurr = c->rbuf;
c->isize = 200;
stats.conn_structs++;
}
@@ -157,7 +161,7 @@ conn *conn_new(int sfd, int init_state, int event_flags) {
c->rlbytes = 0;
c->rbytes = c->wbytes = 0;
c->wcurr = c->wbuf;
- c->rcurr = c->rbuf;
+ c->ritem = 0;
c->icurr = c->ilist;
c->ileft = 0;
c->iptr = c->ibuf;
@@ -166,6 +170,8 @@ conn *conn_new(int sfd, int init_state, int event_flags) {
c->write_and_go = conn_read;
c->write_and_free = 0;
c->item = 0;
+ c->bucket = -1;
+ c->gen = 0;
c->is_corked = 0;
@@ -522,6 +528,20 @@ void process_command(conn *c, char *command) {
out_string(c, "CLIENT_ERROR bad command line format");
return;
}
+
+ if (settings.managed) {
+ int bucket = c->bucket;
+ if (bucket == -1) {
+ out_string(c, "CLIENT_ERROR no BG data in managed mode");
+ return;
+ }
+ c->bucket = -1;
+ if (buckets[bucket] != c->gen) {
+ out_string(c, "ERROR_NOT_OWNER");
+ return;
+ }
+ }
+
expire = realtime(expire);
it = item_alloc(key, flags, expire, len+2);
if (it == 0) {
@@ -534,7 +554,7 @@ void process_command(conn *c, char *command) {
c->item_comm = comm;
c->item = it;
- c->rcurr = ITEM_data(it);
+ c->ritem = ITEM_data(it);
c->rlbytes = it->nbytes;
c->state = conn_nread;
return;
@@ -556,7 +576,20 @@ void process_command(conn *c, char *command) {
out_string(c, "CLIENT_ERROR bad command line format");
return;
}
-
+
+ if (settings.managed) {
+ int bucket = c->bucket;
+ if (bucket == -1) {
+ out_string(c, "CLIENT_ERROR no BG data in managed mode");
+ return;
+ }
+ c->bucket = -1;
+ if (buckets[bucket] != c->gen) {
+ out_string(c, "ERROR_NOT_OWNER");
+ return;
+ }
+ }
+
it = assoc_find(key);
if (it && (it->it_flags & ITEM_DELETED)) {
it = 0;
@@ -612,6 +645,19 @@ void process_command(conn *c, char *command) {
item *it;
time_t now = time(0);
+ if (settings.managed) {
+ int bucket = c->bucket;
+ if (bucket == -1) {
+ out_string(c, "CLIENT_ERROR no BG data in managed mode");
+ return;
+ }
+ c->bucket = -1;
+ if (buckets[bucket] != c->gen) {
+ out_string(c, "ERROR_NOT_OWNER");
+ return;
+ }
+ }
+
while(sscanf(start, " %250s%n", key, &next) >= 1) {
start+=next;
stats.get_cmds++;
@@ -663,6 +709,19 @@ void process_command(conn *c, char *command) {
int res;
time_t exptime = 0;
+ if (settings.managed) {
+ int bucket = c->bucket;
+ if (bucket == -1) {
+ out_string(c, "CLIENT_ERROR no BG data in managed mode");
+ return;
+ }
+ c->bucket = -1;
+ if (buckets[bucket] != c->gen) {
+ out_string(c, "ERROR_NOT_OWNER");
+ return;
+ }
+ }
+
res = sscanf(command, "%*s %250s %ld", key, &exptime);
it = assoc_find(key);
if (!it) {
@@ -701,7 +760,75 @@ void process_command(conn *c, char *command) {
out_string(c, "DELETED");
return;
}
-
+
+ if (strncmp(command, "own ", 4) == 0) {
+ int bucket, gen;
+ char *start = command+4;
+ if (!settings.managed) {
+ out_string(c, "CLIENT_ERROR not a managed instance");
+ return;
+ }
+ if (sscanf(start, "%u:%u\r\n", &bucket,&gen) == 2) {
+ if ((bucket < 0) || (bucket >= MAX_BUCKETS)) {
+ out_string(c, "CLIENT_ERROR bucket number out of range");
+ return;
+ }
+ buckets[bucket] = gen;
+ out_string(c, "OWNED");
+ return;
+ } else {
+ out_string(c, "CLIENT_ERROR bad format");
+ return;
+ }
+ }
+
+ if (strncmp(command, "disown ", 7) == 0) {
+ int bucket;
+ char *start = command+7;
+ if (!settings.managed) {
+ out_string(c, "CLIENT_ERROR not a managed instance");
+ return;
+ }
+ if (sscanf(start, "%u\r\n", &bucket) == 1) {
+ if ((bucket < 0) || (bucket >= MAX_BUCKETS)) {
+ out_string(c, "CLIENT_ERROR bucket number out of range");
+ return;
+ }
+ buckets[bucket] = 0;
+ out_string(c, "DISOWNED");
+ return;
+ } else {
+ out_string(c, "CLIENT_ERROR bad format");
+ return;
+ }
+ }
+
+ if (strncmp(command, "bg ", 3) == 0) {
+ int bucket, gen;
+ char *start = command+3;
+ if (!settings.managed) {
+ out_string(c, "CLIENT_ERROR not a managed instance");
+ return;
+ }
+ if (sscanf(start, "%u:%u\r\n", &bucket,&gen) == 2) {
+ /* we never write anything back, even if input's wrong */
+ if ((bucket < 0) || (bucket >= MAX_BUCKETS) || (gen<=0)) {
+ /* do nothing, bad input */
+ } else {
+ c->bucket = bucket;
+ c->gen = gen;
+ }
+ c->state = conn_read;
+ /* normally conn_write uncorks the connection, but this
+ is the only time we accept a command w/o writing anything */
+ set_cork(c,0);
+ return;
+ } else {
+ out_string(c, "CLIENT_ERROR bad format");
+ return;
+ }
+ }
+
if (strncmp(command, "stats", 5) == 0) {
process_stat(c, command);
return;
@@ -765,40 +892,47 @@ void process_command(conn *c, char *command) {
}
/*
- * if we have a complete line in the buffer, process it and move whatever
- * remains in the buffer to its beginning.
+ * if we have a complete line in the buffer, process it.
*/
int try_read_command(conn *c) {
char *el, *cont;
if (!c->rbytes)
return 0;
- el = memchr(c->rbuf, '\n', c->rbytes);
+ el = memchr(c->rcurr, '\n', c->rbytes);
if (!el)
return 0;
cont = el + 1;
- if (el - c->rbuf > 1 && *(el - 1) == '\r') {
+ if (el - c->rcurr > 1 && *(el - 1) == '\r') {
el--;
}
*el = '\0';
- process_command(c, c->rbuf);
+ process_command(c, c->rcurr);
+
+ c->rbytes -= (cont - c->rcurr);
+ c->rcurr = cont;
- if (cont - c->rbuf < c->rbytes) { /* more stuff in the buffer */
- memmove(c->rbuf, cont, c->rbytes - (cont - c->rbuf));
- }
- c->rbytes -= (cont - c->rbuf);
return 1;
}
/*
* read from network as much as we can, handle buffer overflow and connection
* close.
+ * before reading, move the remaining incomplete fragment of a command
+ * (if any) to the beginning of the buffer.
* return 0 if there's nothing to read on the first read.
*/
int try_read_network(conn *c) {
int gotdata = 0;
int res;
+
+ if (c->rcurr != c->rbuf) {
+ if (c->rbytes != 0) /* otherwise there's nothing to copy */
+ memmove(c->rbuf, c->rcurr, c->rbytes);
+ c->rcurr = c->rbuf;
+ }
+
while (1) {
if (c->rbytes >= c->rsize) {
char *new_rbuf = realloc(c->rbuf, c->rsize*2);
@@ -899,7 +1033,7 @@ void drive_machine(conn *c) {
break;
case conn_nread:
- /* we are reading rlbytes into rcurr; */
+ /* we are reading rlbytes into ritem; */
if (c->rlbytes == 0) {
complete_nread(c);
break;
@@ -907,21 +1041,19 @@ void drive_machine(conn *c) {
/* first check if we have leftovers in the conn_read buffer */
if (c->rbytes > 0) {
int tocopy = c->rbytes > c->rlbytes ? c->rlbytes : c->rbytes;
- memcpy(c->rcurr, c->rbuf, tocopy);
- c->rcurr += tocopy;
+ memcpy(c->ritem, c->rcurr, tocopy);
+ c->ritem += tocopy;
c->rlbytes -= tocopy;
- if (c->rbytes > tocopy) {
- memmove(c->rbuf, c->rbuf+tocopy, c->rbytes - tocopy);
- }
+ c->rcurr += tocopy;
c->rbytes -= tocopy;
break;
}
/* now try reading from the socket */
- res = read(c->sfd, c->rcurr, c->rlbytes);
+ res = read(c->sfd, c->ritem, c->rlbytes);
if (res > 0) {
stats.bytes_read += res;
- c->rcurr += res;
+ c->ritem += res;
c->rlbytes -= res;
break;
}
@@ -956,9 +1088,7 @@ void drive_machine(conn *c) {
if (c->rbytes > 0) {
int tocopy = c->rbytes > c->sbytes ? c->sbytes : c->rbytes;
c->sbytes -= tocopy;
- if (c->rbytes > tocopy) {
- memmove(c->rbuf, c->rbuf+tocopy, c->rbytes - tocopy);
- }
+ c->rcurr += tocopy;
c->rbytes -= tocopy;
break;
}
@@ -1248,6 +1378,7 @@ void usage(void) {
printf("-vv very verbose (also print client commands/reponses)\n");
printf("-h print this help and exit\n");
printf("-i print memcached and libevent license\n");
+ printf("-b run a managed instanced (mnemonic: buckets)\n");
printf("-P <file> save PID in <file>, only used with -d option\n");
return;
}
@@ -1372,8 +1503,11 @@ int main (int argc, char **argv) {
setbuf(stderr, NULL);
/* process arguments */
- while ((c = getopt(argc, argv, "p:m:Mc:khirvdl:u:P:")) != -1) {
+ while ((c = getopt(argc, argv, "bp:m:Mc:khirvdl:u:P:")) != -1) {
switch (c) {
+ case 'b':
+ settings.managed = 1;
+ break;
case 'p':
settings.port = atoi(optarg);
break;
@@ -1521,6 +1655,16 @@ int main (int argc, char **argv) {
conn_init();
slabs_init(settings.maxbytes);
+ /* managed instance? alloc and zero a bucket array */
+ if (settings.managed) {
+ buckets = malloc(sizeof(int)*MAX_BUCKETS);
+ if (buckets == 0) {
+ fprintf(stderr, "failed to allocate the bucket array");
+ exit(1);
+ }
+ memset(buckets, 0, sizeof(int)*MAX_BUCKETS);
+ }
+
/* lock paged memory if needed */
if (lock_memory) {
#ifdef HAVE_MLOCKALL
View
18 memcached.h
@@ -29,6 +29,7 @@ struct settings {
int port;
struct in_addr interface;
int verbose;
+ int managed; /* if 1, a tracker manages virtual buckets */
time_t oldest_live; /* ignore existing items older than this */
int evict_to_free;
};
@@ -83,11 +84,12 @@ typedef struct {
int state;
struct event event;
short ev_flags;
- short which; /* which events were just triggered */
+ short which; /* which events were just triggered */
- char *rbuf;
- int rsize;
- int rbytes;
+ char *rbuf; /* buffer to read commands into */
+ char *rcurr; /* but if we parsed some already, this is where we stopped */
+ int rsize; /* total allocated size of rbuf */
+ int rbytes; /* how much data, starting from rcur, do we have unparsed */
char *wbuf;
char *wcurr;
@@ -97,7 +99,7 @@ typedef struct {
void *write_and_free; /* free this memory after finishing writing */
char is_corked; /* boolean, connection is corked */
- char *rcurr;
+ char *ritem; /* when we read in an item's value, it goes here */
int rlbytes;
/* data for the nread state */
@@ -123,9 +125,15 @@ typedef struct {
char ibuf[300]; /* for VALUE lines */
char *iptr;
int ibytes;
+ int bucket; /* bucket number for the next command, if running as
+ a managed instance. -1 (_not_ 0) means invalid. */
+ int gen; /* generation requested for the bucket */
} conn;
+/* number of virtual buckets for a managed instance */
+#define MAX_BUCKETS 32768
+
/* listening socket */
extern int l_socket;
Please sign in to comment.
Something went wrong with that request. Please try again.