Skip to content

Commit

Permalink
Replace reserve/commit with new iovec-based interface. Add a new evbu…
Browse files Browse the repository at this point in the history
…ffer_peek.

svn:r1296
  • Loading branch information
nmathewson committed May 19, 2009
1 parent ed1bbc7 commit 23243b8
Show file tree
Hide file tree
Showing 6 changed files with 512 additions and 113 deletions.
177 changes: 134 additions & 43 deletions buffer.c
Expand Up @@ -134,7 +134,6 @@ static int use_mmap = 1;
#define CHAIN_SPACE_LEN(ch) ((ch)->flags & EVBUFFER_IMMUTABLE ? \
0 : (ch)->buffer_len - ((ch)->misalign + (ch)->off))


#define CHAIN_PINNED(ch) (((ch)->flags & EVBUFFER_MEM_PINNED_ANY) != 0)
#define CHAIN_PINNED_R(ch) (((ch)->flags & EVBUFFER_MEM_PINNED_R) != 0)

Expand Down Expand Up @@ -463,55 +462,87 @@ evbuffer_get_contiguous_space(const struct evbuffer *buf)
return result;
}

unsigned char *
evbuffer_reserve_space(struct evbuffer *buf, size_t size)
int
evbuffer_reserve_space(struct evbuffer *buf, ssize_t size,
struct evbuffer_iovec *vec, int n_vecs)
{
struct evbuffer_chain *chain;
unsigned char *result = NULL;

EVBUFFER_LOCK(buf, EVTHREAD_WRITE);
int n = -1;

EVBUFFER_LOCK(buf, EVTHREAD_WRITE);
if (buf->freeze_end)
goto done;
if (n_vecs < 1)
goto done;
if (n_vecs == 1) {
if (evbuffer_expand(buf, size) == -1)
goto done;
chain = buf->last;

if (evbuffer_expand(buf, size) == -1)
goto done;

chain = buf->last;

result = (chain->buffer + chain->misalign + chain->off);
vec[0].iov_base = CHAIN_SPACE_PTR(chain);
vec[0].iov_len = CHAIN_SPACE_LEN(chain);
n = 1;
} else {
if (_evbuffer_expand_fast(buf, size)<0)
goto done;
n = _evbuffer_read_setup_vecs(buf, size, vec, &chain, 0);
}

done:
EVBUFFER_UNLOCK(buf, EVTHREAD_WRITE);
EVBUFFER_UNLOCK(buf, EVTHREAD_WRITE);
return n;

return result;
}

int
evbuffer_commit_space(struct evbuffer *buf, size_t size)
evbuffer_commit_space(struct evbuffer *buf,
struct evbuffer_iovec *vec, int n_vecs)
{
struct evbuffer_chain *chain;
int result = -1;

EVBUFFER_LOCK(buf, EVTHREAD_WRITE);
if (buf->freeze_end) {
goto done;
}
struct evbuffer_chain *prev = buf->previous_to_last;
struct evbuffer_chain *last = buf->last;
int result = -1;
size_t added;

chain = buf->last;

if (chain == NULL ||
chain->buffer_len - chain->off - chain->misalign < size)
EVBUFFER_LOCK(buf, EVTHREAD_WRITE);
if (buf->freeze_end)
goto done;
if (n_vecs < 1 || n_vecs > 2)
goto done;
if (n_vecs == 2) {
if (!prev || !last ||
vec[0].iov_base != CHAIN_SPACE_PTR(prev) ||
vec[1].iov_base != CHAIN_SPACE_PTR(last) ||
vec[0].iov_len > CHAIN_SPACE_LEN(prev) ||
vec[1].iov_len > CHAIN_SPACE_LEN(last))
goto done;

prev->off += vec[0].iov_len;
last->off += vec[1].iov_len;
added = vec[0].iov_len + vec[1].iov_len;
} else {
/* n_vecs == 1 */
struct evbuffer_chain *chain;
if (prev && vec[0].iov_base == CHAIN_SPACE_PTR(prev))
chain = prev;
else if (last && vec[0].iov_base == CHAIN_SPACE_PTR(last))
chain = last;
else
goto done;
if (vec[0].iov_len > CHAIN_SPACE_LEN(chain))
goto done;

chain->off += size;
buf->total_len += size;
buf->n_add_for_cb += size;
chain->off += vec[0].iov_len;
added = vec[0].iov_len;
}

buf->total_len += added;
buf->n_add_for_cb += added;
result = 0;
evbuffer_invoke_callbacks(buf);

done:
EVBUFFER_UNLOCK(buf, EVTHREAD_WRITE);
EVBUFFER_UNLOCK(buf, EVTHREAD_WRITE);
return result;
}

Expand Down Expand Up @@ -1444,6 +1475,12 @@ _evbuffer_expand_fast(struct evbuffer *buf, size_t datlen)
#define IOV_PTR_FIELD buf
#define IOV_LEN_FIELD len
#endif

#define IOV_TYPE_FROM_EVBUFFER_IOV(i,ei) do { \
(i)->IOV_PTR_FIELD = (ei)->iov_base; \
(i)->IOV_LEN_FIELD = (ei)->iov_len; \
} while(0)

#endif

#define EVBUFFER_MAX_READ 4096
Expand All @@ -1457,11 +1494,12 @@ _evbuffer_expand_fast(struct evbuffer *buf, size_t datlen)
@param vecs An array of two iovecs or WSABUFs.
@param chainp A pointer to a variable to hold the first chain we're
reading into.
@param exact DOCDOC
@return The number of buffers we're using.
*/
int
_evbuffer_read_setup_vecs(struct evbuffer *buf, ssize_t howmuch,
IOV_TYPE *vecs, struct evbuffer_chain **chainp)
struct evbuffer_iovec *vecs, struct evbuffer_chain **chainp, int exact)
{
struct evbuffer_chain *chain;
int nvecs;
Expand All @@ -1477,11 +1515,11 @@ _evbuffer_read_setup_vecs(struct evbuffer *buf, ssize_t howmuch,
use the space in the next-to-last chain.
*/
struct evbuffer_chain *prev = buf->previous_to_last;
vecs[0].IOV_PTR_FIELD = CHAIN_SPACE_PTR(prev);
vecs[0].IOV_LEN_FIELD = CHAIN_SPACE_LEN(prev);
vecs[1].IOV_PTR_FIELD = CHAIN_SPACE_PTR(chain);
vecs[1].IOV_LEN_FIELD = CHAIN_SPACE_LEN(chain);
if (vecs[0].IOV_LEN_FIELD >= (size_t)howmuch) {
vecs[0].iov_base = CHAIN_SPACE_PTR(prev);
vecs[0].iov_len = CHAIN_SPACE_LEN(prev);
vecs[1].iov_base = CHAIN_SPACE_PTR(chain);
vecs[1].iov_len = CHAIN_SPACE_LEN(chain);
if (vecs[0].iov_len >= (size_t)howmuch) {
/* The next-to-last chain has enough
* space on its own. */
chain = prev;
Expand All @@ -1490,18 +1528,19 @@ _evbuffer_read_setup_vecs(struct evbuffer *buf, ssize_t howmuch,
/* We'll need both chains. */
chain = prev;
nvecs = 2;
if (vecs[0].IOV_LEN_FIELD + vecs[1].IOV_LEN_FIELD > (size_t)howmuch) {
vecs[1].IOV_LEN_FIELD = howmuch - vecs[0].IOV_LEN_FIELD;
if (exact &&
(vecs[0].iov_len + vecs[1].iov_len > (size_t)howmuch)) {
vecs[1].iov_len = howmuch - vecs[0].iov_len;
}
}
} else {
/* There's data in the last chain, so we're
* not allowed to use the next-to-last. */
nvecs = 1;
vecs[0].IOV_PTR_FIELD = CHAIN_SPACE_PTR(chain);
vecs[0].IOV_LEN_FIELD = CHAIN_SPACE_LEN(chain);
if (vecs[0].IOV_LEN_FIELD > (size_t)howmuch)
vecs[0].IOV_LEN_FIELD = howmuch;
vecs[0].iov_base = CHAIN_SPACE_PTR(chain);
vecs[0].iov_len = CHAIN_SPACE_LEN(chain);
if (exact && (vecs[0].iov_len > (size_t)howmuch))
vecs[0].iov_len = howmuch;
}

*chainp = chain;
Expand Down Expand Up @@ -1566,8 +1605,16 @@ evbuffer_read(struct evbuffer *buf, evutil_socket_t fd, int howmuch)
goto done;
} else {
IOV_TYPE vecs[2];
nvecs = _evbuffer_read_setup_vecs(buf, howmuch, vecs,
&chain);
struct evbuffer_iovec ev_vecs[2];
nvecs = _evbuffer_read_setup_vecs(buf, howmuch, ev_vecs,
&chain, 1);

if (nvecs == 2) {
IOV_TYPE_FROM_EVBUFFER_IOV(&vecs[1], &ev_vecs[1]);
IOV_TYPE_FROM_EVBUFFER_IOV(&vecs[0], &ev_vecs[0]);
} else if (nvecs == 1) {
IOV_TYPE_FROM_EVBUFFER_IOV(&vecs[0], &ev_vecs[0]);
}

#ifdef WIN32
{
Expand Down Expand Up @@ -1937,6 +1984,50 @@ evbuffer_search(struct evbuffer *buffer, const char *what, size_t len, const str
return pos;
}

int
evbuffer_peek(struct evbuffer *buffer, ev_ssize_t len,
struct evbuffer_ptr *start_at,
struct evbuffer_iovec *vec, int n_vec)
{
struct evbuffer_chain *chain;
int idx = 0;
size_t len_so_far = 0;

EVBUFFER_LOCK(buffer, EVTHREAD_READ);

if (start_at) {
chain = start_at->_internal.chain;
len_so_far = chain->off
- start_at->_internal.pos_in_chain;
idx = 1;
if (n_vec > 0) {
vec[0].iov_base = chain->buffer + chain->misalign
+ start_at->_internal.pos_in_chain;
vec[0].iov_len = len_so_far;
}
chain = chain->next;
} else {
chain = buffer->first;
}

while (chain) {
if (len >= 0 && len_so_far >= len)
break;
if (idx<n_vec) {
vec[idx].iov_base = chain->buffer + chain->misalign;
vec[idx].iov_len = chain->off;
} else if (len<0)
break;
++idx;
len_so_far += chain->off;
chain = chain->next;
}

EVBUFFER_UNLOCK(buffer, EVTHREAD_READ);

return idx;
}


int
evbuffer_add_vprintf(struct evbuffer *buf, const char *fmt, va_list ap)
Expand Down
41 changes: 27 additions & 14 deletions buffer_iocp.c
Expand Up @@ -118,26 +118,27 @@ read_completed(struct event_overlapped *eo, uintptr_t _, ssize_t nBytes)
struct evbuffer *evbuf = &buf->buffer;

struct evbuffer_chain *chain = buf_o->first_pinned;
struct evbuffer_iovec iov[2];
int n_vec;

EVBUFFER_LOCK(evbuf, EVTHREAD_WRITE);
buf->read_in_progress = 0;
evbuffer_unfreeze(evbuf, 0);

if (chain == evbuf->previous_to_last) {
ssize_t n = chain->buffer_len - (chain->misalign + chain->off);
if (n>nBytes)
n=nBytes;
chain->off += n;
nBytes -= n;
evbuf->n_add_for_cb += n;

evbuffer_commit_space(evbuf, nBytes);
} else if (chain == evbuf->last) {
evbuffer_commit_space(evbuf, nBytes);
iov[0].iov_base = buf_o->buffers[0].buf;
if (nBytes <= buf_o->buffers[0].len) {
iov[0].iov_len = nBytes;
n_vec = 1;
} else {
assert(0);
iov[0].iov_len = buf_o->buffers[0].len;
iov[1].iov_base = buf_o->buffers[1].buf;
iov[1].iov_len = nBytes - iov[0].iov_len;
n_vec = 2;
}

if (evbuffer_commit_space(evbuf, iov, n_vec) < 0)
assert(0); /* XXXX fail nicer. */

pin_release(eo, EVBUFFER_MEM_PINNED_R);

_evbuffer_decref_and_unlock(evbuf);
Expand Down Expand Up @@ -244,16 +245,22 @@ evbuffer_launch_write(struct evbuffer *buf, ssize_t at_most)
return r;
}

#define IOV_TYPE_FROM_EVBUFFER_IOV(i,ei) do { \
(i)->buf = (ei)->iov_base; \
(i)->len = (ei)->iov_len; \
} while(0)

int
evbuffer_launch_read(struct evbuffer *buf, size_t at_most)
{
struct evbuffer_overlapped *buf_o = upcast_evbuffer(buf);
int r = -1;
int r = -1, i;
int nvecs;
int npin=0;
struct evbuffer_chain *chain=NULL;
DWORD bytesRead;
DWORD flags = 0;
struct evbuffer_iovec vecs[MAX_VECS];

if (!buf_o)
return -1;
Expand All @@ -271,7 +278,13 @@ evbuffer_launch_read(struct evbuffer *buf, size_t at_most)
buf_o->read_info.event_overlapped.cb = read_completed;

nvecs = _evbuffer_read_setup_vecs(buf, at_most,
buf_o->read_info.buffers, &chain);
vecs, &chain, 1);
for (i=0;i<nvecs;++i) {
IOV_TYPE_FROM_EVBUFFER_IOV(
&buf_o->read_info.buffers[i],
&vecs[i]);
}

buf_o->read_info.n_buffers = nvecs;
buf_o->read_info.first_pinned = chain;
npin=0;
Expand Down
7 changes: 1 addition & 6 deletions evbuffer-internal.h
Expand Up @@ -256,19 +256,14 @@ void _evbuffer_decref_and_unlock(struct evbuffer *buffer);
* is contiguous. Instead, it may be split across two chunks. */
int _evbuffer_expand_fast(struct evbuffer *, size_t);

#ifdef _EVENT_HAVE_SYS_UIO_H
/** Helper: prepares for a readv/WSARecv call by expanding the buffer to
* hold enough memory to read 'howmuch' bytes in possibly noncontiguous memory.
* Sets up the one or two iovecs in 'vecs' to point to the free memory and its
* extent, and *chainp to poitn to the first chain that we'll try to read into.
* Returns the number of vecs used.
*/
int _evbuffer_read_setup_vecs(struct evbuffer *buf, ssize_t howmuch,
struct iovec *vecs, struct evbuffer_chain **chainp);
#elif defined(WIN32)
int _evbuffer_read_setup_vecs(struct evbuffer *buf, ssize_t howmuch,
WSABUF *vecs, struct evbuffer_chain **chainp);
#endif
struct evbuffer_iovec *vecs, struct evbuffer_chain **chainp, int exact);

#ifdef __cplusplus
}
Expand Down

0 comments on commit 23243b8

Please sign in to comment.