Skip to content

Commit

Permalink
Generic interface for executing commands. This can be used for pipeli…
Browse files Browse the repository at this point in the history
…ng and MULTI/EXEC transactions.

Examples can be found in 'tests/test_generic.cpp'.
  • Loading branch information
mrpi committed Nov 14, 2010
1 parent 2e0789e commit fa63f7f
Show file tree
Hide file tree
Showing 3 changed files with 239 additions and 5 deletions.
3 changes: 2 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ CLIENTOBJS = anet.o
LIBNAME = libredisclient.a

TESTAPP = test_client
TESTAPPOBJS = test_client.o test_lists.o test_sets.o test_zsets.o test_hashes.o test_cluster.o test_distributed_strings.o test_distributed_ints.o test_distributed_mutexes.o benchmark.o functions.o
TESTAPPOBJS = test_client.o test_lists.o test_sets.o test_zsets.o test_hashes.o test_cluster.o test_distributed_strings.o test_distributed_ints.o test_distributed_mutexes.o test_generic.o benchmark.o functions.o
TESTAPPLIBS = $(LIBNAME) -lstdc++ -lpthread -lboost_thread-mt

all: $(LIBNAME) $(TESTAPP)
Expand Down Expand Up @@ -51,4 +51,5 @@ test_cluster.o: redisclient.h tests/test_cluster.cpp tests/functions
test_distributed_strings.o: redisclient.h tests/test_distributed_strings.cpp tests/functions.h
test_distributed_ints.o: redisclient.h tests/test_distributed_ints.cpp tests/functions.h
test_distributed_mutexes.o: redisclient.h tests/test_distributed_mutexes.cpp tests/functions.h
test_generic.o: redisclient.h tests/test_generic.cpp
benchmark.o: redisclient.h tests/benchmark.cpp tests/functions.h
236 changes: 233 additions & 3 deletions redisclient.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,20 +50,22 @@
#include <boost/thread/condition_variable.hpp>
#include <boost/random.hpp>
#include <boost/cstdint.hpp>
#include <boost/date_time/posix_time/ptime.hpp>
#include <boost/date_time/posix_time/posix_time_types.hpp>
#include <boost/optional.hpp>
#include <boost/variant.hpp>

#include "anet.h"

#define REDIS_LBR "\r\n"
#define REDIS_STATUS_REPLY_OK "OK"
#define REDIS_PREFIX_STATUS_REPLY_ERROR "-ERR "
#define REDIS_PREFIX_STATUS_REPLY_ERR_C '-'
#define REDIS_PREFIX_STATUS_REPLY_VALUE '+'
#define REDIS_PREFIX_SINGLE_BULK_REPLY '$'
#define REDIS_PREFIX_MULTI_BULK_REPLY '*'
#define REDIS_PREFIX_INT_REPLY ':'
#define REDIS_WHITESPACE " \f\n\r\t\v"
#include <boost/date_time/posix_time/ptime.hpp>
#include <boost/date_time/posix_time/posix_time_types.hpp>
#include <boost/optional.hpp>

template<class Object >
struct make;
Expand All @@ -72,6 +74,16 @@ namespace redis
template<typename CONSISTENT_HASHER>
class base_client;

enum reply_t
{
no_reply,
status_code_reply,
error_reply,
int_reply,
bulk_reply,
multi_bulk_reply
};

struct connection_data
{
connection_data(const std::string & host = "localhost", uint16_t port = 6379, int dbindex = 0)
Expand Down Expand Up @@ -165,6 +177,16 @@ namespace redis
public:
value_error(const std::string & err) : redis_error(err) {};
};

struct key
{
explicit key(const std::string & name)
: name(name)
{
}

std::string name;
};

class makecmd
{
Expand All @@ -175,6 +197,23 @@ namespace redis
//if (!finalize)
// buffer_ << " ";
}

const std::string & key_name() const
{
if(!key_name_)
throw std::runtime_error("No key defined!");
return *key_name_;
}

inline makecmd & operator<<(const key & datum)
{
if(key_name_)
throw std::runtime_error("You could not add a second key");
else
key_name_ = datum.name;
append(datum.name);
return *this;
}

inline makecmd & operator<<(const std::string & datum)
{
Expand Down Expand Up @@ -231,8 +270,78 @@ namespace redis
}

std::vector<std::string> lines_;
boost::optional<std::string> key_name_;
};

template<typename CONSISTENT_HASHER>
class base_client;

typedef boost::variant< std::string, int, std::vector<std::string> > reply_val_t;
typedef std::pair<reply_t, reply_val_t> reply_data_t;

class command
{
private:
std::string request_;
std::string hash_key_;
reply_data_t reply_;

void check_reply_t(reply_t reply_type) const
{
if( reply_.first != reply_type )
throw std::runtime_error("invalid reply type");
}

void set_reply(const reply_data_t & reply)
{
reply_ = reply;
}

template<typename CONSISTENT_HASHER>
friend class base_client;

public:
command( const makecmd & cmd_input )
: request_(cmd_input), hash_key_(cmd_input.key_name())
{
reply_.first = no_reply;
}

reply_t reply_type() const
{
return reply_.first;
}

const std::string & get_status_code_reply() const
{
check_reply_t(status_code_reply);
return boost::get<std::string>(reply_.second);
}

const std::string & get_error_reply() const
{
check_reply_t(error_reply);
return boost::get<std::string>(reply_.second);
}

int get_int_reply() const
{
check_reply_t(int_reply);
return boost::get<int>(reply_.second);
}

const std::string & get_bulk_reply() const
{
check_reply_t(bulk_reply);
return boost::get<std::string>(reply_.second);
}

const std::vector<std::string> & get_multi_bulk_reply() const
{
check_reply_t(multi_bulk_reply);
return boost::get< std::vector<std::string> >(reply_.second);
}
};

struct server_info
{
Expand Down Expand Up @@ -530,6 +639,71 @@ namespace redis
};

public:
void exec(command & cmd)
{
int socket = get_socket(cmd.hash_key_);
send_( socket, cmd.request_ );
cmd.set_reply( recv_generic_reply_(socket) );
}

void exec(std::vector<command> & commands)
{
std::map< int, std::string > socket_commands;

for(size_t i=0; i < commands.size(); i++)
{
int socket = get_socket( commands[i].hash_key_ );
socket_commands[socket] += commands[i].request_;
}

typedef std::pair< int, std::string > sock_pair;
BOOST_FOREACH(const sock_pair & sp, socket_commands)
{
send_(sp.first, sp.second);
}

for(size_t i=0; i < commands.size(); i++)
{
int socket = get_socket( commands[i].hash_key_ );
commands[i].set_reply( recv_generic_reply_(socket) );
}
}

void exec_transaction(std::vector<command> & commands)
{
int cmd_socket = -1;
std::string cmd_str = makecmd("MULTI");

for(size_t i=0; i < commands.size(); i++)
{
int socket = get_socket( commands[i].hash_key_ );
if( cmd_socket == -1 )
cmd_socket = socket;
else if( cmd_socket != socket )
throw std::runtime_error("calls in transaction map to different server!");

cmd_str += commands[i].request_;
}

cmd_str += makecmd("EXEC");

send_(cmd_socket, cmd_str);
recv_ok_reply_(cmd_socket); // MULTI => +OK
for(size_t i=0; i < commands.size(); i++)
{
std::string resp = recv_single_line_reply_(cmd_socket);
if( resp != "QUEUED" )
throw std::runtime_error("invalid state (expected 'QUEUED' in transaction but got '" + resp + "')");
}
if( read_line(cmd_socket)[0] != REDIS_PREFIX_MULTI_BULK_REPLY )
throw std::runtime_error("EXEC does not return a multi bulk reply");

for(size_t i=0; i < commands.size(); i++)
{
commands[i].set_reply( recv_generic_reply_(cmd_socket) );
}
}

void mget(const string_vector & keys, string_vector & out)
{
out = string_vector( keys.size() );
Expand Down Expand Up @@ -2264,6 +2438,62 @@ namespace redis
delete [] buffer;
return str;
}

reply_t next_reply_type(int socket)
{
char reply_prefix[1];
ssize_t bytes_received = recv(socket, reply_prefix, 1, MSG_PEEK);

if (bytes_received == 0)
throw connection_error("connection was closed");

switch( reply_prefix[0] )
{
case REDIS_PREFIX_STATUS_REPLY_VALUE:
return status_code_reply;
case REDIS_PREFIX_STATUS_REPLY_ERR_C:
return error_reply;
case REDIS_PREFIX_INT_REPLY:
return int_reply;
case REDIS_PREFIX_SINGLE_BULK_REPLY:
return bulk_reply;
case REDIS_PREFIX_MULTI_BULK_REPLY:
return multi_bulk_reply;
}

throw std::runtime_error("invalid/unknown rely type from redis server");
}

reply_data_t recv_generic_reply_(int socket)
{
reply_data_t res;
res.first = next_reply_type(socket);
switch( res.first )
{
case status_code_reply:
res.second = read_line(socket).substr(1);
break;
case error_reply:
res.second = read_line(socket).substr(strlen(REDIS_PREFIX_STATUS_REPLY_ERROR));
break;
case int_reply:
res.second = recv_int_reply_(socket);
break;
case bulk_reply:
res.second = recv_bulk_reply_(socket);
break;
case multi_bulk_reply:
{
string_vector v;
recv_multi_bulk_reply_( socket, v );
res.second = v;
break;
}
case no_reply:
assert(false);
}
return res;
}

// Reads a single line of character data from the given blocking socket.
// Returns the line that was read, not including EOL delimiter(s). Both LF
Expand Down
5 changes: 4 additions & 1 deletion test_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ void test_lists(redis::client & c);
void test_sets(redis::client & c);
void test_zsets(redis::client & c);
void test_hashes(redis::client & c);
void test_generic(redis::client & c);

// High level API
void test_distributed_strings(redis::client & c);
Expand Down Expand Up @@ -322,7 +323,9 @@ int main()
test_distributed_ints(c);
//test_distributed_mutexes(c);

benchmark(c, 10000);
//benchmark(c, 10000);

test_generic(c);

test("save");
{
Expand Down

0 comments on commit fa63f7f

Please sign in to comment.