Skip to content

Commit

Permalink
Merge pull request #199 from jupenur/master
Browse files Browse the repository at this point in the history
Redis connection pooling
  • Loading branch information
s-ludwig committed Mar 30, 2013
2 parents 0bf7ccb + f08c1a4 commit 808b372
Showing 1 changed file with 65 additions and 30 deletions.
95 changes: 65 additions & 30 deletions source/vibe/db/redis/redis.d
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ module vibe.db.redis.redis;

public import vibe.core.net;

import vibe.core.connectionpool;
import vibe.core.log;
import vibe.stream.operations;
import std.string;
Expand Down Expand Up @@ -81,7 +82,7 @@ final class RedisReply {
}
}

final class RedisClient {
private final class RedisConnection : EventedObject {

private {
string m_host;
Expand All @@ -96,18 +97,69 @@ final class RedisClient {
m_port = port;
}

private {
ubyte[][] argsToUbyte(ARGS...)(ARGS args) {
static assert(ARGS.length % 2 == 0 && ARGS.length >= 2, "Arguments to mset must be pairs of key/value");
foreach( i, T; ARGS ){
static assert(i % 2 != 0 || is(T == string), "Keys must be strings.");
static assert(i % 2 != 1 || isArray!T, "Values must be arrays.");
}
ubyte[][] ret;
foreach( i, arg; args) ret ~= cast(ubyte[])arg;
return ret;
override void acquire() {
if (m_conn && m_conn.connected)
m_conn.acquire();
}

override void release() {
if (m_conn && m_conn.connected)
m_conn.release();
}

override bool isOwner() {
return m_conn ? m_conn.isOwner() : true;
}

T request(T=RedisReply)(string command, in ubyte[][] args...) {
if( !m_conn || !m_conn.connected ){
m_conn = connectTcp(m_host, m_port);
}
m_conn.write(format("*%d\r\n$%d\r\n%s\r\n", args.length + 1, command.length, command));
foreach( arg; args ) {
m_conn.write(format("$%d\r\n", arg.length));
m_conn.write(arg);
m_conn.write("\r\n");
}
auto reply = new RedisReply(m_conn);
static if( is(T == bool) ) {
return reply.next!(ubyte[])()[0] == '1';
} else static if ( is(T == int) || is(T == long) || is(T == size_t) || is(T == double) ) {
auto str = reply.next!string();
return parse!T(str);
} else static if ( is(T == string) ) {
return cast(string)reply.next!T();
} else return reply;
}
}

/** A redis client with connection pooling. */
final class RedisClient {

private ConnectionPool!RedisConnection m_connections;

this() { }

/** Initializes the connection pool. */
void connect(string host = "127.0.0.1", ushort port = 6379) {
m_connections = new ConnectionPool!RedisConnection({
auto connection = new RedisConnection;
connection.connect(host, port);
return connection;
});
}

private static ubyte[][] argsToUbyte(ARGS...)(ARGS args) {
static assert(ARGS.length % 2 == 0 && ARGS.length >= 2, "Arguments to mset must be pairs of key/value");
foreach( i, T; ARGS ){
static assert(i % 2 != 0 || is(T == string), "Keys must be strings.");
static assert(i % 2 != 1 || isArray!T, "Values must be arrays.");
}
ubyte[][] ret;
foreach( i, arg; args) ret ~= cast(ubyte[])arg;
return ret;
}

size_t del(string[] keys...) {
return request!size_t("DEL", cast(ubyte[][])keys);
}
Expand Down Expand Up @@ -577,23 +629,6 @@ final class RedisClient {
//TODO sync

T request(T=RedisReply)(string command, in ubyte[][] args...) {
if( !m_conn /*|| !m_conn.connected*/ ){
m_conn = connectTcp(m_host, m_port);
}
m_conn.write(format("*%d\r\n$%d\r\n%s\r\n", args.length + 1, command.length, command));
foreach( arg; args ) {
m_conn.write(format("$%d\r\n", arg.length));
m_conn.write(arg);
m_conn.write("\r\n");
}
auto reply = new RedisReply(m_conn);
static if( is(T == bool) ) {
return reply.next!(ubyte[])()[0] == '1';
} else static if ( is(T == int) || is(T == long) || is(T == size_t) || is(T == double) ) {
auto str = reply.next!string();
return parse!T(str);
} else static if ( is(T == string) ) {
return cast(string)reply.next!T();
} else return reply;
return m_connections.lockConnection().request!T(command, args);
}
}
}

0 comments on commit 808b372

Please sign in to comment.