Skip to content
Browse files

Disconnect broken SUBSCRIBE clients.

  • Loading branch information...
1 parent 21c4413 commit b01cb75db7764f3b942e99f98eab0cccbe17a981 @nicolasff committed Apr 13, 2011
Showing with 29 additions and 12 deletions.
  1. +10 −0 client.c
  2. +3 −0 client.h
  3. +11 −9 cmd.c
  4. +3 −1 cmd.h
  5. +2 −2 websocket.c
View
10 client.c
@@ -4,11 +4,14 @@
#include "server.h"
#include "worker.h"
#include "websocket.h"
+#include "cmd.h"
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <errno.h>
+#include <hiredis/hiredis.h>
+#include <hiredis/async.h>
static int
http_client_on_url(struct http_parser *p, const char *at, size_t sz) {
@@ -245,7 +248,14 @@ http_client_read(struct http_client *c) {
ret = read(c->fd, buffer, sizeof(buffer));
if(ret <= 0) {
/* broken link, free buffer and client object */
+
+ /* disconnect pub/sub client if there is one. */
+ if(c->pub_sub) {
+ redisAsyncDisconnect(c->pub_sub->ac);
+ }
+
close(c->fd);
+
http_client_free(c);
return -1;
}
View
3 client.h
@@ -7,6 +7,7 @@
struct http_header;
struct server;
+struct cmd;
typedef enum {
LAST_CB_NONE = 0,
@@ -49,6 +50,8 @@ struct http_client {
char *type; /* forced output content-type */
char *jsonp; /* jsonp wrapper */
+
+ struct cmd *pub_sub;
};
struct http_client *
View
20 cmd.c
@@ -126,7 +126,6 @@ cmd_run(struct worker *w, struct http_client *client,
int param_count = 0, cur_param = 1;
struct cmd *cmd;
- redisAsyncContext *ac = NULL;
formatting_fun f_format;
/* count arguments */
@@ -174,19 +173,22 @@ cmd_run(struct worker *w, struct http_client *client,
if(cmd_is_subscribe(cmd)) {
/* create a new connection to Redis */
- ac = (redisAsyncContext*)pool_connect(w->pool, 0);
+ cmd->ac = (redisAsyncContext*)pool_connect(w->pool, 0);
+
+ /* register with the client, used upon disconnection */
+ client->pub_sub = cmd;
} else {
/* get a connection from the pool */
- ac = (redisAsyncContext*)pool_get_context(w->pool);
+ cmd->ac = (redisAsyncContext*)pool_get_context(w->pool);
}
/* no args (e.g. INFO command) */
if(!slash) {
- if(!ac) {
+ if(!cmd->ac) {
cmd_free(cmd);
return CMD_REDIS_UNAVAIL;
}
- redisAsyncCommandArgv(ac, f_format, cmd, 1,
+ redisAsyncCommandArgv(cmd->ac, f_format, cmd, 1,
(const char **)cmd->argv, cmd->argv_len);
return CMD_SENT;
}
@@ -216,8 +218,8 @@ cmd_run(struct worker *w, struct http_client *client,
}
/* send it off! */
- if(ac) {
- cmd_send(ac, f_format, cmd);
+ if(cmd->ac) {
+ cmd_send(cmd, f_format);
return CMD_SENT;
}
/* failed to find a suitable connection to Redis. */
@@ -226,8 +228,8 @@ cmd_run(struct worker *w, struct http_client *client,
}
void
-cmd_send(redisAsyncContext *ac, formatting_fun f_format, struct cmd *cmd) {
- redisAsyncCommandArgv(ac, f_format, cmd, cmd->count,
+cmd_send(struct cmd *cmd, formatting_fun f_format) {
+ redisAsyncCommandArgv(cmd->ac, f_format, cmd, cmd->count,
(const char **)cmd->argv, cmd->argv_len);
}
View
4 cmd.h
@@ -37,6 +37,8 @@ struct cmd {
int started_responding;
int is_websocket;
int http_version;
+
+ redisAsyncContext *ac;
};
struct subscription {
@@ -63,7 +65,7 @@ int
cmd_is_subscribe(struct cmd *cmd);
void
-cmd_send(redisAsyncContext *ac, formatting_fun f_format, struct cmd *cmd);
+cmd_send(struct cmd *cmd, formatting_fun f_format);
void
cmd_setup(struct cmd *cmd, struct http_client *client);
View
4 websocket.c
@@ -173,10 +173,10 @@ ws_execute(struct http_client *c, const char *frame, size_t frame_len) {
cmd->is_websocket = 1;
/* get Redis connection from pool */
- redisAsyncContext *ac = (redisAsyncContext*)pool_get_context(c->w->pool);
+ cmd->ac = (redisAsyncContext*)pool_get_context(c->w->pool);
/* send it off */
- cmd_send(ac, fun_reply, cmd);
+ cmd_send(cmd, fun_reply);
return 0;
}

0 comments on commit b01cb75

Please sign in to comment.
Something went wrong with that request. Please try again.