Permalink
Browse files

Use lazy buffers for reads

  • Loading branch information...
1 parent d839e70 commit a6a93d84ec09139d4de3c922276b912f9990768c @kerneis committed Jan 2, 2011
Showing with 92 additions and 74 deletions.
  1. +48 −10 io.cpc
  2. +10 −2 io.h
  3. +34 −62 server.cpc
View
@@ -60,28 +60,47 @@ cpc_io_wait_2(int fd1, int fd2, cpc_condvar *c)
}
cps int
-cpc_full_read(int fd, char *buf, int len)
-{
- return cpc_partial_read(fd, buf, len, len);
-}
-
-cps int
-cpc_partial_read(int fd, unsigned char *buf, int min_len, int max_len)
+cpc_buffer_read(int fd, cpc_buffer *b, int len)
{
size_t pos;
ssize_t rc;
+ assert(b->end - b->start >= 0);
+ pos = b->end - b->start;
+
+ /* No need to read more data */
+ if(pos >= len)
+ return pos;
+
cpc_timeout *t = cpc_timeout_get(io_timeout, 0);
cpc_condvar *c = cpc_timeout_condvar(t);
- pos = 0;
+ /* Lazy allocation of the buffer the first time */
+ if(!b->buf) {
+ if(cpc_io_wait(fd, CPC_IO_IN, c) == CPC_CONDVAR) {
+ goto timeout;
+ } else {
+ assert(b->start == 0 && b->end == 0);
+ b->buf = malloc(b->size);
+ goto first_read;
+ }
+ }
+
+ /* Move data to the beginning of buffer */
+ if(b->start > 0) {
+ if(pos > 0)
+ memmove(b->buf, b->buf + b->start, pos);
+ b->start = 0;
+ }
+
while(1) {
/* io_wait the first time, it's necessary 95% of the time */
if(cpc_timeout_expired(t) ||
cpc_io_wait(fd, CPC_IO_IN, c) == CPC_CONDVAR) {
goto timeout;
}
- rc = read(fd, buf + pos, max_len - pos);
+first_read:
+ rc = read(fd, b->buf + pos, b->size - pos);
if(rc == 0) {
errno = EPIPE;
goto fail;
@@ -90,11 +109,12 @@ cpc_partial_read(int fd, unsigned char *buf, int min_len, int max_len)
goto fail;
} else {
pos += rc;
- if(pos >= min_len)
+ if(pos >= len)
break;
}
}
+ b->end = pos;
cpc_timeout_destroy(t);
return pos;
@@ -103,6 +123,7 @@ timeout:
errno = EINTR;
fail:
cpc_timeout_destroy(t);
+ b->end = pos;
return -1;
}
@@ -188,3 +209,20 @@ cpc_full_writev(int fd, const struct iovec *iov, int iovcnt)
return pos;
}
+
+cpc_buffer *
+cpc_buffer_get(int size) {
+ cpc_buffer *b = malloc(sizeof(cpc_buffer));
+ b->size = size;
+ b->start = 0;
+ b->end = 0;
+ b->buf = NULL;
+ return b;
+}
+
+void
+cpc_buffer_destroy(cpc_buffer *b) {
+ if(b->buf)
+ free(b->buf);
+ free(b);
+}
View
@@ -20,9 +20,17 @@ OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
THE SOFTWARE.
*/
+typedef struct cpc_buffer {
+ int size;
+ int start;
+ int end;
+ unsigned char *buf;
+} cpc_buffer;
+
cps void cpc_io_wait_2(int fd1, int fd2, cpc_condvar *c);
-cps int cpc_full_read(int fd, char *buf, int len);
-cps int cpc_partial_read(int fd, unsigned char *buf, int min_len, int max_len);
+cps int cpc_buffer_read(int fd, cpc_buffer *b, int len);
cps int cpc_full_write(int fd, const char *buf, int len);
cps int cpc_full_writev(int fd, const struct iovec *iov, int iovcnt);
+cpc_buffer *cpc_buffer_get(int size);
+void cpc_buffer_destroy(cpc_buffer *b);
View
@@ -543,32 +543,33 @@ handshake(hashtable *ht, int fd)
{
int rc, numread, i;
const char *protocol = "\023BitTorrent protocol";
- char *res = NULL, t[68], id[41];
+ char *res = NULL, id[41];
struct torrent *torrent = NULL;
+ cpc_buffer *b = cpc_buffer_get(68);
/* The wikified spec implies that we must send our handshake as soon as
we get the info_hash, or risk a deadlock when the tracker performs
its ``NAT check''. No, I don't claim to understand it either. */
- numread = cpc_partial_read(fd, t, 48, 68);
+ numread = cpc_buffer_read(fd, b, 48);
if(numread < 48) {
perror("(handshake)cpc_partial_read");
goto fail;
}
- if(memcmp(t, protocol, 20) != 0) {
+ if(memcmp(b->buf, protocol, 20) != 0) {
for(i = 0; i<20; i++) {
- unsigned char c = t[i];
+ unsigned char c = b->buf[i];
snprintf(id + i, 2, "%c", c >= 0x21 && c <= 0x7E ? c : '.');
}
debugf(1, "%3d bad protocol: %s\n", fd, id);
goto fail;
}
- torrent = ht_get(ht, t + 28);
+ torrent = ht_get(ht, b->buf + 28);
for(i = 0; i<20; i++)
- snprintf(id + 2 * i, 3, "%.2x", (unsigned char)*(t + 28 + i));
+ snprintf(id + 2 * i, 3, "%.2x", (unsigned char)*(b->buf + 28 + i));
if(!torrent) {
debugf(2, "%3d cannot find hash: %s\n", fd, id);
@@ -599,26 +600,28 @@ handshake(hashtable *ht, int fd)
if(numread < 68) {
/* Read the rest of the client handshake. */
- rc = cpc_full_read(fd, t + numread, 68 - numread);
+ rc = cpc_buffer_read(fd, b, 68 - numread);
if(rc < 68 - numread)
goto fail;
}
- peers[fd].dht = !!(t[1 + 19 + 7] & 1);
- peers[fd].ltep = !!(t[1 + 19 + 5] & 0x10);
+ peers[fd].dht = !!(b->buf[1 + 19 + 7] & 1);
+ peers[fd].ltep = !!(b->buf[1 + 19 + 5] & 0x10);
for(i = 0; i < 20; i++) {
- unsigned char c = t[48 + i];
+ unsigned char c = b->buf[48 + i];
snprintf(id + i, 2, "%c", c >= 0x21 && c <= 0x7E ? c : '.');
}
debugf(2, "%3d peer-id %s %s%s\n",
fd, id,
peers[fd].dht ? " (DHT)" : "",
peers[fd].ltep ? " (LTEP)" : "");
+ cpc_buffer_destroy(b);
return 0;
fail:
+ cpc_buffer_destroy(b);
return -1;
}
@@ -754,71 +757,40 @@ cps int
stream_reader(int fd)
{
uint32_t length = 0;
- unsigned char *buf = NULL;
+ cpc_buffer *b = cpc_buffer_get(BUF_LENGTH);
uint32_t begin, index, chunk_len;
- int start = 0, end = 0, size = 0, rc, type;
-
- buf = malloc(BUF_LENGTH);
- if(!buf)
- return -1;
+ int rc, type;
while(1) {
if(peers[fd].dying)
goto fail;
- size = end - start;
- assert(size >= 0);
-
- if(size < 4) {
- if(start > 0) {
- if(end > start)
- memmove(buf, buf + start, size);
- end -= start;
- start = 0;
- }
- rc = cpc_partial_read(fd, buf + size,
- 4 - size, BUF_LENGTH - size);
- if(rc < 4 - size)
- goto fail;
- end += rc;
- size = end;
- }
+ rc = cpc_buffer_read(fd, b, 4);
+ if(rc < 4)
+ goto fail;
- assert(size == end - start && size >= 4);
+ assert(b->end - b->start >= 4);
- DO_NTOHL(length, buf + start);
+ DO_NTOHL(length, b->buf + b->start);
+ b->start += 4;
- if(length + 4 > BUF_LENGTH) {
+ if(length > BUF_LENGTH) {
debugf(1, "%3d unbelievably long message (%d bytes)\n", fd, length);
goto fail;
}
- if(size < length + 4) {
- if(start > 0) {
- if(end > start)
- memmove(buf, buf + start, size);
- end -= start;
- start = 0;
- }
-
- rc = cpc_partial_read(fd, buf + size,
- length + 4 - size, BUF_LENGTH - size);
- if(rc < ((int32_t) length + 4 - size))
- goto fail;
- end += rc;
- size = end;
- }
+ rc = cpc_buffer_read(fd, b, length);
+ if(rc < (int32_t) length)
+ goto fail;
- assert(size == end - start && size >= length + 4);
+ assert(b->end - b->start >= length);
- if(length == 0) {
- /* keep-alive */
- start += 4;
+ /* keep-alive */
+ if(length == 0)
continue;
- }
- type = buf[start + 4];
+ type = b->buf[b->start];
debugf(4, "%3d <- type = %d, length = %d\n", fd, type, length);
@@ -865,9 +837,9 @@ stream_reader(int fd)
break;
}
- DO_NTOHL(index, buf + start + 4 + 1);
- DO_NTOHL(begin, buf + start + 4 + 5);
- DO_NTOHL(chunk_len, buf + start + 4 + 9);
+ DO_NTOHL(index, b->buf + b->start + 1);
+ DO_NTOHL(begin, b->buf + b->start + 5);
+ DO_NTOHL(chunk_len, b->buf + b->start + 9);
if(chunk_len == 0){
debugf(1, "%3d request length is zero\n", fd);
goto fail;
@@ -916,10 +888,10 @@ stream_reader(int fd)
debugf(1, "%3d unexpected message\n", fd);
goto fail;
}
- start += 4 + length;
+ b->start += length;
}
fail:
- free(buf);
+ cpc_buffer_destroy(b);
return -1;
}

0 comments on commit a6a93d8

Please sign in to comment.