Permalink
Browse files

implemented deferred session close

  • Loading branch information...
1 parent a43412e commit 90f985fa2f015e8bdfe92a9b57a5d04b779d7be1 @arut arut committed Apr 7, 2012
Showing with 66 additions and 37 deletions.
  1. +2 −1 ngx_rtmp.h
  2. +63 −35 ngx_rtmp_handler.c
  3. +1 −1 ngx_rtmp_netcall_module.c
View
@@ -175,6 +175,7 @@ typedef struct {
uint32_t signature; /* "RTMP" */ /* <-- FIXME wtf */
ngx_connection_t *connection;
+ ngx_event_t close;
void **ctx;
void **main_conf;
@@ -334,7 +335,7 @@ char* ngx_rtmp_user_message_type(uint16_t evt);
#endif
void ngx_rtmp_init_connection(ngx_connection_t *c);
-void ngx_rtmp_close_connection(ngx_connection_t *c);
+void ngx_rtmp_finalize_session(ngx_rtmp_session_t *s);
u_char * ngx_rtmp_log_error(ngx_log_t *log, u_char *buf, size_t len);
uint32_t ngx_rtmp_get_timestamp();
ngx_int_t ngx_rtmp_set_chunk_size(ngx_rtmp_session_t *s, ngx_uint_t size);
View
@@ -14,6 +14,7 @@
static void ngx_rtmp_init_session(ngx_connection_t *c);
+static void ngx_rtmp_close_connection(ngx_connection_t *c);
static void ngx_rtmp_handshake_recv(ngx_event_t *rev);
static void ngx_rtmp_handshake_send(ngx_event_t *rev);
@@ -241,10 +242,7 @@ ngx_rtmp_init_session(ngx_connection_t *c)
size = NGX_RTMP_HANDSHAKE_SIZE + 1;
ngx_rtmp_set_chunk_size(s, NGX_RTMP_DEFAULT_CHUNK_SIZE);
-/*
- s->in_chunk_size = NGX_RTMP_DEFAULT_CHUNK_SIZE;
- s->in_pool = ngx_create_pool(4096, c->log);
-*/
+
/* start handshake */
b = &s->hs_in_buf;
b->start = b->pos = b->last = ngx_pcalloc(s->in_pool, size);
@@ -267,7 +265,7 @@ ngx_rtmp_init_session(ngx_connection_t *c)
for(n = 0; n < ch->nelts; ++n, ++h) {
if (*h) {
if ((*h)(s, NULL, NULL) != NGX_OK) {
- ngx_rtmp_close_connection(c);
+ ngx_rtmp_finalize_session(s);
return;
}
}
@@ -310,7 +308,7 @@ ngx_rtmp_handshake_recv(ngx_event_t *rev)
if (rev->timedout) {
ngx_log_error(NGX_LOG_INFO, c->log, NGX_ETIMEDOUT, "client timed out");
c->timedout = 1;
- ngx_rtmp_close_connection(c);
+ ngx_rtmp_finalize_session(s);
return;
}
@@ -327,14 +325,14 @@ ngx_rtmp_handshake_recv(ngx_event_t *rev)
n = c->recv(c, b->last, b->end - b->last);
if (n == NGX_ERROR || n == 0) {
- ngx_rtmp_close_connection(c);
+ ngx_rtmp_finalize_session(s);
return;
}
if (n == NGX_AGAIN) {
ngx_add_timer(rev, cscf->timeout);
if (ngx_handle_read_event(c->read, 0) != NGX_OK) {
- ngx_rtmp_close_connection(c);
+ ngx_rtmp_finalize_session(s);
}
return;
}
@@ -351,7 +349,7 @@ ngx_rtmp_handshake_recv(ngx_event_t *rev)
if (*b->pos != NGX_RTMP_VERSION) {
ngx_log_error(NGX_LOG_INFO, c->log, NGX_ERROR,
"invalid handshake signature");
- ngx_rtmp_close_connection(c);
+ ngx_rtmp_finalize_session(s);
return;
}
@@ -428,7 +426,7 @@ ngx_rtmp_handshake_send(ngx_event_t *wev)
ngx_log_error(NGX_LOG_INFO, c->log, NGX_ETIMEDOUT,
"client timed out");
c->timedout = 1;
- ngx_rtmp_close_connection(c);
+ ngx_rtmp_finalize_session(s);
return;
}
@@ -447,14 +445,14 @@ ngx_rtmp_handshake_send(ngx_event_t *wev)
n = c->send(c, b->pos, b->last - b->pos);
if (n == NGX_ERROR) {
- ngx_rtmp_close_connection(c);
+ ngx_rtmp_finalize_session(s);
return;
}
if (n == NGX_AGAIN || n == 0) {
ngx_add_timer(c->write, cscf->timeout);
if (ngx_handle_write_event(c->write, 0) != NGX_OK) {
- ngx_rtmp_close_connection(c);
+ ngx_rtmp_finalize_session(s);
return;
}
}
@@ -540,7 +538,7 @@ ngx_rtmp_recv(ngx_event_t *rev)
if (st->in == NULL) {
ngx_log_error(NGX_LOG_INFO, c->log, NGX_ERROR,
"in buf alloc failed");
- ngx_rtmp_close_connection(c);
+ ngx_rtmp_finalize_session(s);
return;
}
}
@@ -570,13 +568,13 @@ ngx_rtmp_recv(ngx_event_t *rev)
n = c->recv(c, b->last, b->end - b->last);
if (n == NGX_ERROR || n == 0) {
- ngx_rtmp_close_connection(c);
+ ngx_rtmp_finalize_session(s);
return;
}
if (n == NGX_AGAIN) {
if (ngx_handle_read_event(c->read, 0) != NGX_OK) {
- ngx_rtmp_close_connection(c);
+ ngx_rtmp_finalize_session(s);
}
return;
}
@@ -592,7 +590,7 @@ ngx_rtmp_recv(ngx_event_t *rev)
"sending RTMP ACK(%D)", s->in_bytes);
if (ngx_rtmp_send_ack(s, s->in_bytes)) {
- ngx_rtmp_close_connection(c);
+ ngx_rtmp_finalize_session(s);
return;
}
}
@@ -631,7 +629,7 @@ ngx_rtmp_recv(ngx_event_t *rev)
ngx_log_error(NGX_LOG_INFO, c->log, NGX_ERROR,
"RTMP in chunk stream too big: %D >= %D",
csid, cscf->max_streams);
- ngx_rtmp_close_connection(c);
+ ngx_rtmp_finalize_session(s);
return;
}
@@ -733,7 +731,7 @@ ngx_rtmp_recv(ngx_event_t *rev)
if (h->mlen > cscf->max_message) {
ngx_log_error(NGX_LOG_INFO, c->log, NGX_ERROR,
"too big message: %uz", cscf->max_message);
- ngx_rtmp_close_connection(c);
+ ngx_rtmp_finalize_session(s);
return;
}
}
@@ -763,7 +761,7 @@ ngx_rtmp_recv(ngx_event_t *rev)
st->len = 0;
if (ngx_rtmp_receive_message(s, h, head) != NGX_OK) {
- ngx_rtmp_close_connection(c);
+ ngx_rtmp_finalize_session(s);
return;
}
@@ -809,7 +807,7 @@ ngx_rtmp_send(ngx_event_t *wev)
ngx_log_error(NGX_LOG_INFO, c->log, NGX_ETIMEDOUT,
"client timed out");
c->timedout = 1;
- ngx_rtmp_close_connection(c);
+ ngx_rtmp_finalize_session(s);
return;
}
@@ -854,15 +852,15 @@ ngx_rtmp_send(ngx_event_t *wev)
out = c->send_chain(c, s->out, limit);
if (out == NGX_CHAIN_ERROR) {
- ngx_rtmp_close_connection(c);
+ ngx_rtmp_finalize_session(s);
return;
}
if (out == s->out && out->buf->pos == p) {
cscf = ngx_rtmp_get_module_srv_conf(s, ngx_rtmp_core_module);
ngx_add_timer(c->write, cscf->timeout);
if (ngx_handle_write_event(c->write, 0) != NGX_OK) {
- ngx_rtmp_close_connection(c);
+ ngx_rtmp_finalize_session(s);
}
return;
}
@@ -913,7 +911,7 @@ ngx_rtmp_prepare_message(ngx_rtmp_session_t *s, ngx_rtmp_header_t *h,
ngx_log_error(NGX_LOG_INFO, c->log, NGX_ERROR,
"RTMP out chunk stream too big: %D >= %D",
h->csid, cscf->max_streams);
- ngx_rtmp_close_connection(c);
+ ngx_rtmp_finalize_session(s);
return;
}
@@ -1257,28 +1255,37 @@ ngx_rtmp_finalize_set_chunk_size(ngx_rtmp_session_t *s)
}
-void
+static void
ngx_rtmp_close_connection(ngx_connection_t *c)
{
- ngx_rtmp_session_t *s;
ngx_pool_t *pool;
+
+ ngx_log_debug0(NGX_LOG_DEBUG_RTMP, c->log, 0, "close connection");
+
+ pool = c->pool;
+ ngx_close_connection(c);
+ ngx_destroy_pool(pool);
+}
+
+
+static void
+ngx_rtmp_close_session_handler(ngx_event_t *e)
+{
+ ngx_rtmp_session_t *s;
+ ngx_connection_t *c;
ngx_rtmp_core_main_conf_t *cmcf;
ngx_rtmp_core_srv_conf_t *cscf;
ngx_rtmp_handler_pt *h;
ngx_array_t *dh;
size_t n;
- if (c->destroyed) {
- return;
- }
-
- c->destroyed = 1;
+ s = e->data;
+ c = s->connection;
- s = c->data;
cmcf = ngx_rtmp_get_module_main_conf(s, ngx_rtmp_core_module);
cscf = ngx_rtmp_get_module_srv_conf(s, ngx_rtmp_core_module);
- ngx_log_debug0(NGX_LOG_DEBUG_RTMP, c->log, 0, "close connection");
+ ngx_log_debug0(NGX_LOG_DEBUG_RTMP, c->log, 0, "close session");
if (s) {
dh = &cmcf->events[NGX_RTMP_DISCONNECT];
@@ -1305,9 +1312,30 @@ ngx_rtmp_close_connection(ngx_connection_t *c)
ngx_rtmp_free_shared_buf(cscf, s->out->buf);
}
- pool = c->pool;
- ngx_close_connection(c);
- ngx_destroy_pool(pool);
+ ngx_rtmp_close_connection(c);
+}
+
+
+void
+ngx_rtmp_finalize_session(ngx_rtmp_session_t *s)
+{
+ ngx_event_t *e;
+ ngx_connection_t *c;
+
+ /* deferred session finalize;
+ * schedule handler here */
+
+ c = s->connection;
+
+ ngx_log_debug0(NGX_LOG_DEBUG_RTMP, c->log, 0, "finalize session");
+
+ c->destroyed = 1;
+ e = &s->close;
+ e->data = s;
+ e->handler = ngx_rtmp_close_session_handler;
+ e->log = c->log;
+
+ ngx_post_event(e, &ngx_posted_events);
}
@@ -293,7 +293,7 @@ ngx_rtmp_netcall_close(ngx_connection_t *cc)
if (cs->handle &&
cs->handle(s, cs->arg, cs->in) != NGX_OK)
{
- ngx_rtmp_close_connection(c);
+ ngx_rtmp_finalize_session(s);
}
}

0 comments on commit 90f985f

Please sign in to comment.