Skip to content

Commit

Permalink
Merge branch 'custom-db'
Browse files Browse the repository at this point in the history
  • Loading branch information
nicolasff committed Jun 23, 2012
2 parents 66e4e19 + 7ad2c87 commit c26e777
Show file tree
Hide file tree
Showing 8 changed files with 67 additions and 15 deletions.
3 changes: 1 addition & 2 deletions README.markdown
Expand Up @@ -42,6 +42,7 @@ curl -d "GET/hello" http://127.0.0.1:7379/
* Optional daemonize.
* Default root object: Add `"default_root": "/GET/index.html"` in webdis.json to substitute the request to `/` with a Redis request.
* HTTP request limit with `http_max_request_size` (in bytes, set to 128MB by default).
* Database selection in the URL, using e.g. `/7/GET/key` to run the command on DB 7.

# Ideas, TODO...
* Add better support for PUT, DELETE, HEAD, OPTIONS? How? For which commands?
Expand All @@ -51,8 +52,6 @@ curl -d "GET/hello" http://127.0.0.1:7379/
* Enrich config file:
* Provide timeout (maybe for some commands only?). What should the response be? 504 Gateway Timeout? 503 Service Unavailable?
* Multi-server support, using consistent hashing.
* Database selection in the URL? e.g. `/7/GET/key` to run the command on DB 7.
* This might not be very useful, databases will be deprecated from Redis at some point.
* SSL?
* Not sure if this is such a good idea.
* SPDY?
Expand Down
40 changes: 35 additions & 5 deletions cmd.c
Expand Up @@ -136,7 +136,7 @@ cmd_run(struct worker *w, struct http_client *client,

char *qmark = memchr(uri, '?', uri_len);
char *slash;
const char *p;
const char *p, *cmd_name = uri;
int cmd_len;
int param_count = 0, cur_param = 1;

Expand All @@ -160,6 +160,7 @@ cmd_run(struct worker *w, struct http_client *client,

cmd = cmd_new(param_count);
cmd->fd = client->fd;
cmd->database = w->s->cfg->database;

/* get output formatting function */
uri_len = cmd_select_format(client, cmd, uri, uri_len, &f_format);
Expand All @@ -170,14 +171,40 @@ cmd_run(struct worker *w, struct http_client *client,
/* check if we only have one command or more. */
slash = memchr(uri, '/', uri_len);
if(slash) {
cmd_len = slash - uri;

/* detect DB number by checking if first arg is only numbers */
int has_db = 1;
int db_num = 0;
for(p = uri; p < slash; ++p) {
if(*p < '0' || *p > '9') {
has_db = 0;
break;
}
db_num = db_num * 10 + (*p - '0');
}

/* shift to next arg if a db was set up */
if(has_db) {
char *next;
cmd->database = db_num;
cmd->count--; /* overcounted earlier */
cmd_name = slash + 1;

if((next = memchr(cmd_name, '/', uri_len - (slash - uri)))) {
cmd_len = next - cmd_name;
} else {
cmd_len = uri_len - (slash - uri + 1);
}
} else {
cmd_len = slash - uri;
}
} else {
cmd_len = uri_len;
}

/* there is always a first parameter, it's the command name */
cmd->argv[0] = malloc(cmd_len);
memcpy(cmd->argv[0], uri, cmd_len);
memcpy(cmd->argv[0], cmd_name, cmd_len);
cmd->argv_len[0] = cmd_len;

/* check that the client is able to run this command */
Expand All @@ -188,11 +215,14 @@ cmd_run(struct worker *w, struct http_client *client,

if(cmd_is_subscribe(cmd)) {
/* create a new connection to Redis */
cmd->ac = (redisAsyncContext*)pool_connect(w->pool, 0);
cmd->ac = (redisAsyncContext*)pool_connect(w->pool, cmd->database, 0);

/* register with the client, used upon disconnection */
client->pub_sub = cmd;
cmd->pub_sub_client = client;
} else if(cmd->database != w->s->cfg->database) {
/* create a new connection to Redis for custom DBs */
cmd->ac = (redisAsyncContext*)pool_connect(w->pool, cmd->database, 0);
} else {
/* get a connection from the pool */
cmd->ac = (redisAsyncContext*)pool_get_context(w->pool);
Expand All @@ -208,7 +238,7 @@ cmd_run(struct worker *w, struct http_client *client,
(const char **)cmd->argv, cmd->argv_len);
return CMD_SENT;
}
p = slash + 1;
p = cmd_name + cmd_len + 1;
while(p < uri + uri_len) {

const char *arg = p;
Expand Down
1 change: 1 addition & 0 deletions cmd.h
Expand Up @@ -41,6 +41,7 @@ struct cmd {
int started_responding;
int is_websocket;
int http_version;
int database;

struct http_client *pub_sub_client;
redisAsyncContext *ac;
Expand Down
10 changes: 5 additions & 5 deletions pool.c
Expand Up @@ -57,7 +57,7 @@ pool_can_connect(int fd, short event, void *ptr) {

free(pr);

pool_connect(p, 1);
pool_connect(p, p->cfg->database, 1);
}
static void
pool_schedule_reconnect(struct pool *p) {
Expand Down Expand Up @@ -104,7 +104,7 @@ pool_on_disconnect(const redisAsyncContext *ac, int status) {
* Create new connection.
*/
redisAsyncContext *
pool_connect(struct pool *p, int attach) {
pool_connect(struct pool *p, int db_num, int attach) {

struct redisAsyncContext *ac;
if(p->cfg->redis_host[0] == '/') { /* unix socket */
Expand Down Expand Up @@ -134,11 +134,11 @@ pool_connect(struct pool *p, int attach) {
redisAsyncSetConnectCallback(ac, pool_on_connect);
redisAsyncSetDisconnectCallback(ac, pool_on_disconnect);

if (p->cfg->redis_auth) { /* authenticate. */
if(p->cfg->redis_auth) { /* authenticate. */
redisAsyncCommand(ac, NULL, NULL, "AUTH %s", p->cfg->redis_auth);
}
if (p->cfg->database) { /* change database. */
redisAsyncCommand(ac, NULL, NULL, "SELECT %d", p->cfg->database);
if(db_num) { /* change database. */
redisAsyncCommand(ac, NULL, NULL, "SELECT %d", db_num);
}
return ac;
}
Expand Down
2 changes: 1 addition & 1 deletion pool.h
Expand Up @@ -22,7 +22,7 @@ struct pool *
pool_new(struct worker *w, int count);

redisAsyncContext *
pool_connect(struct pool *p, int attach);
pool_connect(struct pool *p, int db_num, int attach);

const redisAsyncContext *
pool_get_context(struct pool *p);
Expand Down
22 changes: 22 additions & 0 deletions tests/basic.py
Expand Up @@ -270,5 +270,27 @@ def test_etag_fail(self):
f = self.query('GET/hello.txt', None, {'If-None-Match': '"'+ h +'"'})
self.assertTrue(f.read() == 'world')

class TestDbSwitch(TestWebdis):
def test_db(self):
"Test database change"
self.query('0/SET/key/val0')
self.query('1/SET/key/val1')
f = self.query('0/GET/key.txt')
self.assertTrue(f.read() == "val0")
f = self.query('1/GET/key.txt')
self.assertTrue(f.read() == "val1")
f = self.query('GET/key.txt')
self.assertTrue(f.read() == "val0")

def test_max_db(self):
"test large number for the DB"

try:
f = self.query('4096/GET/hello.txt')
except urllib2.HTTPError as e:
self.assertTrue(e.code == 400)
return
self.assertTrue(False) # we should have received a 400.

if __name__ == '__main__':
unittest.main()
2 changes: 1 addition & 1 deletion websocket.c
Expand Up @@ -200,7 +200,7 @@ ws_execute(struct http_client *c, const char *frame, size_t frame_len) {
} else if (cmd_is_subscribe(cmd)) {
/* New subscribe command; make new Redis context
* for this client */
cmd->ac = pool_connect(c->w->pool, 0);
cmd->ac = pool_connect(c->w->pool, cmd->database, 0);
c->pub_sub = cmd;
cmd->pub_sub_client = c;
} else {
Expand Down
2 changes: 1 addition & 1 deletion worker.c
Expand Up @@ -130,7 +130,7 @@ worker_pool_connect(struct worker *w) {
int i;
/* create connections */
for(i = 0; i < w->pool->count; ++i) {
pool_connect(w->pool, 1);
pool_connect(w->pool, w->s->cfg->database, 1);
}

}
Expand Down

0 comments on commit c26e777

Please sign in to comment.