Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion META.json
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
"plainbanana <plainbanana@mustardon.tokyo>"
],
"dynamic_config" : 0,
"generated_by" : "Minilla/v3.1.25, CPAN::Meta::Converter version 2.150010",
"generated_by" : "Minilla/v3.1.26, CPAN::Meta::Converter version 2.150010",
"license" : [
"perl_5"
],
Expand Down
28 changes: 8 additions & 20 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -167,31 +167,19 @@ do not execute fork() without issuing `disconnect` if all callbacks are not exec

## run\_event\_loop()

This method allows you to issue commands without waiting for their responses.
You can then perform a blocking wait for those responses later, if needed.
This method is nonblocking and allows you to issue commands without waiting for their responses.

Executes one iteration of the event loop to process any pending commands that have not yet been sent
and any incoming responses from Redis.

If there are events that can be triggered immediately, they will all be processed.
In other words, if there are unsent commands, they will be pipelined and sent,
If there are unsent commands, they will be pipelined and sent,
and if there are already-received responses, their corresponding callbacks will be executed.

If there are no events that can be triggered immediately: there are neither unsent commands nor any Redis responses available to read,
but unprocessed callbacks remain, then this method will block for up to `command_timeout` while waiting for a response from Redis.
When a timeout occurs, an error will be propagated to the corresponding callback(s).

The return value can be either 1 for success (e.g., commands sent or responses read),
The return value can be either 1 for success
(e.g., commands sent, responses read, or exit without waiting for any responses),
0 for no callbacks remained, or undef for other errors.

### Notes

- Be aware that the timeout check will only be triggered when there are neither unsent commands nor Redis responses available to read.
If a timeout occurs, all remaining commands on that node will time out as well.
- Internally, this method calls `event_base_loop(..., EVLOOP_ONCE)`, which
performs a single iteration of the event loop. A command will not be fully processed in a single call.
- If you need to process multiple commands or wait for all responses, call
this method repeatedly or use `wait_all_responses`.
- If a timeout occurs, all remaining commands on that node will time out as well.
- For a simpler, synchronous-like usage where you need at least one response,
refer to `wait_one_response`. If you only need to block until all
pending commands are processed, see `wait_all_responses`.
Expand All @@ -205,17 +193,17 @@ pending commands are processed, see `wait_all_responses`.
# Send commands to Redis without waiting for responses
$redis->run_event_loop();

# Possibly wait for responses
# If any responses are available, read them immediately without waiting for the rest
$redis->run_event_loop();

## wait\_one\_response()

If there are any unexcuted callbacks, it will block until at least one is executed.
If there are any unexecuted callbacks, it will block until at least one is executed.
The return value can be either 1 for success, 0 for no callbacks remained, or undef for other errors.

## wait\_all\_responses()

If there are any unexcuted callbacks, it will block until all of them are executed.
If there are any unexecuted callbacks, it will block until all of them are executed.
The return value can be either 1 for success, 0 for no callbacks remained, or undef for other errors.

## disconnect()
Expand Down
32 changes: 7 additions & 25 deletions lib/Redis/Cluster/Fast.pm
Original file line number Diff line number Diff line change
Expand Up @@ -301,21 +301,14 @@ do not execute fork() without issuing C<disconnect> if all callbacks are not exe

=head2 run_event_loop()

This method allows you to issue commands without waiting for their responses.
You can then perform a blocking wait for those responses later, if needed.
This method is nonblocking and allows you to issue commands without waiting for their responses.

Executes one iteration of the event loop to process any pending commands that have not yet been sent
and any incoming responses from Redis.

If there are events that can be triggered immediately, they will all be processed.
In other words, if there are unsent commands, they will be pipelined and sent,
If there are unsent commands, they will be pipelined and sent,
and if there are already-received responses, their corresponding callbacks will be executed.

If there are no events that can be triggered immediately: there are neither unsent commands nor any Redis responses available to read,
but unprocessed callbacks remain, then this method will block for up to C<command_timeout> while waiting for a response from Redis.
When a timeout occurs, an error will be propagated to the corresponding callback(s).

The return value can be either 1 for success (e.g., commands sent or responses read),
The return value can be either 1 for success
(e.g., commands sent, responses read, or exit without waiting for any responses),
0 for no callbacks remained, or undef for other errors.

=head3 Notes
Expand All @@ -324,21 +317,10 @@ The return value can be either 1 for success (e.g., commands sent or responses r

=item *

Be aware that the timeout check will only be triggered when there are neither unsent commands nor Redis responses available to read.
If a timeout occurs, all remaining commands on that node will time out as well.

=item *

Internally, this method calls C<event_base_loop(..., EVLOOP_ONCE)>, which
performs a single iteration of the event loop. A command will not be fully processed in a single call.

=item *

If you need to process multiple commands or wait for all responses, call
this method repeatedly or use C<wait_all_responses>.

=item *

For a simpler, synchronous-like usage where you need at least one response,
refer to C<wait_one_response>. If you only need to block until all
pending commands are processed, see C<wait_all_responses>.
Expand All @@ -354,17 +336,17 @@ pending commands are processed, see C<wait_all_responses>.
# Send commands to Redis without waiting for responses
$redis->run_event_loop();

# Possibly wait for responses
# If any responses are available, read them immediately without waiting for the rest
$redis->run_event_loop();

=head2 wait_one_response()

If there are any unexcuted callbacks, it will block until at least one is executed.
If there are any unexecuted callbacks, it will block until at least one is executed.
The return value can be either 1 for success, 0 for no callbacks remained, or undef for other errors.

=head2 wait_all_responses()

If there are any unexcuted callbacks, it will block until all of them are executed.
If there are any unexecuted callbacks, it will block until all of them are executed.
The return value can be either 1 for success, 0 for no callbacks remained, or undef for other errors.

=head2 disconnect()
Expand Down
16 changes: 14 additions & 2 deletions src/Fast.xs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ extern "C" {
#include <unistd.h>
#include <string.h>
#include <time.h>
#include "hiredis_cluster/adapters/libevent.h"
#include "adapters/libevent.h"
#include "hiredis_cluster/hircluster.h"

#ifdef __cplusplus
Expand All @@ -27,6 +27,13 @@ extern "C" {

#define MIN_ATTEMPT_TO_GET_RESULT 2

/* libevent adapter priority configuration
Uses 2 priority levels to ensure I/O events are processed before timeouts:
- Priority 0: I/O events (Redis responses) - highest priority
- Priority 1: Timer events (timeouts) - lower priority
EVENT_BASE_PRIORITY_NUMBER sets the total priorities for event_base_priority_init() */
#define EVENT_BASE_PRIORITY_NUMBER 2

#define DEBUG_MSG(fmt, ...) \
if (self->debug) { \
fprintf(stderr, "[%d][%d][%s:%d:%s]: ", getpid(), getppid(), __FILE__, __LINE__, __func__); \
Expand Down Expand Up @@ -272,6 +279,11 @@ SV *Redis__Cluster__Fast_connect(pTHX_ Redis__Cluster__Fast self) {
}

self->cluster_event_base = event_base_new();

if (event_base_priority_init(self->cluster_event_base, EVENT_BASE_PRIORITY_NUMBER) != 0) {
return newSVpvf("%s", "failed to initialize event base priorities");
}

if (redisClusterLibeventAttach(self->acc, self->cluster_event_base) != REDIS_OK) {
return newSVpvf("%s", "failed to attach event base");
}
Expand Down Expand Up @@ -471,7 +483,7 @@ int Redis__Cluster__Fast_run_event_loop(pTHX_ Redis__Cluster__Fast self) {
return 0;
}
DEBUG_EVENT_BASE();
event_loop_error = event_base_loop(self->cluster_event_base, EVLOOP_ONCE);
event_loop_error = event_base_loop(self->cluster_event_base, EVLOOP_NONBLOCK);
if (event_loop_error != 0) {
return -1;
}
Expand Down
Loading