Permalink
Browse files

win: guarantee write callback ordering

  • Loading branch information...
1 parent 2640aae commit f0044c007a24ea4dd18f979c59b16c5f1b02fa8f @pietern pietern committed with piscisaureus Sep 16, 2011
Showing with 79 additions and 23 deletions.
  1. +1 −0 AUTHORS
  2. +3 −1 include/uv-private/uv-win.h
  3. +1 −0 src/win/internal.h
  4. +31 −12 src/win/pipe.c
  5. +14 −0 src/win/stream.c
  6. +29 −10 src/win/tcp.c
View
@@ -21,3 +21,4 @@ Clifford Heath <clifford.heath@gmail.com>
Jorge Chamorro Bieling <jorge@jorgechamorro.com>
Luis Lavena <luislavena@gmail.com>
Matthew Sporleder <msporleder@gmail.com>
+Pieter Noordhuis <pcnoordhuis@gmail.com>
@@ -99,7 +99,8 @@ RB_HEAD(uv_timer_tree_s, uv_timer_s);
struct uv_req_s* next_req;
#define UV_WRITE_PRIVATE_FIELDS \
- /* empty */
+ int done; \
+ uv_write_t* next_write;
#define UV_CONNECT_PRIVATE_FIELDS \
/* empty */
@@ -125,6 +126,7 @@ RB_HEAD(uv_timer_tree_s, uv_timer_s);
#define uv_stream_connection_fields \
unsigned int write_reqs_pending; \
+ uv_write_t* write_reqs_tail; \
uv_shutdown_t* shutdown_req;
#define uv_stream_server_fields \
View
@@ -110,6 +110,7 @@ void uv_process_reqs(uv_loop_t* loop);
*/
void uv_stream_init(uv_loop_t* loop, uv_stream_t* handle);
void uv_connection_init(uv_stream_t* handle);
+void uv_insert_pending_write_req(uv_stream_t* handle, uv_write_t* req);
size_t uv_count_bufs(uv_buf_t bufs[], int count);
View
@@ -747,8 +747,11 @@ int uv_pipe_write(uv_loop_t* loop, uv_write_t* req, uv_pipe_t* handle,
req->type = UV_WRITE;
req->handle = (uv_stream_t*) handle;
req->cb = cb;
+ req->done = 0;
memset(&req->overlapped, 0, sizeof(req->overlapped));
+ uv_insert_pending_write_req((uv_stream_t*)handle, req);
+
result = WriteFile(handle->handle,
bufs[0].base,
bufs[0].len,
@@ -886,22 +889,38 @@ void uv_process_pipe_write_req(uv_loop_t* loop, uv_pipe_t* handle,
handle->write_queue_size -= req->queued_bytes;
- if (req->cb) {
- if (!REQ_SUCCESS(req)) {
- loop->last_error = GET_REQ_UV_ERROR(req);
- ((uv_write_cb)req->cb)(req, -1);
+ req->done = 1;
+
+ while (handle->write_reqs_tail) {
+ req = handle->write_reqs_tail->next_write;
+
+ if (!req->done) {
+ break;
+ }
+
+ if (req == handle->write_reqs_tail) {
+ handle->write_reqs_tail = NULL;
} else {
- ((uv_write_cb)req->cb)(req, 0);
+ handle->write_reqs_tail->next_write = req->next_write;
}
- }
- handle->write_reqs_pending--;
- if (handle->write_reqs_pending == 0 &&
- handle->flags & UV_HANDLE_SHUTTING) {
- uv_want_endgame(loop, (uv_handle_t*)handle);
- }
+ if (req->cb) {
+ if (!REQ_SUCCESS(req)) {
+ loop->last_error = GET_REQ_UV_ERROR(req);
+ ((uv_write_cb)req->cb)(req, -1);
+ } else {
+ ((uv_write_cb)req->cb)(req, 0);
+ }
+ }
- DECREASE_PENDING_REQ_COUNT(handle);
+ handle->write_reqs_pending--;
+ if (handle->flags & UV_HANDLE_SHUTTING &&
+ handle->write_reqs_pending == 0) {
+ uv_want_endgame(loop, (uv_handle_t*)handle);
+ }
+
+ DECREASE_PENDING_REQ_COUNT(handle);
+ }
}
View
@@ -41,13 +41,27 @@ void uv_stream_init(uv_loop_t* loop, uv_stream_t* handle) {
void uv_connection_init(uv_stream_t* handle) {
handle->flags |= UV_HANDLE_CONNECTION;
handle->write_reqs_pending = 0;
+ handle->write_reqs_tail = NULL;
uv_req_init(handle->loop, (uv_req_t*) &(handle->read_req));
handle->read_req.type = UV_READ;
handle->read_req.data = handle;
}
+void uv_insert_pending_write_req(uv_stream_t* handle, uv_write_t* req) {
+ req->next_write = NULL;
+ if (handle->write_reqs_tail) {
+ req->next_write = handle->write_reqs_tail->next_write;
+ handle->write_reqs_tail->next_write = req;
+ handle->write_reqs_tail = req;
+ } else {
+ req->next_write = req;
+ handle->write_reqs_tail = req;
+ }
+}
+
+
int uv_listen(uv_stream_t* stream, int backlog, uv_connection_cb cb) {
switch (stream->type) {
case UV_TCP:
View
@@ -644,8 +644,11 @@ int uv_tcp_write(uv_loop_t* loop, uv_write_t* req, uv_tcp_t* handle,
req->type = UV_WRITE;
req->handle = (uv_stream_t*) handle;
req->cb = cb;
+ req->done = 0;
memset(&req->overlapped, 0, sizeof(req->overlapped));
+ uv_insert_pending_write_req((uv_stream_t*)handle, req);
+
result = WSASend(handle->socket,
(WSABUF*)bufs,
bufcnt,
@@ -781,18 +784,34 @@ void uv_process_tcp_write_req(uv_loop_t* loop, uv_tcp_t* handle,
handle->write_queue_size -= req->queued_bytes;
- if (req->cb) {
- loop->last_error = GET_REQ_UV_SOCK_ERROR(req);
- ((uv_write_cb)req->cb)(req, loop->last_error.code == UV_OK ? 0 : -1);
- }
+ req->done = 1;
- handle->write_reqs_pending--;
- if (handle->flags & UV_HANDLE_SHUTTING &&
- handle->write_reqs_pending == 0) {
- uv_want_endgame(loop, (uv_handle_t*)handle);
- }
+ while (handle->write_reqs_tail) {
+ req = handle->write_reqs_tail->next_write;
- DECREASE_PENDING_REQ_COUNT(handle);
+ if (!req->done) {
+ break;
+ }
+
+ if (req == handle->write_reqs_tail) {
+ handle->write_reqs_tail = NULL;
+ } else {
+ handle->write_reqs_tail->next_write = req->next_write;
+ }
+
+ if (req->cb) {
+ loop->last_error = GET_REQ_UV_SOCK_ERROR(req);
+ ((uv_write_cb)req->cb)(req, loop->last_error.code == UV_OK ? 0 : -1);
+ }
+
+ handle->write_reqs_pending--;
+ if (handle->flags & UV_HANDLE_SHUTTING &&
+ handle->write_reqs_pending == 0) {
+ uv_want_endgame(loop, (uv_handle_t*)handle);
+ }
+
+ DECREASE_PENDING_REQ_COUNT(handle);
+ }
}

0 comments on commit f0044c0

Please sign in to comment.