Permalink
Browse files

Sync evcom after refactor; fix binding issues

  • Loading branch information...
1 parent 4253baf commit 3b0408ec1cbd7872aaf347c93674c33deb36f03f @ry ry committed Aug 13, 2009
Showing with 652 additions and 693 deletions.
  1. +486 −511 deps/evcom/evcom.c
  2. +92 −89 deps/evcom/evcom.h
  3. +23 −19 deps/evcom/test/test.c
  4. +2 −4 src/http.js
  5. +30 −49 src/net.cc
  6. +10 −13 src/net.h
  7. +9 −8 test/mjsunit/test-http-cat.js
View
View
@@ -42,10 +42,10 @@ extern "C" {
# include <gnutls/gnutls.h>
#endif
-typedef struct evcom_queue evcom_queue;
-typedef struct evcom_buf evcom_buf;
-typedef struct evcom_server evcom_server;
-typedef struct evcom_stream evcom_stream;
+/* The maximum evcom_stream will try to read in one callback */
+#ifndef EVCOM_CHUNKSIZE
+# define EVCOM_CHUNKSIZE (8*1024)
+#endif
/* flags for stream and server */
#define EVCOM_ATTACHED 0x0001
@@ -54,8 +54,91 @@ typedef struct evcom_stream evcom_stream;
#define EVCOM_SECURE 0x0008
#define EVCOM_GOT_HALF_CLOSE 0x0010
#define EVCOM_GOT_FULL_CLOSE 0x0020
-#define EVCOM_TOO_MANY_CONN 0x0040
-#define EVCOM_READ_PAUSED 0x0080
+#define EVCOM_PAUSED 0x0040
+#define EVCOM_READABLE 0x0080
+#define EVCOM_WRITABLE 0x0100
+#define EVCOM_GOT_WRITE_EVENT 0x0200
+
+enum evcom_stream_state { EVCOM_INITIALIZED
+ , EVCOM_CONNECTING
+ , EVCOM_CONNECTED_RW /* read write */
+ , EVCOM_CONNECTED_RO /* read only */
+ , EVCOM_CONNECTED_WO /* write only */
+ , EVCOM_CLOSING
+ , EVCOM_CLOSED
+ };
+
+typedef struct evcom_queue {
+ struct evcom_queue *prev;
+ struct evcom_queue *next;
+} evcom_queue;
+
+typedef struct evcom_buf {
+ /* public */
+ char *base;
+ size_t len;
+ void (*release) (struct evcom_buf *); /* called when oi is done with the object */
+ void *data;
+
+ /* private */
+ size_t written;
+ evcom_queue queue;
+} evcom_buf;
+
+#if EV_MULTIPLICITY
+# define EVCOM_LOOP struct ev_loop *loop;
+#else
+# define EVCOM_LOOP
+#endif
+
+#define EVCOM_DESCRIPTOR(type) \
+ unsigned int flags; /* private */ \
+ int (*action) (struct evcom_descriptor*); /* private */ \
+ int errorno; /* read-only */ \
+ int fd; /* read-only */ \
+ EVCOM_LOOP /* read-only */ \
+ void *data; /* public */ \
+ void (*on_close) (struct type*); /* public */
+
+typedef struct evcom_descriptor {
+ EVCOM_DESCRIPTOR(evcom_descriptor)
+} evcom_descriptor;
+
+typedef struct evcom_server {
+ EVCOM_DESCRIPTOR(evcom_server)
+
+ /* PRIVATE */
+ ev_io watcher;
+
+ /* PUBLIC */
+ struct evcom_stream*
+ (*on_connection)(struct evcom_server *, struct sockaddr *remote_addr);
+} evcom_server;
+
+typedef struct evcom_stream {
+ EVCOM_DESCRIPTOR(evcom_stream)
+
+ /* PRIVATE */
+ ev_io write_watcher;
+ ev_io read_watcher;
+ ev_timer timeout_watcher;
+#if EVCOM_HAVE_GNUTLS
+ gnutls_session_t session;
+#endif
+
+ /* READ-ONLY */
+ struct evcom_server *server;
+ evcom_queue out;
+#if EVCOM_HAVE_GNUTLS
+ int gnutls_errorno;
+#endif
+
+ /* PUBLIC */
+ void (*on_connect) (struct evcom_stream *);
+ void (*on_read) (struct evcom_stream *, const void *buf, size_t count);
+ void (*on_drain) (struct evcom_stream *);
+ void (*on_timeout) (struct evcom_stream *);
+} evcom_stream;
void evcom_server_init (evcom_server *);
int evcom_server_listen (evcom_server *, struct sockaddr *address, int backlog);
@@ -97,7 +180,7 @@ void evcom_stream_full_close (evcom_stream *);
/* The most extreme measure.
* Will not wait for the write queue to complete.
*/
-void evcom_stream_force_close (evcom_stream *);
+void evcom_stream_force_close (evcom_stream *);
#if EVCOM_HAVE_GNUTLS
@@ -110,92 +193,12 @@ void evcom_stream_force_close (evcom_stream *);
void evcom_stream_set_secure_session (evcom_stream *, gnutls_session_t);
#endif
+enum evcom_stream_state evcom_stream_state (evcom_stream *stream);
+
evcom_buf * evcom_buf_new (const char* base, size_t len);
evcom_buf * evcom_buf_new2 (size_t len);
void evcom_buf_destroy (evcom_buf *);
-
-struct evcom_queue {
- evcom_queue *prev;
- evcom_queue *next;
-};
-
-struct evcom_buf {
- /* public */
- char *base;
- size_t len;
- void (*release) (evcom_buf *); /* called when oi is done with the object */
- void *data;
-
- /* private */
- size_t written;
- evcom_queue queue;
-};
-
-struct evcom_server {
- /* read only */
- int fd;
-#if EV_MULTIPLICITY
- struct ev_loop *loop;
-#endif
- unsigned flags;
-
- /* PRIVATE */
- ev_io connection_watcher;
-
- /* PUBLIC */
-
- evcom_stream* (*on_connection) (evcom_server *, struct sockaddr *remote_addr);
-
- /* Executed when a server is closed.
- * If evcom_server_close() was called errorno will be 0.
- * An libev error is indicated with errorno == 1
- * Otherwise errorno is a stdlib errno from a system call, e.g. accept()
- */
- void (*on_close) (evcom_server *, int errorno);
-
- void *data;
-};
-
-struct evcom_stream {
- /* read only */
- int fd;
-#if EV_MULTIPLICITY
- struct ev_loop *loop;
-#endif
- evcom_server *server;
- evcom_queue out_stream;
- size_t written;
- unsigned flags;
-
- /* NULL = that end of the stream is closed. */
- int (*read_action) (evcom_stream *);
- int (*write_action) (evcom_stream *);
-
- /* ERROR CODES. 0 = no error. Check on_close. */
- int errorno;
-#if EVCOM_HAVE_GNUTLS
- int gnutls_errorno;
-#endif
-
- /* private */
- ev_io write_watcher;
- ev_io read_watcher;
- ev_timer timeout_watcher;
-#if EVCOM_HAVE_GNUTLS
- gnutls_session_t session;
-#endif
-
- /* public */
- size_t chunksize; /* the maximum chunk that on_read() will return */
- void (*on_connect) (evcom_stream *);
- void (*on_read) (evcom_stream *, const void *buf, size_t count);
- void (*on_drain) (evcom_stream *);
- void (*on_close) (evcom_stream *);
- void (*on_timeout) (evcom_stream *);
- void *data;
-};
-
EV_INLINE void
evcom_queue_init (evcom_queue *q)
{
@@ -27,22 +27,26 @@ static int use_tls;
static int got_server_close;
static void
-common_on_server_close (evcom_server *server, int errorno)
+common_on_server_close (evcom_server *s)
{
- assert(server);
- assert(errorno == 0);
+ printf("server on_close\n");
+ assert(s == &server);
+ assert(s->errorno == 0);
got_server_close = 1;
+ evcom_server_detach(s);
}
static void
common_on_peer_close (evcom_stream *stream)
{
+ assert(EVCOM_CLOSED == evcom_stream_state(stream));
assert(stream->errorno == 0);
printf("server connection closed\n");
#if EVCOM_HAVE_GNUTLS
assert(stream->gnutls_errorno == 0);
if (use_tls) gnutls_deinit(stream->session);
#endif
+ evcom_stream_detach(stream);
free(stream);
}
@@ -106,7 +110,7 @@ void anon_tls_client (evcom_stream *stream)
#define PING "PING"
#define PONG "PONG"
-#define EXCHANGES 5000
+#define EXCHANGES 500
#define PINGPONG_TIMEOUT 5.0
static int successful_ping_count;
@@ -130,9 +134,11 @@ pingpong_on_peer_read (evcom_stream *stream, const void *base, size_t len)
static void
pingpong_on_client_close (evcom_stream *stream)
{
+ assert(EVCOM_CLOSED == evcom_stream_state(stream));
assert(stream);
printf("client connection closed\n");
evcom_server_close(&server);
+ evcom_stream_detach(stream);
}
static evcom_stream*
@@ -147,6 +153,8 @@ pingpong_on_server_connection (evcom_server *_server, struct sockaddr *addr)
stream->on_close = common_on_peer_close;
stream->on_timeout = common_on_peer_timeout;
+ assert(EVCOM_INITIALIZED == evcom_stream_state(stream));
+
nconnections++;
#if EVCOM_HAVE_GNUTLS
@@ -163,6 +171,7 @@ pingpong_on_client_connect (evcom_stream *stream)
{
printf("client connected. sending ping\n");
evcom_stream_write_simple(stream, PING, sizeof PING);
+ assert(EVCOM_CONNECTED_RW == evcom_stream_state(stream));
}
static void
@@ -219,6 +228,8 @@ pingpong (struct sockaddr *address)
client.on_close = pingpong_on_client_close;
client.on_timeout = common_on_client_timeout;
+ assert(EVCOM_INITIALIZED == evcom_stream_state(&client));
+
#if EVCOM_HAVE_GNUTLS
if (use_tls) anon_tls_client(&client);
#endif
@@ -239,35 +250,28 @@ pingpong (struct sockaddr *address)
-
-#define NCONN 100
-#define CONNINT_TIMEOUT 1000.0
+#define NCONN 50
+#define CONNINT_TIMEOUT 10.0
static void
-connint_on_peer_read(evcom_stream *stream, const void *base, size_t len)
+send_bye_and_close(evcom_stream *stream, const void *base, size_t len)
{
assert(base);
assert(len == 0);
evcom_stream_write_simple(stream, "BYE", 3);
printf("server wrote bye\n");
-}
-
-static void
-connint_on_peer_drain(evcom_stream *stream)
-{
evcom_stream_close(stream);
}
static evcom_stream*
-connint_on_server_connection(evcom_server *_server, struct sockaddr *addr)
+connint_on_connection(evcom_server *_server, struct sockaddr *addr)
{
assert(_server == &server);
assert(addr);
evcom_stream *stream = malloc(sizeof(evcom_stream));
evcom_stream_init(stream, CONNINT_TIMEOUT);
- stream->on_read = connint_on_peer_read;
- stream->on_drain = connint_on_peer_drain;
+ stream->on_read = send_bye_and_close;
stream->on_close = common_on_peer_close;
stream->on_timeout = common_on_peer_timeout;
@@ -300,6 +304,8 @@ connint_on_client_close (evcom_stream *stream)
evcom_server_close(&server);
printf("closing server\n");
}
+
+ evcom_stream_detach(stream);
}
static void
@@ -329,10 +335,9 @@ connint (struct sockaddr *address)
got_server_close = 0;
evcom_server_init(&server);
- server.on_connection = connint_on_server_connection;
+ server.on_connection = connint_on_connection;
server.on_close = common_on_server_close;
-
evcom_server_listen(&server, address, 1000);
evcom_server_attach(EV_DEFAULT_ &server);
@@ -443,6 +448,5 @@ main (void)
free_unix_address(unix_address);
#endif
-
return 0;
}
View
@@ -423,8 +423,6 @@ node.http.createClient = function (port, host) {
};
client.addListener("connect", function () {
- //node.debug("HTTP CLIENT onConnect. readyState = " + client.readyState);
- //node.debug("client.requests[0].uri = '" + client.requests[0].uri + "'");
requests[0].flush();
});
@@ -439,11 +437,11 @@ node.http.createClient = function (port, host) {
return;
}
- //node.debug("HTTP CLIENT onDisconnect. readyState = " + client.readyState);
+ node.debug("HTTP CLIENT onDisconnect. readyState = " + client.readyState);
// If there are more requests to handle, reconnect.
if (requests.length > 0 && client.readyState != "opening") {
- //node.debug("HTTP CLIENT: reconnecting readyState = " + client.readyState);
+ node.debug("HTTP CLIENT: reconnecting readyState = " + client.readyState);
client.connect(port, host); // reconnect
}
});
Oops, something went wrong. Retry.

0 comments on commit 3b0408e

Please sign in to comment.