Skip to content

Commit

Permalink
[Feature] Initial support of subscribe command in lua_redis
Browse files Browse the repository at this point in the history
  • Loading branch information
vstakhov committed May 19, 2020
1 parent 5d09c6f commit 48036e4
Showing 1 changed file with 54 additions and 24 deletions.
78 changes: 54 additions & 24 deletions src/lua/lua_redis.c
Expand Up @@ -122,6 +122,7 @@ INIT_LOG_MODULE(lua_redis)
#define LUA_REDIS_TEXTDATA (1 << 1)
#define LUA_REDIS_TERMINATED (1 << 2)
#define LUA_REDIS_NO_POOL (1 << 3)
#define LUA_REDIS_SUBSCRIBED (1 << 4)
#define IS_ASYNC(ctx) ((ctx)->flags & LUA_REDIS_ASYNC)

struct lua_redis_request_specific_userdata {
Expand Down Expand Up @@ -263,7 +264,9 @@ lua_redis_fin (void *arg)
ctx = sp_ud->ctx;
ud = sp_ud->c;

ev_timer_stop (sp_ud->ctx->async.event_loop, &sp_ud->timeout_ev);
if (ev_can_stop (&sp_ud->timeout_ev)) {
ev_timer_stop (sp_ud->ctx->async.event_loop, &sp_ud->timeout_ev);
}

msg_debug_lua_redis ("finished redis query %p from session %p; refcount=%d",
sp_ud, ctx, ctx->ref.refcount);
Expand Down Expand Up @@ -383,7 +386,8 @@ lua_redis_push_data (const redisReply *r, struct lua_redis_ctx *ctx,
struct lua_callback_state cbs;
lua_State *L;

if (!(sp_ud->flags & (LUA_REDIS_SPECIFIC_REPLIED|LUA_REDIS_SPECIFIC_FINISHED))) {
if (!(sp_ud->flags & (LUA_REDIS_SPECIFIC_REPLIED|LUA_REDIS_SPECIFIC_FINISHED)) ||
(sp_ud->flags & LUA_REDIS_SUBSCRIBED)) {
if (sp_ud->cbref != -1) {
lua_thread_pool_prepare_callback (ud->cfg->lua_thread_pool, &cbs);
L = cbs.L;
Expand All @@ -409,17 +413,29 @@ lua_redis_push_data (const redisReply *r, struct lua_redis_ctx *ctx,
lua_thread_pool_restore_callback (&cbs);
}

if (sp_ud->flags & LUA_REDIS_SUBSCRIBED) {
if (!(sp_ud->flags & LUA_REDIS_SPECIFIC_REPLIED)) {
if (ev_can_stop (&sp_ud->timeout_ev)) {
ev_timer_stop (sp_ud->ctx->async.event_loop,
&sp_ud->timeout_ev);
}
}
}

sp_ud->flags |= LUA_REDIS_SPECIFIC_REPLIED;

if (ud->s) {
if (ud->item) {
rspamd_symcache_item_async_dec_check (ud->task, ud->item, M);
}
if (!(sp_ud->flags & LUA_REDIS_SUBSCRIBED)) {
if (ud->s) {
if (ud->item) {
rspamd_symcache_item_async_dec_check (ud->task,
ud->item, M);
}

rspamd_session_remove_event (ud->s, lua_redis_fin, sp_ud);
}
else {
lua_redis_fin (sp_ud);
rspamd_session_remove_event (ud->s, lua_redis_fin, sp_ud);
}
else {
lua_redis_fin (sp_ud);
}
}
}
}
Expand Down Expand Up @@ -453,7 +469,8 @@ lua_redis_callback (redisAsyncContext *c, gpointer r, gpointer priv)
REDIS_RETAIN (ctx);

/* If session is finished, we cannot call lua callbacks */
if (!(sp_ud->flags & LUA_REDIS_SPECIFIC_FINISHED)) {
if (!(sp_ud->flags & LUA_REDIS_SPECIFIC_FINISHED) ||
(sp_ud->flags & LUA_REDIS_SUBSCRIBED)) {
if (c->err == 0) {
if (r != NULL) {
if (reply->type != REDIS_REPLY_ERROR) {
Expand All @@ -477,20 +494,22 @@ lua_redis_callback (redisAsyncContext *c, gpointer r, gpointer priv)
}
}

ctx->cmds_pending --;
if (!(sp_ud->flags & LUA_REDIS_SUBSCRIBED)) {
ctx->cmds_pending--;

if (ctx->cmds_pending == 0 && !ud->terminated) {
/* Disconnect redis early as we don't need it anymore */
ud->terminated = 1;
ac = ud->ctx;
ud->ctx = NULL;
if (ctx->cmds_pending == 0 && !ud->terminated) {
/* Disconnect redis early as we don't need it anymore */
ud->terminated = 1;
ac = ud->ctx;
ud->ctx = NULL;

if (ac) {
msg_debug_lua_redis ("release redis connection ud=%p; ctx=%p; refcount=%d",
ud, ctx, ctx->ref.refcount);
rspamd_redis_pool_release_connection (ud->pool, ac,
(ctx->flags & LUA_REDIS_NO_POOL) ?
RSPAMD_REDIS_RELEASE_ENFORCE : RSPAMD_REDIS_RELEASE_DEFAULT);
if (ac) {
msg_debug_lua_redis ("release redis connection ud=%p; ctx=%p; refcount=%d",
ud, ctx, ctx->ref.refcount);
rspamd_redis_pool_release_connection (ud->pool, ac,
(ctx->flags & LUA_REDIS_NO_POOL) ?
RSPAMD_REDIS_RELEASE_ENFORCE : RSPAMD_REDIS_RELEASE_DEFAULT);
}
}
}

Expand Down Expand Up @@ -586,7 +605,10 @@ lua_redis_callback_sync (redisAsyncContext *ac, gpointer r, gpointer priv)
return;
}

ev_timer_stop (ud->event_loop, &sp_ud->timeout_ev);
if (ev_can_stop ( &sp_ud->timeout_ev)) {
ev_timer_stop (ud->event_loop, &sp_ud->timeout_ev);
}

msg_debug_lua_redis ("got reply from redis: %p for query %p", ac, sp_ud);

struct lua_redis_result *result = g_malloc0 (sizeof *result);
Expand Down Expand Up @@ -617,6 +639,7 @@ lua_redis_callback_sync (redisAsyncContext *ac, gpointer r, gpointer priv)
lua_pushstring (L, ac->errstr);
}
}

/* if error happened, we should terminate the connection,
and release it */

Expand Down Expand Up @@ -1125,10 +1148,17 @@ lua_redis_make_request (lua_State *L)

REDIS_RETAIN (ctx); /* Cleared by fin event */
ctx->cmds_pending ++;

if (ud->ctx->c.flags & REDIS_SUBSCRIBED) {
msg_debug_lua_redis ("subscribe command, never unref/timeout");
sp_ud->flags |= LUA_REDIS_SUBSCRIBED;
}

sp_ud->timeout_ev.data = sp_ud;
ev_now_update_if_cheap ((struct ev_loop *)ud->event_loop);
ev_timer_init (&sp_ud->timeout_ev, lua_redis_timeout, timeout, 0.0);
ev_timer_start (ud->event_loop, &sp_ud->timeout_ev);

ret = TRUE;
}
else {
Expand Down

0 comments on commit 48036e4

Please sign in to comment.