diff --git a/configure b/configure index dbe7f243e16..401b370e923 100755 --- a/configure +++ b/configure @@ -27,6 +27,11 @@ parser.add_option("--without-npm", dest="without_npm", help="Don\'t install the bundled npm package manager") +parser.add_option("--without-isolates", + action="store_true", + dest="without_isolates", + help="Build without isolates (no threads, single loop) [Default: False]") + parser.add_option("--without-ssl", action="store_true", dest="without_ssl", @@ -163,6 +168,7 @@ def target_arch(): def configure_node(o): # TODO add gdb and dest_cpu + o['variables']['node_use_isolates'] = b(not options.without_isolates) o['variables']['node_debug'] = b(options.debug) o['variables']['node_prefix'] = options.prefix if options.prefix else '' o['variables']['node_use_dtrace'] = b(options.with_dtrace) diff --git a/deps/uv/AUTHORS b/deps/uv/AUTHORS index dd40b7af19c..f1fef1ca2e0 100644 --- a/deps/uv/AUTHORS +++ b/deps/uv/AUTHORS @@ -39,3 +39,4 @@ Bruce Mitchener Maciej MaƂecki Yasuhiro Matsumoto Daisuke Murase +Paddy Byers diff --git a/deps/uv/include/uv-private/eio.h b/deps/uv/include/uv-private/eio.h index 450df6ba299..aab9988b814 100644 --- a/deps/uv/include/uv-private/eio.h +++ b/deps/uv/include/uv-private/eio.h @@ -206,6 +206,28 @@ enum { EIO_PRI_DEFAULT = 0 }; +#define ETP_PRI_MIN EIO_PRI_MIN +#define ETP_PRI_MAX EIO_PRI_MAX + +#define ETP_NUM_PRI (ETP_PRI_MAX - ETP_PRI_MIN + 1) + +#define ETP_REQ eio_req + +/* + * a somewhat faster data structure might be nice, but + * with 8 priorities this actually needs <20 insns + * per shift, the most expensive operation. + */ +typedef struct { + ETP_REQ *qs[ETP_NUM_PRI], *qe[ETP_NUM_PRI]; /* qstart, qend */ + int size; +} etp_reqq; + +typedef struct { + etp_reqq res_queue; /* queue of outstanding responses for this channel */ + void *data; /* use this for what you want */ +} eio_channel; + /* eio request structure */ /* this structure is mostly read-only */ /* when initialising it, all members must be zero-initialised */ @@ -227,6 +249,8 @@ struct eio_req long int3; /* chown, fchown: gid */ int errorno; /* errno value on syscall return */ + eio_channel *channel; /* data used to direct poll callbacks arising from this req */ + #if __i386 || __amd64 unsigned char cancelled; #else @@ -261,11 +285,14 @@ enum { * and eio_poll_cb needs to be invoked (it MUST NOT call eio_poll_cb itself). * done_poll is called when the need to poll is gone. */ -int eio_init (void (*want_poll)(void), void (*done_poll)(void)); +int eio_init (void (*want_poll)(eio_channel *), void (*done_poll)(eio_channel *)); + +/* initialises a channel */ +void eio_channel_init(eio_channel *, void *data); /* must be called regularly to handle pending requests */ /* returns 0 if all requests were handled, -1 if not, or the value of EIO_FINISH if != 0 */ -int eio_poll (void); +int eio_poll (eio_channel *channel); /* stop polling if poll took longer than duration seconds */ void eio_set_max_poll_time (eio_tstamp nseconds); @@ -289,55 +316,55 @@ unsigned int eio_nthreads (void); /* number of worker threads in use currently * /* convenience wrappers */ #ifndef EIO_NO_WRAPPERS -eio_req *eio_nop (int pri, eio_cb cb, void *data); /* does nothing except go through the whole process */ -eio_req *eio_busy (eio_tstamp delay, int pri, eio_cb cb, void *data); /* ties a thread for this long, simulating busyness */ -eio_req *eio_sync (int pri, eio_cb cb, void *data); -eio_req *eio_fsync (int fd, int pri, eio_cb cb, void *data); -eio_req *eio_fdatasync (int fd, int pri, eio_cb cb, void *data); -eio_req *eio_syncfs (int fd, int pri, eio_cb cb, void *data); -eio_req *eio_msync (void *addr, size_t length, int flags, int pri, eio_cb cb, void *data); -eio_req *eio_mtouch (void *addr, size_t length, int flags, int pri, eio_cb cb, void *data); -eio_req *eio_mlock (void *addr, size_t length, int pri, eio_cb cb, void *data); -eio_req *eio_mlockall (int flags, int pri, eio_cb cb, void *data); -eio_req *eio_sync_file_range (int fd, off_t offset, size_t nbytes, unsigned int flags, int pri, eio_cb cb, void *data); -eio_req *eio_fallocate (int fd, int mode, off_t offset, size_t len, int pri, eio_cb cb, void *data); -eio_req *eio_close (int fd, int pri, eio_cb cb, void *data); -eio_req *eio_readahead (int fd, off_t offset, size_t length, int pri, eio_cb cb, void *data); -eio_req *eio_read (int fd, void *buf, size_t length, off_t offset, int pri, eio_cb cb, void *data); -eio_req *eio_write (int fd, void *buf, size_t length, off_t offset, int pri, eio_cb cb, void *data); -eio_req *eio_fstat (int fd, int pri, eio_cb cb, void *data); /* stat buffer=ptr2 allocated dynamically */ -eio_req *eio_fstatvfs (int fd, int pri, eio_cb cb, void *data); /* stat buffer=ptr2 allocated dynamically */ -eio_req *eio_futime (int fd, eio_tstamp atime, eio_tstamp mtime, int pri, eio_cb cb, void *data); -eio_req *eio_ftruncate (int fd, off_t offset, int pri, eio_cb cb, void *data); -eio_req *eio_fchmod (int fd, eio_mode_t mode, int pri, eio_cb cb, void *data); -eio_req *eio_fchown (int fd, eio_uid_t uid, eio_gid_t gid, int pri, eio_cb cb, void *data); -eio_req *eio_dup2 (int fd, int fd2, int pri, eio_cb cb, void *data); -eio_req *eio_sendfile (int out_fd, int in_fd, off_t in_offset, size_t length, int pri, eio_cb cb, void *data); -eio_req *eio_open (const char *path, int flags, eio_mode_t mode, int pri, eio_cb cb, void *data); -eio_req *eio_utime (const char *path, eio_tstamp atime, eio_tstamp mtime, int pri, eio_cb cb, void *data); -eio_req *eio_truncate (const char *path, off_t offset, int pri, eio_cb cb, void *data); -eio_req *eio_chown (const char *path, eio_uid_t uid, eio_gid_t gid, int pri, eio_cb cb, void *data); -eio_req *eio_chmod (const char *path, eio_mode_t mode, int pri, eio_cb cb, void *data); -eio_req *eio_mkdir (const char *path, eio_mode_t mode, int pri, eio_cb cb, void *data); -eio_req *eio_readdir (const char *path, int flags, int pri, eio_cb cb, void *data); /* result=ptr2 allocated dynamically */ -eio_req *eio_rmdir (const char *path, int pri, eio_cb cb, void *data); -eio_req *eio_unlink (const char *path, int pri, eio_cb cb, void *data); -eio_req *eio_readlink (const char *path, int pri, eio_cb cb, void *data); /* result=ptr2 allocated dynamically */ -eio_req *eio_realpath (const char *path, int pri, eio_cb cb, void *data); /* result=ptr2 allocated dynamically */ -eio_req *eio_stat (const char *path, int pri, eio_cb cb, void *data); /* stat buffer=ptr2 allocated dynamically */ -eio_req *eio_lstat (const char *path, int pri, eio_cb cb, void *data); /* stat buffer=ptr2 allocated dynamically */ -eio_req *eio_statvfs (const char *path, int pri, eio_cb cb, void *data); /* stat buffer=ptr2 allocated dynamically */ -eio_req *eio_mknod (const char *path, eio_mode_t mode, dev_t dev, int pri, eio_cb cb, void *data); -eio_req *eio_link (const char *path, const char *new_path, int pri, eio_cb cb, void *data); -eio_req *eio_symlink (const char *path, const char *new_path, int pri, eio_cb cb, void *data); -eio_req *eio_rename (const char *path, const char *new_path, int pri, eio_cb cb, void *data); -eio_req *eio_custom (void (*execute)(eio_req *), int pri, eio_cb cb, void *data); +eio_req *eio_nop (int pri, eio_cb cb, void *data, eio_channel *channel); /* does nothing except go through the whole process */ +eio_req *eio_busy (eio_tstamp delay, int pri, eio_cb cb, void *data, eio_channel *channel); /* ties a thread for this long, simulating busyness */ +eio_req *eio_sync (int pri, eio_cb cb, void *data, eio_channel *channel); +eio_req *eio_fsync (int fd, int pri, eio_cb cb, void *data, eio_channel *channel); +eio_req *eio_fdatasync (int fd, int pri, eio_cb cb, void *data, eio_channel *channel); +eio_req *eio_syncfs (int fd, int pri, eio_cb cb, void *data, eio_channel *channel); +eio_req *eio_msync (void *addr, size_t length, int flags, int pri, eio_cb cb, void *data, eio_channel *channel); +eio_req *eio_mtouch (void *addr, size_t length, int flags, int pri, eio_cb cb, void *data, eio_channel *channel); +eio_req *eio_mlock (void *addr, size_t length, int pri, eio_cb cb, void *data, eio_channel *channel); +eio_req *eio_mlockall (int flags, int pri, eio_cb cb, void *data, eio_channel *channel); +eio_req *eio_sync_file_range (int fd, off_t offset, size_t nbytes, unsigned int flags, int pri, eio_cb cb, void *data, eio_channel *channel); +eio_req *eio_fallocate (int fd, int mode, off_t offset, size_t len, int pri, eio_cb cb, void *data, eio_channel *channel); +eio_req *eio_close (int fd, int pri, eio_cb cb, void *data, eio_channel *channel); +eio_req *eio_readahead (int fd, off_t offset, size_t length, int pri, eio_cb cb, void *data, eio_channel *channel); +eio_req *eio_read (int fd, void *buf, size_t length, off_t offset, int pri, eio_cb cb, void *data, eio_channel *channel); +eio_req *eio_write (int fd, void *buf, size_t length, off_t offset, int pri, eio_cb cb, void *data, eio_channel *channel); +eio_req *eio_fstat (int fd, int pri, eio_cb cb, void *data, eio_channel *channel); /* stat buffer=ptr2 allocated dynamically */ +eio_req *eio_fstatvfs (int fd, int pri, eio_cb cb, void *data, eio_channel *channel); /* stat buffer=ptr2 allocated dynamically */ +eio_req *eio_futime (int fd, eio_tstamp atime, eio_tstamp mtime, int pri, eio_cb cb, void *data, eio_channel *channel); +eio_req *eio_ftruncate (int fd, off_t offset, int pri, eio_cb cb, void *data, eio_channel *channel); +eio_req *eio_fchmod (int fd, eio_mode_t mode, int pri, eio_cb cb, void *data, eio_channel *channel); +eio_req *eio_fchown (int fd, eio_uid_t uid, eio_gid_t gid, int pri, eio_cb cb, void *data, eio_channel *channel); +eio_req *eio_dup2 (int fd, int fd2, int pri, eio_cb cb, void *data, eio_channel *channel); +eio_req *eio_sendfile (int out_fd, int in_fd, off_t in_offset, size_t length, int pri, eio_cb cb, void *data, eio_channel *channel); +eio_req *eio_open (const char *path, int flags, eio_mode_t mode, int pri, eio_cb cb, void *data, eio_channel *channel); +eio_req *eio_utime (const char *path, eio_tstamp atime, eio_tstamp mtime, int pri, eio_cb cb, void *data, eio_channel *channel); +eio_req *eio_truncate (const char *path, off_t offset, int pri, eio_cb cb, void *data, eio_channel *channel); +eio_req *eio_chown (const char *path, eio_uid_t uid, eio_gid_t gid, int pri, eio_cb cb, void *data, eio_channel *channel); +eio_req *eio_chmod (const char *path, eio_mode_t mode, int pri, eio_cb cb, void *data, eio_channel *channel); +eio_req *eio_mkdir (const char *path, eio_mode_t mode, int pri, eio_cb cb, void *data, eio_channel *channel); +eio_req *eio_readdir (const char *path, int flags, int pri, eio_cb cb, void *data, eio_channel *channel); /* result=ptr2 allocated dynamically */ +eio_req *eio_rmdir (const char *path, int pri, eio_cb cb, void *data, eio_channel *channel); +eio_req *eio_unlink (const char *path, int pri, eio_cb cb, void *data, eio_channel *channel); +eio_req *eio_readlink (const char *path, int pri, eio_cb cb, void *data, eio_channel *channel); /* result=ptr2 allocated dynamically */ +eio_req *eio_realpath (const char *path, int pri, eio_cb cb, void *data, eio_channel *channel); /* result=ptr2 allocated dynamically */ +eio_req *eio_stat (const char *path, int pri, eio_cb cb, void *data, eio_channel *channel); /* stat buffer=ptr2 allocated dynamically */ +eio_req *eio_lstat (const char *path, int pri, eio_cb cb, void *data, eio_channel *channel); /* stat buffer=ptr2 allocated dynamically */ +eio_req *eio_statvfs (const char *path, int pri, eio_cb cb, void *data, eio_channel *channel); /* stat buffer=ptr2 allocated dynamically */ +eio_req *eio_mknod (const char *path, eio_mode_t mode, dev_t dev, int pri, eio_cb cb, void *data, eio_channel *channel); +eio_req *eio_link (const char *path, const char *new_path, int pri, eio_cb cb, void *data, eio_channel *channel); +eio_req *eio_symlink (const char *path, const char *new_path, int pri, eio_cb cb, void *data, eio_channel *channel); +eio_req *eio_rename (const char *path, const char *new_path, int pri, eio_cb cb, void *data, eio_channel *channel); +eio_req *eio_custom (void (*execute)(eio_req *), int pri, eio_cb cb, void *data, eio_channel *channel); #endif /*****************************************************************************/ /* groups */ -eio_req *eio_grp (eio_cb cb, void *data); +eio_req *eio_grp (eio_cb cb, void *data, eio_channel *channel); void eio_grp_feed (eio_req *grp, void (*feed)(eio_req *req), int limit); void eio_grp_limit (eio_req *grp, int limit); void eio_grp_add (eio_req *grp, eio_req *req); diff --git a/deps/uv/include/uv-private/uv-unix.h b/deps/uv/include/uv-private/uv-unix.h index 99537347f76..24ef37cb9d5 100644 --- a/deps/uv/include/uv-private/uv-unix.h +++ b/deps/uv/include/uv-private/uv-unix.h @@ -63,6 +63,8 @@ typedef void* uv_lib_t; * definition of ares_timeout(). \ */ \ ev_timer timer; \ + /* Poll result queue */ \ + eio_channel uv_eio_channel; \ struct ev_loop* ev; #define UV_REQ_BUFSML_SIZE (4) diff --git a/deps/uv/include/uv.h b/deps/uv/include/uv.h index 3f6df256873..7e089ef7243 100644 --- a/deps/uv/include/uv.h +++ b/deps/uv/include/uv.h @@ -1313,8 +1313,7 @@ UV_EXTERN uv_err_t uv_dlopen(const char* filename, uv_lib_t* library); UV_EXTERN uv_err_t uv_dlclose(uv_lib_t library); /* - * Retrieves a data pointer from a dynamic library. It is legal for a symbol to - * map to NULL. + * Retrieves a data pointer from a dynamic library. */ UV_EXTERN uv_err_t uv_dlsym(uv_lib_t library, const char* name, void** ptr); diff --git a/deps/uv/src/unix/core.c b/deps/uv/src/unix/core.c index a024f12b867..210f8fe3f42 100644 --- a/deps/uv/src/unix/core.c +++ b/deps/uv/src/unix/core.c @@ -167,6 +167,7 @@ static int uv__loop_init(uv_loop_t* loop, loop->ev = ev_loop_new(EVFLAG_AUTO); #endif ev_set_userdata(loop->ev, loop); + eio_channel_init(&loop->uv_eio_channel, loop); return 0; } @@ -709,7 +710,7 @@ int uv_getaddrinfo(uv_loop_t* loop, uv_ref(loop); req = eio_custom(getaddrinfo_thread_proc, EIO_PRI_DEFAULT, - uv_getaddrinfo_done, handle); + uv_getaddrinfo_done, handle, &loop->uv_eio_channel); assert(req); assert(req->data == handle); diff --git a/deps/uv/src/unix/dl.c b/deps/uv/src/unix/dl.c index 41c244d79ea..6c4ddff89e6 100644 --- a/deps/uv/src/unix/dl.c +++ b/deps/uv/src/unix/dl.c @@ -25,17 +25,11 @@ #include #include -/* The dl family of functions don't set errno. We need a good way to communicate - * errors to the caller but there is only dlerror() and that returns a string - - * a string that may or may not be safe to keep a reference to... - */ -static const uv_err_t uv_inval_ = { UV_EINVAL, EINVAL }; - uv_err_t uv_dlopen(const char* filename, uv_lib_t* library) { void* handle = dlopen(filename, RTLD_LAZY); if (handle == NULL) { - return uv_inval_; + return uv__new_sys_error(errno); } *library = handle; @@ -45,7 +39,7 @@ uv_err_t uv_dlopen(const char* filename, uv_lib_t* library) { uv_err_t uv_dlclose(uv_lib_t library) { if (dlclose(library) != 0) { - return uv_inval_; + return uv__new_sys_error(errno); } return uv_ok_; @@ -53,15 +47,9 @@ uv_err_t uv_dlclose(uv_lib_t library) { uv_err_t uv_dlsym(uv_lib_t library, const char* name, void** ptr) { - void* address; - - /* Reset error status. */ - dlerror(); - - address = dlsym(library, name); - - if (dlerror()) { - return uv_inval_; + void* address = dlsym(library, name); + if (address == NULL) { + return uv__new_sys_error(errno); } *ptr = (void*) address; diff --git a/deps/uv/src/unix/eio/eio.c b/deps/uv/src/unix/eio/eio.c index 75abd9bb69b..58300a65eb2 100644 --- a/deps/uv/src/unix/eio/eio.c +++ b/deps/uv/src/unix/eio/eio.c @@ -362,12 +362,8 @@ static int gettimeofday(struct timeval *tv, struct timezone *tz) #define EIO_TICKS ((1000000 + 1023) >> 10) -#define ETP_PRI_MIN EIO_PRI_MIN -#define ETP_PRI_MAX EIO_PRI_MAX - struct etp_worker; -#define ETP_REQ eio_req #define ETP_DESTROY(req) eio_destroy (req) static int eio_finish (eio_req *req); #define ETP_FINISH(req) eio_finish (req) @@ -376,8 +372,6 @@ static void eio_execute (struct etp_worker *self, eio_req *req); /*****************************************************************************/ -#define ETP_NUM_PRI (ETP_PRI_MAX - ETP_PRI_MIN + 1) - /* calculate time difference in ~1/EIO_TICKS of a second */ ecb_inline int tvdiff (struct timeval *tv1, struct timeval *tv2) @@ -388,8 +382,8 @@ tvdiff (struct timeval *tv1, struct timeval *tv2) static unsigned int started, idle, wanted = 4; -static void (*want_poll_cb) (void); -static void (*done_poll_cb) (void); +static void (*want_poll_cb) (eio_channel *); +static void (*done_poll_cb) (eio_channel *); static unsigned int max_poll_time; /* reslock */ static unsigned int max_poll_reqs; /* reslock */ @@ -506,18 +500,8 @@ etp_nthreads (void) return retval; } -/* - * a somewhat faster data structure might be nice, but - * with 8 priorities this actually needs <20 insns - * per shift, the most expensive operation. - */ -typedef struct { - ETP_REQ *qs[ETP_NUM_PRI], *qe[ETP_NUM_PRI]; /* qstart, qend */ - int size; -} etp_reqq; - static etp_reqq req_queue; -static etp_reqq res_queue; +static eio_channel default_channel; static void ecb_noinline ecb_cold reqq_init (etp_reqq *q) @@ -574,7 +558,7 @@ reqq_shift (etp_reqq *q) } static int ecb_cold -etp_init (void (*want_poll)(void), void (*done_poll)(void)) +etp_init (void (*want_poll)(eio_channel *), void (*done_poll)(eio_channel *)) { X_MUTEX_CREATE (wrklock); X_MUTEX_CREATE (reslock); @@ -582,7 +566,7 @@ etp_init (void (*want_poll)(void), void (*done_poll)(void)) X_COND_CREATE (reqwait); reqq_init (&req_queue); - reqq_init (&res_queue); + eio_channel_init (&default_channel, 0); wrk_first.next = wrk_first.prev = &wrk_first; @@ -656,12 +640,19 @@ etp_end_thread (void) X_UNLOCK (wrklock); } +void +eio_channel_init(eio_channel *channel, void *data) { + reqq_init(&channel->res_queue); + channel->data = data; +} + static int -etp_poll (void) +etp_poll (eio_channel *channel) { unsigned int maxreqs; unsigned int maxtime; struct timeval tv_start, tv_now; + if(!channel) channel = &default_channel; X_LOCK (reslock); maxreqs = max_poll_reqs; @@ -678,14 +669,14 @@ etp_poll (void) etp_maybe_start_thread (); X_LOCK (reslock); - req = reqq_shift (&res_queue); + req = reqq_shift (&channel->res_queue); if (req) { --npending; - if (!res_queue.size && done_poll_cb) - done_poll_cb (); + if (!channel->res_queue.size && done_poll_cb) + done_poll_cb (channel); } X_UNLOCK (reslock); @@ -752,8 +743,8 @@ etp_submit (ETP_REQ *req) ++npending; - if (!reqq_push (&res_queue, req) && want_poll_cb) - want_poll_cb (); + if (!reqq_push (&req->channel->res_queue, req) && want_poll_cb) + want_poll_cb (req->channel); X_UNLOCK (reslock); } @@ -970,9 +961,9 @@ eio_set_max_parallel (unsigned int nthreads) etp_set_max_parallel (nthreads); } -int eio_poll (void) +int eio_poll (eio_channel *channel) { - return etp_poll (); + return etp_poll (channel); } /*****************************************************************************/ @@ -2092,8 +2083,8 @@ X_THREAD_PROC (etp_proc) ++npending; - if (!reqq_push (&res_queue, req) && want_poll_cb) - want_poll_cb (); + if (!reqq_push (&req->channel->res_queue, req) && want_poll_cb) + want_poll_cb (req->channel); self->req = 0; etp_worker_clear (self); @@ -2112,7 +2103,7 @@ X_THREAD_PROC (etp_proc) /*****************************************************************************/ int ecb_cold -eio_init (void (*want_poll)(void), void (*done_poll)(void)) +eio_init (void (*want_poll)(eio_channel *), void (*done_poll)(eio_channel *)) { #if !HAVE_PREADWRITE X_MUTEX_CREATE (preadwritelock); @@ -2138,7 +2129,8 @@ eio_api_destroy (eio_req *req) req->pri = pri; \ req->finish = cb; \ req->data = data; \ - req->destroy = eio_api_destroy; + req->destroy = eio_api_destroy; \ + req->channel = channel #define SEND eio_submit (req); return req @@ -2294,209 +2286,209 @@ eio_execute (etp_worker *self, eio_req *req) #ifndef EIO_NO_WRAPPERS -eio_req *eio_nop (int pri, eio_cb cb, void *data) +eio_req *eio_nop (int pri, eio_cb cb, void *data, eio_channel *channel) { REQ (EIO_NOP); SEND; } -eio_req *eio_busy (double delay, int pri, eio_cb cb, void *data) +eio_req *eio_busy (double delay, int pri, eio_cb cb, void *data, eio_channel *channel) { REQ (EIO_BUSY); req->nv1 = delay; SEND; } -eio_req *eio_sync (int pri, eio_cb cb, void *data) +eio_req *eio_sync (int pri, eio_cb cb, void *data, eio_channel *channel) { REQ (EIO_SYNC); SEND; } -eio_req *eio_fsync (int fd, int pri, eio_cb cb, void *data) +eio_req *eio_fsync (int fd, int pri, eio_cb cb, void *data, eio_channel *channel) { REQ (EIO_FSYNC); req->int1 = fd; SEND; } -eio_req *eio_msync (void *addr, size_t length, int flags, int pri, eio_cb cb, void *data) +eio_req *eio_msync (void *addr, size_t length, int flags, int pri, eio_cb cb, void *data, eio_channel *channel) { REQ (EIO_MSYNC); req->ptr2 = addr; req->size = length; req->int1 = flags; SEND; } -eio_req *eio_fdatasync (int fd, int pri, eio_cb cb, void *data) +eio_req *eio_fdatasync (int fd, int pri, eio_cb cb, void *data, eio_channel *channel) { REQ (EIO_FDATASYNC); req->int1 = fd; SEND; } -eio_req *eio_syncfs (int fd, int pri, eio_cb cb, void *data) +eio_req *eio_syncfs (int fd, int pri, eio_cb cb, void *data, eio_channel *channel) { REQ (EIO_SYNCFS); req->int1 = fd; SEND; } -eio_req *eio_sync_file_range (int fd, off_t offset, size_t nbytes, unsigned int flags, int pri, eio_cb cb, void *data) +eio_req *eio_sync_file_range (int fd, off_t offset, size_t nbytes, unsigned int flags, int pri, eio_cb cb, void *data, eio_channel *channel) { REQ (EIO_SYNC_FILE_RANGE); req->int1 = fd; req->offs = offset; req->size = nbytes; req->int2 = flags; SEND; } -eio_req *eio_mtouch (void *addr, size_t length, int flags, int pri, eio_cb cb, void *data) +eio_req *eio_mtouch (void *addr, size_t length, int flags, int pri, eio_cb cb, void *data, eio_channel *channel) { REQ (EIO_MTOUCH); req->ptr2 = addr; req->size = length; req->int1 = flags; SEND; } -eio_req *eio_mlock (void *addr, size_t length, int pri, eio_cb cb, void *data) +eio_req *eio_mlock (void *addr, size_t length, int pri, eio_cb cb, void *data, eio_channel *channel) { REQ (EIO_MLOCK); req->ptr2 = addr; req->size = length; SEND; } -eio_req *eio_mlockall (int flags, int pri, eio_cb cb, void *data) +eio_req *eio_mlockall (int flags, int pri, eio_cb cb, void *data, eio_channel *channel) { REQ (EIO_MLOCKALL); req->int1 = flags; SEND; } -eio_req *eio_fallocate (int fd, int mode, off_t offset, size_t len, int pri, eio_cb cb, void *data) +eio_req *eio_fallocate (int fd, int mode, off_t offset, size_t len, int pri, eio_cb cb, void *data, eio_channel *channel) { REQ (EIO_FALLOCATE); req->int1 = fd; req->int2 = mode; req->offs = offset; req->size = len; SEND; } -eio_req *eio_close (int fd, int pri, eio_cb cb, void *data) +eio_req *eio_close (int fd, int pri, eio_cb cb, void *data, eio_channel *channel) { REQ (EIO_CLOSE); req->int1 = fd; SEND; } -eio_req *eio_readahead (int fd, off_t offset, size_t length, int pri, eio_cb cb, void *data) +eio_req *eio_readahead (int fd, off_t offset, size_t length, int pri, eio_cb cb, void *data, eio_channel *channel) { REQ (EIO_READAHEAD); req->int1 = fd; req->offs = offset; req->size = length; SEND; } -eio_req *eio_read (int fd, void *buf, size_t length, off_t offset, int pri, eio_cb cb, void *data) +eio_req *eio_read (int fd, void *buf, size_t length, off_t offset, int pri, eio_cb cb, void *data, eio_channel *channel) { REQ (EIO_READ); req->int1 = fd; req->offs = offset; req->size = length; req->ptr2 = buf; SEND; } -eio_req *eio_write (int fd, void *buf, size_t length, off_t offset, int pri, eio_cb cb, void *data) +eio_req *eio_write (int fd, void *buf, size_t length, off_t offset, int pri, eio_cb cb, void *data, eio_channel *channel) { REQ (EIO_WRITE); req->int1 = fd; req->offs = offset; req->size = length; req->ptr2 = buf; SEND; } -eio_req *eio_fstat (int fd, int pri, eio_cb cb, void *data) +eio_req *eio_fstat (int fd, int pri, eio_cb cb, void *data, eio_channel *channel) { REQ (EIO_FSTAT); req->int1 = fd; SEND; } -eio_req *eio_fstatvfs (int fd, int pri, eio_cb cb, void *data) +eio_req *eio_fstatvfs (int fd, int pri, eio_cb cb, void *data, eio_channel *channel) { REQ (EIO_FSTATVFS); req->int1 = fd; SEND; } -eio_req *eio_futime (int fd, double atime, double mtime, int pri, eio_cb cb, void *data) +eio_req *eio_futime (int fd, double atime, double mtime, int pri, eio_cb cb, void *data, eio_channel *channel) { REQ (EIO_FUTIME); req->int1 = fd; req->nv1 = atime; req->nv2 = mtime; SEND; } -eio_req *eio_ftruncate (int fd, off_t offset, int pri, eio_cb cb, void *data) +eio_req *eio_ftruncate (int fd, off_t offset, int pri, eio_cb cb, void *data, eio_channel *channel) { REQ (EIO_FTRUNCATE); req->int1 = fd; req->offs = offset; SEND; } -eio_req *eio_fchmod (int fd, eio_mode_t mode, int pri, eio_cb cb, void *data) +eio_req *eio_fchmod (int fd, eio_mode_t mode, int pri, eio_cb cb, void *data, eio_channel *channel) { REQ (EIO_FCHMOD); req->int1 = fd; req->int2 = (long)mode; SEND; } -eio_req *eio_fchown (int fd, eio_uid_t uid, eio_gid_t gid, int pri, eio_cb cb, void *data) +eio_req *eio_fchown (int fd, eio_uid_t uid, eio_gid_t gid, int pri, eio_cb cb, void *data, eio_channel *channel) { REQ (EIO_FCHOWN); req->int1 = fd; req->int2 = (long)uid; req->int3 = (long)gid; SEND; } -eio_req *eio_dup2 (int fd, int fd2, int pri, eio_cb cb, void *data) +eio_req *eio_dup2 (int fd, int fd2, int pri, eio_cb cb, void *data, eio_channel *channel) { REQ (EIO_DUP2); req->int1 = fd; req->int2 = fd2; SEND; } -eio_req *eio_sendfile (int out_fd, int in_fd, off_t in_offset, size_t length, int pri, eio_cb cb, void *data) +eio_req *eio_sendfile (int out_fd, int in_fd, off_t in_offset, size_t length, int pri, eio_cb cb, void *data, eio_channel *channel) { REQ (EIO_SENDFILE); req->int1 = out_fd; req->int2 = in_fd; req->offs = in_offset; req->size = length; SEND; } -eio_req *eio_open (const char *path, int flags, eio_mode_t mode, int pri, eio_cb cb, void *data) +eio_req *eio_open (const char *path, int flags, eio_mode_t mode, int pri, eio_cb cb, void *data, eio_channel *channel) { REQ (EIO_OPEN); PATH; req->int1 = flags; req->int2 = (long)mode; SEND; } -eio_req *eio_utime (const char *path, double atime, double mtime, int pri, eio_cb cb, void *data) +eio_req *eio_utime (const char *path, double atime, double mtime, int pri, eio_cb cb, void *data, eio_channel *channel) { REQ (EIO_UTIME); PATH; req->nv1 = atime; req->nv2 = mtime; SEND; } -eio_req *eio_truncate (const char *path, off_t offset, int pri, eio_cb cb, void *data) +eio_req *eio_truncate (const char *path, off_t offset, int pri, eio_cb cb, void *data, eio_channel *channel) { REQ (EIO_TRUNCATE); PATH; req->offs = offset; SEND; } -eio_req *eio_chown (const char *path, eio_uid_t uid, eio_gid_t gid, int pri, eio_cb cb, void *data) +eio_req *eio_chown (const char *path, eio_uid_t uid, eio_gid_t gid, int pri, eio_cb cb, void *data, eio_channel *channel) { REQ (EIO_CHOWN); PATH; req->int2 = (long)uid; req->int3 = (long)gid; SEND; } -eio_req *eio_chmod (const char *path, eio_mode_t mode, int pri, eio_cb cb, void *data) +eio_req *eio_chmod (const char *path, eio_mode_t mode, int pri, eio_cb cb, void *data, eio_channel *channel) { REQ (EIO_CHMOD); PATH; req->int2 = (long)mode; SEND; } -eio_req *eio_mkdir (const char *path, eio_mode_t mode, int pri, eio_cb cb, void *data) +eio_req *eio_mkdir (const char *path, eio_mode_t mode, int pri, eio_cb cb, void *data, eio_channel *channel) { REQ (EIO_MKDIR); PATH; req->int2 = (long)mode; SEND; } static eio_req * -eio__1path (int type, const char *path, int pri, eio_cb cb, void *data) +eio__1path (int type, const char *path, int pri, eio_cb cb, void *data, eio_channel *channel) { REQ (type); PATH; SEND; } -eio_req *eio_readlink (const char *path, int pri, eio_cb cb, void *data) +eio_req *eio_readlink (const char *path, int pri, eio_cb cb, void *data, eio_channel *channel) { - return eio__1path (EIO_READLINK, path, pri, cb, data); + return eio__1path (EIO_READLINK, path, pri, cb, data, channel); } -eio_req *eio_realpath (const char *path, int pri, eio_cb cb, void *data) +eio_req *eio_realpath (const char *path, int pri, eio_cb cb, void *data, eio_channel *channel) { - return eio__1path (EIO_REALPATH, path, pri, cb, data); + return eio__1path (EIO_REALPATH, path, pri, cb, data, channel); } -eio_req *eio_stat (const char *path, int pri, eio_cb cb, void *data) +eio_req *eio_stat (const char *path, int pri, eio_cb cb, void *data, eio_channel *channel) { - return eio__1path (EIO_STAT, path, pri, cb, data); + return eio__1path (EIO_STAT, path, pri, cb, data, channel); } -eio_req *eio_lstat (const char *path, int pri, eio_cb cb, void *data) +eio_req *eio_lstat (const char *path, int pri, eio_cb cb, void *data, eio_channel *channel) { - return eio__1path (EIO_LSTAT, path, pri, cb, data); + return eio__1path (EIO_LSTAT, path, pri, cb, data, channel); } -eio_req *eio_statvfs (const char *path, int pri, eio_cb cb, void *data) +eio_req *eio_statvfs (const char *path, int pri, eio_cb cb, void *data, eio_channel *channel) { - return eio__1path (EIO_STATVFS, path, pri, cb, data); + return eio__1path (EIO_STATVFS, path, pri, cb, data, channel); } -eio_req *eio_unlink (const char *path, int pri, eio_cb cb, void *data) +eio_req *eio_unlink (const char *path, int pri, eio_cb cb, void *data, eio_channel *channel) { - return eio__1path (EIO_UNLINK, path, pri, cb, data); + return eio__1path (EIO_UNLINK, path, pri, cb, data, channel); } -eio_req *eio_rmdir (const char *path, int pri, eio_cb cb, void *data) +eio_req *eio_rmdir (const char *path, int pri, eio_cb cb, void *data, eio_channel *channel) { - return eio__1path (EIO_RMDIR, path, pri, cb, data); + return eio__1path (EIO_RMDIR, path, pri, cb, data, channel); } -eio_req *eio_readdir (const char *path, int flags, int pri, eio_cb cb, void *data) +eio_req *eio_readdir (const char *path, int flags, int pri, eio_cb cb, void *data, eio_channel *channel) { REQ (EIO_READDIR); PATH; req->int1 = flags; SEND; } -eio_req *eio_mknod (const char *path, eio_mode_t mode, dev_t dev, int pri, eio_cb cb, void *data) +eio_req *eio_mknod (const char *path, eio_mode_t mode, dev_t dev, int pri, eio_cb cb, void *data, eio_channel *channel) { REQ (EIO_MKNOD); PATH; req->int2 = (long)mode; req->offs = (off_t)dev; SEND; } static eio_req * -eio__2path (int type, const char *path, const char *new_path, int pri, eio_cb cb, void *data) +eio__2path (int type, const char *path, const char *new_path, int pri, eio_cb cb, void *data, eio_channel *channel) { REQ (type); PATH; @@ -2511,29 +2503,29 @@ eio__2path (int type, const char *path, const char *new_path, int pri, eio_cb cb SEND; } -eio_req *eio_link (const char *path, const char *new_path, int pri, eio_cb cb, void *data) +eio_req *eio_link (const char *path, const char *new_path, int pri, eio_cb cb, void *data, eio_channel *channel) { - return eio__2path (EIO_LINK, path, new_path, pri, cb, data); + return eio__2path (EIO_LINK, path, new_path, pri, cb, data, channel); } -eio_req *eio_symlink (const char *path, const char *new_path, int pri, eio_cb cb, void *data) +eio_req *eio_symlink (const char *path, const char *new_path, int pri, eio_cb cb, void *data, eio_channel *channel) { - return eio__2path (EIO_SYMLINK, path, new_path, pri, cb, data); + return eio__2path (EIO_SYMLINK, path, new_path, pri, cb, data, channel); } -eio_req *eio_rename (const char *path, const char *new_path, int pri, eio_cb cb, void *data) +eio_req *eio_rename (const char *path, const char *new_path, int pri, eio_cb cb, void *data, eio_channel *channel) { - return eio__2path (EIO_RENAME, path, new_path, pri, cb, data); + return eio__2path (EIO_RENAME, path, new_path, pri, cb, data, channel); } -eio_req *eio_custom (void (*execute)(eio_req *), int pri, eio_cb cb, void *data) +eio_req *eio_custom (void (*execute)(eio_req *), int pri, eio_cb cb, void *data, eio_channel *channel) { REQ (EIO_CUSTOM); req->feed = execute; SEND; } #endif -eio_req *eio_grp (eio_cb cb, void *data) +eio_req *eio_grp (eio_cb cb, void *data, eio_channel *channel) { const int pri = EIO_PRI_MAX; diff --git a/deps/uv/src/unix/fs.c b/deps/uv/src/unix/fs.c index 436e54c680d..66155168850 100644 --- a/deps/uv/src/unix/fs.c +++ b/deps/uv/src/unix/fs.c @@ -44,7 +44,7 @@ uv_fs_req_init(loop, req, type, path, cb); \ if (cb) { \ /* async */ \ - req->eio = eiofunc(args, EIO_PRI_DEFAULT, uv__fs_after, req); \ + req->eio = eiofunc(args, EIO_PRI_DEFAULT, uv__fs_after, req, &loop->uv_eio_channel); \ if (!req->eio) { \ uv__set_sys_error(loop, ENOMEM); \ return -1; \ @@ -191,7 +191,7 @@ int uv_fs_open(uv_loop_t* loop, uv_fs_t* req, const char* path, int flags, if (cb) { /* async */ uv_ref(loop); - req->eio = eio_open(path, flags, mode, EIO_PRI_DEFAULT, uv__fs_after, req); + req->eio = eio_open(path, flags, mode, EIO_PRI_DEFAULT, uv__fs_after, req, &loop->uv_eio_channel); if (!req->eio) { uv__set_sys_error(loop, ENOMEM); return -1; @@ -222,7 +222,7 @@ int uv_fs_read(uv_loop_t* loop, uv_fs_t* req, uv_file fd, void* buf, /* async */ uv_ref(loop); req->eio = eio_read(fd, buf, length, offset, EIO_PRI_DEFAULT, - uv__fs_after, req); + uv__fs_after, req, &loop->uv_eio_channel); if (!req->eio) { uv__set_sys_error(loop, ENOMEM); @@ -260,7 +260,7 @@ int uv_fs_write(uv_loop_t* loop, uv_fs_t* req, uv_file file, void* buf, /* async */ uv_ref(loop); req->eio = eio_write(file, buf, length, offset, EIO_PRI_DEFAULT, - uv__fs_after, req); + uv__fs_after, req, &loop->uv_eio_channel); if (!req->eio) { uv__set_sys_error(loop, ENOMEM); return -1; @@ -307,7 +307,7 @@ int uv_fs_readdir(uv_loop_t* loop, uv_fs_t* req, const char* path, int flags, if (cb) { /* async */ uv_ref(loop); - req->eio = eio_readdir(path, flags, EIO_PRI_DEFAULT, uv__fs_after, req); + req->eio = eio_readdir(path, flags, EIO_PRI_DEFAULT, uv__fs_after, req, &loop->uv_eio_channel); if (!req->eio) { uv__set_sys_error(loop, ENOMEM); return -1; @@ -377,7 +377,7 @@ int uv_fs_stat(uv_loop_t* loop, uv_fs_t* req, const char* path, uv_fs_cb cb) { if (cb) { /* async */ uv_ref(loop); - req->eio = eio_stat(pathdup, EIO_PRI_DEFAULT, uv__fs_after, req); + req->eio = eio_stat(pathdup, EIO_PRI_DEFAULT, uv__fs_after, req, &loop->uv_eio_channel); free(pathdup); @@ -411,7 +411,7 @@ int uv_fs_fstat(uv_loop_t* loop, uv_fs_t* req, uv_file file, uv_fs_cb cb) { if (cb) { /* async */ uv_ref(loop); - req->eio = eio_fstat(file, EIO_PRI_DEFAULT, uv__fs_after, req); + req->eio = eio_fstat(file, EIO_PRI_DEFAULT, uv__fs_after, req, &loop->uv_eio_channel); if (!req->eio) { uv__set_sys_error(loop, ENOMEM); @@ -550,7 +550,7 @@ int uv_fs_lstat(uv_loop_t* loop, uv_fs_t* req, const char* path, uv_fs_cb cb) { if (cb) { /* async */ uv_ref(loop); - req->eio = eio_lstat(pathdup, EIO_PRI_DEFAULT, uv__fs_after, req); + req->eio = eio_lstat(pathdup, EIO_PRI_DEFAULT, uv__fs_after, req, &loop->uv_eio_channel); free(pathdup); @@ -598,7 +598,7 @@ int uv_fs_readlink(uv_loop_t* loop, uv_fs_t* req, const char* path, uv_fs_req_init(loop, req, UV_FS_READLINK, path, cb); if (cb) { - if ((req->eio = eio_readlink(path, EIO_PRI_DEFAULT, uv__fs_after, req))) { + if ((req->eio = eio_readlink(path, EIO_PRI_DEFAULT, uv__fs_after, req, &loop->uv_eio_channel))) { uv_ref(loop); return 0; } else { @@ -692,7 +692,7 @@ int uv_queue_work(uv_loop_t* loop, uv_work_t* req, uv_work_cb work_cb, req->work_cb = work_cb; req->after_work_cb = after_work_cb; - req->eio = eio_custom(uv__work, EIO_PRI_DEFAULT, uv__after_work, req); + req->eio = eio_custom(uv__work, EIO_PRI_DEFAULT, uv__after_work, req, &loop->uv_eio_channel); if (!req->eio) { uv__set_sys_error(loop, ENOMEM); diff --git a/deps/uv/src/unix/uv-eio.c b/deps/uv/src/unix/uv-eio.c index 84afe09b747..8656ea6f5a0 100644 --- a/deps/uv/src/unix/uv-eio.c +++ b/deps/uv/src/unix/uv-eio.c @@ -27,16 +27,12 @@ #include -/* TODO remove me! */ -static uv_loop_t* main_loop; - - static void uv_eio_do_poll(uv_idle_t* watcher, int status) { assert(watcher == &(watcher->loop->uv_eio_poller)); /* printf("uv_eio_poller\n"); */ - if (eio_poll() != -1 && uv_is_active((uv_handle_t*) watcher)) { + if (eio_poll(&watcher->loop->uv_eio_channel) != -1 && uv_is_active((uv_handle_t*) watcher)) { /* printf("uv_eio_poller stop\n"); */ uv_idle_stop(watcher); uv_unref(watcher->loop); @@ -52,7 +48,7 @@ static void uv_eio_want_poll_notifier_cb(uv_async_t* watcher, int status) { /* printf("want poll notifier\n"); */ - if (eio_poll() == -1 && !uv_is_active((uv_handle_t*) &loop->uv_eio_poller)) { + if (eio_poll(&watcher->loop->uv_eio_channel) == -1 && !uv_is_active((uv_handle_t*) &loop->uv_eio_poller)) { /* printf("uv_eio_poller start\n"); */ uv_idle_start(&loop->uv_eio_poller, uv_eio_do_poll); uv_ref(loop); @@ -67,7 +63,7 @@ static void uv_eio_done_poll_notifier_cb(uv_async_t* watcher, int revents) { /* printf("done poll notifier\n"); */ - if (eio_poll() != -1 && uv_is_active((uv_handle_t*) &loop->uv_eio_poller)) { + if (eio_poll(&watcher->loop->uv_eio_channel) != -1 && uv_is_active((uv_handle_t*) &loop->uv_eio_poller)) { /* printf("uv_eio_poller stop\n"); */ uv_idle_stop(&loop->uv_eio_poller); uv_unref(loop); @@ -79,7 +75,7 @@ static void uv_eio_done_poll_notifier_cb(uv_async_t* watcher, int revents) { * uv_eio_want_poll() is called from the EIO thread pool each time an EIO * request (that is, one of the node.fs.* functions) has completed. */ -static void uv_eio_want_poll(void) { +static void uv_eio_want_poll(eio_channel *channel) { /* Signal the main thread that eio_poll need to be processed. */ /* @@ -87,25 +83,35 @@ static void uv_eio_want_poll(void) { * uv_eio_want_poll_notifier. */ - uv_async_send(&main_loop->uv_eio_want_poll_notifier); + uv_async_send(&((uv_loop_t *)channel->data)->uv_eio_want_poll_notifier); } -static void uv_eio_done_poll(void) { +static void uv_eio_done_poll(eio_channel *channel) { /* * Signal the main thread that we should stop calling eio_poll(). * from the idle watcher. */ - uv_async_send(&main_loop->uv_eio_done_poll_notifier); + uv_async_send(&((uv_loop_t *)channel->data)->uv_eio_done_poll_notifier); } +static void uv__eio_init(void) { + eio_init(uv_eio_want_poll, uv_eio_done_poll); + /* + * Don't handle more than 10 reqs on each eio_poll(). This is to avoid + * race conditions. See Node's test/simple/test-eio-race.js + */ + eio_set_max_poll_reqs(10); +} + +static uv_once_t uv__eio_init_once_guard = UV_ONCE_INIT; + + void uv_eio_init(uv_loop_t* loop) { if (loop->counters.eio_init == 0) { loop->counters.eio_init++; - main_loop = loop; - uv_idle_init(loop, &loop->uv_eio_poller); uv_idle_start(&loop->uv_eio_poller, uv_eio_do_poll); @@ -118,17 +124,6 @@ void uv_eio_init(uv_loop_t* loop) { uv_eio_done_poll_notifier_cb); uv_unref(loop); - eio_init(uv_eio_want_poll, uv_eio_done_poll); - /* - * Don't handle more than 10 reqs on each eio_poll(). This is to avoid - * race conditions. See Node's test/simple/test-eio-race.js - */ - eio_set_max_poll_reqs(10); - } else { - /* - * If this assertion breaks then Ryan hasn't implemented support for - * receiving thread pool requests back to multiple threads. - */ - assert(main_loop == loop); + uv_once(&uv__eio_init_once_guard, uv__eio_init); } } diff --git a/deps/uv/test/test-list.h b/deps/uv/test/test-list.h index 51b847291ed..f5f05412697 100644 --- a/deps/uv/test/test-list.h +++ b/deps/uv/test/test-list.h @@ -119,6 +119,7 @@ TEST_DECLARE (fs_readdir_empty_dir) TEST_DECLARE (fs_readdir_file) TEST_DECLARE (fs_open_dir) TEST_DECLARE (threadpool_queue_work_simple) +TEST_DECLARE (threadpool_multiple_event_loops) TEST_DECLARE (thread_mutex) TEST_DECLARE (thread_rwlock) TEST_DECLARE (thread_create) @@ -285,6 +286,7 @@ TASK_LIST_START TEST_ENTRY (fs_readdir_file) TEST_ENTRY (fs_open_dir) TEST_ENTRY (threadpool_queue_work_simple) + TEST_ENTRY (threadpool_multiple_event_loops) TEST_ENTRY (thread_mutex) TEST_ENTRY (thread_rwlock) TEST_ENTRY (thread_create) diff --git a/deps/uv/test/test-thread.c b/deps/uv/test/test-thread.c index 48b31b172e0..5c0bb75e61f 100644 --- a/deps/uv/test/test-thread.c +++ b/deps/uv/test/test-thread.c @@ -23,12 +23,123 @@ #include "task.h" #include +#include #include +#define ARRAY_SIZE(a) (sizeof(a) / sizeof((a)[0])) + +#define container_of(ptr, type, member) \ + ((type *) ((char *) (ptr) - offsetof(type, member))) + +struct getaddrinfo_req { + uv_thread_t thread_id; + unsigned int counter; + uv_loop_t* loop; + uv_getaddrinfo_t handle; +}; + + +struct fs_req { + uv_thread_t thread_id; + unsigned int counter; + uv_loop_t* loop; + uv_fs_t handle; +}; + +static void getaddrinfo_do(struct getaddrinfo_req* req); +static void getaddrinfo_cb(uv_getaddrinfo_t* handle, + int status, + struct addrinfo* res); +static void fs_do(struct fs_req* req); +static void fs_cb(uv_fs_t* handle); static volatile int thread_called; +static void getaddrinfo_do(struct getaddrinfo_req* req) { + int r; + + ASSERT(req->thread_id == uv_thread_self()); + + r = uv_getaddrinfo(req->loop, + &req->handle, + getaddrinfo_cb, + "localhost", + NULL, + NULL); + ASSERT(r == 0); +} + + +static void getaddrinfo_cb(uv_getaddrinfo_t* handle, + int status, + struct addrinfo* res) { + struct getaddrinfo_req* req; + + ASSERT(status == 0); + + req = container_of(handle, struct getaddrinfo_req, handle); + uv_freeaddrinfo(res); + + if (--req->counter) + getaddrinfo_do(req); +} + + +static void fs_do(struct fs_req* req) { + int r; + + ASSERT(req->thread_id == uv_thread_self()); + + r = uv_fs_stat(req->loop, &req->handle, ".", fs_cb); + ASSERT(r == 0); +} + + +static void fs_cb(uv_fs_t* handle) { + struct fs_req* req = container_of(handle, struct fs_req, handle); + + if (--req->counter) + fs_do(req); +} + + +static void do_work(void* arg) { + struct getaddrinfo_req getaddrinfo_reqs[16]; + struct fs_req fs_reqs[16]; + uv_thread_t self; + uv_loop_t* loop; + size_t i; + int r; + + self = uv_thread_self(); + + loop = uv_loop_new(); + ASSERT(loop != NULL); + + for (i = 0; i < ARRAY_SIZE(getaddrinfo_reqs); i++) { + struct getaddrinfo_req* req = getaddrinfo_reqs + i; + req->thread_id = self; + req->counter = 16; + req->loop = loop; + getaddrinfo_do(req); + } + + for (i = 0; i < ARRAY_SIZE(fs_reqs); i++) { + struct fs_req* req = fs_reqs + i; + req->thread_id = self; + req->counter = 16; + req->loop = loop; + fs_do(req); + } + + r = uv_run(loop); + ASSERT(r == 0); + + uv_loop_delete(loop); +} + + static void thread_entry(void* arg) { ASSERT(arg == (void *) 42); thread_called++; @@ -56,3 +167,25 @@ TEST_IMPL(thread_self) { tid = uv_thread_self(); return 0; } + + +/* Hilariously bad test name. Run a lot of tasks in the thread pool and verify + * that each "finished" callback is run in its originating thread. + */ +TEST_IMPL(threadpool_multiple_event_loops) { + uv_thread_t threads[8]; + size_t i; + int r; + + for (i = 0; i < ARRAY_SIZE(threads); i++) { + r = uv_thread_create(threads + i, do_work, NULL); + ASSERT(r == 0); + } + + for (i = 0; i < ARRAY_SIZE(threads); i++) { + r = uv_thread_join(threads + i); + ASSERT(r == 0); + } + + return 0; +} diff --git a/lib/child_process.js b/lib/child_process.js index cef093a0b74..b3e6383c752 100644 --- a/lib/child_process.js +++ b/lib/child_process.js @@ -153,6 +153,12 @@ exports.fork = function(modulePath, args, options) { args = args ? args.slice(0) : []; args.unshift(modulePath); + if (options.thread) { + if (!process.features.isolates) { + throw new Error('node compiled without isolate support'); + } + } + if (options.stdinStream) { throw new Error('stdinStream not allowed for fork()'); } diff --git a/node.gyp b/node.gyp index bc351219964..b4a3e461fa8 100644 --- a/node.gyp +++ b/node.gyp @@ -74,6 +74,7 @@ 'src/node.cc', 'src/node_vars.cc', 'src/node_buffer.cc', + 'src/node_isolate.cc', 'src/node_constants.cc', 'src/node_extensions.cc', 'src/node_file.cc', @@ -97,6 +98,7 @@ 'src/node.h', 'src/node_vars.h', 'src/node_buffer.h', + 'src/node_isolate.h', 'src/node_constants.h', 'src/node_crypto.h', 'src/node_extensions.h', @@ -123,11 +125,18 @@ ], 'defines': [ + 'NODE_WANT_INTERNALS=1', 'ARCH="<(target_arch)"', 'PLATFORM="<(OS)"', ], 'conditions': [ + [ 'node_use_isolates=="true"', { + 'defines': [ 'HAVE_ISOLATES=1' ], + }, { + 'defines': [ 'HAVE_ISOLATES=0' ], + }], + [ 'node_use_openssl=="true"', { 'defines': [ 'HAVE_OPENSSL=1' ], 'sources': [ 'src/node_crypto.cc' ], diff --git a/src/cares_wrap.cc b/src/cares_wrap.cc index 89ae4581dad..64e96581b0b 100644 --- a/src/cares_wrap.cc +++ b/src/cares_wrap.cc @@ -22,6 +22,7 @@ #include #include #include +#include #include #include @@ -607,7 +608,7 @@ void AfterGetAddrInfo(uv_getaddrinfo_t* req, int status, struct addrinfo* res) { if (status) { // Error - SetErrno(uv_last_error(uv_default_loop())); + SetErrno(uv_last_error(Loop())); argv[0] = Local::New(Null()); } else { // Success @@ -710,7 +711,7 @@ static Handle GetAddrInfo(const Arguments& args) { hints.ai_family = fam; hints.ai_socktype = SOCK_STREAM; - int r = uv_getaddrinfo(uv_default_loop(), + int r = uv_getaddrinfo(Loop(), &req_wrap->req_, AfterGetAddrInfo, *hostname, @@ -719,7 +720,7 @@ static Handle GetAddrInfo(const Arguments& args) { req_wrap->Dispatched(); if (r) { - SetErrno(uv_last_error(uv_default_loop())); + SetErrno(uv_last_error(Loop())); delete req_wrap; return scope.Close(v8::Null()); } else { @@ -736,7 +737,7 @@ static void Initialize(Handle target) { assert(r == ARES_SUCCESS); struct ares_options options; - uv_ares_init_options(uv_default_loop(), &ares_channel, &options, 0); + uv_ares_init_options(Loop(), &ares_channel, &options, 0); assert(r == 0); NODE_SET_METHOD(target, "queryA", Query); diff --git a/src/fs_event_wrap.cc b/src/fs_event_wrap.cc index 48cdabc00e2..1f99e856a8d 100644 --- a/src/fs_event_wrap.cc +++ b/src/fs_event_wrap.cc @@ -21,6 +21,7 @@ #include #include +#include #include @@ -109,15 +110,15 @@ Handle FSEventWrap::Start(const Arguments& args) { String::Utf8Value path(args[0]->ToString()); - int r = uv_fs_event_init(uv_default_loop(), &wrap->handle_, *path, OnEvent, 0); + int r = uv_fs_event_init(Loop(), &wrap->handle_, *path, OnEvent, 0); if (r == 0) { // Check for persistent argument if (!args[1]->IsTrue()) { - uv_unref(uv_default_loop()); + uv_unref(Loop()); } wrap->initialized_ = true; } else { - SetErrno(uv_last_error(uv_default_loop())); + SetErrno(uv_last_error(Loop())); } return scope.Close(Integer::New(r)); @@ -145,7 +146,7 @@ void FSEventWrap::OnEvent(uv_fs_event_t* handle, const char* filename, // assumption that a rename implicitly means an attribute change. Not too // unreasonable, right? Still, we should revisit this before v1.0. if (status) { - SetErrno(uv_last_error(uv_default_loop())); + SetErrno(uv_last_error(Loop())); eventStr = String::Empty(); } else if (events & UV_RENAME) { diff --git a/src/handle_wrap.cc b/src/handle_wrap.cc index 5b6594a3a90..f661885d9fa 100644 --- a/src/handle_wrap.cc +++ b/src/handle_wrap.cc @@ -21,6 +21,7 @@ #include #include +#include namespace node { @@ -70,7 +71,7 @@ Handle HandleWrap::Unref(const Arguments& args) { } wrap->unref = true; - uv_unref(uv_default_loop()); + uv_unref(Loop()); return v8::Undefined(); } @@ -102,7 +103,6 @@ Handle HandleWrap::Close(const Arguments& args) { assert(!wrap->object_.IsEmpty()); uv_close(wrap->handle__, OnClose); - HandleWrap::Ref(args); wrap->StateChange(); diff --git a/src/ngx-queue.h b/src/ngx-queue.h new file mode 100644 index 00000000000..8c5e461762b --- /dev/null +++ b/src/ngx-queue.h @@ -0,0 +1,106 @@ + +/* + * Copyright (C) Igor Sysoev + */ + + +#ifndef _NGX_QUEUE_H_INCLUDED_ +#define _NGX_QUEUE_H_INCLUDED_ + + +typedef struct ngx_queue_s ngx_queue_t; + +struct ngx_queue_s { + ngx_queue_t *prev; + ngx_queue_t *next; +}; + + +#define ngx_queue_init(q) \ + (q)->prev = q; \ + (q)->next = q + + +#define ngx_queue_empty(h) \ + (h == (h)->prev) + + +#define ngx_queue_insert_head(h, x) \ + (x)->next = (h)->next; \ + (x)->next->prev = x; \ + (x)->prev = h; \ + (h)->next = x + + +#define ngx_queue_insert_after ngx_queue_insert_head + + +#define ngx_queue_insert_tail(h, x) \ + (x)->prev = (h)->prev; \ + (x)->prev->next = x; \ + (x)->next = h; \ + (h)->prev = x + + +#define ngx_queue_head(h) \ + (h)->next + + +#define ngx_queue_last(h) \ + (h)->prev + + +#define ngx_queue_sentinel(h) \ + (h) + + +#define ngx_queue_next(q) \ + (q)->next + + +#define ngx_queue_prev(q) \ + (q)->prev + + +#if (NGX_DEBUG) + +#define ngx_queue_remove(x) \ + (x)->next->prev = (x)->prev; \ + (x)->prev->next = (x)->next; \ + (x)->prev = NULL; \ + (x)->next = NULL + +#else + +#define ngx_queue_remove(x) \ + (x)->next->prev = (x)->prev; \ + (x)->prev->next = (x)->next + +#endif + + +#define ngx_queue_split(h, q, n) \ + (n)->prev = (h)->prev; \ + (n)->prev->next = n; \ + (n)->next = q; \ + (h)->prev = (q)->prev; \ + (h)->prev->next = h; \ + (q)->prev = n; + + +#define ngx_queue_add(h, n) \ + (h)->prev->next = (n)->next; \ + (n)->next->prev = (h)->prev; \ + (h)->prev = (n)->prev; \ + (h)->prev->next = h; + + +#define ngx_queue_data(q, type, link) \ + (type *) ((unsigned char *) q - offsetof(type, link)) + + +#define ngx_queue_foreach(q, h) \ + for ((q) = ngx_queue_head(h); (q) != (h); (q) = ngx_queue_next(q)) + + +#endif /* _NGX_QUEUE_H_INCLUDED_ */ diff --git a/src/node.cc b/src/node.cc index f18f754cd3b..94b8c44680d 100644 --- a/src/node.cc +++ b/src/node.cc @@ -20,6 +20,8 @@ // USE OR OTHER DEALINGS IN THE SOFTWARE. #include +#include +#include #include @@ -97,12 +99,9 @@ extern char **environ; // use the variables as they were being used before. #define check_tick_watcher NODE_VAR(check_tick_watcher) #define code_symbol NODE_VAR(code_symbol) -#define debug_port NODE_VAR(debug_port) -#define debug_wait_connect NODE_VAR(debug_wait_connect) #define emit_symbol NODE_VAR(emit_symbol) #define errno_symbol NODE_VAR(errno_symbol) #define errpath_symbol NODE_VAR(errpath_symbol) -#define eval_string NODE_VAR(eval_string) #define gc_check NODE_VAR(gc_check) #define gc_idle NODE_VAR(gc_idle) #define gc_timer NODE_VAR(gc_timer) @@ -110,11 +109,8 @@ extern char **environ; #define heap_total_symbol NODE_VAR(heap_total_symbol) #define heap_used_symbol NODE_VAR(heap_used_symbol) #define listeners_symbol NODE_VAR(listeners_symbol) -#define max_stack_size NODE_VAR(max_stack_size) #define need_tick_cb NODE_VAR(need_tick_cb) -#define option_end_index NODE_VAR(option_end_index) #define prepare_tick_watcher NODE_VAR(prepare_tick_watcher) -#define print_eval NODE_VAR(print_eval) #define process NODE_VAR(process) #define rss_symbol NODE_VAR(rss_symbol) #define syscall_symbol NODE_VAR(syscall_symbol) @@ -123,7 +119,6 @@ extern char **environ; #define tick_time_head NODE_VAR(tick_time_head) #define tick_times NODE_VAR(tick_times) #define uncaught_exception_symbol NODE_VAR(uncaught_exception_symbol) -#define use_debug_agent NODE_VAR(use_debug_agent) #define use_npn NODE_VAR(use_npn) #define use_sni NODE_VAR(use_sni) #define uncaught_exception_counter NODE_VAR(uncaught_exception_counter) @@ -136,13 +131,27 @@ extern char **environ; namespace node { +#define TICK_TIME(n) tick_times[(tick_time_head - (n)) % RPM_SAMPLES] +static int option_end_index; +static unsigned long max_stack_size; +static unsigned short debug_port = 5858; +static bool debug_wait_connect; +static bool use_debug_agent; +static const char* eval_string; +static bool print_eval; -#define TICK_TIME(n) tick_times[(tick_time_head - (n)) % RPM_SAMPLES] +static void CheckStatus(uv_timer_t* watcher, int status); +uv_loop_t* Loop() { +#if defined(HAVE_ISOLATES) && HAVE_ISOLATES + return Isolate::GetCurrent()->GetLoop(); +#else + return uv_default_loop(); +#endif +} -static void CheckStatus(uv_timer_t* watcher, int status); static void StartGCTimer () { if (!uv_is_active((uv_handle_t*) &gc_timer)) { @@ -170,7 +179,7 @@ static void Idle(uv_idle_t* watcher, int status) { static void Check(uv_check_t* watcher, int status) { assert(watcher == &gc_check); - tick_times[tick_time_head] = uv_now(uv_default_loop()); + tick_times[tick_time_head] = uv_now(Loop()); tick_time_head = (tick_time_head + 1) % RPM_SAMPLES; StartGCTimer(); @@ -200,7 +209,7 @@ static void Tick(void) { need_tick_cb = false; if (uv_is_active((uv_handle_t*) &tick_spinner)) { uv_idle_stop(&tick_spinner); - uv_unref(uv_default_loop()); + uv_unref(Loop()); } HandleScope scope; @@ -242,7 +251,7 @@ static Handle NeedTickCallback(const Arguments& args) { // tick_spinner to keep the event loop alive long enough to handle it. if (!uv_is_active((uv_handle_t*) &tick_spinner)) { uv_idle_start(&tick_spinner, Spin); - uv_ref(uv_default_loop()); + uv_ref(Loop()); } return Undefined(); } @@ -1494,7 +1503,7 @@ static void CheckStatus(uv_timer_t* watcher, int status) { } } - double d = uv_now(uv_default_loop()) - TICK_TIME(3); + double d = uv_now(Loop()) - TICK_TIME(3); //printfb("timer d = %f\n", d); @@ -1523,7 +1532,7 @@ static Handle Uptime(const Arguments& args) { v8::Handle UVCounters(const v8::Arguments& args) { HandleScope scope; - uv_counters_t* c = &uv_default_loop()->counters; + uv_counters_t* c = &Loop()->counters; Local obj = Object::New(); @@ -1977,6 +1986,15 @@ static Handle GetFeatures() { obj->Set(String::NewSymbol("tls"), Boolean::New(get_builtin_module("crypto") != NULL)); + + obj->Set(String::NewSymbol("isolates"), +#if HAVE_ISOLATES + True() +#else + False() +#endif + ); + return scope.Close(obj); } @@ -1993,7 +2011,6 @@ Handle SetupProcessObject(int argc, char *argv[]) { process = Persistent::New(process_template->GetFunction()->NewInstance()); - process->SetAccessor(String::New("title"), ProcessTitleGetter, ProcessTitleSetter); @@ -2317,6 +2334,7 @@ static void EnableDebug(bool wait_connect) { #ifdef __POSIX__ +// FIXME this is positively unsafe with isolates/threads static void EnableDebugSignalHandler(int signal) { // Break once process will return execution to v8 v8::Debug::DebugBreak(node_isolate); @@ -2519,10 +2537,7 @@ static Handle DebugPause(const Arguments& args) { } -char** Init(int argc, char *argv[]) { - // Initialize prog_start_time to get relative uptime. - uv_uptime(&prog_start_time); - +char** ProcessInit(int argc, char *argv[]) { // Hack aroung with the argv pointer. Used for process.title = "blah". argv = uv_setup_args(argc, argv); @@ -2562,34 +2577,61 @@ char** Init(int argc, char *argv[]) { #ifdef __POSIX__ // Ignore SIGPIPE RegisterSignalHandler(SIGPIPE, SIG_IGN); + // TODO decide whether to handle these signals per-process or per-thread RegisterSignalHandler(SIGINT, SignalExit); RegisterSignalHandler(SIGTERM, SignalExit); #endif // __POSIX__ - uv_prepare_init(uv_default_loop(), &prepare_tick_watcher); + return argv; +} + + +void EmitExit(v8::Handle process_l) { + // process.emit('exit') + Local emit_v = process_l->Get(String::New("emit")); + assert(emit_v->IsFunction()); + Local emit = Local::Cast(emit_v); + Local args[] = { String::New("exit") }; + TryCatch try_catch; + emit->Call(process_l, 1, args); + if (try_catch.HasCaught()) { + FatalException(try_catch); + } +} + + +// Create a new isolate with node::Isolate::New() before you call this function +void StartThread(node::Isolate* isolate, + int argc, + char** argv) { + HandleScope scope; + + assert(node::Isolate::GetCurrent() == isolate); + + uv_loop_t* loop = isolate->GetLoop(); + uv_prepare_init(loop, &prepare_tick_watcher); uv_prepare_start(&prepare_tick_watcher, PrepareTick); - uv_unref(uv_default_loop()); + uv_unref(loop); - uv_check_init(uv_default_loop(), &check_tick_watcher); + uv_check_init(loop, &check_tick_watcher); uv_check_start(&check_tick_watcher, node::CheckTick); - uv_unref(uv_default_loop()); + uv_unref(loop); - uv_idle_init(uv_default_loop(), &tick_spinner); - uv_unref(uv_default_loop()); + uv_idle_init(loop, &tick_spinner); + uv_unref(loop); - uv_check_init(uv_default_loop(), &gc_check); + uv_check_init(loop, &gc_check); uv_check_start(&gc_check, node::Check); - uv_unref(uv_default_loop()); + uv_unref(loop); - uv_idle_init(uv_default_loop(), &gc_idle); - uv_unref(uv_default_loop()); + uv_idle_init(loop, &gc_idle); + uv_unref(loop); - uv_timer_init(uv_default_loop(), &gc_timer); - uv_unref(uv_default_loop()); + uv_timer_init(loop, &gc_timer); + uv_unref(loop); V8::SetFatalErrorHandler(node::OnFatalError); - // Set the callback DebugMessageDispatch which is called from the debug // thread. v8::Debug::SetDebugMessageDispatchHandler(node::DebugMessageDispatch); @@ -2598,13 +2640,13 @@ char** Init(int argc, char *argv[]) { // main thread to execute a random bit of javascript - which will give V8 // control so it can handle whatever new message had been received on the // debug thread. - uv_async_init(uv_default_loop(), &debug_watcher, node::DebugMessageCallback); + uv_async_init(loop, &debug_watcher, node::DebugMessageCallback); // unref it so that we exit the event loop despite it being active. - uv_unref(uv_default_loop()); + uv_unref(loop); // Fetch a reference to the main isolate, so we have a reference to it // even when we need it to access it from another (debugger) thread. - node_isolate = Isolate::GetCurrent(); + node_isolate = v8::Isolate::GetCurrent(); // If the --debug flag was specified then initialize the debug thread. if (use_debug_agent) { @@ -2617,37 +2659,16 @@ char** Init(int argc, char *argv[]) { #endif // __POSIX__ } - return argv; -} - - -void EmitExit(v8::Handle process_l) { - // process.emit('exit') - Local emit_v = process_l->Get(String::New("emit")); - assert(emit_v->IsFunction()); - Local emit = Local::Cast(emit_v); - Local args[] = { String::New("exit") }; - TryCatch try_catch; - emit->Call(process_l, 1, args); - if (try_catch.HasCaught()) { - FatalException(try_catch); - } -} - - -int Start(int argc, char *argv[]) { - // This needs to run *before* V8::Initialize() - argv = Init(argc, argv); + Handle process_l = SetupProcessObject(argc, argv); - v8::V8::Initialize(); - v8::HandleScope handle_scope; + process_l->Set(String::NewSymbol("tid"), + Integer::NewFromUnsigned(isolate->id_)); - // Create the one and only Context. - Persistent context = v8::Context::New(); - v8::Context::Scope context_scope(context); + // FIXME crashes with "CHECK(heap->isolate() == Isolate::Current()) failed" + //v8_typed_array::AttachBindings(v8::Context::GetCurrent()->Global()); - Handle process_l = SetupProcessObject(argc, argv); - v8_typed_array::AttachBindings(context->Global()); + // Initialize prog_start_time to get relative uptime. + uv_uptime(&prog_start_time); // Create all the objects, load modules, do everything. // so your next reading stop should be node::Load()! @@ -2658,13 +2679,36 @@ int Start(int argc, char *argv[]) { // there are no watchers on the loop (except for the ones that were // uv_unref'd) then this function exits. As long as there are active // watchers, it blocks. - uv_run(uv_default_loop()); + uv_run(loop); EmitExit(process_l); +} + + +int Start(int argc, char *argv[]) { + // This needs to run *before* V8::Initialize() + argv = ProcessInit(argc, argv); + + v8::V8::Initialize(); + v8::HandleScope handle_scope; + + // Get the id of the this, the main, thread. + uv_thread_t tid = uv_thread_self(); + + // Create the main node::Isolate object + node::Isolate::Initialize(); + Isolate* isolate = new node::Isolate(); + isolate->tid_ = tid; + isolate->Enter(); + StartThread(isolate, argc, argv); + isolate->Dispose(); + + // The main thread/isolate is done. Wait for all other thread/isolates to + // finish. + node::Isolate::JoinAll(); #ifndef NDEBUG // Clean up. - context.Dispose(); V8::Dispose(); #endif // NDEBUG diff --git a/src/node.h b/src/node.h index 38d3c621bfa..f17c3fcf815 100644 --- a/src/node.h +++ b/src/node.h @@ -66,20 +66,8 @@ #include -#ifndef offset_of -// g++ in strict mode complains loudly about the system offsetof() macro -// because it uses NULL as the base address. -#define offset_of(type, member) \ - ((intptr_t) ((char *) &(((type *) 8)->member) - 8)) -#endif - -#ifndef container_of -#define container_of(ptr, type, member) \ - ((type *) ((char *) (ptr) - offset_of(type, member))) -#endif - -#ifndef ARRAY_SIZE -#define ARRAY_SIZE(a) (sizeof((a)) / sizeof((a)[0])) +#if NODE_WANT_INTERNALS +# include "node_internals.h" #endif #ifndef NODE_STRINGIFY @@ -96,6 +84,10 @@ v8::Handle SetupProcessObject(int argc, char *argv[]); void Load(v8::Handle process); void EmitExit(v8::Handle process); +// Returns the loop for the current isolate. If compiled with +// --without-isolates then this will always return uv_default_loop(); +uv_loop_t* Loop(); + #define NODE_PSYMBOL(s) \ v8::Persistent::New(v8::String::NewSymbol(s)) diff --git a/src/node_crypto.cc b/src/node_crypto.cc index eb040ebc8c8..bcd3949ec07 100644 --- a/src/node_crypto.cc +++ b/src/node_crypto.cc @@ -24,6 +24,7 @@ #include #include +#include #include #include @@ -4117,7 +4118,8 @@ PBKDF2(const Arguments& args) { req = new uv_work_t(); req->data = request; - uv_queue_work(uv_default_loop(), req, EIO_PBKDF2, EIO_PBKDF2After); + uv_queue_work(Loop(), req, EIO_PBKDF2, EIO_PBKDF2After); + return Undefined(); err: @@ -4239,7 +4241,7 @@ Handle RandomBytes(const Arguments& args) { Local callback_v = Local(Function::Cast(*args[1])); req->callback_ = Persistent::New(callback_v); - uv_queue_work(uv_default_loop(), + uv_queue_work(Loop(), &req->work_req_, RandomBytesWork, RandomBytesAfter); diff --git a/src/node_extensions.h b/src/node_extensions.h index 39ddf1748fa..1d5b1322386 100644 --- a/src/node_extensions.h +++ b/src/node_extensions.h @@ -34,6 +34,10 @@ NODE_EXT_LIST_ITEM(node_signal_watcher) NODE_EXT_LIST_ITEM(node_os) NODE_EXT_LIST_ITEM(node_zlib) +#if defined(HAVE_ISOLATES) && HAVE_ISOLATES +NODE_EXT_LIST_ITEM(node_isolates) +#endif + // libuv rewrite NODE_EXT_LIST_ITEM(node_timer_wrap) NODE_EXT_LIST_ITEM(node_tcp_wrap) diff --git a/src/node_file.cc b/src/node_file.cc index 4913b699c38..f7d2d87ed11 100644 --- a/src/node_file.cc +++ b/src/node_file.cc @@ -22,6 +22,7 @@ #include "node.h" #include "node_file.h" #include "node_buffer.h" +#include #ifdef __POSIX__ # include "node_stat_watcher.h" #endif @@ -225,7 +226,7 @@ struct fs_req_wrap { #define ASYNC_CALL(func, callback, ...) \ FSReqWrap* req_wrap = new FSReqWrap(); \ - int r = uv_fs_##func(uv_default_loop(), &req_wrap->req_, \ + int r = uv_fs_##func(Loop(), &req_wrap->req_, \ __VA_ARGS__, After); \ assert(r == 0); \ req_wrap->object_->Set(oncomplete_sym, callback); \ @@ -234,9 +235,9 @@ struct fs_req_wrap { #define SYNC_CALL(func, path, ...) \ fs_req_wrap req_wrap; \ - int result = uv_fs_##func(uv_default_loop(), &req_wrap.req, __VA_ARGS__, NULL); \ + int result = uv_fs_##func(Loop(), &req_wrap.req, __VA_ARGS__, NULL); \ if (result < 0) { \ - int code = uv_last_error(uv_default_loop()).code; \ + int code = uv_last_error(Loop()).code; \ return ThrowException(UVException(code, #func, "", path)); \ } diff --git a/src/node_internals.h b/src/node_internals.h new file mode 100644 index 00000000000..08dbcb1f439 --- /dev/null +++ b/src/node_internals.h @@ -0,0 +1,106 @@ +// Copyright Joyent, Inc. and other Node contributors. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the +// "Software"), to deal in the Software without restriction, including +// without limitation the rights to use, copy, modify, merge, publish, +// distribute, sublicense, and/or sell copies of the Software, and to permit +// persons to whom the Software is furnished to do so, subject to the +// following conditions: +// +// The above copyright notice and this permission notice shall be included +// in all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF +// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN +// NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, +// DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR +// OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE +// USE OR OTHER DEALINGS IN THE SOFTWARE. + +#ifndef SRC_NODE_INTERNALS_H_ +#define SRC_NODE_INTERNALS_H_ + +namespace node { + +// This function starts an Isolate. This function is defined in node.cc +// currently so that we minimize the diff between master and v0.6 for easy +// merging. In the future, when v0.6 is extinct, StartThread should be moved +// to node_isolate.cc. +class Isolate; +void StartThread(Isolate* isolate, int argc, char** argv); + +#ifndef offset_of +// g++ in strict mode complains loudly about the system offsetof() macro +// because it uses NULL as the base address. +#define offset_of(type, member) \ + ((intptr_t) ((char *) &(((type *) 8)->member) - 8)) +#endif + +#ifndef container_of +#define container_of(ptr, type, member) \ + ((type *) ((char *) (ptr) - offset_of(type, member))) +#endif + +#ifndef ARRAY_SIZE +#define ARRAY_SIZE(a) (sizeof((a)) / sizeof((a)[0])) +#endif + +// +// isolates support +// +#if HAVE_ISOLATES + +# if _WIN32 +# define THREAD __declspec(thread) +# else +# define THREAD __thread +# endif + +# define TLS(type, name) THREAD type* __tls_##name +# define VAR(name) (*__tls_##name) +# define EMPTY(name) (__tls_##name == NULL) +# define ASSIGN(name, val) ((__tls_##name) = P(val)) + +# define LAZY_ASSIGN(name, val) \ + do if (!__tls_##name) ((__tls_##name) = P(val)); while (0) + +template inline v8::Persistent* P(v8::Handle v) +{ + return new v8::Persistent(v8::Persistent::New(v)); +} + +inline v8::Persistent* P(const char* symbol) +{ + return new v8::Persistent( + v8::Persistent::New( + v8::String::NewSymbol(symbol))); +} + +#else // !HAVE_ISOLATES + +# define THREAD /* nothing */ +# define TLS(type, name) type name +# define VAR(name) (name) +# define EMPTY(name) ((name).IsEmpty()) +# define ASSIGN(name, val) ((name) = P(val)) + +# define LAZY_ASSIGN(name, val) \ + do if ((name).IsEmpty()) (name) = P(val); while (0) + +template inline v8::Persistent P(v8::Handle v) +{ + return v8::Persistent(v); +} + +inline v8::Persistent P(const char* symbol) +{ + return v8::Persistent::New( + v8::String::NewSymbol(symbol)); +} +#endif // HAVE_ISOLATES + +} // namespace node + +#endif // SRC_NODE_INTERNALS_H_ diff --git a/src/node_isolate.cc b/src/node_isolate.cc new file mode 100644 index 00000000000..a05cbea3686 --- /dev/null +++ b/src/node_isolate.cc @@ -0,0 +1,278 @@ +// Copyright Joyent, Inc. and other Node contributors. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the +// "Software"), to deal in the Software without restriction, including +// without limitation the rights to use, copy, modify, merge, publish, +// distribute, sublicense, and/or sell copies of the Software, and to permit +// persons to whom the Software is furnished to do so, subject to the +// following conditions: +// +// The above copyright notice and this permission notice shall be included +// in all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF +// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN +// NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, +// DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR +// OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE +// USE OR OTHER DEALINGS IN THE SOFTWARE. + +#include +#include +#include +#include + +#include +#include +#include + + +namespace node { + +using v8::Arguments; +using v8::Array; +using v8::False; +using v8::Handle; +using v8::HandleScope; +using v8::Integer; +using v8::Local; +using v8::Null; +using v8::Object; +using v8::ObjectTemplate; +using v8::String; +using v8::True; +using v8::Value; + +static char magic_isolate_cookie_[] = "magic isolate cookie"; + + +static volatile bool initialized; +static volatile int id; +static volatile int isolate_count; +static uv_mutex_t list_lock; +static ngx_queue_t list_head; + + +void Isolate::Initialize() { + if (!initialized) { + initialized = true; + if (uv_mutex_init(&list_lock)) abort(); + ngx_queue_init(&list_head); + } +} + + +int Isolate::Count() { + return isolate_count; +} + + +void Isolate::JoinAll() { + uv_mutex_lock(&list_lock); + + while (ngx_queue_empty(&list_head) == false) { + ngx_queue_t* q = ngx_queue_head(&list_head); + assert(q); + Isolate* isolate = ngx_queue_data(q, Isolate, list_member_); + assert(isolate); + + // Unlock the list while we join the thread. + uv_mutex_unlock(&list_lock); + + uv_thread_join(&isolate->tid_); + + // Relock to check the next element in the list. + uv_mutex_lock(&list_lock); + } + + // Unlock the list finally. + uv_mutex_unlock(&list_lock); +} + + +Isolate::Isolate() { + uv_mutex_lock(&list_lock); + + assert(initialized && "node::Isolate::Initialize() hasn't been called"); + + id_ = ++id; + + if (id_ == 1) { + loop_ = uv_default_loop(); + } else { + loop_ = uv_loop_new(); + } + + ngx_queue_init(&at_exit_callbacks_); + + ngx_queue_init(&list_member_); + + // Add this isolate into the list of all isolates. + ngx_queue_insert_tail(&list_head, &list_member_); + isolate_count++; + + uv_mutex_unlock(&list_lock); + + v8_isolate_ = v8::Isolate::New(); + assert(v8_isolate_->GetData() == NULL); + v8_isolate_->SetData(this); + + globals_init_ = false; +} + + +struct globals* Isolate::Globals() { + return &globals_; +} + + +void Isolate::AtExit(AtExitCallback callback, void* arg) { + struct AtExitCallbackInfo* it = new AtExitCallbackInfo; + + NODE_ISOLATE_CHECK(this); + + it->callback_ = callback; + it->arg_ = arg; + + ngx_queue_insert_head(&at_exit_callbacks_, &it->at_exit_callbacks_); +} + + +void Isolate::Enter() { + v8_isolate_->Enter(); + + if (v8_context_.IsEmpty()) { + v8_context_ = v8::Context::New(); + } + v8_context_->Enter(); + + if (!globals_init_) { + globals_init_ = true; + globals_init(&globals_); + } + + NODE_ISOLATE_CHECK(this); +} + + +void Isolate::Dispose() { + uv_mutex_lock(&list_lock); + + NODE_ISOLATE_CHECK(this); + + struct AtExitCallbackInfo* it; + ngx_queue_t* q; + + + assert(v8_context_->InContext()); + v8_context_->Exit(); + v8_context_.Clear(); + v8_context_.Dispose(); + + v8_isolate_->Exit(); + v8_isolate_->Dispose(); + v8_isolate_ = NULL; + + ngx_queue_remove(&list_member_); + isolate_count--; + assert(isolate_count >= 0); + assert(isolate_count > 0 || ngx_queue_empty(&list_head)); + + uv_mutex_unlock(&list_lock); +} + + +static void RunIsolate(void* arg) { + node::Isolate* isolate = reinterpret_cast(arg); + isolate->Enter(); + + // TODO in the future when v0.6 is dead, move StartThread and related + // handles into node_isolate.cc. It is currently organized like this to + // minimize diff (and thus merge conflicts) between the legacy v0.6 + // branch. + StartThread(isolate, isolate->argc_, isolate->argv_); + + isolate->Dispose(); + delete isolate; +} + + +static Handle CreateIsolate(const Arguments& args) { + HandleScope scope; + + assert(args[0]->IsArray()); + + Local argv = args[0].As(); + assert(argv->Length() >= 2); + + // Note that isolate lock is aquired in the constructor here. It will not + // be unlocked until RunIsolate starts and calls isolate->Enter(). + Isolate* isolate = new node::Isolate(); + + // Copy over arguments into isolate + isolate->argc_ = argv->Length(); + isolate->argv_ = new char*[isolate->argc_ + 1]; + for (int i = 0; i < isolate->argc_; ++i) { + String::Utf8Value str(argv->Get(i)); + size_t size = 1 + strlen(*str); + isolate->argv_[i] = new char[size]; + memcpy(isolate->argv_[i], *str, size); + } + isolate->argv_[isolate->argc_] = NULL; + + if (uv_thread_create(&isolate->tid_, RunIsolate, isolate)) { + delete isolate; + return Null(); + } + + // TODO instead of ObjectTemplate - have a special wrapper. + Local tpl = ObjectTemplate::New(); + tpl->SetInternalFieldCount(2); + + Local obj = tpl->NewInstance(); + obj->SetPointerInInternalField(0, magic_isolate_cookie_); + obj->SetPointerInInternalField(1, isolate); + + return scope.Close(obj); +} + + +static Handle CountIsolate(const Arguments& args) { + HandleScope scope; + return scope.Close(Integer::New(Isolate::Count())); +} + + +static Handle JoinIsolate(const Arguments& args) { + HandleScope scope; + + assert(args[0]->IsObject()); + + Local obj = args[0]->ToObject(); + assert(obj->InternalFieldCount() == 2); + assert(obj->GetPointerFromInternalField(0) == magic_isolate_cookie_); + + Isolate* ti = reinterpret_cast( + obj->GetPointerFromInternalField(1)); + + if (uv_thread_join(&ti->tid_)) + return False(); // error + else + return True(); // ok +} + + +void InitIsolates(Handle target) { + HandleScope scope; + NODE_SET_METHOD(target, "create", CreateIsolate); + NODE_SET_METHOD(target, "count", CountIsolate); + NODE_SET_METHOD(target, "join", JoinIsolate); +} + + +} // namespace node + + +NODE_MODULE(node_isolates, node::InitIsolates) diff --git a/src/node_isolate.h b/src/node_isolate.h new file mode 100644 index 00000000000..a80e6ac1d2d --- /dev/null +++ b/src/node_isolate.h @@ -0,0 +1,124 @@ +// Copyright Joyent, Inc. and other Node contributors. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the +// "Software"), to deal in the Software without restriction, including +// without limitation the rights to use, copy, modify, merge, publish, +// distribute, sublicense, and/or sell copies of the Software, and to permit +// persons to whom the Software is furnished to do so, subject to the +// following conditions: +// +// The above copyright notice and this permission notice shall be included +// in all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF +// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN +// NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, +// DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR +// OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE +// USE OR OTHER DEALINGS IN THE SOFTWARE. + +#ifndef SRC_NODE_ISOLATE_H_ +#define SRC_NODE_ISOLATE_H_ + +#include "v8.h" +#include "uv.h" +#include "node_vars.h" +#include "ngx-queue.h" + +#ifdef NDEBUG +# define NODE_ISOLATE_CHECK(ptr) ((void) (ptr)) +#else +# include +# define NODE_ISOLATE_CHECK(ptr) \ + do { \ + node::Isolate* data_ = node::Isolate::GetCurrent(); \ + assert(data_ == (ptr)); \ + } \ + while (0) +#endif + + +namespace node { + +class Isolate { +public: + char** argv_; + int argc_; + uv_thread_t tid_; + + // Call this before instantiating any Isolate + static void Initialize(); + static int Count(); + + typedef void (*AtExitCallback)(void* arg); + + static void JoinAll(); + + static Isolate* GetCurrent() { + return reinterpret_cast(v8::Isolate::GetCurrent()->GetData()); + } + + uv_loop_t* GetLoop() { + NODE_ISOLATE_CHECK(this); + return loop_; + } + + v8::Isolate* GetV8Isolate() { + NODE_ISOLATE_CHECK(this); + return v8_isolate_; + } + + v8::Handle GetV8Context() { + NODE_ISOLATE_CHECK(this); + return v8_context_; + } + + /* Register a handler that should run when the current isolate exits. + * Handlers run in LIFO order. + */ + void AtExit(AtExitCallback callback, void *arg); + + struct globals* Globals(); + + unsigned int id_; + + // This constructor is used for every non-main thread + Isolate(); + + ~Isolate() { + if (argv_) { + delete argv_; + } + } + + void Enter(); + + /* Shutdown the isolate. Call this method at thread death. */ + void Dispose(); + +private: + + struct AtExitCallbackInfo { + ngx_queue_t at_exit_callbacks_; + AtExitCallback callback_; + void* arg_; + }; + + ngx_queue_t at_exit_callbacks_; + v8::Persistent v8_context_; + v8::Isolate* v8_isolate_; + uv_loop_t* loop_; + + // Each isolate is a member of the static list_head. + ngx_queue_t list_member_; + + // Global variables for this isolate. + struct globals globals_; + bool globals_init_; +}; + +} // namespace node + +#endif // SRC_NODE_ISOLATE_H_ diff --git a/src/node_vars.cc b/src/node_vars.cc index 28718042d0a..01bf4067e5c 100644 --- a/src/node_vars.cc +++ b/src/node_vars.cc @@ -1,4 +1,5 @@ #include +#include #if HAVE_OPENSSL # include #endif @@ -9,13 +10,8 @@ namespace node { // For now we just statically initialize the globals structure. Later there // will be one struct globals for each isolate. -static struct globals g_struct; -static struct globals* g_ptr; - - -static void globals_init(struct globals* g) { +void globals_init(struct globals* g) { memset(g, 0, sizeof(struct globals)); - g->debug_port = 5858; #ifdef OPENSSL_NPN_NEGOTIATED g->use_npn = true; @@ -31,6 +27,15 @@ static void globals_init(struct globals* g) { } +#if HAVE_ISOLATES +struct globals* globals_get() { + node::Isolate* isolate = node::Isolate::GetCurrent(); + return isolate->Globals(); +} +#else +static struct globals g_struct; +static struct globals* g_ptr; + struct globals* globals_get() { if (!g_ptr) { g_ptr = &g_struct; @@ -38,5 +43,6 @@ struct globals* globals_get() { } return g_ptr; } +#endif // HAVE_ISOLATES } // namespace node diff --git a/src/node_vars.h b/src/node_vars.h index 9b2dd8e8bc8..dbd362bb906 100644 --- a/src/node_vars.h +++ b/src/node_vars.h @@ -31,13 +31,6 @@ struct globals { v8::Persistent listeners_symbol; v8::Persistent uncaught_exception_symbol; v8::Persistent emit_symbol; - bool print_eval; - char *eval_string; - int option_end_index; - bool use_debug_agent; - bool debug_wait_connect; - int debug_port; - int max_stack_size; uv_check_t check_tick_watcher; uv_prepare_t prepare_tick_watcher; uv_idle_t tick_spinner; @@ -183,6 +176,11 @@ struct globals { ::ares_channel ares_channel; }; +// Initialize globals struct. +void globals_init(struct globals*); + +// Get the globals struct for the current Isolate. The returned pointer is +// already initialized. struct globals* globals_get(); } // namespace node diff --git a/src/node_zlib.cc b/src/node_zlib.cc index e16092f9285..228427b6400 100644 --- a/src/node_zlib.cc +++ b/src/node_zlib.cc @@ -29,6 +29,7 @@ #include #include +#include #include #include @@ -133,7 +134,7 @@ template class ZCtx : public ObjectWrap { uv_work_t* work_req = new uv_work_t(); work_req->data = req_wrap; - uv_queue_work(uv_default_loop(), + uv_queue_work(Loop(), work_req, ZCtx::Process, ZCtx::After); diff --git a/src/pipe_wrap.cc b/src/pipe_wrap.cc index 1870837bcae..c33de3858f6 100644 --- a/src/pipe_wrap.cc +++ b/src/pipe_wrap.cc @@ -21,6 +21,7 @@ #include #include +#include #include #include #include @@ -123,7 +124,7 @@ Handle PipeWrap::New(const Arguments& args) { PipeWrap::PipeWrap(Handle object, bool ipc) : StreamWrap(object, (uv_stream_t*) &handle_) { - int r = uv_pipe_init(uv_default_loop(), &handle_, ipc); + int r = uv_pipe_init(Loop(), &handle_, ipc); assert(r == 0); // How do we proxy this error up to javascript? // Suggestion: uv_pipe_init() returns void. handle_.data = reinterpret_cast(this); @@ -141,7 +142,7 @@ Handle PipeWrap::Bind(const Arguments& args) { int r = uv_pipe_bind(&wrap->handle_, *name); // Error starting the pipe. - if (r) SetErrno(uv_last_error(uv_default_loop())); + if (r) SetErrno(uv_last_error(Loop())); return scope.Close(Integer::New(r)); } @@ -172,7 +173,7 @@ Handle PipeWrap::Listen(const Arguments& args) { int r = uv_listen((uv_stream_t*)&wrap->handle_, backlog, OnConnection); // Error starting the pipe. - if (r) SetErrno(uv_last_error(uv_default_loop())); + if (r) SetErrno(uv_last_error(Loop())); return scope.Close(Integer::New(r)); } @@ -225,7 +226,7 @@ void PipeWrap::AfterConnect(uv_connect_t* req, int status) { assert(wrap->object_.IsEmpty() == false); if (status) { - SetErrno(uv_last_error(uv_default_loop())); + SetErrno(uv_last_error(Loop())); } Local argv[3] = { diff --git a/src/process_wrap.cc b/src/process_wrap.cc index 7a6a8518a2d..847a76352f8 100644 --- a/src/process_wrap.cc +++ b/src/process_wrap.cc @@ -21,6 +21,7 @@ #include #include +#include #include #include #include @@ -175,7 +176,7 @@ class ProcessWrap : public HandleWrap { Get(String::NewSymbol("windowsVerbatimArguments"))->IsTrue(); #endif - int r = uv_spawn(uv_default_loop(), &wrap->process_, options); + int r = uv_spawn(Loop(), &wrap->process_, options); wrap->SetHandle((uv_handle_t*)&wrap->process_); assert(wrap->process_.data == wrap); @@ -195,7 +196,7 @@ class ProcessWrap : public HandleWrap { delete [] options.env; } - if (r) SetErrno(uv_last_error(uv_default_loop())); + if (r) SetErrno(uv_last_error(Loop())); return scope.Close(Integer::New(r)); } @@ -209,7 +210,7 @@ class ProcessWrap : public HandleWrap { int r = uv_process_kill(&wrap->process_, signal); - if (r) SetErrno(uv_last_error(uv_default_loop())); + if (r) SetErrno(uv_last_error(Loop())); return scope.Close(Integer::New(r)); } diff --git a/src/stream_wrap.cc b/src/stream_wrap.cc index 7193f77c126..a64b4c356c5 100644 --- a/src/stream_wrap.cc +++ b/src/stream_wrap.cc @@ -21,6 +21,7 @@ #include #include +#include #include #include #include @@ -132,7 +133,7 @@ Handle StreamWrap::ReadStart(const Arguments& args) { } // Error starting the tcp. - if (r) SetErrno(uv_last_error(uv_default_loop())); + if (r) SetErrno(uv_last_error(Loop())); return scope.Close(Integer::New(r)); } @@ -146,7 +147,7 @@ Handle StreamWrap::ReadStop(const Arguments& args) { int r = uv_read_stop(wrap->stream_); // Error starting the tcp. - if (r) SetErrno(uv_last_error(uv_default_loop())); + if (r) SetErrno(uv_last_error(Loop())); return scope.Close(Integer::New(r)); } @@ -225,7 +226,7 @@ void StreamWrap::OnReadCommon(uv_stream_t* handle, ssize_t nread, slab_used -= buf.len; } - SetErrno(uv_last_error(uv_default_loop())); + SetErrno(uv_last_error(Loop())); MakeCallback(wrap->object_, "onread", 0, NULL); return; } @@ -338,7 +339,7 @@ Handle StreamWrap::Write(const Arguments& args) { wrap->UpdateWriteQueueSize(); if (r) { - SetErrno(uv_last_error(uv_default_loop())); + SetErrno(uv_last_error(Loop())); delete req_wrap; return scope.Close(v8::Null()); } else { @@ -358,7 +359,7 @@ void StreamWrap::AfterWrite(uv_write_t* req, int status) { assert(wrap->object_.IsEmpty() == false); if (status) { - SetErrno(uv_last_error(uv_default_loop())); + SetErrno(uv_last_error(Loop())); } wrap->UpdateWriteQueueSize(); @@ -388,7 +389,7 @@ Handle StreamWrap::Shutdown(const Arguments& args) { req_wrap->Dispatched(); if (r) { - SetErrno(uv_last_error(uv_default_loop())); + SetErrno(uv_last_error(Loop())); delete req_wrap; return scope.Close(v8::Null()); } else { @@ -408,7 +409,7 @@ void StreamWrap::AfterShutdown(uv_shutdown_t* req, int status) { HandleScope scope; if (status) { - SetErrno(uv_last_error(uv_default_loop())); + SetErrno(uv_last_error(Loop())); } Local argv[3] = { diff --git a/src/tcp_wrap.cc b/src/tcp_wrap.cc index 9ac6aedfa3d..2871025d765 100644 --- a/src/tcp_wrap.cc +++ b/src/tcp_wrap.cc @@ -21,6 +21,7 @@ #include #include +#include #include #include #include @@ -154,7 +155,7 @@ Handle TCPWrap::New(const Arguments& args) { TCPWrap::TCPWrap(Handle object) : StreamWrap(object, (uv_stream_t*) &handle_) { - int r = uv_tcp_init(uv_default_loop(), &handle_); + int r = uv_tcp_init(Loop(), &handle_); assert(r == 0); // How do we proxy this error up to javascript? // Suggestion: uv_tcp_init() returns void. UpdateWriteQueueSize(); @@ -182,7 +183,7 @@ Handle TCPWrap::GetSockName(const Arguments& args) { Local sockname = Object::New(); if (r != 0) { - SetErrno(uv_last_error(uv_default_loop())); + SetErrno(uv_last_error(Loop())); } else { family = address.ss_family; @@ -224,7 +225,7 @@ Handle TCPWrap::GetPeerName(const Arguments& args) { Local sockname = Object::New(); if (r != 0) { - SetErrno(uv_last_error(uv_default_loop())); + SetErrno(uv_last_error(Loop())); } else { family = address.ss_family; @@ -257,7 +258,7 @@ Handle TCPWrap::SetNoDelay(const Arguments& args) { int r = uv_tcp_nodelay(&wrap->handle_, 1); if (r) - SetErrno(uv_last_error(uv_default_loop())); + SetErrno(uv_last_error(Loop())); return Undefined(); } @@ -273,7 +274,7 @@ Handle TCPWrap::SetKeepAlive(const Arguments& args) { int r = uv_tcp_keepalive(&wrap->handle_, enable, delay); if (r) - SetErrno(uv_last_error(uv_default_loop())); + SetErrno(uv_last_error(Loop())); return Undefined(); } @@ -289,7 +290,7 @@ Handle TCPWrap::SetSimultaneousAccepts(const Arguments& args) { int r = uv_tcp_simultaneous_accepts(&wrap->handle_, enable ? 1 : 0); if (r) - SetErrno(uv_last_error(uv_default_loop())); + SetErrno(uv_last_error(Loop())); return Undefined(); } @@ -308,7 +309,7 @@ Handle TCPWrap::Bind(const Arguments& args) { int r = uv_tcp_bind(&wrap->handle_, address); // Error starting the tcp. - if (r) SetErrno(uv_last_error(uv_default_loop())); + if (r) SetErrno(uv_last_error(Loop())); return scope.Close(Integer::New(r)); } @@ -326,7 +327,7 @@ Handle TCPWrap::Bind6(const Arguments& args) { int r = uv_tcp_bind6(&wrap->handle_, address); // Error starting the tcp. - if (r) SetErrno(uv_last_error(uv_default_loop())); + if (r) SetErrno(uv_last_error(Loop())); return scope.Close(Integer::New(r)); } @@ -342,7 +343,7 @@ Handle TCPWrap::Listen(const Arguments& args) { int r = uv_listen((uv_stream_t*)&wrap->handle_, backlog, OnConnection); // Error starting the tcp. - if (r) SetErrno(uv_last_error(uv_default_loop())); + if (r) SetErrno(uv_last_error(Loop())); return scope.Close(Integer::New(r)); } @@ -377,7 +378,7 @@ void TCPWrap::OnConnection(uv_stream_t* handle, int status) { // Successful accept. Call the onconnection callback in JavaScript land. argv[0] = client_obj; } else { - SetErrno(uv_last_error(uv_default_loop())); + SetErrno(uv_last_error(Loop())); argv[0] = v8::Null(); } @@ -396,7 +397,7 @@ void TCPWrap::AfterConnect(uv_connect_t* req, int status) { assert(wrap->object_.IsEmpty() == false); if (status) { - SetErrno(uv_last_error(uv_default_loop())); + SetErrno(uv_last_error(Loop())); } Local argv[3] = { @@ -432,7 +433,7 @@ Handle TCPWrap::Connect(const Arguments& args) { req_wrap->Dispatched(); if (r) { - SetErrno(uv_last_error(uv_default_loop())); + SetErrno(uv_last_error(Loop())); delete req_wrap; return scope.Close(v8::Null()); } else { @@ -459,7 +460,7 @@ Handle TCPWrap::Connect6(const Arguments& args) { req_wrap->Dispatched(); if (r) { - SetErrno(uv_last_error(uv_default_loop())); + SetErrno(uv_last_error(Loop())); delete req_wrap; return scope.Close(v8::Null()); } else { diff --git a/src/timer_wrap.cc b/src/timer_wrap.cc index 470c2d6b98d..2b29e5f854a 100644 --- a/src/timer_wrap.cc +++ b/src/timer_wrap.cc @@ -21,6 +21,7 @@ #include #include +#include #define UNWRAP \ assert(!args.Holder().IsEmpty()); \ @@ -91,7 +92,7 @@ class TimerWrap : public HandleWrap { : HandleWrap(object, (uv_handle_t*) &handle_) { active_ = false; - int r = uv_timer_init(uv_default_loop(), &handle_); + int r = uv_timer_init(Loop(), &handle_); assert(r == 0); handle_.data = this; @@ -99,11 +100,11 @@ class TimerWrap : public HandleWrap { // uv_timer_init adds a loop reference. (That is, it calls uv_ref.) This // is not the behavior we want in Node. Timers should not increase the // ref count of the loop except when active. - uv_unref(uv_default_loop()); + uv_unref(Loop()); } ~TimerWrap() { - if (!active_) uv_ref(uv_default_loop()); + if (!active_) uv_ref(Loop()); } void StateChange() { @@ -113,11 +114,11 @@ class TimerWrap : public HandleWrap { if (!was_active && active_) { // If our state is changing from inactive to active, we // increase the loop's reference count. - uv_ref(uv_default_loop()); + uv_ref(Loop()); } else if (was_active && !active_) { // If our state is changing from active to inactive, we // decrease the loop's reference count. - uv_unref(uv_default_loop()); + uv_unref(Loop()); } } @@ -132,7 +133,7 @@ class TimerWrap : public HandleWrap { int r = uv_timer_start(&wrap->handle_, OnTimeout, timeout, repeat); // Error starting the timer. - if (r) SetErrno(uv_last_error(uv_default_loop())); + if (r) SetErrno(uv_last_error(Loop())); wrap->StateChange(); @@ -146,7 +147,7 @@ class TimerWrap : public HandleWrap { int r = uv_timer_stop(&wrap->handle_); - if (r) SetErrno(uv_last_error(uv_default_loop())); + if (r) SetErrno(uv_last_error(Loop())); wrap->StateChange(); @@ -160,7 +161,7 @@ class TimerWrap : public HandleWrap { int r = uv_timer_again(&wrap->handle_); - if (r) SetErrno(uv_last_error(uv_default_loop())); + if (r) SetErrno(uv_last_error(Loop())); wrap->StateChange(); @@ -186,7 +187,7 @@ class TimerWrap : public HandleWrap { int64_t repeat = uv_timer_get_repeat(&wrap->handle_); - if (repeat < 0) SetErrno(uv_last_error(uv_default_loop())); + if (repeat < 0) SetErrno(uv_last_error(Loop())); return scope.Close(Integer::New(repeat)); } diff --git a/src/tty_wrap.cc b/src/tty_wrap.cc index 486c2a70b4b..31949011f31 100644 --- a/src/tty_wrap.cc +++ b/src/tty_wrap.cc @@ -21,6 +21,7 @@ #include #include +#include #include #include #include @@ -124,7 +125,7 @@ class TTYWrap : StreamWrap { int r = uv_tty_get_winsize(&wrap->handle_, &width, &height); if (r) { - SetErrno(uv_last_error(uv_default_loop())); + SetErrno(uv_last_error(Loop())); return v8::Undefined(); } @@ -143,7 +144,7 @@ class TTYWrap : StreamWrap { int r = uv_tty_set_mode(&wrap->handle_, args[0]->IsTrue()); if (r) { - SetErrno(uv_last_error(uv_default_loop())); + SetErrno(uv_last_error(Loop())); } return scope.Close(Integer::New(r)); @@ -169,7 +170,7 @@ class TTYWrap : StreamWrap { TTYWrap(Handle object, int fd, bool readable) : StreamWrap(object, (uv_stream_t*)&handle_) { - uv_tty_init(uv_default_loop(), &handle_, fd, readable); + uv_tty_init(Loop(), &handle_, fd, readable); } uv_tty_t handle_; diff --git a/src/udp_wrap.cc b/src/udp_wrap.cc index cd4c58ecabd..dce45ada498 100644 --- a/src/udp_wrap.cc +++ b/src/udp_wrap.cc @@ -21,6 +21,7 @@ #include #include +#include #include #include @@ -103,7 +104,7 @@ class UDPWrap: public HandleWrap { UDPWrap::UDPWrap(Handle object): HandleWrap(object, (uv_handle_t*)&handle_) { - int r = uv_udp_init(uv_default_loop(), &handle_); + int r = uv_udp_init(Loop(), &handle_); assert(r == 0); // can't fail anyway handle_.data = reinterpret_cast(this); } @@ -176,7 +177,7 @@ Handle UDPWrap::DoBind(const Arguments& args, int family) { } if (r) - SetErrno(uv_last_error(uv_default_loop())); + SetErrno(uv_last_error(Loop())); return scope.Close(Integer::New(r)); } @@ -233,7 +234,7 @@ Handle UDPWrap::DoSend(const Arguments& args, int family) { req_wrap->Dispatched(); if (r) { - SetErrno(uv_last_error(uv_default_loop())); + SetErrno(uv_last_error(Loop())); delete req_wrap; return Null(); } @@ -260,8 +261,8 @@ Handle UDPWrap::RecvStart(const Arguments& args) { // UV_EALREADY means that the socket is already bound but that's okay int r = uv_udp_recv_start(&wrap->handle_, OnAlloc, OnRecv); - if (r && uv_last_error(uv_default_loop()).code != UV_EALREADY) { - SetErrno(uv_last_error(uv_default_loop())); + if (r && uv_last_error(Loop()).code != UV_EALREADY) { + SetErrno(uv_last_error(Loop())); return False(); } @@ -297,7 +298,7 @@ Handle UDPWrap::GetSockName(const Arguments& args) { return scope.Close(sockname); } else { - SetErrno(uv_last_error(uv_default_loop())); + SetErrno(uv_last_error(Loop())); return Null(); } } @@ -316,7 +317,7 @@ void UDPWrap::OnSend(uv_udp_send_t* req, int status) { assert(wrap->object_.IsEmpty() == false); if (status) { - SetErrno(uv_last_error(uv_default_loop())); + SetErrno(uv_last_error(Loop())); } Local argv[4] = { @@ -364,7 +365,7 @@ void UDPWrap::OnRecv(uv_udp_t* handle, }; if (nread == -1) { - SetErrno(uv_last_error(uv_default_loop())); + SetErrno(uv_last_error(Loop())); } else { Local rinfo = Object::New(); diff --git a/test/addons/shared-buffer/binding.cc b/test/addons/shared-buffer/binding.cc new file mode 100644 index 00000000000..d81f1d4f22f --- /dev/null +++ b/test/addons/shared-buffer/binding.cc @@ -0,0 +1,55 @@ +#include +#include +#include + +using namespace v8; + +extern "C" { + void init(Handle target); +} + + +#define BUFSIZE 1024 +static uint8_t buf[BUFSIZE]; +static uv_mutex_t lock; + + +Handle Get(const Arguments& args) { + HandleScope scope; + + int index = args[0]->Uint32Value(); + + if (index < 0 || BUFSIZE <= index) { + return ThrowException(Exception::Error(String::New("out of bounds"))); + } + + return scope.Close(Integer::New(buf[index])); +} + + +Handle Set(const Arguments& args) { + uv_mutex_lock(&lock); + HandleScope scope; + + int index = args[0]->Uint32Value(); + + if (index < 0 || BUFSIZE <= index) { + return ThrowException(Exception::Error(String::New("out of bounds"))); + } + + buf[index] = args[1]->Uint32Value(); + + Local val = Integer::New(buf[index]); + + uv_mutex_unlock(&lock); + + return scope.Close(val); +} + + +void init(Handle target) { + NODE_SET_METHOD(target, "get", Get); + NODE_SET_METHOD(target, "set", Set); + target->Set(String::New("length"), Integer::New(BUFSIZE)); + uv_mutex_init(&lock); +} diff --git a/test/addons/shared-buffer/binding.gyp b/test/addons/shared-buffer/binding.gyp new file mode 100644 index 00000000000..3bfb84493f3 --- /dev/null +++ b/test/addons/shared-buffer/binding.gyp @@ -0,0 +1,8 @@ +{ + 'targets': [ + { + 'target_name': 'binding', + 'sources': [ 'binding.cc' ] + } + ] +} diff --git a/test/addons/shared-buffer/test.js b/test/addons/shared-buffer/test.js new file mode 100644 index 00000000000..9ba896f28a9 --- /dev/null +++ b/test/addons/shared-buffer/test.js @@ -0,0 +1,19 @@ +var assert = require('assert'); +var binding = require('./out/Release/binding'); +var isolates = process.binding('isolates'); + +console.log("binding.length =", binding.length); + +if (process.tid === 1) { + var isolate = isolates.create(process.argv); + for (var i = 0; i < binding.length; i++) { + console.log('parent', + 'binding.set(' + i + ', ' + i + ')', + binding.set(i, i)); + } +} else { + for (var i = 0; i < binding.length; i++) { + console.log('child', 'binding.get(' + i + ')', binding.get(i)); + } +} + diff --git a/test/simple/test-child-process-fork.js b/test/simple/test-child-process-fork.js index 41cc28c72e4..ea99ae7704b 100644 --- a/test/simple/test-child-process-fork.js +++ b/test/simple/test-child-process-fork.js @@ -24,7 +24,13 @@ var common = require('../common'); var fork = require('child_process').fork; var args = ['foo', 'bar']; -var n = fork(common.fixturesDir + '/child-process-spawn-node.js', args); +var options = { + thread: process.TEST_ISOLATE ? true : false +}; + +var n = fork(common.fixturesDir + '/child-process-spawn-node.js', + args, + options); assert.deepEqual(args, ['foo', 'bar']); var messageCount = 0; diff --git a/test/simple/test-isolates.js b/test/simple/test-isolates.js new file mode 100644 index 00000000000..fa2dccd3307 --- /dev/null +++ b/test/simple/test-isolates.js @@ -0,0 +1,39 @@ +var fs = require('fs'); +var http = require('http'); +var isolates = process.binding('isolates'); + +console.log("count: %d", isolates.count()); + +if (process.tid === 1) { + var isolate = isolates.create(process.argv); + //process._joinIsolate(isolate); + console.error("master"); + fs.stat(__dirname, function(err, stat) { + if (err) throw err; + console.error('thread 1', stat.mtime); + }); + + setTimeout(function() { + fs.stat(__dirname, function(err, stat) { + if (err) throw err; + console.error('thread 1', stat.mtime); + }); + }, 500); + + console.log("thread 1 count: %d", isolates.count()); +} else { + console.error("slave"); + fs.stat(__dirname, function(err, stat) { + if (err) throw err; + console.error('thread 2', stat.mtime); + }); + + setTimeout(function() { + fs.stat(__dirname, function(err, stat) { + if (err) throw err; + console.error('thread 2', stat.mtime); + }); + }, 500); + + console.error("thread 2 count: %d", isolates.count()); +} diff --git a/test/simple/test-isolates2.js b/test/simple/test-isolates2.js new file mode 100644 index 00000000000..1823b8fe76c --- /dev/null +++ b/test/simple/test-isolates2.js @@ -0,0 +1,13 @@ +// Skip this test if Node is not compiled with isolates support. +if (!process.features.isolates) return; + +var assert = require('assert'); + +// This is the same test as test-child-process-fork except it uses isolates +// instead of processes. process.TEST_ISOLATE is a ghetto method of passing +// some information into the other test. +process.TEST_ISOLATE = true; +require('./test-child-process-fork'); + +var numThreads = process.binding('isolates').count(); +assert.ok(numThreads > 1); diff --git a/vcbuild.bat b/vcbuild.bat index 38a1951b922..952c043e48c 100644 --- a/vcbuild.bat +++ b/vcbuild.bat @@ -76,7 +76,7 @@ goto run :msbuild-found @rem Build the sln with msbuild. -msbuild node.sln /t:%target% /p:Configuration=%config% /clp:NoSummary;NoItemAndPropertyList;Verbosity=minimal /nologo +msbuild node.sln /m /t:%target% /p:Configuration=%config% /clp:NoSummary;NoItemAndPropertyList;Verbosity=minimal /nologo if errorlevel 1 goto exit if defined nosign goto msi @@ -89,7 +89,7 @@ python "%~dp0tools\getnodeversion.py" > "%temp%\node_version.txt" if not errorlevel 0 echo Cannot determine current version of node.js & goto exit for /F "tokens=*" %%i in (%temp%\node_version.txt) do set NODE_VERSION=%%i heat dir deps\npm -var var.NPMSourceDir -dr NodeModulesFolder -cg NPMFiles -gg -template fragment -nologo -out npm.wxs -msbuild "%~dp0tools\msvs\msi\nodemsi.sln" /t:Clean,Build /p:Configuration=%config% /p:NodeVersion=%NODE_VERSION% /clp:NoSummary;NoItemAndPropertyList;Verbosity=minimal /nologo +msbuild "%~dp0tools\msvs\msi\nodemsi.sln" /m /t:Clean,Build /p:Configuration=%config% /p:NodeVersion=%NODE_VERSION% /clp:NoSummary;NoItemAndPropertyList;Verbosity=minimal /nologo if errorlevel 1 goto exit if defined nosign goto run