Skip to content

Commit

Permalink
Experimental: new keyspace and expire algorithm.
Browse files Browse the repository at this point in the history
This is an alpha quality implementation of a new keyspace representation
and a new expire algorithm for Redis.

This work is described here:

    https://gist.github.com/antirez/b2eb293819666ee104c7fcad71986eb7
  • Loading branch information
antirez committed Jun 18, 2019
1 parent fd0ee46 commit e9bb30f
Show file tree
Hide file tree
Showing 34 changed files with 998 additions and 758 deletions.
69 changes: 36 additions & 33 deletions src/aof.c
Original file line number Diff line number Diff line change
Expand Up @@ -923,7 +923,7 @@ int rioWriteBulkObject(rio *r, robj *obj) {

/* Emit the commands needed to rebuild a list object.
* The function returns 0 on error, 1 on success. */
int rewriteListObject(rio *r, robj *key, robj *o) {
int rewriteListObject(rio *r, rkey *key, robj *o) {
long long count = 0, items = listTypeLength(o);

if (o->encoding == OBJ_ENCODING_QUICKLIST) {
Expand All @@ -937,7 +937,7 @@ int rewriteListObject(rio *r, robj *key, robj *o) {
AOF_REWRITE_ITEMS_PER_CMD : items;
if (rioWriteBulkCount(r,'*',2+cmd_items) == 0) return 0;
if (rioWriteBulkString(r,"RPUSH",5) == 0) return 0;
if (rioWriteBulkObject(r,key) == 0) return 0;
if (rioWriteBulkString(r,key->name,key->len) == 0) return 0;
}

if (entry.value) {
Expand All @@ -957,7 +957,7 @@ int rewriteListObject(rio *r, robj *key, robj *o) {

/* Emit the commands needed to rebuild a set object.
* The function returns 0 on error, 1 on success. */
int rewriteSetObject(rio *r, robj *key, robj *o) {
int rewriteSetObject(rio *r, rkey *key, robj *o) {
long long count = 0, items = setTypeSize(o);

if (o->encoding == OBJ_ENCODING_INTSET) {
Expand All @@ -971,7 +971,7 @@ int rewriteSetObject(rio *r, robj *key, robj *o) {

if (rioWriteBulkCount(r,'*',2+cmd_items) == 0) return 0;
if (rioWriteBulkString(r,"SADD",4) == 0) return 0;
if (rioWriteBulkObject(r,key) == 0) return 0;
if (rioWriteBulkString(r,key->name,key->len) == 0) return 0;
}
if (rioWriteBulkLongLong(r,llval) == 0) return 0;
if (++count == AOF_REWRITE_ITEMS_PER_CMD) count = 0;
Expand All @@ -989,7 +989,7 @@ int rewriteSetObject(rio *r, robj *key, robj *o) {

if (rioWriteBulkCount(r,'*',2+cmd_items) == 0) return 0;
if (rioWriteBulkString(r,"SADD",4) == 0) return 0;
if (rioWriteBulkObject(r,key) == 0) return 0;
if (rioWriteBulkString(r,key->name,key->len) == 0) return 0;
}
if (rioWriteBulkString(r,ele,sdslen(ele)) == 0) return 0;
if (++count == AOF_REWRITE_ITEMS_PER_CMD) count = 0;
Expand All @@ -1004,7 +1004,7 @@ int rewriteSetObject(rio *r, robj *key, robj *o) {

/* Emit the commands needed to rebuild a sorted set object.
* The function returns 0 on error, 1 on success. */
int rewriteSortedSetObject(rio *r, robj *key, robj *o) {
int rewriteSortedSetObject(rio *r, rkey *key, robj *o) {
long long count = 0, items = zsetLength(o);

if (o->encoding == OBJ_ENCODING_ZIPLIST) {
Expand All @@ -1030,7 +1030,7 @@ int rewriteSortedSetObject(rio *r, robj *key, robj *o) {

if (rioWriteBulkCount(r,'*',2+cmd_items*2) == 0) return 0;
if (rioWriteBulkString(r,"ZADD",4) == 0) return 0;
if (rioWriteBulkObject(r,key) == 0) return 0;
if (rioWriteBulkString(r,key->name,key->len) == 0) return 0;
}
if (rioWriteBulkDouble(r,score) == 0) return 0;
if (vstr != NULL) {
Expand All @@ -1057,7 +1057,7 @@ int rewriteSortedSetObject(rio *r, robj *key, robj *o) {

if (rioWriteBulkCount(r,'*',2+cmd_items*2) == 0) return 0;
if (rioWriteBulkString(r,"ZADD",4) == 0) return 0;
if (rioWriteBulkObject(r,key) == 0) return 0;
if (rioWriteBulkString(r,key->name,key->len) == 0) return 0;
}
if (rioWriteBulkDouble(r,*score) == 0) return 0;
if (rioWriteBulkString(r,ele,sdslen(ele)) == 0) return 0;
Expand Down Expand Up @@ -1099,7 +1099,7 @@ static int rioWriteHashIteratorCursor(rio *r, hashTypeIterator *hi, int what) {

/* Emit the commands needed to rebuild a hash object.
* The function returns 0 on error, 1 on success. */
int rewriteHashObject(rio *r, robj *key, robj *o) {
int rewriteHashObject(rio *r, rkey *key, robj *o) {
hashTypeIterator *hi;
long long count = 0, items = hashTypeLength(o);

Expand All @@ -1111,7 +1111,7 @@ int rewriteHashObject(rio *r, robj *key, robj *o) {

if (rioWriteBulkCount(r,'*',2+cmd_items*2) == 0) return 0;
if (rioWriteBulkString(r,"HMSET",5) == 0) return 0;
if (rioWriteBulkObject(r,key) == 0) return 0;
if (rioWriteBulkString(r,key->name,key->len) == 0) return 0;
}

if (rioWriteHashIteratorCursor(r, hi, OBJ_HASH_KEY) == 0) return 0;
Expand Down Expand Up @@ -1140,14 +1140,14 @@ int rioWriteBulkStreamID(rio *r,streamID *id) {
* add the message described by 'nack' having the id 'rawid', into the pending
* list of the specified consumer. All this in the context of the specified
* key and group. */
int rioWriteStreamPendingEntry(rio *r, robj *key, const char *groupname, size_t groupname_len, streamConsumer *consumer, unsigned char *rawid, streamNACK *nack) {
int rioWriteStreamPendingEntry(rio *r, rkey *key, const char *groupname, size_t groupname_len, streamConsumer *consumer, unsigned char *rawid, streamNACK *nack) {
/* XCLAIM <key> <group> <consumer> 0 <id> TIME <milliseconds-unix-time>
RETRYCOUNT <count> JUSTID FORCE. */
streamID id;
streamDecodeID(rawid,&id);
if (rioWriteBulkCount(r,'*',12) == 0) return 0;
if (rioWriteBulkString(r,"XCLAIM",6) == 0) return 0;
if (rioWriteBulkObject(r,key) == 0) return 0;
if (rioWriteBulkString(r,key->name,key->len) == 0) return 0;
if (rioWriteBulkString(r,groupname,groupname_len) == 0) return 0;
if (rioWriteBulkString(r,consumer->name,sdslen(consumer->name)) == 0) return 0;
if (rioWriteBulkString(r,"0",1) == 0) return 0;
Expand All @@ -1163,7 +1163,7 @@ int rioWriteStreamPendingEntry(rio *r, robj *key, const char *groupname, size_t

/* Emit the commands needed to rebuild a stream object.
* The function returns 0 on error, 1 on success. */
int rewriteStreamObject(rio *r, robj *key, robj *o) {
int rewriteStreamObject(rio *r, rkey *key, robj *o) {
stream *s = o->ptr;
streamIterator si;
streamIteratorStart(&si,s,NULL,NULL,0);
Expand All @@ -1179,7 +1179,7 @@ int rewriteStreamObject(rio *r, robj *key, robj *o) {
/* Emit the XADD <key> <id> ...fields... command. */
if (rioWriteBulkCount(r,'*',3+numfields*2) == 0) return 0;
if (rioWriteBulkString(r,"XADD",4) == 0) return 0;
if (rioWriteBulkObject(r,key) == 0) return 0;
if (rioWriteBulkString(r,key->name,key->len) == 0) return 0;
if (rioWriteBulkStreamID(r,&id) == 0) return 0;
while(numfields--) {
unsigned char *field, *value;
Expand All @@ -1195,7 +1195,7 @@ int rewriteStreamObject(rio *r, robj *key, robj *o) {
* for the Stream type. */
if (rioWriteBulkCount(r,'*',7) == 0) return 0;
if (rioWriteBulkString(r,"XADD",4) == 0) return 0;
if (rioWriteBulkObject(r,key) == 0) return 0;
if (rioWriteBulkString(r,key->name,key->len) == 0) return 0;
if (rioWriteBulkString(r,"MAXLEN",6) == 0) return 0;
if (rioWriteBulkString(r,"0",1) == 0) return 0;
if (rioWriteBulkStreamID(r,&s->last_id) == 0) return 0;
Expand All @@ -1207,7 +1207,7 @@ int rewriteStreamObject(rio *r, robj *key, robj *o) {
* in case of XDEL lastid. */
if (rioWriteBulkCount(r,'*',3) == 0) return 0;
if (rioWriteBulkString(r,"XSETID",6) == 0) return 0;
if (rioWriteBulkObject(r,key) == 0) return 0;
if (rioWriteBulkString(r,key->name,key->len) == 0) return 0;
if (rioWriteBulkStreamID(r,&s->last_id) == 0) return 0;


Expand All @@ -1222,7 +1222,7 @@ int rewriteStreamObject(rio *r, robj *key, robj *o) {
if (rioWriteBulkCount(r,'*',5) == 0) return 0;
if (rioWriteBulkString(r,"XGROUP",6) == 0) return 0;
if (rioWriteBulkString(r,"CREATE",6) == 0) return 0;
if (rioWriteBulkObject(r,key) == 0) return 0;
if (rioWriteBulkString(r,key->name,key->len) == 0) return 0;
if (rioWriteBulkString(r,(char*)ri.key,ri.key_len) == 0) return 0;
if (rioWriteBulkStreamID(r,&group->last_id) == 0) return 0;

Expand Down Expand Up @@ -1262,12 +1262,14 @@ int rewriteStreamObject(rio *r, robj *key, robj *o) {
/* Call the module type callback in order to rewrite a data type
* that is exported by a module and is not handled by Redis itself.
* The function returns 0 on error, 1 on success. */
int rewriteModuleObject(rio *r, robj *key, robj *o) {
int rewriteModuleObject(rio *r, rkey *key, robj *o) {
RedisModuleIO io;
moduleValue *mv = o->ptr;
moduleType *mt = mv->type;
moduleInitIOContext(io,mt,r,key);
mt->aof_rewrite(&io,key,mv->value);
robj *keyname = createStringObject(key->name,key->len);
moduleInitIOContext(io,mt,r,keyname);
mt->aof_rewrite(&io,keyname,mv->value);
decrRefCount(keyname);
if (io.ctx) {
moduleFreeContext(io.ctx);
zfree(io.ctx);
Expand Down Expand Up @@ -1309,44 +1311,45 @@ int rewriteAppendOnlyFileRio(rio *aof) {

/* Iterate this DB writing every entry */
while((de = dictNext(di)) != NULL) {
sds keystr;
robj key, *o;
rkey *key;
robj *o;
long long expiretime;

keystr = dictGetKey(de);
key = dictGetKey(de);
o = dictGetVal(de);
initStaticStringObject(key,keystr);

expiretime = getExpire(db,&key);
expiretime = getExpire(key);

/* Save the key and associated value */
if (o->type == OBJ_STRING) {
/* Emit a SET command */
char cmd[]="*3\r\n$3\r\nSET\r\n";
if (rioWrite(aof,cmd,sizeof(cmd)-1) == 0) goto werr;
/* Key and value */
if (rioWriteBulkObject(aof,&key) == 0) goto werr;
if (rioWriteBulkString(aof,key->name,key->len) == 0)
goto werr;
if (rioWriteBulkObject(aof,o) == 0) goto werr;
} else if (o->type == OBJ_LIST) {
if (rewriteListObject(aof,&key,o) == 0) goto werr;
if (rewriteListObject(aof,key,o) == 0) goto werr;
} else if (o->type == OBJ_SET) {
if (rewriteSetObject(aof,&key,o) == 0) goto werr;
if (rewriteSetObject(aof,key,o) == 0) goto werr;
} else if (o->type == OBJ_ZSET) {
if (rewriteSortedSetObject(aof,&key,o) == 0) goto werr;
if (rewriteSortedSetObject(aof,key,o) == 0) goto werr;
} else if (o->type == OBJ_HASH) {
if (rewriteHashObject(aof,&key,o) == 0) goto werr;
if (rewriteHashObject(aof,key,o) == 0) goto werr;
} else if (o->type == OBJ_STREAM) {
if (rewriteStreamObject(aof,&key,o) == 0) goto werr;
if (rewriteStreamObject(aof,key,o) == 0) goto werr;
} else if (o->type == OBJ_MODULE) {
if (rewriteModuleObject(aof,&key,o) == 0) goto werr;
if (rewriteModuleObject(aof,key,o) == 0) goto werr;
} else {
serverPanic("Unknown object type");
}
/* Save the expire time */
if (expiretime != -1) {
char cmd[]="*3\r\n$9\r\nPEXPIREAT\r\n";
if (rioWrite(aof,cmd,sizeof(cmd)-1) == 0) goto werr;
if (rioWriteBulkObject(aof,&key) == 0) goto werr;
if (rioWriteBulkString(aof,key->name,key->len) == 0)
goto werr;
if (rioWriteBulkLongLong(aof,expiretime) == 0) goto werr;
}
/* Read some diff from the parent process from time to time. */
Expand Down
2 changes: 1 addition & 1 deletion src/bio.c
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ struct bio_job {

void *bioProcessBackgroundJobs(void *arg);
void lazyfreeFreeObjectFromBioThread(robj *o);
void lazyfreeFreeDatabaseFromBioThread(dict *ht1, dict *ht2);
void lazyfreeFreeDatabaseFromBioThread(dict *ht, rax *tree);
void lazyfreeFreeSlotsMapFromBioThread(zskiplist *sl);

/* Make sure we have enough stack to perform all the things we do in the
Expand Down
12 changes: 6 additions & 6 deletions src/bitops.c
Original file line number Diff line number Diff line change
Expand Up @@ -477,7 +477,7 @@ int getBitfieldTypeFromArgument(client *c, robj *o, int *sign, int *bits) {
* an error is sent to the client. */
robj *lookupStringForBitCommand(client *c, size_t maxbit) {
size_t byte = maxbit >> 3;
robj *o = lookupKeyWrite(c->db,c->argv[1]);
robj *o = lookupKeyWrite(c->db,c->argv[1],NULL);

if (o == NULL) {
o = createObject(OBJ_STRING,sdsnewlen(NULL, byte+1));
Expand Down Expand Up @@ -571,7 +571,7 @@ void getbitCommand(client *c) {
if (getBitOffsetFromArgument(c,c->argv[2],&bitoffset,0,0) != C_OK)
return;

if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.czero)) == NULL ||
if ((o = lookupKeyReadOrReply(c,c->argv[1],NULL,shared.czero)) == NULL ||
checkType(c,o,OBJ_STRING)) return;

byte = bitoffset >> 3;
Expand Down Expand Up @@ -625,7 +625,7 @@ void bitopCommand(client *c) {
len = zmalloc(sizeof(long) * numkeys);
objects = zmalloc(sizeof(robj*) * numkeys);
for (j = 0; j < numkeys; j++) {
o = lookupKeyRead(c->db,c->argv[j+3]);
o = lookupKeyRead(c->db,c->argv[j+3],NULL);
/* Handle non-existing keys as empty strings. */
if (o == NULL) {
objects[j] = NULL;
Expand Down Expand Up @@ -773,7 +773,7 @@ void bitcountCommand(client *c) {
char llbuf[LONG_STR_SIZE];

/* Lookup, check for type, and return 0 for non existing keys. */
if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.czero)) == NULL ||
if ((o = lookupKeyReadOrReply(c,c->argv[1],NULL,shared.czero)) == NULL ||
checkType(c,o,OBJ_STRING)) return;
p = getObjectReadOnlyString(o,&strlen,llbuf);

Expand Down Expand Up @@ -834,7 +834,7 @@ void bitposCommand(client *c) {
/* If the key does not exist, from our point of view it is an infinite
* array of 0 bits. If the user is looking for the fist clear bit return 0,
* If the user is looking for the first set bit, return -1. */
if ((o = lookupKeyRead(c->db,c->argv[1])) == NULL) {
if ((o = lookupKeyRead(c->db,c->argv[1],NULL)) == NULL) {
addReplyLongLong(c, bit ? -1 : 0);
return;
}
Expand Down Expand Up @@ -993,7 +993,7 @@ void bitfieldCommand(client *c) {
if (readonly) {
/* Lookup for read is ok if key doesn't exit, but errors
* if it's not a string. */
o = lookupKeyRead(c->db,c->argv[1]);
o = lookupKeyRead(c->db,c->argv[1],NULL);
if (o != NULL && checkType(c,o,OBJ_STRING)) {
zfree(ops);
return;
Expand Down
2 changes: 1 addition & 1 deletion src/blocked.c
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,7 @@ void handleClientsBlockedOnKeys(void) {
dictDelete(rl->db->ready_keys,rl->key);

/* Serve clients blocked on list key. */
robj *o = lookupKeyWrite(rl->db,rl->key);
robj *o = lookupKeyWrite(rl->db,rl->key,NULL);
if (o != NULL && o->type == OBJ_LIST) {
dictEntry *de;

Expand Down

0 comments on commit e9bb30f

Please sign in to comment.