Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

...

  • Loading branch information...
commit 4fdf0276b32a8487386304a6981436508bdb982f 1 parent b17dcd5
@ry ry authored
Showing with 184 additions and 24 deletions.
  1. +2 −2 Makefile
  2. +165 −19 ol-unix.c
  3. +2 −1  ol-unix.h
  4. +1 −0  ol.h
  5. +14 −2 test/echo-server.c
View
4 Makefile
@@ -1,11 +1,11 @@
test/echo-server: test/echo-server.c ol.a
- $(CC) -o test/echo-server test/echo-server.c ol.a -lm
+ $(CC) -g -o test/echo-server test/echo-server.c ol.a -lm
ol.a: ol-unix.o ev/ev.o
$(AR) rcs ol.a ol-unix.o ev/ev.o
ol-unix.o: ol-unix.c ol.h ol-unix.h
- $(CC) -c ol-unix.c -o ol-unix.o -lm
+ $(CC) -g -c ol-unix.c -o ol-unix.o -lm
ev/ev.o: ev/config.h ev/ev.c
$(MAKE) -C ev
View
184 ol-unix.c
@@ -11,10 +11,15 @@ void ol_tcp_connect(ol_handle* handle, ol_req* req);
int ol_close_error(ol_handle* handle, ol_err err);
-static int ol_err_new(int e) {
+static ol_err ol_err_new(ol_handle* handle, int e) {
+ handle->_.err = e;
return e;
}
+ol_err ol_err_last(ol_handle *handle) {
+ return handle->_.err;
+}
+
struct sockaddr_in ol_ip4_addr(char *ip, int port) {
struct sockaddr_in addr;
@@ -32,8 +37,13 @@ int ol_close(ol_handle* handle) {
}
+void ol_init() {
+ ev_default_loop(0);
+}
+
+
int ol_run() {
- ev_run(0);
+ ev_run(EV_DEFAULT_ 0);
}
@@ -42,6 +52,8 @@ ol_handle* ol_handle_new(ol_close_cb close_cb, void* data) {
handle->close_cb = close_cb;
handle->data = data;
+ handle->_.fd = -1;
+
ev_init(&handle->_.read_watcher, ol_tcp_io);
ev_init(&handle->_.write_watcher, ol_tcp_io);
@@ -49,33 +61,114 @@ ol_handle* ol_handle_new(ol_close_cb close_cb, void* data) {
}
+int ol_tcp_lazy_open(ol_handle* handle, int domain) {
+ assert(handle->_.fd < 0);
+
+ /* Lazily allocate a file descriptor for this handle */
+ handle->_.fd = socket(domain, SOCK_STREAM, 0);
+ int yes = 1;
+ setsockopt(handle->_.fd, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(int));
+ fcntl(handle->_.fd, F_SETFL, O_NONBLOCK);
+
+ return 0;
+}
+
+
int ol_bind(ol_handle* handle, struct sockaddr* addr) {
int addrsize;
+ int domain;
if (addr->sa_family == AF_INET) {
addrsize = sizeof(struct sockaddr_in);
+ domain = AF_INET;
} else if (addr->sa_family == AF_INET6) {
addrsize = sizeof(struct sockaddr_in6);
+ domain = AF_INET6;
} else {
assert(0);
return -1;
}
- int r = bind(handle->_.fd, addr, addrsize);
+ int r = 0;
+
+ if (handle->_.fd < 0) {
+ r = ol_tcp_lazy_open(handle, domain);
+ if (r) {
+ return ol_err_new(handle, r);
+ }
+ }
+
+ r = bind(handle->_.fd, addr, addrsize);
- return ol_err_new(r);
+ return ol_err_new(handle, r);
+}
+
+
+ol_handle* ol_tcp_open(ol_handle* parent, int fd) {
+ ol_handle* h = ol_handle_new(NULL, NULL);
+ h->_.fd = fd;
+ return h;
+}
+
+
+void ol_server_io(EV_P_ ev_io* watcher, int revents) {
+ ol_handle* handle = watcher->data;
+
+ assert(revents == EV_READ);
+
+ while (1) {
+ struct sockaddr addr;
+ socklen_t addrlen;
+ int fd = accept(handle->_.fd, &addr, &addrlen);
+
+ if (fd < 0) {
+ if (errno == EAGAIN) {
+ return; // No problem.
+ } else if (errno == EMFILE) {
+ // TODO special trick. unlock reserved socket, accept, close.
+ return;
+ } else {
+ ol_close_error(handle, ol_err_new(handle, errno));
+ }
+ } else {
+ if (handle->accept_cb) {
+ ol_handle* new_client = ol_tcp_open(handle, fd);
+ if (!new_client) {
+ ol_close_error(handle, ol_err_last(handle));
+ return;
+ }
+
+ handle->accept_cb(handle, new_client);
+ } else {
+ close(fd);
+ }
+ }
+ }
}
int ol_listen(ol_handle* handle, int backlog, ol_accept_cb cb) {
+ if (handle->_.fd < 0) {
+ /* Lazily allocate a file descriptor for this handle */
+ handle->_.fd = socket(AF_INET, SOCK_STREAM, 0);
+ }
+
int r = listen(handle->_.fd, backlog);
+ if (r < 0) {
+ return ol_err_new(handle, errno);
+ }
+
handle->accept_cb = cb;
- return ol_err_new(r);
+ ev_io_init(&handle->_.read_watcher, ol_server_io, handle->_.fd, EV_READ);
+ ev_io_start(EV_DEFAULT_ &handle->_.read_watcher);
+ handle->_.read_watcher.data = handle;
+
+ return 0;
}
int ol_close_error(ol_handle* handle, ol_err err) {
- ev_io_stop(&handle->_.read_watcher);
+ ev_io_stop(EV_DEFAULT_ &handle->_.read_watcher);
close(handle->_.fd);
handle->_.fd = -1;
@@ -121,9 +214,9 @@ void ol_tcp_connect(ol_handle* handle, ol_req* req) {
ol_connect_cb connect_cb = req->connect_cb;
if (connect_cb) {
if (req->_.local) {
- connect_cb(NULL, ol_err_new(0));
+ connect_cb(NULL, ol_err_new(handle, 0));
} else {
- connect_cb(req, ol_err_new(0));
+ connect_cb(req, ol_err_new(handle, 0));
}
}
@@ -139,11 +232,13 @@ void ol_tcp_connect(ol_handle* handle, ol_req* req) {
return;
} else {
+ ol_err err = ol_err_new(handle, error);
+
if (req->_.connect_cb) {
- req->_.connect_cb(req, ol_err_new(error));
+ req->_.connect_cb(req, err);
}
- ol_close_error(handle, ol_err_new(error));
+ ol_close_error(handle, err);
}
}
@@ -165,16 +260,16 @@ ol_req* ol_req_maybe_alloc(ol_handle* handle, ol_req* in_req) {
int ol_connect(ol_handle* handle, ol_req *req_in, struct sockaddr* addr) {
if (handle->_.connect_req) {
- return ol_err_new(EALREADY);
+ return ol_err_new(handle, EALREADY);
}
if (handle->type != OL_TCP) {
- return ol_err_new(ENOTSOCK);
+ return ol_err_new(handle, ENOTSOCK);
}
ol_req *req = ol_req_maybe_alloc(handle, req_in);
if (!req) {
- return ol_err_new(ENOMEM);
+ return ol_err_new(handle, ENOMEM);
}
handle->_.connect_req = req;
@@ -195,16 +290,16 @@ int ol_connect(ol_handle* handle, ol_req *req_in, struct sockaddr* addr) {
/* socket(2) failed */
if (handle->_.fd < 0) {
- return ol_err_new(errno);
+ return ol_err_new(handle, errno);
}
int r = connect(handle->_.fd, addr, addrsize);
ev_io_init(&handle->_.read_watcher, ol_tcp_io, handle->_.fd, EV_READ);
ev_io_init(&handle->_.write_watcher, ol_tcp_io, handle->_.fd, EV_WRITE);
- ev_io_start(&handle->_.read_watcher);
+ ev_io_start(EV_DEFAULT_ &handle->_.read_watcher);
- return ol_err_new(r);
+ return ol_err_new(handle, r);
}
int ol_write(ol_handle* handle, ol_req *req, ol_buf* bufs, int bufcnt) {
@@ -223,9 +318,60 @@ int ol_write2(ol_handle* handle, const char* msg) {
}
-int ol_read(ol_handle* handle, ol_req *req, ol_buf* bufs, int bufcnt) {
- // stub
- assert(0);
+void ol_req_append(ol_handle* handle, ol_req *req) {
+ ngx_queue_insert_tail(&handle->read_reqs, &req->read_queue);
+}
+
+
+int ol_read(ol_handle* handle, ol_req *req_in, ol_buf* bufs, int bufcnt) {
+ assert(handle->_.fd >= 0);
+
+ if (!ngx_queue_empty(&handle->read_reqs)) {
+ /* There are already pending read_reqs. We must get in line. */
+ assert(ev_is_active(&handle->read_watcher));
+
+ ol_req *req = ol_req_maybe_alloc(handle, req_in);
+ if (!req) {
+ return ol_err_new(handle, ENOMEM);
+ }
+
+ ol_req_append(handle, req);
+
+ return ol_err_new(handle, EINPROGRESS);
+
+ } else {
+ /* Attempt to read immediately */
+ ssize_t nread = readv(handle->_.fd, (struct iovec) bufs, bufcnt);
+
+ if (nread < 0) {
+
+ }
+
+ ssize_t buftotal;
+ for (int i = 0; i < bufcnt; i++) {
+ buftotal += bufs.len;
+ if (buftotal == nread) {
+ /* We read everything */
+ if (req_in.read_cb) {
+ req_in.read_cb(handle, req_in, nread, 0);
+ }
+ }
+
+ if (buftotal > nread) break;
+ }
+
+
+ ol_req *req = ol_req_maybe_alloc(handle, req_in);
+ if (!req) {
+ return ol_err_new(handle, ENOMEM);
+ }
+
+ // blah
+ }
+
+
+ ngx_queue_insert_tail();
+
return 0;
}
View
3  ol-unix.h
@@ -3,7 +3,6 @@
#include "ngx-queue.h"
-#define EV_MULTIPLICITY 0
#include "ev/ev.h"
#include <sys/types.h>
@@ -42,6 +41,8 @@ typedef struct {
typedef struct {
int fd;
+ ol_err err;
+
ol_read_cb read_cb;
ol_close_cb close_cb;
View
1  ol.h
@@ -73,6 +73,7 @@ struct ol_req_s {
};
+void ol_init();
int ol_run();
ol_handle* ol_handle_new(ol_close_cb close_cb, void* data);
View
16 test/echo-server.c
@@ -80,11 +80,23 @@ void on_accept(ol_handle* server, ol_handle* new_client) {
int main(int argc, char** argv) {
+ ol_init();
+
ol_handle* server = ol_handle_new(on_close, NULL);
struct sockaddr_in addr = ol_ip4_addr("0.0.0.0", 8000);
- ol_bind(server, (struct sockaddr*) &addr);
- ol_listen(server, 128, on_accept);
+
+ int r = ol_bind(server, (struct sockaddr*) &addr);
+ if (r) {
+ fprintf(stderr, "Bind error\n");
+ return 1;
+ }
+
+ r = ol_listen(server, 128, on_accept);
+ if (r) {
+ fprintf(stderr, "Listen error\n");
+ return 1;
+ }
ol_run();
Please sign in to comment.
Something went wrong with that request. Please try again.