Skip to content

Commit

Permalink
unix: Release GIL during all system calls.
Browse files Browse the repository at this point in the history
Addition of GIL EXIT/ENTER pairs are:

- modos: release the GIL during system calls.  CPython does this as well.

- moduselect: release the GIL during the poll() syscall.  This call can be
  blocking, so it is important to allow other threads to run at this time.

- modusocket: release the GIL during system calls.  Many of these calls can
  be blocking, so it is important to allow other threads to run.

- unix_mphal: release the GIL during the read and write syscalls in
  mp_hal_stdin_rx_chr and mp_hal_stdout_tx_strn.  If we don't do this
  threads are blocked when the REPL or the builtin input function are used.

- file, main, mpconfigport.h: release GIL during syscalls in built-in
  functions that could block.
  • Loading branch information
dlech authored and dpgeorge committed Jan 26, 2020
1 parent 96716b4 commit fee7e56
Show file tree
Hide file tree
Showing 7 changed files with 94 additions and 4 deletions.
18 changes: 17 additions & 1 deletion ports/unix/file.c
Expand Up @@ -35,6 +35,7 @@
#include "py/stream.h"
#include "py/builtin.h"
#include "py/mphal.h"
#include "py/mpthread.h"
#include "fdfile.h"

#if MICROPY_PY_IO && !MICROPY_VFS
Expand Down Expand Up @@ -65,7 +66,9 @@ STATIC void fdfile_print(const mp_print_t *print, mp_obj_t self_in, mp_print_kin
STATIC mp_uint_t fdfile_read(mp_obj_t o_in, void *buf, mp_uint_t size, int *errcode) {
mp_obj_fdfile_t *o = MP_OBJ_TO_PTR(o_in);
check_fd_is_open(o);
MP_THREAD_GIL_EXIT();
mp_int_t r = read(o->fd, buf, size);
MP_THREAD_GIL_ENTER();
if (r == -1) {
*errcode = errno;
return MP_STREAM_ERROR;
Expand All @@ -82,14 +85,18 @@ STATIC mp_uint_t fdfile_write(mp_obj_t o_in, const void *buf, mp_uint_t size, in
return size;
}
#endif
MP_THREAD_GIL_EXIT();
mp_int_t r = write(o->fd, buf, size);
MP_THREAD_GIL_ENTER();
while (r == -1 && errno == EINTR) {
if (MP_STATE_VM(mp_pending_exception) != MP_OBJ_NULL) {
mp_obj_t obj = MP_STATE_VM(mp_pending_exception);
MP_STATE_VM(mp_pending_exception) = MP_OBJ_NULL;
nlr_raise(obj);
}
MP_THREAD_GIL_EXIT();
r = write(o->fd, buf, size);
MP_THREAD_GIL_ENTER();
}
if (r == -1) {
*errcode = errno;
Expand All @@ -104,7 +111,9 @@ STATIC mp_uint_t fdfile_ioctl(mp_obj_t o_in, mp_uint_t request, uintptr_t arg, i
switch (request) {
case MP_STREAM_SEEK: {
struct mp_stream_seek_t *s = (struct mp_stream_seek_t*)arg;
MP_THREAD_GIL_EXIT();
off_t off = lseek(o->fd, s->offset, s->whence);
MP_THREAD_GIL_ENTER();
if (off == (off_t)-1) {
*errcode = errno;
return MP_STREAM_ERROR;
Expand All @@ -113,13 +122,18 @@ STATIC mp_uint_t fdfile_ioctl(mp_obj_t o_in, mp_uint_t request, uintptr_t arg, i
return 0;
}
case MP_STREAM_FLUSH:
if (fsync(o->fd) < 0) {
MP_THREAD_GIL_EXIT();
int ret = fsync(o->fd);
MP_THREAD_GIL_ENTER();
if (ret == -1) {
*errcode = errno;
return MP_STREAM_ERROR;
}
return 0;
case MP_STREAM_CLOSE:
MP_THREAD_GIL_EXIT();
close(o->fd);
MP_THREAD_GIL_ENTER();
#ifdef MICROPY_CPYTHON_COMPAT
o->fd = -1;
#endif
Expand Down Expand Up @@ -198,7 +212,9 @@ STATIC mp_obj_t fdfile_open(const mp_obj_type_t *type, mp_arg_val_t *args) {
}

const char *fname = mp_obj_str_get_str(fid);
MP_THREAD_GIL_EXIT();
int fd = open(fname, mode_x | mode_rw, 0644);
MP_THREAD_GIL_ENTER();
if (fd == -1) {
mp_raise_OSError(errno);
}
Expand Down
2 changes: 2 additions & 0 deletions ports/unix/main.c
Expand Up @@ -64,7 +64,9 @@ long heap_size = 1024*1024 * (sizeof(mp_uint_t) / 4);

STATIC void stderr_print_strn(void *env, const char *str, size_t len) {
(void)env;
MP_THREAD_GIL_EXIT();
ssize_t dummy = write(STDERR_FILENO, str, len);
MP_THREAD_GIL_ENTER();
mp_uos_dupterm_tx_strn(str, len);
(void)dummy;
}
Expand Down
20 changes: 20 additions & 0 deletions ports/unix/modos.c
Expand Up @@ -38,6 +38,7 @@
#include "py/runtime.h"
#include "py/objtuple.h"
#include "py/mphal.h"
#include "py/mpthread.h"
#include "extmod/vfs.h"
#include "extmod/misc.h"

Expand All @@ -49,7 +50,9 @@ STATIC mp_obj_t mod_os_stat(mp_obj_t path_in) {
struct stat sb;
const char *path = mp_obj_str_get_str(path_in);

MP_THREAD_GIL_EXIT();
int res = stat(path, &sb);
MP_THREAD_GIL_ENTER();
RAISE_ERRNO(res, errno);

mp_obj_tuple_t *t = MP_OBJ_TO_PTR(mp_obj_new_tuple(10, NULL));
Expand Down Expand Up @@ -89,7 +92,9 @@ STATIC mp_obj_t mod_os_statvfs(mp_obj_t path_in) {
STRUCT_STATVFS sb;
const char *path = mp_obj_str_get_str(path_in);

MP_THREAD_GIL_EXIT();
int res = STATVFS(path, &sb);
MP_THREAD_GIL_ENTER();
RAISE_ERRNO(res, errno);

mp_obj_tuple_t *t = MP_OBJ_TO_PTR(mp_obj_new_tuple(10, NULL));
Expand All @@ -116,7 +121,9 @@ STATIC mp_obj_t mod_os_remove(mp_obj_t path_in) {
// of that function. But Python remove() follows ANSI C, and explicitly
// required to raise exception on attempt to remove a directory. Thus,
// call POSIX unlink() here.
MP_THREAD_GIL_EXIT();
int r = unlink(path);
MP_THREAD_GIL_ENTER();

RAISE_ERRNO(r, errno);

Expand All @@ -128,7 +135,9 @@ STATIC mp_obj_t mod_os_rename(mp_obj_t old_path_in, mp_obj_t new_path_in) {
const char *old_path = mp_obj_str_get_str(old_path_in);
const char *new_path = mp_obj_str_get_str(new_path_in);

MP_THREAD_GIL_EXIT();
int r = rename(old_path, new_path);
MP_THREAD_GIL_ENTER();

RAISE_ERRNO(r, errno);

Expand All @@ -139,7 +148,9 @@ STATIC MP_DEFINE_CONST_FUN_OBJ_2(mod_os_rename_obj, mod_os_rename);
STATIC mp_obj_t mod_os_rmdir(mp_obj_t path_in) {
const char *path = mp_obj_str_get_str(path_in);

MP_THREAD_GIL_EXIT();
int r = rmdir(path);
MP_THREAD_GIL_ENTER();

RAISE_ERRNO(r, errno);

Expand All @@ -150,7 +161,9 @@ STATIC MP_DEFINE_CONST_FUN_OBJ_1(mod_os_rmdir_obj, mod_os_rmdir);
STATIC mp_obj_t mod_os_system(mp_obj_t cmd_in) {
const char *cmd = mp_obj_str_get_str(cmd_in);

MP_THREAD_GIL_EXIT();
int r = system(cmd);
MP_THREAD_GIL_ENTER();

RAISE_ERRNO(r, errno);

Expand All @@ -170,11 +183,13 @@ MP_DEFINE_CONST_FUN_OBJ_1(mod_os_getenv_obj, mod_os_getenv);
STATIC mp_obj_t mod_os_mkdir(mp_obj_t path_in) {
// TODO: Accept mode param
const char *path = mp_obj_str_get_str(path_in);
MP_THREAD_GIL_EXIT();
#ifdef _WIN32
int r = mkdir(path);
#else
int r = mkdir(path, 0777);
#endif
MP_THREAD_GIL_ENTER();
RAISE_ERRNO(r, errno);
return mp_const_none;
}
Expand All @@ -192,13 +207,16 @@ STATIC mp_obj_t listdir_next(mp_obj_t self_in) {
if (self->dir == NULL) {
goto done;
}
MP_THREAD_GIL_EXIT();
struct dirent *dirent = readdir(self->dir);
if (dirent == NULL) {
closedir(self->dir);
MP_THREAD_GIL_ENTER();
self->dir = NULL;
done:
return MP_OBJ_STOP_ITERATION;
}
MP_THREAD_GIL_ENTER();

mp_obj_tuple_t *t = MP_OBJ_TO_PTR(mp_obj_new_tuple(3, NULL));
t->items[0] = mp_obj_new_str(dirent->d_name, strlen(dirent->d_name));
Expand Down Expand Up @@ -235,7 +253,9 @@ STATIC mp_obj_t mod_os_ilistdir(size_t n_args, const mp_obj_t *args) {
}
mp_obj_listdir_t *o = m_new_obj(mp_obj_listdir_t);
o->base.type = &mp_type_polymorph_iter;
MP_THREAD_GIL_EXIT();
o->dir = opendir(path);
MP_THREAD_GIL_ENTER();
o->iternext = listdir_next;
return MP_OBJ_FROM_PTR(o);
}
Expand Down
3 changes: 3 additions & 0 deletions ports/unix/moduselect.c
Expand Up @@ -39,6 +39,7 @@
#include "py/objlist.h"
#include "py/objtuple.h"
#include "py/mphal.h"
#include "py/mpthread.h"
#include "fdfile.h"

#define DEBUG 0
Expand Down Expand Up @@ -188,7 +189,9 @@ STATIC int poll_poll_internal(size_t n_args, const mp_obj_t *args) {

self->flags = flags;

MP_THREAD_GIL_EXIT();
int n_ready = poll(self->entries, self->len, timeout);
MP_THREAD_GIL_ENTER();
RAISE_ERRNO(n_ready, errno);
return n_ready;
}
Expand Down
43 changes: 41 additions & 2 deletions ports/unix/modusocket.c
Expand Up @@ -45,6 +45,7 @@
#include "py/stream.h"
#include "py/builtin.h"
#include "py/mphal.h"
#include "py/mpthread.h"

/*
The idea of this module is to implement reasonable minimum of
Expand Down Expand Up @@ -93,7 +94,9 @@ STATIC void socket_print(const mp_print_t *print, mp_obj_t self_in, mp_print_kin

STATIC mp_uint_t socket_read(mp_obj_t o_in, void *buf, mp_uint_t size, int *errcode) {
mp_obj_socket_t *o = MP_OBJ_TO_PTR(o_in);
MP_THREAD_GIL_EXIT();
mp_int_t r = read(o->fd, buf, size);
MP_THREAD_GIL_ENTER();
if (r == -1) {
int err = errno;
// On blocking socket, we get EAGAIN in case SO_RCVTIMEO/SO_SNDTIMEO
Expand All @@ -110,7 +113,9 @@ STATIC mp_uint_t socket_read(mp_obj_t o_in, void *buf, mp_uint_t size, int *errc

STATIC mp_uint_t socket_write(mp_obj_t o_in, const void *buf, mp_uint_t size, int *errcode) {
mp_obj_socket_t *o = MP_OBJ_TO_PTR(o_in);
MP_THREAD_GIL_EXIT();
mp_int_t r = write(o->fd, buf, size);
MP_THREAD_GIL_ENTER();
if (r == -1) {
int err = errno;
// On blocking socket, we get EAGAIN in case SO_RCVTIMEO/SO_SNDTIMEO
Expand All @@ -137,7 +142,9 @@ STATIC mp_uint_t socket_ioctl(mp_obj_t o_in, mp_uint_t request, uintptr_t arg, i
// The rationale MicroPython follows is that close() just releases
// file descriptor. If you're interested to catch I/O errors before
// closing fd, fsync() it.
MP_THREAD_GIL_EXIT();
close(self->fd);
MP_THREAD_GIL_ENTER();
return 0;

case MP_STREAM_GET_FILENO:
Expand All @@ -159,7 +166,9 @@ STATIC mp_obj_t socket_connect(mp_obj_t self_in, mp_obj_t addr_in) {
mp_obj_socket_t *self = MP_OBJ_TO_PTR(self_in);
mp_buffer_info_t bufinfo;
mp_get_buffer_raise(addr_in, &bufinfo, MP_BUFFER_READ);
MP_THREAD_GIL_EXIT();
int r = connect(self->fd, (const struct sockaddr *)bufinfo.buf, bufinfo.len);
MP_THREAD_GIL_ENTER();
int err = errno;
if (r == -1 && self->blocking && err == EINPROGRESS) {
// EINPROGRESS on a blocking socket means the operation timed out
Expand All @@ -174,15 +183,19 @@ STATIC mp_obj_t socket_bind(mp_obj_t self_in, mp_obj_t addr_in) {
mp_obj_socket_t *self = MP_OBJ_TO_PTR(self_in);
mp_buffer_info_t bufinfo;
mp_get_buffer_raise(addr_in, &bufinfo, MP_BUFFER_READ);
MP_THREAD_GIL_EXIT();
int r = bind(self->fd, (const struct sockaddr *)bufinfo.buf, bufinfo.len);
MP_THREAD_GIL_ENTER();
RAISE_ERRNO(r, errno);
return mp_const_none;
}
STATIC MP_DEFINE_CONST_FUN_OBJ_2(socket_bind_obj, socket_bind);

STATIC mp_obj_t socket_listen(mp_obj_t self_in, mp_obj_t backlog_in) {
mp_obj_socket_t *self = MP_OBJ_TO_PTR(self_in);
MP_THREAD_GIL_EXIT();
int r = listen(self->fd, MP_OBJ_SMALL_INT_VALUE(backlog_in));
MP_THREAD_GIL_ENTER();
RAISE_ERRNO(r, errno);
return mp_const_none;
}
Expand All @@ -194,7 +207,9 @@ STATIC mp_obj_t socket_accept(mp_obj_t self_in) {
//struct sockaddr_storage addr;
byte addr[32];
socklen_t addr_len = sizeof(addr);
MP_THREAD_GIL_EXIT();
int fd = accept(self->fd, (struct sockaddr*)&addr, &addr_len);
MP_THREAD_GIL_ENTER();
int err = errno;
if (fd == -1 && self->blocking && err == EAGAIN) {
// EAGAIN on a blocking socket means the operation timed out
Expand Down Expand Up @@ -223,7 +238,9 @@ STATIC mp_obj_t socket_recv(size_t n_args, const mp_obj_t *args) {
}

byte *buf = m_new(byte, sz);
MP_THREAD_GIL_EXIT();
int out_sz = recv(self->fd, buf, sz, flags);
MP_THREAD_GIL_ENTER();
RAISE_ERRNO(out_sz, errno);

mp_obj_t ret = mp_obj_new_str_of_type(&mp_type_bytes, buf, out_sz);
Expand All @@ -245,7 +262,9 @@ STATIC mp_obj_t socket_recvfrom(size_t n_args, const mp_obj_t *args) {
socklen_t addr_len = sizeof(addr);

byte *buf = m_new(byte, sz);
MP_THREAD_GIL_EXIT();
int out_sz = recvfrom(self->fd, buf, sz, flags, (struct sockaddr*)&addr, &addr_len);
MP_THREAD_GIL_ENTER();
RAISE_ERRNO(out_sz, errno);

mp_obj_t buf_o = mp_obj_new_str_of_type(&mp_type_bytes, buf, out_sz);
Expand All @@ -272,7 +291,9 @@ STATIC mp_obj_t socket_send(size_t n_args, const mp_obj_t *args) {

mp_buffer_info_t bufinfo;
mp_get_buffer_raise(args[1], &bufinfo, MP_BUFFER_READ);
MP_THREAD_GIL_EXIT();
int out_sz = send(self->fd, bufinfo.buf, bufinfo.len, flags);
MP_THREAD_GIL_ENTER();
RAISE_ERRNO(out_sz, errno);

return MP_OBJ_NEW_SMALL_INT(out_sz);
Expand All @@ -292,8 +313,10 @@ STATIC mp_obj_t socket_sendto(size_t n_args, const mp_obj_t *args) {
mp_buffer_info_t bufinfo, addr_bi;
mp_get_buffer_raise(args[1], &bufinfo, MP_BUFFER_READ);
mp_get_buffer_raise(dst_addr, &addr_bi, MP_BUFFER_READ);
MP_THREAD_GIL_EXIT();
int out_sz = sendto(self->fd, bufinfo.buf, bufinfo.len, flags,
(struct sockaddr *)addr_bi.buf, addr_bi.len);
MP_THREAD_GIL_ENTER();
RAISE_ERRNO(out_sz, errno);

return MP_OBJ_NEW_SMALL_INT(out_sz);
Expand All @@ -319,7 +342,9 @@ STATIC mp_obj_t socket_setsockopt(size_t n_args, const mp_obj_t *args) {
optval = bufinfo.buf;
optlen = bufinfo.len;
}
MP_THREAD_GIL_EXIT();
int r = setsockopt(self->fd, level, option, optval, optlen);
MP_THREAD_GIL_ENTER();
RAISE_ERRNO(r, errno);
return mp_const_none;
}
Expand All @@ -328,14 +353,19 @@ STATIC MP_DEFINE_CONST_FUN_OBJ_VAR_BETWEEN(socket_setsockopt_obj, 4, 4, socket_s
STATIC mp_obj_t socket_setblocking(mp_obj_t self_in, mp_obj_t flag_in) {
mp_obj_socket_t *self = MP_OBJ_TO_PTR(self_in);
int val = mp_obj_is_true(flag_in);
MP_THREAD_GIL_EXIT();
int flags = fcntl(self->fd, F_GETFL, 0);
RAISE_ERRNO(flags, errno);
if (flags == -1) {
MP_THREAD_GIL_ENTER();
RAISE_ERRNO(flags, errno);
}
if (val) {
flags &= ~O_NONBLOCK;
} else {
flags |= O_NONBLOCK;
}
flags = fcntl(self->fd, F_SETFL, flags);
MP_THREAD_GIL_ENTER();
RAISE_ERRNO(flags, errno);
self->blocking = val;
return mp_const_none;
Expand Down Expand Up @@ -368,9 +398,14 @@ STATIC mp_obj_t socket_settimeout(mp_obj_t self_in, mp_obj_t timeout_in) {

if (new_blocking) {
int r;
MP_THREAD_GIL_EXIT();
r = setsockopt(self->fd, SOL_SOCKET, SO_RCVTIMEO, &tv, sizeof(struct timeval));
RAISE_ERRNO(r, errno);
if (r == -1) {
MP_THREAD_GIL_ENTER();
RAISE_ERRNO(r, errno);
}
r = setsockopt(self->fd, SOL_SOCKET, SO_SNDTIMEO, &tv, sizeof(struct timeval));
MP_THREAD_GIL_ENTER();
RAISE_ERRNO(r, errno);
}

Expand Down Expand Up @@ -415,7 +450,9 @@ STATIC mp_obj_t socket_make_new(const mp_obj_type_t *type_in, size_t n_args, siz
}
}

MP_THREAD_GIL_EXIT();
int fd = socket(family, type, proto);
MP_THREAD_GIL_ENTER();
RAISE_ERRNO(fd, errno);
return MP_OBJ_FROM_PTR(socket_new(fd));
}
Expand Down Expand Up @@ -537,7 +574,9 @@ STATIC mp_obj_t mod_socket_getaddrinfo(size_t n_args, const mp_obj_t *args) {
}

struct addrinfo *addr_list;
MP_THREAD_GIL_EXIT();
int res = getaddrinfo(host, serv, &hints, &addr_list);
MP_THREAD_GIL_ENTER();

if (res != 0) {
// CPython: socket.gaierror
Expand Down

0 comments on commit fee7e56

Please sign in to comment.