Skip to content

Commit

Permalink
Nasty bug of the new DB format fixed, objects sharing implemented
Browse files Browse the repository at this point in the history
  • Loading branch information
antirez committed Mar 25, 2009
1 parent d9f650b commit 10c4361
Show file tree
Hide file tree
Showing 3 changed files with 95 additions and 31 deletions.
1 change: 1 addition & 0 deletions TODO
Expand Up @@ -8,5 +8,6 @@
- check 'server.dirty' everywere
- replication automated tests
- a command, or an external tool, to perform the MD5SUM of the whole dataset, so that if the dataset between two servers is identical, so will be the MD5SUM
- objects sharing, "objectsharing yes", "objectsharingpool 1024"

* Include Lua and Perl bindings
119 changes: 88 additions & 31 deletions redis.c
Expand Up @@ -95,10 +95,10 @@
* 00|000000 => if the two MSB are 00 the len is the 6 bits of this byte
* 01|000000 00000000 => 01, the len is 14 byes, 6 bits + 8 bits of next byte
* 10|000000 [32 bit integer] => if it's 01, a full 32 bit len will follow
* 11|000000 [64 bit integer] => if it's 11, a full 64 bit len will follow
* 11|000000 reserved for future uses
*
* 64 bit lengths are not used currently. Lenghts up to 63 are stored using
* a single byte, most DB keys, and may values, will fit inside. */
* Lenghts up to 63 are stored using a single byte, most DB keys, and may
* values, will fit inside. */
#define REDIS_RDB_6BITLEN 0
#define REDIS_RDB_14BITLEN 1
#define REDIS_RDB_32BITLEN 2
Expand Down Expand Up @@ -173,6 +173,8 @@ struct redisServer {
int port;
int fd;
dict **dict;
dict *sharingpool;
unsigned int sharingpoolsize;
long long dirty; /* changes to DB from the last save */
list *clients;
list *slaves, *monitors;
Expand All @@ -199,6 +201,7 @@ struct redisServer {
char *logfile;
char *bindaddr;
char *dbfilename;
int shareobjects;
/* Replication related */
int isslave;
char *masterhost;
Expand Down Expand Up @@ -258,6 +261,7 @@ static int rdbSaveBackground(char *filename);
static robj *createStringObject(char *ptr, size_t len);
static void replicationFeedSlaves(list *slaves, struct redisCommand *cmd, int dictid, robj **argv, int argc);
static int syncWithMaster(void);
static robj *tryObjectSharing(robj *o);

static void pingCommand(redisClient *c);
static void echoCommand(redisClient *c);
Expand Down Expand Up @@ -627,7 +631,8 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
redisLog(REDIS_DEBUG,"%d clients connected (%d slaves), %d bytes in use",
listLength(server.clients)-listLength(server.slaves),
listLength(server.slaves),
server.usedmemory);
server.usedmemory,
dictGetHashTableUsed(server.sharingpool));
}

/* Close connections of timedout clients */
Expand Down Expand Up @@ -739,6 +744,7 @@ static void initServerConfig() {
server.daemonize = 0;
server.pidfile = "/var/run/redis.pid";
server.dbfilename = "dump.rdb";
server.shareobjects = 0;
ResetServerSaveParams();

appendServerSaveParams(60*60,1); /* save after 1 hour and 1 change */
Expand All @@ -765,18 +771,17 @@ static void initServer() {
createSharedObjects();
server.el = aeCreateEventLoop();
server.dict = zmalloc(sizeof(dict*)*server.dbnum);
server.sharingpool = dictCreate(&setDictType,NULL);
server.sharingpoolsize = 1024;
if (!server.dict || !server.clients || !server.slaves || !server.monitors || !server.el || !server.objfreelist)
oom("server initialization"); /* Fatal OOM */
server.fd = anetTcpServer(server.neterr, server.port, server.bindaddr);
if (server.fd == -1) {
redisLog(REDIS_WARNING, "Opening TCP port: %s", server.neterr);
exit(1);
}
for (j = 0; j < server.dbnum; j++) {
for (j = 0; j < server.dbnum; j++)
server.dict[j] = dictCreate(&hashDictType,NULL);
if (!server.dict[j])
oom("dictCreate"); /* Fatal OOM */
}
server.cronloops = 0;
server.bgsaveinprogress = 0;
server.lastsave = time(NULL);
Expand Down Expand Up @@ -895,6 +900,13 @@ static void loadServerConfig(char *filename) {
else {
err = "argument must be 'yes' or 'no'"; goto loaderr;
}
} else if (!strcmp(argv[0],"shareobjects") && argc == 2) {
sdstolower(argv[1]);
if (!strcmp(argv[1],"yes")) server.shareobjects = 1;
else if (!strcmp(argv[1],"no")) server.shareobjects = 0;
else {
err = "argument must be 'yes' or 'no'"; goto loaderr;
}
} else if (!strcmp(argv[0],"daemonize") && argc == 2) {
sdstolower(argv[1]);
if (!strcmp(argv[1],"yes")) server.daemonize = 1;
Expand Down Expand Up @@ -1102,6 +1114,12 @@ static int processCommand(redisClient *c) {
return 1;
}
}
/* Let's try to share objects on the command arguments vector */
if (server.shareobjects) {
int j;
for(j = 1; j < c->argc; j++)
c->argv[j] = tryObjectSharing(c->argv[j]);
}
/* Exec the command */
dirty = server.dirty;
cmd->proc(c);
Expand Down Expand Up @@ -1412,6 +1430,51 @@ static void decrRefCount(void *obj) {
}
}

/* Try to share an object against the shared objects pool */
static robj *tryObjectSharing(robj *o) {
struct dictEntry *de;
unsigned long c;

if (server.shareobjects == 0) return o;

assert(o->type == REDIS_STRING);
de = dictFind(server.sharingpool,o);
if (de) {
robj *shared = dictGetEntryKey(de);

c = ((unsigned long) dictGetEntryVal(de))+1;
dictGetEntryVal(de) = (void*) c;
incrRefCount(shared);
decrRefCount(o);
return shared;
} else {
/* Here we are using a stream algorihtm: Every time an object is
* shared we increment its count, everytime there is a miss we
* recrement the counter of a random object. If this object reaches
* zero we remove the object and put the current object instead. */
if (dictGetHashTableUsed(server.sharingpool) >=
server.sharingpoolsize) {
de = dictGetRandomKey(server.sharingpool);
assert(de != NULL);
c = ((unsigned long) dictGetEntryVal(de))-1;
dictGetEntryVal(de) = (void*) c;
if (c == 0) {
dictDelete(server.sharingpool,de->key);
}
} else {
c = 0; /* If the pool is empty we want to add this object */
}
if (c == 0) {
int retval;

retval = dictAdd(server.sharingpool,o,(void*)1);
assert(retval == DICT_OK);
incrRefCount(o);
}
return o;
}
}

/*============================ DB saving/loading ============================ */

static int rdbSaveType(FILE *fp, unsigned char type) {
Expand All @@ -1424,23 +1487,31 @@ static int rdbSaveLen(FILE *fp, uint32_t len) {

if (len < (1<<6)) {
/* Save a 6 bit len */
buf[0] = (len&0xFF)|REDIS_RDB_6BITLEN;
buf[0] = (len&0xFF)|(REDIS_RDB_6BITLEN<<6);
if (fwrite(buf,1,1,fp) == 0) return -1;
} else if (len < (1<<14)) {
/* Save a 14 bit len */
buf[0] = ((len>>8)&0xFF)|REDIS_RDB_14BITLEN;
buf[0] = ((len>>8)&0xFF)|(REDIS_RDB_14BITLEN<<6);
buf[1] = len&0xFF;
if (fwrite(buf,4,1,fp) == 0) return -1;
} else {
/* Save a 32 bit len */
buf[0] = REDIS_RDB_32BITLEN;
buf[0] = (REDIS_RDB_32BITLEN<<6);
if (fwrite(buf,1,1,fp) == 0) return -1;
len = htonl(len);
if (fwrite(&len,4,1,fp) == 0) return -1;
}
return 0;
}

static int rdbSaveStringObject(FILE *fp, robj *obj) {
size_t len = sdslen(obj->ptr);

if (rdbSaveLen(fp,len) == -1) return -1;
if (len && fwrite(obj->ptr,len,1,fp) == 0) return -1;
return 0;
}

/* Save the DB on disk. Return REDIS_ERR on error, REDIS_OK on success */
static int rdbSave(char *filename) {
dictIterator *di = NULL;
Expand Down Expand Up @@ -1475,15 +1546,10 @@ static int rdbSave(char *filename) {
robj *o = dictGetEntryVal(de);

if (rdbSaveType(fp,o->type) == -1) goto werr;
if (rdbSaveLen(fp,sdslen(key->ptr)) == -1) goto werr;
if (fwrite(key->ptr,sdslen(key->ptr),1,fp) == 0) goto werr;
if (rdbSaveStringObject(fp,key) == -1) goto werr;
if (o->type == REDIS_STRING) {
/* Save a string value */
sds sval = o->ptr;

if (rdbSaveLen(fp,sdslen(sval)) == -1) goto werr;
if (sdslen(sval) &&
fwrite(sval,sdslen(sval),1,fp) == 0) goto werr;
if (rdbSaveStringObject(fp,o) == -1) goto werr;
} else if (o->type == REDIS_LIST) {
/* Save a list value */
list *list = o->ptr;
Expand All @@ -1493,10 +1559,7 @@ static int rdbSave(char *filename) {
while(ln) {
robj *eleobj = listNodeValue(ln);

if (rdbSaveLen(fp,sdslen(eleobj->ptr)) == -1) goto werr;
if (sdslen(eleobj->ptr) &&
fwrite(eleobj->ptr,sdslen(eleobj->ptr),1,fp) == 0)
goto werr;
if (rdbSaveStringObject(fp,eleobj) == -1) goto werr;
ln = ln->next;
}
} else if (o->type == REDIS_SET) {
Expand All @@ -1508,13 +1571,9 @@ static int rdbSave(char *filename) {
if (!set) oom("dictGetIteraotr");
if (rdbSaveLen(fp,dictGetHashTableUsed(set)) == -1) goto werr;
while((de = dictNext(di)) != NULL) {
robj *eleobj;
robj *eleobj = dictGetEntryKey(de);

eleobj = dictGetEntryKey(de);
if (rdbSaveLen(fp,sdslen(eleobj->ptr)) == -1) goto werr;
if (sdslen(eleobj->ptr) &&
fwrite(eleobj->ptr,sdslen(eleobj->ptr),1,fp) == 0)
goto werr;
if (rdbSaveStringObject(fp,eleobj) == -1) goto werr;
}
dictReleaseIterator(di);
} else {
Expand Down Expand Up @@ -1600,7 +1659,6 @@ static uint32_t rdbLoadLen(FILE *fp, int rdbver) {
return ntohl(len);
}
}
return 0;
}

static robj *rdbLoadStringObject(FILE*fp,int rdbver) {
Expand All @@ -1613,7 +1671,7 @@ static robj *rdbLoadStringObject(FILE*fp,int rdbver) {
sdsfree(val);
return NULL;
}
return createObject(REDIS_STRING,val);
return tryObjectSharing(createObject(REDIS_STRING,val));
}

static int rdbLoad(char *filename) {
Expand All @@ -1625,7 +1683,6 @@ static int rdbLoad(char *filename) {
dict *d = server.dict[0];
char buf[1024];
int rdbver;

fp = fopen(filename,"r");
if (!fp) return REDIS_ERR;
if (fread(buf,9,1,fp) == 0) goto eoferr;
Expand Down
6 changes: 6 additions & 0 deletions redis.conf
Expand Up @@ -68,3 +68,9 @@ databases 16
# single TCP packet. Uses a bit more CPU but most of the times it is a win
# in terms of number of queries per second. Use 'yes' if unsure.
glueoutputbuf yes

# Use object sharing. Can save a lot of memory if you have many common
# string in your dataset, but performs lookups against the shared objects
# pool so it uses more CPU and can be a bit slower. Usually it's a good
# idea.
shareobjects no

0 comments on commit 10c4361

Please sign in to comment.