/
outstream.c
114 lines (102 loc) · 3.2 KB
/
outstream.c
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
#include <assert.h>
#include <unistd.h>
#include <errno.h>
#include <stdlib.h>
#include "outstream.h"
#include "memory.h"
#include "ev_helpers.h"
#include "helpers.h"
static void outstream_writable_cb(struct ev_loop *loop, struct ev_io *watcher, int revents) {
struct outstream *s = CASTUP(watcher, struct outstream, watcher);
assert(s->first_buf != NULL);
struct buffer *next;
while (s->first_buf != NULL) {
ssize_t written = write(watcher->fd, s->first_buf->buf, s->first_buf->len);
if (written == -1 && errno == EAGAIN)
return;
if (written == -1 || written == 0) {
s->error_cb(s); return;
}
// if we reach this point, stuff has actually been written
s->pressure -= written;
if (written < s->first_buf->len) {
s->first_buf->buf += written;
s->first_buf->len -= written;
return;
}
assert(written == s->first_buf->len);
next = s->first_buf->next;
free(s->first_buf->free_ptr);
free(s->first_buf);
s->first_buf = next;
}
// buffer flushed!
assert(s->pressure == 0);
ev_io_stop(ev_default_loop(0), watcher);
}
void outstream_init(struct outstream *s, int fd, void (*error_cb)(struct outstream *)) {
ev_io_init(&s->watcher, outstream_writable_cb, fd, EV_WRITE);
// ev_io_start(ev_default_loop(0), &s->watcher);
s->input_watcher = NULL;
s->first_buf = NULL;
s->last_buf = NULL;
s->pressure = 0;
s->error_cb = error_cb;
}
void outstream_input_set(struct outstream *s, struct ev_io *w) {
// remove pressure on old path
if (s->input_watcher && (s->input_watcher->events & EV_READ) == 0)
alter_ev_io_events(s->input_watcher, 1, EV_READ);
s->input_watcher = w;
if (w == NULL) return;
assert(w->events & EV_READ);
// maybe add pressure on new path
if (s->pressure > OUTSTREAM_HIGH)
alter_ev_io_events(s->input_watcher, 0, EV_READ);
}
void outstream_nuke(struct outstream *s) {
struct buffer *buf = s->first_buf;
struct buffer *next_buf;
while (buf != NULL) {
next_buf = buf->next;
free(buf->free_ptr);
free(buf);
buf = next_buf;
}
outstream_input_set(s, NULL);
ev_io_stop(ev_default_loop(0), &s->watcher);
}
// 0 means success, 1 means failure
int outstream_send(struct outstream *s, char *buf, size_t len) {
char *free_ptr = buf;
if (s->first_buf == NULL) {
ssize_t written = write(s->watcher.fd, buf, len);
if (written == -1 && errno != EAGAIN) {
s->error_cb(s); return 1;
} else if (written == 0) {
s->error_cb(s); return 1;
}
if (written == len) { free(buf); return; } // yaaay, no buffer magic needed!
// unlikely: empty->filled transition
if (written == -1) written = 0;
buf += written;
len -= written;
}
struct buffer *buffer = malloc(sizeof(struct buffer));
buffer->buf = buf;
buffer->free_ptr = free_ptr;
buffer->len = len;
buffer->next = NULL;
if (s->last_buf == NULL) {
s->first_buf = buffer;
ev_io_start(ev_default_loop(0), &s->watcher);
} else {
s->last_buf->next = buffer;
}
s->last_buf = buffer;
s->pressure += len;
if (s->pressure >= OUTSTREAM_HIGH && s->pressure - len < OUTSTREAM_HIGH && s->input_watcher != NULL) {
alter_ev_io_events(s->input_watcher, 0, EV_READ);
}
return 0;
}