Skip to content

Commit

Permalink
change the way the message template is parsed to be faster when aplly…
Browse files Browse the repository at this point in the history
…ing it to a published message
  • Loading branch information
wandenberg committed Apr 1, 2015
1 parent e8a6de6 commit be0ae92
Show file tree
Hide file tree
Showing 6 changed files with 203 additions and 39 deletions.
28 changes: 27 additions & 1 deletion include/ngx_http_push_stream_module.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,14 +38,40 @@ typedef struct {
ngx_uint_t message_min_len;
} ngx_http_push_stream_padding_t;

typedef enum {
PUSH_STREAM_TEMPLATE_PART_TYPE_ID = 0,
PUSH_STREAM_TEMPLATE_PART_TYPE_TAG,
PUSH_STREAM_TEMPLATE_PART_TYPE_TIME,
PUSH_STREAM_TEMPLATE_PART_TYPE_EVENT_ID,
PUSH_STREAM_TEMPLATE_PART_TYPE_EVENT_TYPE,
PUSH_STREAM_TEMPLATE_PART_TYPE_CHANNEL,
PUSH_STREAM_TEMPLATE_PART_TYPE_TEXT,
PUSH_STREAM_TEMPLATE_PART_TYPE_LITERAL
} ngx_http_push_stream_template_part_type;

typedef struct {
ngx_queue_t queue;
ngx_http_push_stream_template_part_type kind;
ngx_str_t text;
} ngx_http_push_stream_template_parts_t;

// template queue
typedef struct {
ngx_queue_t queue;
ngx_str_t *template;
ngx_uint_t index;
ngx_flag_t eventsource;
ngx_flag_t websocket;
} ngx_http_push_stream_template_queue_t;
ngx_queue_t parts;
ngx_uint_t qtd_message_id;
ngx_uint_t qtd_event_id;
ngx_uint_t qtd_event_type;
ngx_uint_t qtd_channel;
ngx_uint_t qtd_text;
ngx_uint_t qtd_tag;
ngx_uint_t qtd_time;
size_t literal_len;
} ngx_http_push_stream_template_t;

typedef struct ngx_http_push_stream_msg_s ngx_http_push_stream_msg_t;
typedef struct ngx_http_push_stream_shm_data_s ngx_http_push_stream_shm_data_t;
Expand Down
4 changes: 2 additions & 2 deletions include/ngx_http_push_stream_module_utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -236,8 +236,8 @@ static ngx_int_t ngx_http_push_stream_send_only_header_response(ngx_h
static ngx_int_t ngx_http_push_stream_send_only_header_response_and_finalize(ngx_http_request_t *r, ngx_int_t status, const ngx_str_t *explain_error_message);
static ngx_str_t * ngx_http_push_stream_str_replace(const ngx_str_t *org, const ngx_str_t *find, const ngx_str_t *replace, off_t offset, ngx_pool_t *temp_pool);
static ngx_str_t * ngx_http_push_stream_get_formatted_websocket_frame(const u_char *opcode, off_t opcode_len, const u_char *text, off_t text_len, ngx_pool_t *temp_pool);
static ngx_str_t * ngx_http_push_stream_get_formatted_message(ngx_http_request_t *r, ngx_http_push_stream_channel_t *channel, ngx_http_push_stream_msg_t *msg, ngx_pool_t *temp_pool);
static ngx_str_t * ngx_http_push_stream_format_message(ngx_http_push_stream_channel_t *channel, ngx_http_push_stream_msg_t *message, ngx_str_t *text, ngx_str_t *message_template, ngx_pool_t *temp_pool);
static ngx_str_t * ngx_http_push_stream_get_formatted_message(ngx_http_request_t *r, ngx_http_push_stream_channel_t *channel, ngx_http_push_stream_msg_t *msg);
static ngx_str_t * ngx_http_push_stream_format_message(ngx_http_push_stream_channel_t *channel, ngx_http_push_stream_msg_t *message, ngx_str_t *text, ngx_http_push_stream_template_t *template, ngx_pool_t *temp_pool);
static ngx_str_t * ngx_http_push_stream_apply_template_to_each_line(ngx_str_t *text, const ngx_str_t *message_template, ngx_pool_t *temp_pool);
static ngx_int_t ngx_http_push_stream_send_response_content_header(ngx_http_request_t *r, ngx_http_push_stream_loc_conf_t *pslcf);
static ngx_int_t ngx_http_push_stream_send_response(ngx_http_request_t *r, ngx_str_t *text, const ngx_str_t *content_type, ngx_int_t status_code);
Expand Down
2 changes: 1 addition & 1 deletion misc/spec/subscriber/comunication_properties_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@

if lines.length >= 3
lines[0].should eql("#{conf.header_template}")
lines[1].should eql("{\"channel\":\"ch_test_message_and_channel_with_same_pattern_of_the_template~channel~~channel~~channel~~channel~~channel~~channel~~text~~text~~text~~channel~~channel~~channel~~text~~text~~text~~channel~~channel~~channel~~text~~text~~text~\", \"message\":\"~channel~~channel~~channel~~text~~text~~text~\", \"message_id\":\"1\"}")
lines[1].should eql("{\"channel\":\"ch_test_message_and_channel_with_same_pattern_of_the_template~channel~~channel~~channel~~text~~text~~text~\", \"message\":\"~channel~~channel~~channel~~text~~text~~text~\", \"message_id\":\"1\"}")
lines[2].should eql("{\"channel\":\"\", \"message\":\" \", \"message_id\":\"-1\"}")
EventMachine.stop
end
Expand Down
109 changes: 105 additions & 4 deletions src/ngx_http_push_stream_module.c
Original file line number Diff line number Diff line change
Expand Up @@ -276,14 +276,52 @@ ngx_http_push_stream_send_response_channels_info_detailed(ngx_http_request_t *r,
}

static ngx_int_t
ngx_http_push_stream_find_or_add_template(ngx_conf_t *cf, ngx_str_t template, ngx_flag_t eventsource, ngx_flag_t websocket) {
ngx_http_push_stream_check_and_parse_template_pattern(ngx_conf_t *cf, ngx_http_push_stream_template_t *template, u_char *last, u_char *start, const ngx_str_t *token, ngx_http_push_stream_template_part_type part_type)
{
ngx_http_push_stream_template_parts_t *part;

if (ngx_strncasecmp(start, token->data, token->len) == 0) {
if ((start - last) > 0) {
part = ngx_pcalloc(cf->pool, sizeof(ngx_http_push_stream_template_parts_t));
if (part == NULL) {
ngx_log_error(NGX_LOG_ERR, cf->log, 0, "push stream module: unable to allocate memory for add template part");
return NGX_ERROR;
}
part->kind = PUSH_STREAM_TEMPLATE_PART_TYPE_LITERAL;
part->text.data = last;
part->text.len = start - last;
template->literal_len += part->text.len;
ngx_queue_insert_tail(&template->parts, &part->queue);
}

part = ngx_pcalloc(cf->pool, sizeof(ngx_http_push_stream_template_parts_t));
if (part == NULL) {
ngx_log_error(NGX_LOG_ERR, cf->log, 0, "push stream module: unable to allocate memory for add template part");
return NGX_ERROR;
}
part->kind = part_type;
ngx_queue_insert_tail(&template->parts, &part->queue);

return NGX_OK;
}

return NGX_DECLINED;
}

static ngx_int_t
ngx_http_push_stream_find_or_add_template(ngx_conf_t *cf, ngx_str_t template, ngx_flag_t eventsource, ngx_flag_t websocket)
{
ngx_http_push_stream_main_conf_t *mcf = ngx_http_conf_get_module_main_conf(cf, ngx_http_push_stream_module);
ngx_queue_t *q;
ngx_http_push_stream_template_queue_t *cur;
ngx_http_push_stream_template_t *cur;
ngx_str_t *aux = NULL;
u_char *start = NULL, *last = NULL;
size_t len = 0;
ngx_http_push_stream_template_parts_t *part;
ngx_int_t rc;

for (q = ngx_queue_head(&mcf->msg_templates); q != ngx_queue_sentinel(&mcf->msg_templates); q = ngx_queue_next(q)) {
cur = ngx_queue_data(q, ngx_http_push_stream_template_queue_t, queue);
cur = ngx_queue_data(q, ngx_http_push_stream_template_t, queue);
if ((ngx_memn2cmp(cur->template->data, template.data, cur->template->len, template.len) == 0) &&
(cur->eventsource == eventsource) && (cur->websocket == websocket)) {
return cur->index;
Expand All @@ -292,7 +330,7 @@ ngx_http_push_stream_find_or_add_template(ngx_conf_t *cf, ngx_str_t template, n

mcf->qtd_templates++;

cur = ngx_pcalloc(cf->pool, sizeof(ngx_http_push_stream_template_queue_t));
cur = ngx_pcalloc(cf->pool, sizeof(ngx_http_push_stream_template_t));
aux = ngx_http_push_stream_create_str(cf->pool, template.len);
if ((cur == NULL) || (aux == NULL)) {
ngx_log_error(NGX_LOG_ERR, cf->log, 0, "push stream module: unable to allocate memory for add template to main configuration");
Expand All @@ -302,7 +340,70 @@ ngx_http_push_stream_find_or_add_template(ngx_conf_t *cf, ngx_str_t template, n
cur->eventsource = eventsource;
cur->websocket = websocket;
cur->index = mcf->qtd_templates;
cur->qtd_message_id = 0;
cur->qtd_event_id = 0;
cur->qtd_event_type = 0;
cur->qtd_channel = 0;
cur->qtd_text = 0;
cur->qtd_tag = 0;
cur->qtd_time = 0;
cur->literal_len = 0;
ngx_queue_init(&cur->parts);
ngx_memcpy(cur->template->data, template.data, template.len);
ngx_queue_insert_tail(&mcf->msg_templates, &cur->queue);

len = cur->template->len;
last = start = cur->template->data;
while ((start = ngx_strnstr(start, "~", len)) != NULL) {
if ((rc = ngx_http_push_stream_check_and_parse_template_pattern(cf, cur, last, start, &NGX_HTTP_PUSH_STREAM_TOKEN_MESSAGE_ID, PUSH_STREAM_TEMPLATE_PART_TYPE_ID)) == NGX_OK) {
start += NGX_HTTP_PUSH_STREAM_TOKEN_MESSAGE_ID.len;
last = start;
cur->qtd_message_id++;
} else if ((rc == NGX_DECLINED) && ((rc = ngx_http_push_stream_check_and_parse_template_pattern(cf, cur, last, start, &NGX_HTTP_PUSH_STREAM_TOKEN_MESSAGE_EVENT_ID, PUSH_STREAM_TEMPLATE_PART_TYPE_EVENT_ID)) == NGX_OK)) {
start += NGX_HTTP_PUSH_STREAM_TOKEN_MESSAGE_EVENT_ID.len;
last = start;
cur->qtd_event_id++;
} else if ((rc == NGX_DECLINED) && ((rc = ngx_http_push_stream_check_and_parse_template_pattern(cf, cur, last, start, &NGX_HTTP_PUSH_STREAM_TOKEN_MESSAGE_EVENT_TYPE, PUSH_STREAM_TEMPLATE_PART_TYPE_EVENT_TYPE)) == NGX_OK)) {
start += NGX_HTTP_PUSH_STREAM_TOKEN_MESSAGE_EVENT_TYPE.len;
last = start;
cur->qtd_event_type++;
} else if ((rc == NGX_DECLINED) && ((rc = ngx_http_push_stream_check_and_parse_template_pattern(cf, cur, last, start, &NGX_HTTP_PUSH_STREAM_TOKEN_MESSAGE_CHANNEL, PUSH_STREAM_TEMPLATE_PART_TYPE_CHANNEL)) == NGX_OK)) {
start += NGX_HTTP_PUSH_STREAM_TOKEN_MESSAGE_CHANNEL.len;
last = start;
cur->qtd_channel++;
} else if ((rc == NGX_DECLINED) && ((rc = ngx_http_push_stream_check_and_parse_template_pattern(cf, cur, last, start, &NGX_HTTP_PUSH_STREAM_TOKEN_MESSAGE_TEXT, PUSH_STREAM_TEMPLATE_PART_TYPE_TEXT)) == NGX_OK)) {
start += NGX_HTTP_PUSH_STREAM_TOKEN_MESSAGE_TEXT.len;
last = start;
cur->qtd_text++;
} else if ((rc == NGX_DECLINED) && ((rc = ngx_http_push_stream_check_and_parse_template_pattern(cf, cur, last, start, &NGX_HTTP_PUSH_STREAM_TOKEN_MESSAGE_TAG, PUSH_STREAM_TEMPLATE_PART_TYPE_TAG)) == NGX_OK)) {
start += NGX_HTTP_PUSH_STREAM_TOKEN_MESSAGE_TAG.len;
last = start;
cur->qtd_tag++;
} else if ((rc == NGX_DECLINED) && ((rc = ngx_http_push_stream_check_and_parse_template_pattern(cf, cur, last, start, &NGX_HTTP_PUSH_STREAM_TOKEN_MESSAGE_TIME, PUSH_STREAM_TEMPLATE_PART_TYPE_TIME)) == NGX_OK)) {
start += NGX_HTTP_PUSH_STREAM_TOKEN_MESSAGE_TIME.len;
last = start;
cur->qtd_time++;
} else {
start += 1;
}

if (rc == NGX_ERROR) {
return -1;
}
}

if (last < (cur->template->data + cur->template->len)) {
part = ngx_pcalloc(cf->pool, sizeof(ngx_http_push_stream_template_parts_t));
if (part == NULL) {
ngx_log_error(NGX_LOG_ERR, cf->log, 0, "push stream module: unable to allocate memory for add template part");
return -1;
}
part->kind = PUSH_STREAM_TEMPLATE_PART_TYPE_LITERAL;
part->text.data = last;
part->text.len = (cur->template->data + cur->template->len) - last;
cur->literal_len += part->text.len;
ngx_queue_insert_tail(&cur->parts, &part->queue);
}

return cur->index;
}
6 changes: 5 additions & 1 deletion src/ngx_http_push_stream_module_setup.c
Original file line number Diff line number Diff line change
Expand Up @@ -748,7 +748,11 @@ ngx_http_push_stream_merge_loc_conf(ngx_conf_t *cf, void *parent, void *child)
(conf->location_type == NGX_HTTP_PUSH_STREAM_SUBSCRIBER_MODE_STREAMING) ||
(conf->location_type == NGX_HTTP_PUSH_STREAM_SUBSCRIBER_MODE_EVENTSOURCE) ||
(conf->location_type == NGX_HTTP_PUSH_STREAM_SUBSCRIBER_MODE_WEBSOCKET)) {
conf->message_template_index = ngx_http_push_stream_find_or_add_template(cf, conf->message_template, (conf->location_type == NGX_HTTP_PUSH_STREAM_SUBSCRIBER_MODE_EVENTSOURCE), (conf->location_type == NGX_HTTP_PUSH_STREAM_SUBSCRIBER_MODE_WEBSOCKET));
if ((conf->message_template_index = ngx_http_push_stream_find_or_add_template(cf, conf->message_template, (conf->location_type == NGX_HTTP_PUSH_STREAM_SUBSCRIBER_MODE_EVENTSOURCE), (conf->location_type == NGX_HTTP_PUSH_STREAM_SUBSCRIBER_MODE_WEBSOCKET))) < 0) {
ngx_conf_log_error(NGX_LOG_ERR, cf, 0, "push stream module: push stream module: unable to parse message template: %V", &conf->message_template);
return NGX_CONF_ERROR;
}


if (conf->padding_by_user_agent.len > 0) {
if ((conf->paddings = ngx_http_push_stream_parse_paddings(cf, &conf->padding_by_user_agent)) == NULL) {
Expand Down
Loading

0 comments on commit be0ae92

Please sign in to comment.