-
Notifications
You must be signed in to change notification settings - Fork 292
/
ngx_http_push_module.h
137 lines (118 loc) · 7.03 KB
/
ngx_http_push_module.h
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
#include <ngx_config.h>
#include <ngx_core.h>
#include <ngx_http.h>
//with the declarations
typedef struct {
ngx_int_t index;
time_t buffer_timeout;
ngx_int_t min_message_queue_size;
ngx_int_t max_message_queue_size;
ngx_int_t listener_concurrency;
ngx_int_t listener_poll_mechanism;
ngx_int_t authorize_channel;
ngx_int_t store_messages;
ngx_int_t min_message_recipients;
} ngx_http_push_loc_conf_t;
#define NGX_HTTP_PUSH_DEFAULT_SHM_SIZE 3145728 //3 megs
#define NGX_HTTP_PUSH_DEFAULT_BUFFER_TIMEOUT 3600
#define NGX_HTTP_PUSH_DEFAULT_MIN_MESSAGE_QUEUE_SIZE 5
#define NGX_HTTP_PUSH_DEFAULT_MAX_MESSAGE_QUEUE_SIZE 255
ngx_str_t NGX_HTTP_PUSH_CACHE_CONTROL_VALUE = ngx_string("no-cache");
#define NGX_HTTP_PUSH_LISTENER_LASTIN 0
#define NGX_HTTP_PUSH_LISTENER_FIRSTIN 1
#define NGX_HTTP_PUSH_LISTENER_BROADCAST 2
#define NGX_HTTP_PUSH_LISTENER_LONGPOLL 0
#define NGX_HTTP_PUSH_LISTENER_INTERVALPOLL 1
#define NGX_HTTP_PUSH_MIN_MESSAGE_RECIPIENTS 0
typedef struct {
size_t shm_size;
} ngx_http_push_main_conf_t;
//message queue
typedef struct {
ngx_queue_t queue;
ngx_str_t content_type;
ngx_str_t charset;
ngx_buf_t *buf;
time_t expires;
ngx_uint_t received;
time_t message_time; //tag message by time
ngx_int_t message_tag; //used in conjunction with message_time if more than one message have the same time.
} ngx_http_push_msg_t;
typedef struct ngx_http_push_listener_s ngx_http_push_listener_t;
typedef struct ngx_http_push_node_s ngx_http_push_node_t;
//cleaning supplies
typedef struct {
ngx_http_push_listener_t *listener;
ngx_http_push_node_t *node;
ngx_slab_pool_t *shpool;
} ngx_http_push_listener_cleanup_t;
//listener request queue
struct ngx_http_push_listener_s {
ngx_queue_t queue;
ngx_http_request_t *request;
ngx_http_push_listener_cleanup_t *cleanup;
};
//our typecast-friendly rbtree node
struct ngx_http_push_node_s {
ngx_rbtree_node_t node;
ngx_str_t id;
ngx_http_push_msg_t *message_queue;
ngx_uint_t message_queue_size;
ngx_http_push_listener_t *listener_queue;
ngx_uint_t listener_queue_size;
time_t last_seen;
};
//sender stuff
static char * ngx_http_push_sender(ngx_conf_t *cf, ngx_command_t *cmd, void *conf); //push_sender hook
static ngx_int_t ngx_http_push_sender_handler(ngx_http_request_t * r);
static void ngx_http_push_sender_body_handler(ngx_http_request_t * r);
static ngx_int_t ngx_http_push_node_info(ngx_http_request_t *r, ngx_uint_t message_queue_size, ngx_uint_t listener_queue_size, time_t last_seen);
//listener stuff
static char * ngx_http_push_listener(ngx_conf_t *cf, ngx_command_t *cmd, void *conf); //push_listener hook
static ngx_int_t ngx_http_push_listener_handler(ngx_http_request_t * r);
static ngx_str_t * ngx_http_push_listener_get_etag(ngx_http_request_t * r);
static ngx_int_t ngx_http_push_listener_get_etag_int(ngx_http_request_t * r);
static ngx_int_t ngx_http_push_handle_listener_concurrency_setting(ngx_int_t concurrency, ngx_http_push_node_t *node, ngx_http_request_t *r, ngx_slab_pool_t *shpool);
//response generating stuff
static ngx_int_t ngx_http_push_set_listener_header(ngx_http_request_t *r, ngx_http_push_msg_t *msg);
static ngx_chain_t *ngx_http_push_create_output_chain(ngx_http_request_t *r, ngx_buf_t *buf, ngx_slab_pool_t *shpool);
static void ngx_http_push_copy_preallocated_buffer(ngx_buf_t *buf, ngx_buf_t *cbuf);
static ngx_int_t ngx_http_push_set_listener_body(ngx_http_request_t *r, ngx_chain_t *out);
//misc stuff
ngx_shm_zone_t * ngx_http_push_shm_zone = NULL;
static char * ngx_http_push_setup_handler(ngx_conf_t *cf, void * conf, ngx_int_t (*handler)(ngx_http_request_t *));
static void * ngx_http_push_create_main_conf(ngx_conf_t *cf);
static void * ngx_http_push_create_loc_conf(ngx_conf_t *cf);
static char * ngx_http_push_merge_loc_conf(ngx_conf_t *cf, void *parent, void *child);
static ngx_int_t ngx_http_push_set_up_shm(ngx_conf_t *cf, size_t shm_size);
static ngx_int_t ngx_http_push_init_shm_zone(ngx_shm_zone_t * shm_zone, void * data);
static ngx_int_t ngx_http_push_postconfig(ngx_conf_t *cf);
static char * ngx_http_push_set_listener_concurrency(ngx_conf_t *cf, ngx_command_t *cmd, void *conf);
static void ngx_http_push_reply_status_only(ngx_http_request_t *r, ngx_int_t code, ngx_str_t *statusline);
static ngx_table_elt_t * ngx_http_push_add_response_header(ngx_http_request_t *r, ngx_str_t *header_name, ngx_str_t *header_value);
static ngx_int_t ngx_http_push_set_channel_id(ngx_str_t *id, ngx_http_request_t *r, ngx_http_push_loc_conf_t *cf);
static void ngx_http_push_listener_cleanup(ngx_http_push_listener_cleanup_t *data);
static ngx_http_push_listener_t * ngx_http_push_dequeue_listener_locked(ngx_http_push_node_t * node); //doesn't free associated memory
static ngx_inline ngx_http_push_listener_t *ngx_http_push_queue_listener_request(ngx_http_push_node_t * node, ngx_http_request_t *r, ngx_slab_pool_t *shpool);
static ngx_inline ngx_http_push_listener_t *ngx_http_push_queue_listener_request_locked(ngx_http_push_node_t * node, ngx_http_request_t *r, ngx_slab_pool_t *shpool);
//message stuff
static ngx_http_push_msg_t * ngx_http_push_dequeue_message_locked(ngx_http_push_node_t * node); // doesn't free associated memory
static ngx_http_push_msg_t * ngx_http_push_find_message_locked(ngx_http_push_node_t * node, ngx_http_request_t *r, ngx_int_t *status);
static ngx_http_push_msg_t *ngx_http_push_get_last_message_locked(ngx_http_push_node_t * node, ngx_slab_pool_t *shpool);
static ngx_http_push_msg_t *ngx_http_push_get_oldest_message_locked(ngx_http_push_node_t * node);
static ngx_inline void ngx_http_push_delete_message(ngx_slab_pool_t *shpool, ngx_http_push_node_t *node, ngx_http_push_msg_t *msg);
static ngx_inline void ngx_http_push_delete_message_locked(ngx_slab_pool_t *shpool, ngx_http_push_node_t *node, ngx_http_push_msg_t *msg);
//missing in nginx < 0.7.?
#ifndef ngx_queue_insert_tail
#define ngx_queue_insert_tail(h, x) \
(x)->prev = (h)->prev; \
(x)->prev->next = x; \
(x)->next = h; \
(h)->prev = x
#endif
//string constants
//headers
static ngx_str_t ngx_http_push_Etag = ngx_string("Etag");
static ngx_str_t ngx_http_push_If_None_Match = ngx_string("If-None-Match");
static ngx_str_t ngx_http_push_Vary = ngx_string("Vary");
static ngx_str_t ngx_http_push_Allow = ngx_string("Allow");