Skip to content
This repository
Newer
Older
100644 434 lines (367 sloc) 13.997 kb
ea4b8724 »
2009-02-02 checkpoint work on big bufferevent refactoring
1 /*
2 * Copyright (c) 2002-2004 Niels Provos <provos@citi.umich.edu>
3 * All rights reserved.
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions
7 * are met:
8 * 1. Redistributions of source code must retain the above copyright
9 * notice, this list of conditions and the following disclaimer.
10 * 2. Redistributions in binary form must reproduce the above copyright
11 * notice, this list of conditions and the following disclaimer in the
12 * documentation and/or other materials provided with the distribution.
13 * 3. The name of the author may not be used to endorse or promote products
14 * derived from this software without specific prior written permission.
15 *
16 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR
17 * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
18 * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
19 * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT,
20 * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
21 * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
22 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
23 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
24 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
25 * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
26 */
27
28 #include <sys/types.h>
29
30 #ifdef HAVE_CONFIG_H
3502a472 »
2009-02-02 convert new bufferevent_*.c files to event-config.h only.
31 #include "event-config.h"
ea4b8724 »
2009-02-02 checkpoint work on big bufferevent refactoring
32 #endif
33
3502a472 »
2009-02-02 convert new bufferevent_*.c files to event-config.h only.
34 #ifdef _EVENT_HAVE_SYS_TIME_H
ea4b8724 »
2009-02-02 checkpoint work on big bufferevent refactoring
35 #include <sys/time.h>
36 #endif
37
38 #include <errno.h>
39 #include <stdio.h>
40 #include <stdlib.h>
41 #include <string.h>
42 #include <assert.h>
3502a472 »
2009-02-02 convert new bufferevent_*.c files to event-config.h only.
43 #ifdef _EVENT_HAVE_STDARG_H
ea4b8724 »
2009-02-02 checkpoint work on big bufferevent refactoring
44 #include <stdarg.h>
45 #endif
46
47 #ifdef WIN32
48 #include <winsock2.h>
49 #endif
50
51 #include "event2/util.h"
52 #include "event2/bufferevent.h"
53 #include "event2/buffer.h"
54 #include "event2/buffer_compat.h"
55 #include "event2/bufferevent_struct.h"
56 #include "event2/bufferevent_compat.h"
57 #include "event2/event.h"
58 #include "log-internal.h"
59 #include "mm-internal.h"
60 #include "bufferevent-internal.h"
61 #include "util-internal.h"
62
63 /* prototypes */
64 static int be_filter_enable(struct bufferevent *, short);
65 static int be_filter_disable(struct bufferevent *, short);
66 static void be_filter_destruct(struct bufferevent *);
67 static void be_filter_adj_timeouts(struct bufferevent *);
68
69 static void be_filter_readcb(struct bufferevent *, void *);
70 static void be_filter_writecb(struct bufferevent *, void *);
71 static void be_filter_errorcb(struct bufferevent *, short, void *);
72 static int be_filter_flush(struct bufferevent *bufev,
73 short iotype, enum bufferevent_flush_mode mode);
74
75 static void bufferevent_filtered_outbuf_cb(struct evbuffer *buf,
f1b1bad4 »
2009-04-03 Make the new evbuffer callbacks use a new struct-based interface.
76 const struct evbuffer_cb_info *info, void *arg);
ea4b8724 »
2009-02-02 checkpoint work on big bufferevent refactoring
77
78 struct bufferevent_filtered {
79 struct bufferevent bev;
80
81 /** The bufferevent that we read/write filterd data from/to. */
82 struct bufferevent *underlying;
83 /** A callback on our outbuf to notice when somebody adds data */
84 struct evbuffer_cb_entry *outbuf_cb;
85 /** True iff we have received an EOF callback from the underlying
86 * bufferevent. */
87 unsigned got_eof;
88
89 /** Function to free context when we're done. */
90 void (*free_context)(void *);
91 /** Input filter */
92 bufferevent_filter_cb process_in;
93 /** Output filter */
94 bufferevent_filter_cb process_out;
95
96 /** User-supplied argument to the filters. */
97 void *context;
98 };
99
23085c92 »
2009-04-10 Add a linked-pair abstraction to bufferevents.
100 const struct bufferevent_ops bufferevent_ops_filter = {
ea4b8724 »
2009-02-02 checkpoint work on big bufferevent refactoring
101 "filter",
102 evutil_offsetof(struct bufferevent_filtered, bev),
103 be_filter_enable,
104 be_filter_disable,
105 be_filter_destruct,
106 be_filter_adj_timeouts,
107 be_filter_flush,
108 };
109
110 /* Given a bufferevent that's really the bev filter of a bufferevent_filtered,
111 * return that bufferevent_filtered. Returns NULL otherwise.*/
112 static inline struct bufferevent_filtered *
113 upcast(struct bufferevent *bev)
114 {
115 struct bufferevent_filtered *bev_f;
116 if (bev->be_ops != &bufferevent_ops_filter)
117 return NULL;
118 bev_f = (void*)( ((char*)bev) -
119 evutil_offsetof(struct bufferevent_filtered, bev) );
120 assert(bev_f->bev.be_ops == &bufferevent_ops_filter);
121 return bev_f;
122 }
123
124 #define downcast(bev_f) (&(bev_f)->bev)
125
126 /** Return 1 iff bevf's underlying bufferevent's output buffer is at or
127 * over its high watermark such that we should not write to it in a given
128 * flush mode. */
129 static int
130 be_underlying_writebuf_full(struct bufferevent_filtered *bevf,
131 enum bufferevent_flush_mode state)
132 {
133 struct bufferevent *u = bevf->underlying;
134 return state == BEV_NORMAL &&
135 u->wm_write.high &&
136 EVBUFFER_LENGTH(u->output) >= u->wm_write.high;
137 }
138
139 /** Return 1 if our input buffer is at or over its high watermark such that we
140 * should not write to it in a given flush mode. */
141 static int
142 be_readbuf_full(struct bufferevent_filtered *bevf,
143 enum bufferevent_flush_mode state)
144 {
145 struct bufferevent *bufev = downcast(bevf);
146 return state == BEV_NORMAL &&
147 bufev->wm_read.high &&
148 EVBUFFER_LENGTH(bufev->input) >= bufev->wm_read.high;
149 }
150
151
152 /* Filter to use when we're created with a NULL filter. */
153 static enum bufferevent_filter_result
154 be_null_filter(struct evbuffer *src, struct evbuffer *dst, ssize_t lim,
155 enum bufferevent_flush_mode state, void *ctx)
156 {
157 (void)state;
ac36f404 »
2009-02-11 oops; coding too quickly on nil-filter patch. Caught by niels.
158 if (evbuffer_remove_buffer(src, dst, lim) == 0)
ea4b8724 »
2009-02-02 checkpoint work on big bufferevent refactoring
159 return BEV_OK;
160 else
161 return BEV_ERROR;
162 }
163
164 struct bufferevent *
165 bufferevent_filter_new(struct bufferevent *underlying,
166 bufferevent_filter_cb input_filter,
167 bufferevent_filter_cb output_filter,
168 enum bufferevent_options options,
169 void (*free_context)(void *),
170 void *ctx)
171 {
172 struct bufferevent_filtered *bufev_f;
173
174 if (!input_filter)
175 input_filter = be_null_filter;
176 if (!output_filter)
177 output_filter = be_null_filter;
178
179 bufev_f = mm_calloc(1, sizeof(struct bufferevent_filtered));
180 if (!bufev_f)
181 return NULL;
182
183 if (bufferevent_init_common(&bufev_f->bev, underlying->ev_base,
184 &bufferevent_ops_filter, options) < 0) {
185 mm_free(bufev_f);
186 return NULL;
187 }
188
189 bufev_f->underlying = underlying;
190 bufev_f->process_in = input_filter;
191 bufev_f->process_out = output_filter;
192 bufev_f->free_context = free_context;
193 bufev_f->context = ctx;
194
195 bufferevent_setcb(bufev_f->underlying,
196 be_filter_readcb, be_filter_writecb, be_filter_errorcb, bufev_f);
197
198 bufev_f->outbuf_cb = evbuffer_add_cb(bufev_f->bev.output,
199 bufferevent_filtered_outbuf_cb, bufev_f);
200
201 return &bufev_f->bev;
202 }
203
204 static void
205 be_filter_destruct(struct bufferevent *bev)
206 {
207 struct bufferevent_filtered *bevf = upcast(bev);
208 assert(bevf);
209 if (bevf->free_context)
210 bevf->free_context(bevf->context);
211
212 if (bev->options & BEV_OPT_CLOSE_ON_FREE)
213 bufferevent_free(bevf->underlying);
214 }
215
216 static int
217 be_filter_enable(struct bufferevent *bev, short event)
218 {
219 struct bufferevent_filtered *bevf = upcast(bev);
220 return bufferevent_enable(bevf->underlying, event);
221 }
222
223 static int
224 be_filter_disable(struct bufferevent *bev, short event)
225 {
226 struct bufferevent_filtered *bevf = upcast(bev);
227 return bufferevent_disable(bevf->underlying, event);
228 }
229
230 static void
231 be_filter_adj_timeouts(struct bufferevent *bev)
232 {
233 struct bufferevent_filtered *bevf = upcast(bev);
234 struct timeval *r = NULL, *w = NULL;
235
236 if (bev->timeout_read.tv_sec >= 0)
237 r = &bev->timeout_read;
238 if (bev->timeout_write.tv_sec >= 0)
239 w = &bev->timeout_write;
240
241 bufferevent_set_timeouts(bevf->underlying, r, w);
242 }
243
244 static enum bufferevent_filter_result
245 be_filter_process_input(struct bufferevent_filtered *bevf,
246 enum bufferevent_flush_mode state,
247 int *processed_out)
248 {
249 enum bufferevent_filter_result res;
250
251 if (state == BEV_NORMAL) {
252 /* If we're in 'normal' mode, don't urge data on the filter
253 * unless we're reading data and under our high-water mark.*/
254 if (!(bevf->bev.enabled & EV_READ) ||
255 be_readbuf_full(bevf, state))
256 return BEV_OK;
257 }
258
259 do {
260 ssize_t limit = -1;
261 if (state == BEV_NORMAL && bevf->bev.wm_read.high)
262 limit = bevf->bev.wm_read.high -
263 EVBUFFER_LENGTH(bevf->bev.input);
264
265 res = bevf->process_in(bevf->underlying->input,
266 bevf->bev.input, limit, state, bevf->context);
267
268 if (res == BEV_OK)
269 *processed_out = 1;
270 } while (res == BEV_OK &&
271 (bevf->bev.enabled & EV_READ) &&
272 EVBUFFER_LENGTH(bevf->underlying->input) &&
273 !be_readbuf_full(bevf, state));
274
275 return res;
276 }
277
278
279 static enum bufferevent_filter_result
280 be_filter_process_output(struct bufferevent_filtered *bevf,
281 enum bufferevent_flush_mode state,
282 int *processed_out)
283 {
284 enum bufferevent_filter_result res = BEV_OK;
285 struct bufferevent *bufev = downcast(bevf);
286 int again = 0;
287
288 if (state == BEV_NORMAL) {
289 /* If we're in 'normal' mode, don't urge data on the
290 * filter unless we're writing data, and the underlying
291 * bufferevent is accepting data, and we have data to
292 * give the filter. If we're in 'flush' or 'finish',
293 * call the filter no matter what. */
294 if (!(bufev->enabled & EV_WRITE) ||
295 be_underlying_writebuf_full(bevf, state) ||
296 !EVBUFFER_LENGTH(bufev->output))
297 return BEV_OK;
298 }
299
300 /* disable the callback that calls this function
301 when the user adds to the output buffer. */
302 evbuffer_cb_set_flags(bufev->output, bevf->outbuf_cb, 0);
303
304 do {
305 int processed = 0;
306 again = 0;
307
308 do {
309 ssize_t limit = -1;
310 if (state == BEV_NORMAL &&
311 bevf->underlying->wm_write.high)
312 limit = bevf->underlying->wm_write.high -
313 EVBUFFER_LENGTH(bevf->underlying->output);
314
315 res = bevf->process_out(bevf->bev.output,
316 bevf->underlying->output,
317 limit,
318 state,
319 bevf->context);
320
321 if (res == BEV_OK)
322 processed = *processed_out = 1;
323 } while (/* Stop if the filter wasn't successful...*/
324 res == BEV_OK &&
325 /* Or if we aren't writing any more. */
326 (bufev->enabled & EV_WRITE) &&
327 /* Of if we have nothing more to write and we are
328 * not flushing. */
329 EVBUFFER_LENGTH(bufev->output) &&
330 /* Or if we have filled the underlying output buffer. */
331 !be_underlying_writebuf_full(bevf,state));
332
333 if (processed && bufev->writecb &&
334 EVBUFFER_LENGTH(bufev->output) <= bufev->wm_write.low) {
335 /* call the write callback.*/
336 (*bufev->writecb)(bufev, bufev->cbarg);
337
338 if (res == BEV_OK &&
339 (bufev->enabled & EV_WRITE) &&
340 EVBUFFER_LENGTH(bufev->output) &&
341 !be_underlying_writebuf_full(bevf, state)) {
342 again = 1;
343 }
344 }
345 } while (again);
346
347 /* reenable the outbuf_cb */
348 evbuffer_cb_set_flags(bufev->output,bevf->outbuf_cb,
349 EVBUFFER_CB_ENABLED);
350
351 return res;
352 }
353
354 /* Called when the size of our outbuf changes. */
355 static void
356 bufferevent_filtered_outbuf_cb(struct evbuffer *buf,
f1b1bad4 »
2009-04-03 Make the new evbuffer callbacks use a new struct-based interface.
357 const struct evbuffer_cb_info *cbinfo, void *arg)
ea4b8724 »
2009-02-02 checkpoint work on big bufferevent refactoring
358 {
359 struct bufferevent_filtered *bevf = arg;
360
f1b1bad4 »
2009-04-03 Make the new evbuffer callbacks use a new struct-based interface.
361 if (cbinfo->n_added) {
ea4b8724 »
2009-02-02 checkpoint work on big bufferevent refactoring
362 int processed_any = 0;
363 /* Somebody added more data to the output buffer. Try to
364 * process it, if we should. */
365 be_filter_process_output(bevf, BEV_NORMAL, &processed_any);
366 }
367 }
368
369 /* Called when the underlying socket has read. */
370 static void
371 be_filter_readcb(struct bufferevent *underlying, void *_me)
372 {
373 struct bufferevent_filtered *bevf = _me;
374 enum bufferevent_filter_result res;
375 enum bufferevent_flush_mode state;
376 struct bufferevent *bufev = downcast(bevf);
377 int processed_any = 0;
378
379 if (bevf->got_eof)
380 state = BEV_FINISHED;
381 else
382 state = BEV_NORMAL;
383
384 res = be_filter_process_input(bevf, state, &processed_any);
385
386 if (processed_any &&
387 EVBUFFER_LENGTH(bufev->input) >= bufev->wm_read.low &&
388 bufev->readcb != NULL)
389 (*bufev->readcb)(bufev, bufev->cbarg);
390 }
391
392 /* Called when the underlying socket has drained enough that we can write to
393 it. */
394 static void
395 be_filter_writecb(struct bufferevent *underlying, void *_me)
396 {
397 struct bufferevent_filtered *bevf = _me;
398 int processed_any = 0;
399
400 be_filter_process_output(bevf, BEV_NORMAL, &processed_any);
401 }
402
403 /* Called when the underlying socket has given us an error */
404 static void
405 be_filter_errorcb(struct bufferevent *underlying, short what, void *_me)
406 {
407 struct bufferevent_filtered *bevf = _me;
408
409 /* All we can really to is tell our own errorcb. */
410 if (bevf->bev.errorcb)
411 bevf->bev.errorcb(&bevf->bev, what, bevf->bev.cbarg);
412 }
413
414 static int
415 be_filter_flush(struct bufferevent *bufev,
416 short iotype, enum bufferevent_flush_mode mode)
417 {
418 struct bufferevent_filtered *bevf = upcast(bufev);
419 int processed_any = 0;
420 assert(bevf);
421
422 if (iotype & EV_READ) {
423 be_filter_process_input(bevf, mode, &processed_any);
424 }
425 if (iotype & EV_WRITE) {
426 be_filter_process_output(bevf, mode, &processed_any);
427 }
428 /* XXX check the return value? */
429 /* XXX does this want to recursively call lower-level flushes? */
430 bufferevent_flush(bevf->underlying, iotype, mode);
431
432 return processed_any;
433 }
Something went wrong with that request. Please try again.