diff --git a/.cproject b/.cproject
index 6788210..0a61ec6 100644
--- a/.cproject
+++ b/.cproject
@@ -30,15 +30,15 @@
-
@@ -427,15 +427,15 @@
-
@@ -825,15 +825,15 @@
-
@@ -1226,15 +1226,15 @@
-
diff --git a/Makefile b/Makefile
index 010b6c1..a901d99 100644
--- a/Makefile
+++ b/Makefile
@@ -4,7 +4,7 @@ REDIS_HOME=$(CURDIR)
libredis: src/alloc.o src/batch.o src/connection.o src/ketama.o src/md5.o src/module.o src/parser.o src/buffer.o
mkdir -p lib
- gcc -shared -o"lib/libredis.so" ./src/alloc.o ./src/batch.o ./src/buffer.o ./src/connection.o ./src/ketama.o ./src/md5.o ./src/module.o ./src/parser.o -levent -lm -lrt
+ gcc -shared -o"lib/libredis.so" ./src/alloc.o ./src/batch.o ./src/buffer.o ./src/connection.o ./src/ketama.o ./src/md5.o ./src/module.o ./src/parser.o -lm -lrt
php_ext:
rm -rf /tmp/libredis_php
diff --git a/redis.py b/redis.py
index 308e334..1dfb6a1 100644
--- a/redis.py
+++ b/redis.py
@@ -5,29 +5,36 @@
import sys
if '--debug' in sys.argv:
libredis = cdll.LoadLibrary("Debug/libredis.so")
-else:
+elif '--release' in sys.argv:
libredis = cdll.LoadLibrary("Release/libredis.so")
+else:
+ libredis = cdll.LoadLibrary("lib/libredis.so")
-libredis.Module_init()
+libredis.Module_init(0)
atexit.register(libredis.Module_free)
+DEFAULT_TIMEOUT_MS = 3000
+
class Connection(object):
def __init__(self, addr):
self._connection = libredis.Connection_new(addr)
- def get(self, key):
+ def get(self, key, timeout_ms = DEFAULT_TIMEOUT_MS):
batch = Batch()
- batch.writef("GET %s\r\n", key)
- batch.add_command()
- self.execute(batch)
- reply = batch.pop_reply()
- return reply.value
+ batch.write("GET %s\r\n" % key, 1)
+ return self._simple_exec(batch, timeout_ms)
- def execute(self, batch, dispatch = True):
- libredis.Connection_execute(self._connection, batch._batch)
- if dispatch:
- libredis.Module_dispatch()
-
+ def _simple_exec(self, batch, timeout_ms):
+ executor = libredis.Executor_new()
+ try:
+ libredis.Executor_add(executor, self._connection, batch._batch)
+ libredis.Executor_execute(executor, timeout_ms)
+ finally:
+ libredis.Executor_free(executor)
+ #reply = batch.pop_reply()
+ #return reply.value
+ return "BlaatTODO"
+
def free(self):
libredis.Connection_free(self._connection)
self._connection = None
@@ -128,12 +135,9 @@ class Batch(object):
def __init__(self):
self._batch = libredis.Batch_new()
- def writef(self, format, *args):
- libredis.Batch_writef(self._batch, format, *args)
+ def write(self, cmd, nr_commands):
+ libredis.Batch_write(self._batch, cmd, len(cmd), nr_commands)
- def add_command(self):
- libredis.Batch_add_command(self._batch)
-
def next_reply(self):
return Reply.from_next(self)
diff --git a/src/connection.c b/src/connection.c
index c7fde84..a32b034 100644
--- a/src/connection.c
+++ b/src/connection.c
@@ -8,6 +8,7 @@
#include
#include
+#include
#include
#include
#include
@@ -21,7 +22,6 @@
#include
#include "redis.h"
-#include "event.h"
#include "common.h"
#include "buffer.h"
#include "connection.h"
@@ -40,6 +40,12 @@ typedef enum _ConnectionState
CS_ABORTED = 3
} ConnectionState;
+typedef enum _EventType
+{
+ EVENT_READ = 1,
+ EVENT_WRITE = 2,
+ EVENT_TIMEOUT = 4,
+} EventType;
struct _Connection
{
@@ -47,8 +53,6 @@ struct _Connection
struct sockaddr_in sa; //parsed addres
int sockfd;
ConnectionState state;
- struct event event_read;
- struct event event_write;
Batch *current_batch;
Executor *current_executor;
ReplyParser *parser;
@@ -56,13 +60,11 @@ struct _Connection
//forward decls.
void Connection_abort(Connection *connection, const char *format, ...);
-void Connection_execute(Connection *connection, Executor *executor, Batch *batch);
-void Connection_event_add(Connection *connection, struct event *event);
+void Connection_execute_start(Connection *connection, Executor *executor, Batch *batch);
void Connection_write_data(Connection *connection);
void Connection_read_data(Connection *connection);
-void Connection_handle_event(int fd, short flags, void *data);
-void Executor_set_timeout(Executor *executor, struct timeval *tv);
+void Executor_notify_event(Executor *executor, Connection *connection, EventType event);
Connection *Connection_new(const char *in_addr)
{
@@ -112,7 +114,6 @@ Connection *Connection_new(const char *in_addr)
connection->sockfd = 0;
- //
return connection;
}
@@ -140,10 +141,6 @@ int Connection_create_socket(Connection *connection)
return -1;
}
- //prepare libevent structures
- event_set(&connection->event_read, connection->sockfd, EV_READ, &Connection_handle_event, (void *)connection);
- event_set(&connection->event_write, connection->sockfd, EV_WRITE, &Connection_handle_event, (void *)connection);
-
//set socket in non-blocking mode
int flags;
if ((flags = fcntl(connection->sockfd, F_GETFL, 0)) < 0)
@@ -184,10 +181,6 @@ void Connection_abort(Connection *connection, const char *format, ...)
CS_CONNECTING == connection->state) {
assert(connection->sockfd > 0);
- //unset outstanding events
- event_del(&connection->event_read);
- event_del(&connection->event_write);
-
//close the socket
close(connection->sockfd);
}
@@ -198,7 +191,7 @@ void Connection_abort(Connection *connection, const char *format, ...)
DEBUG(("Connection aborted\n"));
}
-void Connection_execute(Connection *connection, Executor *executor, Batch *batch)
+void Connection_execute_start(Connection *connection, Executor *executor, Batch *batch)
{
DEBUG(("Connection exec\n"));
@@ -222,27 +215,10 @@ void Connection_execute(Connection *connection, Executor *executor, Batch *batch
//kick off reading when socket becomes readable
if(CS_CONNECTED == connection->state) {
- Connection_event_add(connection, &connection->event_read);
+ Executor_notify_event(connection->current_executor, connection, EVENT_READ);
}
}
-void Connection_event_add(Connection *connection, struct event *event)
-{
- if(CS_ABORTED == connection->state) {
- return;
- }
- assert(connection->current_batch != NULL);
- assert(connection->current_executor != NULL);
-
- struct timeval tv;
- Executor_set_timeout(connection->current_executor, &tv);
- if(-1 == event_add(event, &tv)) {
- Connection_abort(connection, "could not add event");
- return;
- }
- DEBUG(("connection ev add: fd: %d, type: %c\n", connection->sockfd, event == &connection->event_read ? 'R' : 'W'));
-}
-
void Connection_write_data(Connection *connection)
{
DEBUG(("connection write_data fd: %d\n", connection->sockfd));
@@ -264,7 +240,7 @@ void Connection_write_data(Connection *connection)
//normal async connect
connection->state = CS_CONNECTING;
DEBUG(("async connecting, adding write event\n"));
- Connection_event_add(connection, &connection->event_write);
+ Executor_notify_event(connection->current_executor, connection, EVENT_WRITE);
DEBUG(("write event added, now returning\n"));
return;
}
@@ -294,7 +270,7 @@ void Connection_write_data(Connection *connection)
}
else {
connection->state = CS_CONNECTED;
- Connection_event_add(connection, &connection->event_read);
+ Executor_notify_event(connection->current_executor, connection, EVENT_READ);
}
}
@@ -308,7 +284,7 @@ void Connection_write_data(Connection *connection)
DEBUG(("bfr send res: %d\n", res));
if(res == -1) {
if(errno == EAGAIN) {
- Connection_event_add(connection, &connection->event_write);
+ Executor_notify_event(connection->current_executor, connection, EVENT_WRITE);
return;
}
else {
@@ -350,7 +326,7 @@ void Connection_read_data(Connection *connection)
#endif
if(res == -1) {
if(errno == EAGAIN) {
- Connection_event_add(connection, &connection->event_read);
+ Executor_notify_event(connection->current_executor, connection, EVENT_READ);
return;
}
else {
@@ -375,18 +351,16 @@ void Connection_read_data(Connection *connection)
}
}
-void Connection_handle_event(int fd, short flags, void *data)
+void Connection_handle_event(Connection *connection, EventType event)
{
- Connection *connection = (Connection *)data;
-
if(CS_ABORTED == connection->state) {
return;
}
- DEBUG(("con event, fd: %d, state: %d, flags: %d, readable: %d, writeable: %d, timeout: %d\n", connection->sockfd,
- connection->state, flags, (flags & EV_READ) ? 1 : 0, (flags & EV_WRITE) ? 1 : 0, (flags & EV_TIMEOUT) ? 1 : 0 ));
+ DEBUG(("con event, fd: %d, state: %d, event: %d, readable: %d, writeable: %d, timeout: %d\n", connection->sockfd,
+ connection->state, event, (event & EVENT_READ) ? 1 : 0, (event & EVENT_WRITE) ? 1 : 0, (event & EVENT_TIMEOUT) ? 1 : 0 ));
- if(flags & EV_TIMEOUT) {
+ if(event & EVENT_TIMEOUT) {
if(CS_CONNECTING == connection->state) {
Connection_abort(connection, "connect timeout");
}
@@ -396,17 +370,16 @@ void Connection_handle_event(int fd, short flags, void *data)
return;
}
- if(flags & EV_WRITE) {
+ if(event & EVENT_WRITE) {
Connection_write_data(connection);
}
- if(flags & EV_READ) {
+ if(event & EVENT_READ) {
Connection_read_data(connection);
}
}
-
#define MAX_PAIRS 1024
struct _Pair
@@ -418,6 +391,9 @@ struct _Pair
struct _Executor
{
int numpairs;
+ int max_fd;
+ fd_set readfds;
+ fd_set writefds;
struct _Pair pairs[MAX_PAIRS];
double end_tm_ms;
};
@@ -431,6 +407,9 @@ Executor *Executor_new()
return NULL;
}
executor->numpairs = 0;
+ executor->max_fd = 0;
+ FD_ZERO(&executor->readfds);
+ FD_ZERO(&executor->writefds);
return executor;
}
@@ -445,6 +424,11 @@ void Executor_free(Executor *executor)
int Executor_add(Executor *executor, Connection *connection, Batch *batch)
{
+ assert(executor != NULL);
+ assert(connection != NULL);
+ assert(connection->state != CS_ABORTED);
+ assert(batch != NULL);
+
if(executor->numpairs >= MAX_PAIRS) {
SETERROR(("executor is full"));
return -1;
@@ -464,33 +448,83 @@ int Executor_execute(Executor *executor, int timeout_ms)
DEBUG(("Executor execute start\n"));
struct timespec tm;
clock_gettime(CLOCK_MONOTONIC, &tm);
+ DEBUG(("Executor start_tm_ms: %3.2f\n", TIMESPEC_TO_MS(tm)));
executor->end_tm_ms = TIMESPEC_TO_MS(tm) + ((float)timeout_ms);
DEBUG(("Executor end_tm_ms: %3.2f\n", executor->end_tm_ms));
for(int i = 0; i < executor->numpairs; i++) {
struct _Pair *pair = &executor->pairs[i];
- Connection_execute(pair->connection, executor, pair->batch);
+ Connection_execute_start(pair->connection, executor, pair->batch);
+ }
+
+ while(executor->max_fd > 0) { //for as long there are outstanding events
+
+ //copy filedes. sets, because select is going to modify them
+ fd_set readfds;
+ fd_set writefds;
+ readfds = executor->readfds;
+ writefds = executor->writefds;
+
+ //figure out how many ms left for this execution
+ struct timespec tm;
+ clock_gettime(CLOCK_MONOTONIC, &tm);
+ double cur_tm_ms = TIMESPEC_TO_MS(tm);
+ DEBUG(("Executor cur_tm: %3.2f\n", cur_tm_ms));
+ double left_ms = executor->end_tm_ms - cur_tm_ms;
+ DEBUG(("Time left: %3.2f\n", left_ms));
+ struct timeval tv;
+ tv.tv_sec = (time_t)left_ms / 1000.0;
+ tv.tv_usec = (left_ms - (tv.tv_sec * 1000.0)) * 1000.0;
+ DEBUG(("Timeout: %d sec, %d usec\n", (int)tv.tv_sec, (int)tv.tv_usec));
+
+ //do the select
+ DEBUG(("Executor start select max_fd %d\n", executor->max_fd));
+ int res = select(executor->max_fd + 1, &readfds, &writefds, NULL, &tv);
+ DEBUG(("Executor select res %d\n", res));
+
+ executor->max_fd = 0;
+
+ for(int i = 0; i < executor->numpairs; i++) {
+ struct _Pair *pair = &executor->pairs[i];
+ Connection *connection = pair->connection;
+ EventType event = 0;
+ if(FD_ISSET(connection->sockfd, &readfds)) {
+ event |= EVENT_READ;
+ FD_CLR(connection->sockfd, &executor->readfds);
+ }
+ if(FD_ISSET(connection->sockfd, &writefds)) {
+ event |= EVENT_WRITE;
+ FD_CLR(connection->sockfd, &executor->writefds);
+ }
+ if(event > 0) {
+ Connection_handle_event(connection, event);
+ }
+ }
}
- Module_dispatch();
+
DEBUG(("Executor execute done\n"));
return 0;
}
-void Executor_set_timeout(Executor *executor, struct timeval *tv)
+void Executor_notify_event(Executor *executor, Connection *connection, EventType event)
{
- //figure out how many ms left for this execution
- struct timespec tm;
- clock_gettime(CLOCK_MONOTONIC, &tm);
- double cur_tm_ms = TIMESPEC_TO_MS(tm);
- DEBUG(("Executor cur_tm: %3.2f\n", cur_tm_ms));
- double left_ms = executor->end_tm_ms - cur_tm_ms;
- DEBUG(("Time left: %3.2f\n", left_ms));
+ assert(executor != NULL);
+ assert(connection != NULL);
+ assert(connection->sockfd != 0);
+
+ if(connection->sockfd > executor->max_fd) {
+ executor->max_fd = connection->sockfd;
+ }
+
+ if(event & EVENT_READ) {
+ FD_SET(connection->sockfd, &executor->readfds);
+ }
- //set timeout based on that
- tv->tv_sec = (time_t)left_ms / 1000.0;
- tv->tv_usec = (left_ms - (tv->tv_sec * 1000.0)) * 1000.0;
+ if(event & EVENT_WRITE) {
+ FD_SET(connection->sockfd, &executor->writefds);
+ }
- DEBUG(("Timeout: %d sec, %d usec\n", (int)tv->tv_sec, (int)tv->tv_usec));
+ DEBUG(("executor notify event added: fd: %d, type: %c, max_fd: %d\n", connection->sockfd, event == EVENT_READ ? 'R' : 'W', executor->max_fd));
}
diff --git a/src/event.h b/src/event.h
deleted file mode 100644
index f0de6d5..0000000
--- a/src/event.h
+++ /dev/null
@@ -1,18 +0,0 @@
-/**
-* Copyright (C) 2010, Hyves (Startphone Ltd.)
-*
-* This module is part of Libredis (http://github.com/toymachine/libredis) and is released under
-* the New BSD License: http://www.opensource.org/licenses/bsd-license.php
-*
-*/
-#ifndef EVENT99_H_
-#define EVENT99_H_
-
-#include
-#include
-
-typedef uint8_t u_char;
-
-#include
-
-#endif /* EVENT99_H_ */
diff --git a/src/module.c b/src/module.c
index 509a780..4ee6f2e 100644
--- a/src/module.c
+++ b/src/module.c
@@ -7,7 +7,6 @@
*/
#include "module.h"
-#include "event.h"
#include "reply.h"
#include "batch.h"
@@ -32,15 +31,7 @@ void Module_init(Module *module)
if(g_module->alloc_free == NULL) {
g_module->alloc_free = free;
}
-
- event_init();
-}
-
-void Module_dispatch()
-{
- DEBUG(("Module before dispatch\n"));
- event_dispatch();
- DEBUG(("Module after dispatch\n"));
+ DEBUG(("start alloc: %d\n", g_module->allocated));
}
void Module_free()
diff --git a/src/module.h b/src/module.h
index 59c6600..91198e5 100644
--- a/src/module.h
+++ b/src/module.h
@@ -12,6 +12,4 @@
extern Module *g_module;
-void Module_dispatch();
-
#endif /* MODULE_H_ */
diff --git a/test.py b/test.py
index 8763ed3..3ee8e12 100644
--- a/test.py
+++ b/test.py
@@ -22,7 +22,8 @@ def timer():
def test_simple():
connection = Connection("127.0.0.1:6379")
- for j in range(10):
+ N = 10
+ for j in range(N):
print repr(connection.get('blaat'))
def test_ketama():
@@ -93,7 +94,7 @@ def profile(f = None):
if __name__ == '__main__':
#test_ketama()
- #test_simple()
+ test_simple()
#profile(test_mget)
- test_mget()
+ #test_mget()