/
ngx_http_push_module.h
253 lines (208 loc) · 10.4 KB
/
ngx_http_push_module.h
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
#include <ngx_config.h>
#include <ngx_core.h>
#include <ngx_http.h>
#include <ngx_channel.h>
#define NGX_HTTP_PUSH_DEFAULT_SHM_SIZE 33554432 //32 megs
#define NGX_HTTP_PUSH_DEFAULT_BUFFER_TIMEOUT 3600
#define NGX_HTTP_PUSH_DEFAULT_MIN_MESSAGES 1
#define NGX_HTTP_PUSH_DEFAULT_MAX_MESSAGES 10
#define NGX_HTTP_PUSH_SUBSCRIBER_CONCURRENCY_LASTIN 0
#define NGX_HTTP_PUSH_SUBSCRIBER_CONCURRENCY_FIRSTIN 1
#define NGX_HTTP_PUSH_SUBSCRIBER_CONCURRENCY_BROADCAST 2
#define NGX_HTTP_PUSH_MECHANISM_LONGPOLL 0
#define NGX_HTTP_PUSH_MECHANISM_INTERVALPOLL 1
#define NGX_HTTP_PUSH_MIN_MESSAGE_RECIPIENTS 0
#define NGX_HTTP_PUSH_MAX_CHANNEL_ID_LENGTH 1024 //bytes
#ifndef NGX_HTTP_CONFLICT
#define NGX_HTTP_CONFLICT 409
#endif
#ifndef NGX_HTTP_GONE
#define NGX_HTTP_GONE 410
#endif
#ifndef NGX_HTTP_CREATED
#define NGX_HTTP_CREATED 201
#endif
#ifndef NGX_HTTP_ACCEPTED
#define NGX_HTTP_ACCEPTED 202
#endif
#define NGX_HTTP_PUSH_MESSAGE_RECEIVED 9000
#define NGX_HTTP_PUSH_MESSAGE_QUEUED 9001
#define NGX_HTTP_PUSH_MESSAGE_FOUND 1000
#define NGX_HTTP_PUSH_MESSAGE_EXPECTED 1001
#define NGX_HTTP_PUSH_MESSAGE_EXPIRED 1002
//on with the declarations
typedef struct {
size_t shm_size;
} ngx_http_push_main_conf_t;
typedef struct {
ngx_int_t index;
time_t buffer_timeout;
ngx_int_t min_messages;
ngx_int_t max_messages;
ngx_int_t subscriber_concurrency;
ngx_int_t subscriber_poll_mechanism;
ngx_int_t authorize_channel;
ngx_int_t store_messages;
ngx_int_t min_message_recipients;
ngx_str_t channel_group;
ngx_int_t max_channel_id_length;
} ngx_http_push_loc_conf_t;
//message queue
typedef struct {
ngx_queue_t queue; //this MUST be first.
ngx_str_t content_type;
// ngx_str_t charset;
ngx_buf_t *buf;
time_t expires;
ngx_uint_t received;
time_t message_time; //tag message by time
ngx_int_t message_tag; //used in conjunction with message_time if more than one message have the same time.
ngx_int_t refcount;
} ngx_http_push_msg_t;
typedef struct ngx_http_push_subscriber_cleanup_s ngx_http_push_subscriber_cleanup_t;
//subscriber request queue
typedef struct {
ngx_queue_t queue; //this MUST be first.
ngx_http_request_t *request;
ngx_http_push_subscriber_cleanup_t *clndata;
} ngx_http_push_subscriber_t;
typedef struct {
ngx_queue_t queue;
pid_t pid;
ngx_int_t slot;
ngx_http_push_subscriber_t *subscriber_sentinel;
} ngx_http_push_pid_queue_t;
//our typecast-friendly rbtree node (channel)
typedef struct {
ngx_rbtree_node_t node; //this MUST be first.
ngx_str_t id;
ngx_http_push_msg_t *message_queue;
ngx_uint_t messages;
ngx_http_push_pid_queue_t workers_with_subscribers;
ngx_uint_t subscribers;
time_t last_seen;
} ngx_http_push_channel_t;
//cleaning supplies
struct ngx_http_push_subscriber_cleanup_s {
ngx_http_push_subscriber_t *subscriber;
ngx_http_push_channel_t *channel;
};
//garbage collecting goodness
typedef struct {
ngx_queue_t queue;
ngx_http_push_channel_t *channel;
} ngx_http_push_channel_queue_t;
//messages to worker processes
typedef struct {
ngx_queue_t queue;
ngx_http_push_msg_t *msg; //->shared memory
ngx_int_t status_code;
ngx_pid_t pid;
ngx_http_push_channel_t *channel; //->shared memory
ngx_http_push_subscriber_t *subscriber_sentinel; //->a worker's local pool
} ngx_http_push_worker_msg_t;
//shared memory
typedef struct {
ngx_rbtree_t tree;
ngx_uint_t channels; //# of channels being used
ngx_http_push_worker_msg_t *ipc; //interprocess stuff
} ngx_http_push_shm_data_t;
ngx_int_t ngx_http_push_worker_processes;
ngx_pool_t *ngx_http_push_pool;
ngx_slab_pool_t *ngx_http_push_shm_shpool;
ngx_shm_zone_t *ngx_http_push_shm_zone = NULL;
//garbage-collecting shared memory slab allocation
void * ngx_http_push_slab_alloc(size_t size);
void * ngx_http_push_slab_alloc_locked(size_t size);
//channel messages
static ngx_http_push_msg_t *ngx_http_push_get_latest_message_locked(ngx_http_push_channel_t * channel);
static ngx_http_push_msg_t *ngx_http_push_get_oldest_message_locked(ngx_http_push_channel_t * channel);
static ngx_inline void ngx_http_push_general_delete_message_locked(ngx_http_push_channel_t *channel, ngx_http_push_msg_t *msg, ngx_int_t force, ngx_slab_pool_t *shpool);
#define ngx_http_push_delete_message_locked(channel, msg, shpool) ngx_http_push_general_delete_message_locked(channel, msg, 0, shpool)
#define ngx_http_push_force_delete_message_locked(channel, msg, shpool) ngx_http_push_general_delete_message_locked(channel, msg, 1, shpool)
static ngx_inline void ngx_http_push_free_message_locked(ngx_http_push_msg_t *msg, ngx_slab_pool_t *shpool);
static ngx_http_push_msg_t * ngx_http_push_find_message_locked(ngx_http_push_channel_t *channel, ngx_http_request_t *r, ngx_int_t *status);
//channel
static ngx_str_t * ngx_http_push_get_channel_id(ngx_http_request_t *r, ngx_http_push_loc_conf_t *cf);
static ngx_int_t ngx_http_push_channel_info(ngx_http_request_t *r, ngx_uint_t message_queue_size, ngx_uint_t subscriber_queue_size, time_t last_seen);
//subscriber
static ngx_int_t ngx_http_push_subscriber_handler(ngx_http_request_t *r);
static ngx_int_t ngx_http_push_handle_subscriber_concurrency_setting(ngx_int_t concurrency, ngx_http_push_channel_t *channel, ngx_http_request_t *r, ngx_slab_pool_t *shpool);
static ngx_int_t ngx_http_push_broadcast_locked(ngx_http_push_channel_t *channel, ngx_http_push_msg_t *msg, ngx_int_t status_code, const ngx_str_t *status_line, ngx_log_t *log, ngx_slab_pool_t *shpool);
#define ngx_http_push_broadcast_status_locked(channel, status_code, status_line, log, shpool) ngx_http_push_broadcast_locked(channel, NULL, status_code, status_line, log, shpool)
#define ngx_http_push_broadcast_message_locked(channel, msg, log, shpool) ngx_http_push_broadcast_locked(channel, msg, 0, NULL, log, shpool)
static ngx_int_t ngx_http_push_respond_to_subscribers(ngx_http_push_channel_t *channel, ngx_http_push_subscriber_t *sentinel, ngx_http_push_msg_t *msg, ngx_int_t status_code, const ngx_str_t *status_line);
static ngx_int_t ngx_http_push_subscriber_get_etag_int(ngx_http_request_t * r);
static ngx_str_t * ngx_http_push_subscriber_get_etag(ngx_http_request_t * r);
static void ngx_http_push_subscriber_cleanup(ngx_http_push_subscriber_cleanup_t *data);
static ngx_int_t ngx_http_push_prepare_response_to_subscriber_request(ngx_http_request_t *r, ngx_chain_t *chain, ngx_str_t *content_type, ngx_str_t *etag, time_t last_modified);
//publisher
static ngx_int_t ngx_http_push_publisher_handler(ngx_http_request_t * r);
static void ngx_http_push_publisher_body_handler(ngx_http_request_t * r);
//utilities
//general request handling
static void ngx_http_push_copy_preallocated_buffer(ngx_buf_t *buf, ngx_buf_t *cbuf);
static ngx_table_elt_t * ngx_http_push_add_response_header(ngx_http_request_t *r, const ngx_str_t *header_name, const ngx_str_t *header_value);
static void ngx_http_push_respond_status_only(ngx_http_request_t *r, ngx_int_t status_code, const ngx_str_t *statusline);
static ngx_chain_t * ngx_http_push_create_output_chain_general(ngx_buf_t *buf, ngx_pool_t *pool, ngx_log_t *log, ngx_slab_pool_t *shpool);
#define ngx_http_push_create_output_chain(buf, pool, log) ngx_http_push_create_output_chain_general(buf, pool, log, NULL)
#define ngx_http_push_create_output_chain_locked(buf, pool, log, shpool) ngx_http_push_create_output_chain_general(buf, pool, log, shpool)
//missing in nginx < 0.7.?
#ifndef ngx_queue_insert_tail
#define ngx_queue_insert_tail(h, x) \
(x)->prev = (h)->prev; \
(x)->prev->next = x; \
(x)->next = h; \
(h)->prev = x
#endif
#ifndef ngx_queue_next
#define ngx_queue_next(q) \
(q)->next
#endif
#ifndef NGX_FILE_OWNER_ACCESS
#define NGX_FILE_OWNER_ACCESS 0600
#endif
//string constants
//headers
const ngx_str_t NGX_HTTP_PUSH_HEADER_ETAG = ngx_string("Etag");
const ngx_str_t NGX_HTTP_PUSH_HEADER_IF_NONE_MATCH = ngx_string("If-None-Match");
const ngx_str_t NGX_HTTP_PUSH_HEADER_VARY = ngx_string("Vary");
const ngx_str_t NGX_HTTP_PUSH_HEADER_ALLOW = ngx_string("Allow");
//header values
const ngx_str_t NGX_HTTP_PUSH_CACHE_CONTROL_VALUE = ngx_string("no-cache");
//status strings
const ngx_str_t NGX_HTTP_PUSH_HTTP_STATUS_409 = ngx_string("409 Conflict");
const ngx_str_t NGX_HTTP_PUSH_HTTP_STATUS_410 = ngx_string("410 Gone");
//other stuff
const ngx_str_t NGX_HTTP_PUSH_ALLOW_GET_POST_PUT_DELETE= ngx_string("GET, POST, PUT, DELETE");
const ngx_str_t NGX_HTTP_PUSH_ALLOW_GET= ngx_string("GET");
const ngx_str_t NGX_HTTP_PUSH_VARY_HEADER_VALUE = ngx_string("If-None-Match, If-Modified-Since");
const ngx_str_t NGX_HTTP_PUSH_CHANNEL_INFO_PLAIN = ngx_string(
"queued messages: %ui" CRLF
"last requested: %d sec. ago (-1=never)" CRLF
"active subscribers: %ui"
"\0");
const ngx_str_t NGX_HTTP_PUSH_CHANNEL_INFO_JSON = ngx_string(
"{\"messages\": %ui, "
"\"requested\": %d, "
"\"subscribers\": %ui }"
"\0");
const ngx_str_t NGX_HTTP_PUSH_CHANNEL_INFO_XML = ngx_string(
"<?xml version=\"1.0\" encoding=\"UTF-8\" ?>" CRLF
"<channel>" CRLF
" <messages>%ui</messages>" CRLF
" <requested>%d</requested>" CRLF
" <subscribers>%ui</subscribers>" CRLF
"</channel>"
"\0");
const ngx_str_t NGX_HTTP_PUSH_CHANNEL_INFO_YAML = ngx_string(
"---" CRLF
"messages: %ui" CRLF
"requested: %d" CRLF
"subscribers %ui" CRLF
"\0");
typedef struct {
char *subtype;
size_t len;
const ngx_str_t *format;
} ngx_http_push_content_subtype_t;