diff --git a/META.json b/META.json index 8edcab6..d01a451 100644 --- a/META.json +++ b/META.json @@ -4,7 +4,7 @@ "plainbanana " ], "dynamic_config" : 0, - "generated_by" : "Minilla/v3.1.25, CPAN::Meta::Converter version 2.150010", + "generated_by" : "Minilla/v3.1.26, CPAN::Meta::Converter version 2.150010", "license" : [ "perl_5" ], diff --git a/README.md b/README.md index e04fd3d..a1119fd 100644 --- a/README.md +++ b/README.md @@ -167,31 +167,19 @@ do not execute fork() without issuing `disconnect` if all callbacks are not exec ## run\_event\_loop() -This method allows you to issue commands without waiting for their responses. -You can then perform a blocking wait for those responses later, if needed. +This method is nonblocking and allows you to issue commands without waiting for their responses. -Executes one iteration of the event loop to process any pending commands that have not yet been sent -and any incoming responses from Redis. - -If there are events that can be triggered immediately, they will all be processed. -In other words, if there are unsent commands, they will be pipelined and sent, +If there are unsent commands, they will be pipelined and sent, and if there are already-received responses, their corresponding callbacks will be executed. - -If there are no events that can be triggered immediately: there are neither unsent commands nor any Redis responses available to read, -but unprocessed callbacks remain, then this method will block for up to `command_timeout` while waiting for a response from Redis. When a timeout occurs, an error will be propagated to the corresponding callback(s). -The return value can be either 1 for success (e.g., commands sent or responses read), +The return value can be either 1 for success +(e.g., commands sent, responses read, or exit without waiting for any responses), 0 for no callbacks remained, or undef for other errors. ### Notes -- Be aware that the timeout check will only be triggered when there are neither unsent commands nor Redis responses available to read. -If a timeout occurs, all remaining commands on that node will time out as well. -- Internally, this method calls `event_base_loop(..., EVLOOP_ONCE)`, which -performs a single iteration of the event loop. A command will not be fully processed in a single call. -- If you need to process multiple commands or wait for all responses, call -this method repeatedly or use `wait_all_responses`. +- If a timeout occurs, all remaining commands on that node will time out as well. - For a simpler, synchronous-like usage where you need at least one response, refer to `wait_one_response`. If you only need to block until all pending commands are processed, see `wait_all_responses`. @@ -205,17 +193,17 @@ pending commands are processed, see `wait_all_responses`. # Send commands to Redis without waiting for responses $redis->run_event_loop(); - # Possibly wait for responses + # If any responses are available, read them immediately without waiting for the rest $redis->run_event_loop(); ## wait\_one\_response() -If there are any unexcuted callbacks, it will block until at least one is executed. +If there are any unexecuted callbacks, it will block until at least one is executed. The return value can be either 1 for success, 0 for no callbacks remained, or undef for other errors. ## wait\_all\_responses() -If there are any unexcuted callbacks, it will block until all of them are executed. +If there are any unexecuted callbacks, it will block until all of them are executed. The return value can be either 1 for success, 0 for no callbacks remained, or undef for other errors. ## disconnect() diff --git a/lib/Redis/Cluster/Fast.pm b/lib/Redis/Cluster/Fast.pm index 37bae37..fecd157 100644 --- a/lib/Redis/Cluster/Fast.pm +++ b/lib/Redis/Cluster/Fast.pm @@ -301,21 +301,14 @@ do not execute fork() without issuing C if all callbacks are not exe =head2 run_event_loop() -This method allows you to issue commands without waiting for their responses. -You can then perform a blocking wait for those responses later, if needed. +This method is nonblocking and allows you to issue commands without waiting for their responses. -Executes one iteration of the event loop to process any pending commands that have not yet been sent -and any incoming responses from Redis. - -If there are events that can be triggered immediately, they will all be processed. -In other words, if there are unsent commands, they will be pipelined and sent, +If there are unsent commands, they will be pipelined and sent, and if there are already-received responses, their corresponding callbacks will be executed. - -If there are no events that can be triggered immediately: there are neither unsent commands nor any Redis responses available to read, -but unprocessed callbacks remain, then this method will block for up to C while waiting for a response from Redis. When a timeout occurs, an error will be propagated to the corresponding callback(s). -The return value can be either 1 for success (e.g., commands sent or responses read), +The return value can be either 1 for success +(e.g., commands sent, responses read, or exit without waiting for any responses), 0 for no callbacks remained, or undef for other errors. =head3 Notes @@ -324,21 +317,10 @@ The return value can be either 1 for success (e.g., commands sent or responses r =item * -Be aware that the timeout check will only be triggered when there are neither unsent commands nor Redis responses available to read. If a timeout occurs, all remaining commands on that node will time out as well. =item * -Internally, this method calls C, which -performs a single iteration of the event loop. A command will not be fully processed in a single call. - -=item * - -If you need to process multiple commands or wait for all responses, call -this method repeatedly or use C. - -=item * - For a simpler, synchronous-like usage where you need at least one response, refer to C. If you only need to block until all pending commands are processed, see C. @@ -354,17 +336,17 @@ pending commands are processed, see C. # Send commands to Redis without waiting for responses $redis->run_event_loop(); - # Possibly wait for responses + # If any responses are available, read them immediately without waiting for the rest $redis->run_event_loop(); =head2 wait_one_response() -If there are any unexcuted callbacks, it will block until at least one is executed. +If there are any unexecuted callbacks, it will block until at least one is executed. The return value can be either 1 for success, 0 for no callbacks remained, or undef for other errors. =head2 wait_all_responses() -If there are any unexcuted callbacks, it will block until all of them are executed. +If there are any unexecuted callbacks, it will block until all of them are executed. The return value can be either 1 for success, 0 for no callbacks remained, or undef for other errors. =head2 disconnect() diff --git a/src/Fast.xs b/src/Fast.xs index 27ef84c..61b2768 100644 --- a/src/Fast.xs +++ b/src/Fast.xs @@ -11,7 +11,7 @@ extern "C" { #include #include #include -#include "hiredis_cluster/adapters/libevent.h" +#include "adapters/libevent.h" #include "hiredis_cluster/hircluster.h" #ifdef __cplusplus @@ -27,6 +27,13 @@ extern "C" { #define MIN_ATTEMPT_TO_GET_RESULT 2 +/* libevent adapter priority configuration + Uses 2 priority levels to ensure I/O events are processed before timeouts: + - Priority 0: I/O events (Redis responses) - highest priority + - Priority 1: Timer events (timeouts) - lower priority + EVENT_BASE_PRIORITY_NUMBER sets the total priorities for event_base_priority_init() */ +#define EVENT_BASE_PRIORITY_NUMBER 2 + #define DEBUG_MSG(fmt, ...) \ if (self->debug) { \ fprintf(stderr, "[%d][%d][%s:%d:%s]: ", getpid(), getppid(), __FILE__, __LINE__, __func__); \ @@ -272,6 +279,11 @@ SV *Redis__Cluster__Fast_connect(pTHX_ Redis__Cluster__Fast self) { } self->cluster_event_base = event_base_new(); + + if (event_base_priority_init(self->cluster_event_base, EVENT_BASE_PRIORITY_NUMBER) != 0) { + return newSVpvf("%s", "failed to initialize event base priorities"); + } + if (redisClusterLibeventAttach(self->acc, self->cluster_event_base) != REDIS_OK) { return newSVpvf("%s", "failed to attach event base"); } @@ -471,7 +483,7 @@ int Redis__Cluster__Fast_run_event_loop(pTHX_ Redis__Cluster__Fast self) { return 0; } DEBUG_EVENT_BASE(); - event_loop_error = event_base_loop(self->cluster_event_base, EVLOOP_ONCE); + event_loop_error = event_base_loop(self->cluster_event_base, EVLOOP_NONBLOCK); if (event_loop_error != 0) { return -1; } diff --git a/src/adapters/libevent.h b/src/adapters/libevent.h new file mode 100644 index 0000000..837a532 --- /dev/null +++ b/src/adapters/libevent.h @@ -0,0 +1,224 @@ +/* + * Copyright (c) 2010-2011, Pieter Noordhuis + * + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * * Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * * Neither the name of Redis nor the names of its contributors may be used + * to endorse or promote products derived from this software without + * specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE + * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE + * POSSIBILITY OF SUCH DAMAGE. + */ + +#ifndef __HIREDIS_LIBEVENT_H__ +#define __HIREDIS_LIBEVENT_H__ +#include +#include "hiredis/hiredis.h" +#include "hiredis/async.h" + +#define REDIS_LIBEVENT_DELETED 0x01 +#define REDIS_LIBEVENT_ENTERED 0x02 + +typedef struct redisLibeventEvents { + redisAsyncContext *context; + struct event *ev_io; + struct event *ev_timer; + struct event_base *base; + short flags; + short state; +} redisLibeventEvents; + +static void redisLibeventDestroy(redisLibeventEvents *e) { + hi_free(e); +} + +static void redisLibeventHandler(evutil_socket_t fd, short event, void *arg) { + redisLibeventEvents *e = (redisLibeventEvents*)arg; + ((void)fd); + e->state |= REDIS_LIBEVENT_ENTERED; + + #define CHECK_DELETED() if (e->state & REDIS_LIBEVENT_DELETED) {\ + redisLibeventDestroy(e);\ + return; \ + } + + if ((event & EV_READ) && e->context && (e->state & REDIS_LIBEVENT_DELETED) == 0) { + redisAsyncHandleRead(e->context); + CHECK_DELETED(); + } + + if ((event & EV_WRITE) && e->context && (e->state & REDIS_LIBEVENT_DELETED) == 0) { + redisAsyncHandleWrite(e->context); + CHECK_DELETED(); + } + + e->state &= ~REDIS_LIBEVENT_ENTERED; + #undef CHECK_DELETED +} + +static void redisLibeventTimerHandler(evutil_socket_t fd, short event, void *arg) { + redisLibeventEvents *e = (redisLibeventEvents*)arg; + ((void)fd); + ((void)event); + e->state |= REDIS_LIBEVENT_ENTERED; + + #define CHECK_DELETED() if (e->state & REDIS_LIBEVENT_DELETED) {\ + redisLibeventDestroy(e);\ + return; \ + } + + if (e->context && (e->state & REDIS_LIBEVENT_DELETED) == 0) { + redisAsyncHandleTimeout(e->context); + CHECK_DELETED(); + } + + e->state &= ~REDIS_LIBEVENT_ENTERED; + #undef CHECK_DELETED +} + +static void redisLibeventUpdate(void *privdata, short flag, int isRemove) { + redisLibeventEvents *e = (redisLibeventEvents *)privdata; + + if (isRemove) { + if ((e->flags & flag) == 0) { + return; + } else { + e->flags &= ~flag; + } + } else { + if (e->flags & flag) { + return; + } else { + e->flags |= flag; + } + } + + event_del(e->ev_io); + event_assign(e->ev_io, e->base, e->context->c.fd, e->flags | EV_PERSIST, + redisLibeventHandler, privdata); + event_priority_set(e->ev_io, 0); + event_add(e->ev_io, NULL); +} + +static void redisLibeventAddRead(void *privdata) { + redisLibeventUpdate(privdata, EV_READ, 0); +} + +static void redisLibeventDelRead(void *privdata) { + redisLibeventUpdate(privdata, EV_READ, 1); +} + +static void redisLibeventAddWrite(void *privdata) { + redisLibeventUpdate(privdata, EV_WRITE, 0); +} + +static void redisLibeventDelWrite(void *privdata) { + redisLibeventUpdate(privdata, EV_WRITE, 1); +} + +static void redisLibeventCleanup(void *privdata) { + redisLibeventEvents *e = (redisLibeventEvents*)privdata; + if (!e) { + return; + } + if (e->ev_io) { + event_del(e->ev_io); + event_free(e->ev_io); + e->ev_io = NULL; + } + if (e->ev_timer) { + evtimer_del(e->ev_timer); + event_free(e->ev_timer); + e->ev_timer = NULL; + } + + if (e->state & REDIS_LIBEVENT_ENTERED) { + e->state |= REDIS_LIBEVENT_DELETED; + } else { + redisLibeventDestroy(e); + } +} + +static void redisLibeventSetTimeout(void *privdata, struct timeval tv) { + redisLibeventEvents *e = (redisLibeventEvents *)privdata; + evtimer_del(e->ev_timer); + evtimer_add(e->ev_timer, &tv); +} + +static int redisLibeventAttach(redisAsyncContext *ac, struct event_base *base) { + redisContext *c = &(ac->c); + redisLibeventEvents *e; + + /* Nothing should be attached when something is already attached */ + if (ac->ev.data != NULL) + return REDIS_ERR; + + /* Create container for context and r/w events */ + e = (redisLibeventEvents*)hi_calloc(1, sizeof(*e)); + if (e == NULL) + return REDIS_ERR; + + e->context = ac; + + /* Register functions to start/stop listening for events */ + ac->ev.addRead = redisLibeventAddRead; + ac->ev.delRead = redisLibeventDelRead; + ac->ev.addWrite = redisLibeventAddWrite; + ac->ev.delWrite = redisLibeventDelWrite; + ac->ev.cleanup = redisLibeventCleanup; + ac->ev.scheduleTimer = redisLibeventSetTimeout; + ac->ev.data = e; + + /* Initialize and install read/write events */ + e->ev_io = event_new(base, c->fd, EV_READ | EV_WRITE, redisLibeventHandler, e); + event_priority_set(e->ev_io, 0); + + /* Initialize and install timer events */ + e->ev_timer = evtimer_new(base, redisLibeventTimerHandler, e); + event_priority_set(e->ev_timer, 1); + + e->base = base; + return REDIS_OK; +} +#endif + +#ifndef __HIREDIS_CLUSTER_LIBEVENT_H__ +#define __HIREDIS_CLUSTER_LIBEVENT_H__ +#include "hiredis_cluster/hircluster.h" + +static int redisLibeventAttach_link(redisAsyncContext *ac, void *base) { + return redisLibeventAttach(ac, (struct event_base *)base); +} + +static int redisClusterLibeventAttach(redisClusterAsyncContext *acc, + struct event_base *base) { + + if (acc == NULL || base == NULL) { + return REDIS_ERR; + } + + acc->adapter = base; + acc->attach_fn = redisLibeventAttach_link; + + return REDIS_OK; +} + +#endif diff --git a/xt/01_simple.t b/xt/01_simple.t index 219b7b4..29a6c1f 100644 --- a/xt/01_simple.t +++ b/xt/01_simple.t @@ -149,7 +149,7 @@ for my $case ( is $result, 12345; }); ok $redis->run_event_loop; - ok $redis->run_event_loop; + ok $redis->wait_one_response; is $redis->run_event_loop, 0; }