Permalink
Browse files

Added a thin abstraction layer over sem_wait and sem_post, because se…

…m_wait was being interrupted by signals in the bytecode runtime; now I correctly call it in a loop
  • Loading branch information...
1 parent 4de896d commit 11901cbeecf44969100aeede82b19e4e7358e363 @lucasaiu committed Jul 25, 2013
View
Binary file not shown.
View
Binary file not shown.
View
Binary file not shown.
View
Binary file not shown.
View
Binary file not shown.
View
Binary file not shown.
View
Binary file not shown.
View
Binary file not shown.
View
Binary file not shown.
View
@@ -63,7 +63,7 @@ extern int caml_try_leave_blocking_section_default(void);
extern char caml_globals_map[];
#endif
-/* static */ /* !!!!!!!!!!!!!!! */ int already_initialized = 0;
+/* static */ /* !!!!!!!!!!!!!!! */ int caml_are_mutexes_already_initialized = 0;
/* The global lock: */
static pthread_mutex_t caml_global_mutex;
@@ -99,14 +99,28 @@ void caml_finalize_semaphore(sem_t *semaphore){
sem_destroy(semaphore);
}
+void caml_p_semaphore(sem_t* semaphore){
+ int sem_wait_result;
+ while((sem_wait_result = sem_wait(semaphore)) != 0){
+ assert(errno == EINTR);
+ INIT_CAML_R; DUMP("\a!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! sem_wait was interrupted by a signal");
+ errno = 0;
+ }
+ assert(sem_wait_result == 0);
+}
+void caml_v_semaphore(sem_t* semaphore){
+ int sem_post_result = sem_post(semaphore);
+ assert(sem_post_result == 0);
+}
+
caml_global_context *caml_initialize_first_global_context(void)
{
/* Maybe we should use partial contexts for specific tasks, that
will probably not be used by all threads. We should check the size of
each part of the context, to allocate only what is probably required
by all threads, and then allocate other sub-contexts on demand. */
- caml_global_context* ctx = (caml_global_context*)malloc( sizeof(caml_global_context) );
+ caml_global_context* ctx = (caml_global_context*)caml_stat_alloc( sizeof(caml_global_context) );
/*
!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! IMPORTANT --Luca Saiu REENTRANTRUNTIME: BEGIN
FIXME: This is a pretty bad symptom. If I replace the 0 with a 1, the
@@ -788,21 +802,26 @@ CAMLprim value caml_context_is_remote_r(CAML_R, value descriptor)
CAMLprim value caml_cpu_no_r(CAML_R, value unit){
/* FIXME: this is a GNU extension. What should we do on non-GNU systems? */
int cpu_no =
- //get_nprocs_conf();
+ //get_nprocs_conf();
sysconf(_SC_NPROCESSORS_ONLN);
return Val_int(cpu_no);
}
+CAMLprim value caml_set_debugging(value bool){
+ caml_debugging = Int_val(bool);
+ return Val_unit;
+}
+
void caml_context_initialize_global_stuff(void){
/* Attempt to prevent multiple initialization. This will not always
work, because of missing synchronization: we can't use the global
mutex, since we're gonna initialize it here. */
- if(already_initialized){
+ if(caml_are_mutexes_already_initialized){
fprintf(stderr, "caml_initialize_global_stuff: called more than once\n");
fflush(stderr);
exit(EXIT_FAILURE);
}
- already_initialized = 1;
+ caml_are_mutexes_already_initialized = 1;
caml_enter_blocking_section_hook = &caml_enter_blocking_section_default;
caml_leave_blocking_section_hook = &caml_leave_blocking_section_default;
@@ -818,7 +837,7 @@ void caml_context_initialize_global_stuff(void){
/* This is a thin wrapper over pthread_mutex_lock and pthread_mutex_unlock: */
static void caml_call_on_mutex(int(*function)(pthread_mutex_t *), pthread_mutex_t *mutex){
INIT_CAML_R;
- if(! already_initialized){
+ if(! caml_are_mutexes_already_initialized){
/* INIT_CAML_R; */ fprintf(stderr, "global mutexes aren't initialized yet. Bailing out"); fflush(stderr);
exit(EXIT_FAILURE);
}
@@ -833,7 +852,7 @@ static void caml_call_on_mutex(int(*function)(pthread_mutex_t *), pthread_mutex_
/* void caml_acquire_global_lock(void){ */
/* INIT_CAML_R; */
-/* if(! already_initialized){ */
+/* if(! caml_are_mutexes_already_initialized){ */
/* /\* INIT_CAML_R; *\/ DUMP("@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@ caml_global_mutex is not yet initialized"); */
/* return; */
/* } */
@@ -855,7 +874,7 @@ static void caml_call_on_mutex(int(*function)(pthread_mutex_t *), pthread_mutex_
/* } */
/* void caml_release_global_lock(void){ */
-/* if(! already_initialized){ */
+/* if(! caml_are_mutexes_already_initialized){ */
/* INIT_CAML_R; DUMP("@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@ caml_global_mutex is not yet initialized"); */
/* return; */
/* } */
@@ -979,3 +998,5 @@ int TRIVIAL_caml_systhreads_get_thread_no_r(CAML_R){
return 0;
}
int caml_systhreads_get_thread_no_r (CAML_R) __attribute__ ((weak, alias ("TRIVIAL_caml_systhreads_get_thread_no_r")));
+
+int caml_debugging = 0; // !!!!!!!!!!!!!!!!!!!!!!!
View
@@ -1071,7 +1071,6 @@ void caml_release_channel_lock(void);
void caml_acquire_contextual_lock(CAML_R);
void caml_release_contextual_lock(CAML_R);
-
// FIXME: remove this after debugging
void caml_dump_global_mutex(void);
@@ -1080,6 +1079,8 @@ void caml_initialize_mutex(pthread_mutex_t *mutex);
void caml_finalize_mutex(pthread_mutex_t *mutex);
void caml_initialize_semaphore(sem_t *semaphore, int initial_value);
void caml_finalize_semaphore(sem_t *semaphore);
+void caml_p_semaphore(sem_t* semaphore); // signal-safe, differently from POSIX semaphores
+void caml_v_semaphore(sem_t* semaphore); // signal-safe, differently from POSIX semaphores
#define NOATTR "\033[0m"
@@ -1219,6 +1220,12 @@ extern __thread int caml_indentation_level;
#define QR(FORMAT, ...) /* nothing */
#define QBR(FORMAT, ...) /* nothing */
+#define DDUMP(FORMAT, ...) \
+ do{ \
+ if(caml_debugging) \
+ DUMP(FORMAT, ##__VA_ARGS__); \
+ } while(0)
+
#define DUMPUNLESSMAIN(FORMAT, ...) \
do{ \
if(ctx->descriptor->kind != caml_global_context_main) \
@@ -1227,7 +1234,20 @@ extern __thread int caml_indentation_level;
#define USLEEP(LABEL, FLOAT_SECONDS) \
do { \
- DUMP("before usleep'ing for %.2f seconds (%s)", (double)(FLOAT_SECONDS), LABEL); usleep((long)(FLOAT_SECONDS * 1000.0 * 1000.0)); DUMP("after usleep'ing for %.2f seconds (%s)", (double)(FLOAT_SECONDS), LABEL); \
+ double __float_seconds = FLOAT_SECONDS; \
+ long __float_seconds_integer_part = (long)__float_seconds ; \
+ double __float_seconds_frational_part = __float_seconds - __float_seconds_integer_part; \
+ long __float_seconds_nanoSECOndS_only = (long)(__float_seconds_frational_part * 1e9); \
+ struct timespec __time_spEC_ = { (time_t)__float_seconds_integer_part, __float_seconds_nanoSECOndS_only }; \
+ struct timespec __remainINg_; \
+ DUMP("before nanosleep'ing for %li.%li seconds (%s)", (long)(__float_seconds_integer_part), (long)(__float_seconds_nanoSECOndS_only), LABEL); \
+ while(nanosleep(&__time_spEC_, &__remainINg_) != 0){ \
+ assert(errno == EINTR); \
+ errno = 0; \
+ DUMP("nanosleep was interrupted by a signal"); \
+ __time_spEC_ = __remainINg_; \
+ }; \
+ DUMP("after nanosleep'ing (%s)", LABEL); \
} while(0)
/* int caml_get_thread_no_r(CAML_R); */
@@ -1245,4 +1265,5 @@ void caml_set_caml_can_split_r(CAML_R, int (*caml_can_split_r)(CAML_R));
/* The one and only main context: */
CAMLextern caml_global_context *the_main_context;
+extern int caml_debugging; // !!!!!!!!!!!!!!!!!!!!!!!
#endif
@@ -4,6 +4,8 @@
#include <string.h>
#include <limits.h> // FIXME: remove if not used in the end
#include <assert.h> // FIXME: remove if not used in the end
+#include <pthread.h> // FIXME: remove if not used in the end
+#include <errno.h> // FIXME: remove if not used in the end
#define CAML_CONTEXT_ROOTS /* GC-protection macros */
#include "mlvalues.h"
@@ -394,12 +396,13 @@ static int caml_deserialize_and_run_in_this_thread(caml_global_context *parent_c
*to_context = ctx;
caml_install_globals_and_data_as_c_byte_array_r(ctx, blob, &function);
+ DUMP("Done with the blob: index=%i\n", index);
- /* We're done with the blob: unpin it via the semaphore, so that it
- can be destroyed when all split threads have deserialized. */
-//fprintf(stderr, "W5.5context %p] [thread %p] (index %i) EEEEEEEEEEEEEEEEEEEEEEEEEE\n", ctx, (void*)(pthread_self()), index); fflush(stderr); caml_release_global_lock();
- DUMP("About to V the semaphore. index=%i\n", index);
- sem_post(semaphore);
+/* /\* We're done with the blob: unpin it via the semaphore, so that it */
+/* can be destroyed when all split threads have deserialized. *\/ */
+/* //fprintf(stderr, "W5.5context %p] [thread %p] (index %i) EEEEEEEEEEEEEEEEEEEEEEEEEE\n", ctx, (void*)(pthread_self()), index); fflush(stderr); caml_release_global_lock(); */
+/* DUMP("About to V the semaphore. index=%i\n", index); */
+/* sem_post(semaphore); */
#ifndef NATIVE_CODE
DUMP();
@@ -412,6 +415,15 @@ static int caml_deserialize_and_run_in_this_thread(caml_global_context *parent_c
ctx->caml_main_argv = parent_context->caml_main_argv;
DUMP();
+ /* We're done with the blob: unpin it via the semaphore, so that it
+ can be destroyed when all split threads have deserialized. */
+//fprintf(stderr, "W5.5context %p] [thread %p] (index %i) EEEEEEEEEEEEEEEEEEEEEEEEEE\n", ctx, (void*)(pthread_self()), index); fflush(stderr); caml_release_global_lock();
+ USLEEP("before V'ing the semaphore.", 3);
+ DUMP("Slept. About to V the semaphore. index=%i\n", index);
+ //int sem_post_result = sem_post(semaphore);
+ //assert(sem_post_result == 0);
+ caml_v_semaphore(semaphore);
+
/* Now do the actual work, in a function which correctly GC-protects its locals: */
did_we_fail = caml_run_function_this_thread_r(ctx, function, index);
if(did_we_fail){
@@ -470,13 +482,20 @@ static void caml_split_and_wait_r(CAML_R, char *blob, caml_global_context **spli
caml_failwith_r(ctx, "pthread_create failed"); // FIXME: blob is leaked is this case. Maybe we should just make this a fatal error
} /* for */
/* Wait for the last thread to use the blob: */
- //DUMP("waiting for every thread to deserialize");
+ DUMP("waiting for all %i threads to deserialize", (int)how_many);
for(i = 0; i < how_many; i ++){
DUMP("about to P");
caml_enter_blocking_section_r(ctx);
- sem_wait(semaphore);
+ caml_p_semaphore(semaphore);
+ //int sem_wait_result = sem_wait(semaphore);
caml_leave_blocking_section_r(ctx);
- DUMP("one child finished; waiting for %i more", (int)(how_many - i - 1));
+ /* DUMP("right after P: sem_wait returned %i, errno is %i", sem_wait_result, (int)errno); */
+ /* DUMP("is errno EINTR? %i", errno == EINTR); */
+ /* DUMP("is errno EINVAL? %i", errno == EINVAL); */
+ /* DUMP("is errno EAGAIN? %i", errno == EAGAIN); */
+ /* DUMP("is errno ETIMEDOUT? %i", errno == ETIMEDOUT); */
+ /* assert(sem_wait_result == 0); // !!!!!!!!!!!!!!!!!!!!!!!!!!! */
+ DUMP("one child finished with the blob; waiting for %i more", (int)(how_many - i - 1));
}
DUMP("every thread has deserialized");
// !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
@@ -733,7 +752,9 @@ CAMLprim value caml_context_send_r(CAML_R, value receiver_mailbox_as_value, valu
//fprintf(stderr, "caml_context_send_r [%p, m %p]: OK-40 BEFORE UNLOCK; message_no is now %i\n", ctx, receiver_mailbox, (int)receiver_mailbox->message_no); fflush(stderr);
pthread_mutex_unlock(&receiver_mailbox->mutex);
//fprintf(stderr, "caml_context_send_r [%p, m %p]: OK-50 AFTER UNLOCK BEFORE V\n", ctx, receiver_mailbox); fflush(stderr);
- sem_post(&receiver_mailbox->message_no_semaphore);
+ //int sem_post_result = sem_post(&receiver_mailbox->message_no_semaphore);
+ //assert(sem_post_result == 0);
+ caml_v_semaphore(&receiver_mailbox->message_no_semaphore);
//fprintf(stderr, "caml_context_send_r [%p, m %p]: OK-60 AFTER V\n", ctx, receiver_mailbox); fflush(stderr);
//fprintf(stderr, "caml_context_send_r [%p, m %p]: OK-100\n", ctx, receiver_mailbox); fflush(stderr);
//fprintf(stderr, "caml_context_send_r [%p, m %p]: OK-100 END, message_no is %i\n", ctx, receiver_mailbox, (int)receiver_mailbox->message_no); fflush(stderr);
@@ -758,7 +779,9 @@ CAMLprim value caml_context_receive_r(CAML_R, value receiver_mailbox_as_value){
//fprintf(stderr, "caml_context_receive_r [%p, m %p]: OK-10 BEFORE P, message_no is %i\n", ctx, receiver_mailbox, (int)receiver_mailbox->message_no); fflush(stderr);
/* Wait until there is a message: */
caml_enter_blocking_section_r(ctx);
- sem_wait(&receiver_mailbox->message_no_semaphore);
+ //int sem_wait_result = sem_wait(&receiver_mailbox->message_no_semaphore);
+ //assert(sem_wait_result == 0);
+ caml_p_semaphore(&receiver_mailbox->message_no_semaphore);
caml_leave_blocking_section_r(ctx);
//fprintf(stderr, "RECEIVE: OK-2\n"); fflush(stderr);
View
@@ -111,9 +111,9 @@ extern struct custom_operations caml_int32_ops,
void caml_init_custom_operations(void)
{
- INIT_CAML_R; DUMP("Initializing file locking functions");
+ //INIT_CAML_R; DUMP("Initializing file locking functions");
caml_initialize_default_channel_mutex_functions();
- DUMP("Initializing custom operations, including channels");
+ //DUMP("Initializing custom operations, including channels");
caml_register_custom_operations(&caml_int32_ops);
caml_register_custom_operations(&caml_nativeint_ops);
caml_register_custom_operations(&caml_int64_ops);
View
@@ -34,6 +34,9 @@
CAMLexport void caml_raise_r(CAML_R, value v)
{
+ char *printed_exception = caml_format_exception_r(ctx, v);
+ DDUMP("raising an exception %s", printed_exception);
+ free(printed_exception);
Unlock_exn();
caml_exn_bucket = v;
if (caml_external_raise == NULL) caml_fatal_uncaught_exception_r(ctx, v);
View
@@ -17,6 +17,7 @@
/* Buffered input/output. */
+#include <assert.h> // !!!!!!!!!!!!!!
#include <stdio.h>
#include <errno.h>
#include <fcntl.h>
@@ -51,18 +52,24 @@
/* Hooks for locking channels */
/// Ugly and experimental: BEGIN --Luca Saiu REENTRANTRUNTIME
+static struct channel *the_currently_locked_channel = NULL;
static void caml_default_mutex_free(struct channel *c){
}
static void caml_default_mutex_lock(struct channel *c){
caml_acquire_channel_lock();
+ the_currently_locked_channel = c;
}
static void caml_default_mutex_unlock(struct channel *c){
+ the_currently_locked_channel = NULL;
caml_release_channel_lock();
//extern int already_initialized; if(already_initialized){ INIT_CAML_R; DUMP(); }
}
static void caml_default_mutex_unlock_exn(void){
-
+ if(the_currently_locked_channel != NULL){
+ INIT_CAML_R; DUMP("I have to unlock %p", the_currently_locked_channel);
+ caml_default_mutex_unlock(the_currently_locked_channel);
+ }
}
void caml_initialize_default_channel_mutex_functions(void){
caml_channel_mutex_free = caml_default_mutex_free;
@@ -165,6 +165,7 @@ static void thread_scan_roots(scanning_action action)
/// Ugly and experimental: BEGIN --Luca Saiu REENTRANTRUNTIME
static void caml_vmthreads_mutex_free(struct channel *c){
free(c->mutex);
+ INIT_CAML_R; DDUMP("destroying the channel at %p, with fd %i", c, c->fd);
}
value thread_yield_r(CAML_R, value unit);
static void caml_vmthreads_mutex_lock(struct channel *c){
@@ -178,30 +179,30 @@ caml_acquire_channel_lock();
}
caml_release_channel_lock();
INIT_CAML_R;
+ DDUMP("trying to lock channel %p with fd %i", c, c->fd);
int iteration_no = 0;
while(pthread_mutex_trylock(mutex_pointer) != 0){
iteration_no ++;
- //DUMP("Waiting for fd %i...", (int)c->fd);
thread_yield_r(ctx, Val_unit);
}
ctx->last_locked_channel = c;
- if(iteration_no > 0)
- DUMP("Got the channel with fd %i after %i attempts...", (int)c->fd, iteration_no);
- //caml_acquire_channel_lock();
+ //if(iteration_no > 0)
+ DDUMP("Got the channel with fd %i after %i failed attempts...", (int)c->fd, iteration_no);
}
static void caml_vmthreads_mutex_unlock(struct channel *c){
INIT_CAML_R;
ctx->last_locked_channel = NULL;
pthread_mutex_unlock(c->mutex);
- //caml_release_channel_lock();
- //extern int already_initialized; if(already_initialized){ INIT_CAML_R; DUMP(); }
+ DDUMP("unlocked channel %p with fd %i", c, c->fd);
}
static void caml_vmthreads_mutex_unlock_exn(void){
INIT_CAML_R;
struct channel *the_channel_to_unlock = ctx->last_locked_channel;
- DUMP("I have to unlock %p", the_channel_to_unlock);
- if(the_channel_to_unlock != NULL)
+ DDUMP("I have to unlock %p", the_channel_to_unlock);
+ if(the_channel_to_unlock != NULL){
caml_vmthreads_mutex_unlock(the_channel_to_unlock);
+ ctx->last_locked_channel = NULL;
+ }
}
/// Ugly and experimental: END --Luca Saiu REENTRANTRUNTIME
View
@@ -151,3 +151,6 @@ let split1 f =
(* FIXME: remove after debugging *)
external dump : string -> unit = "caml_dump_r" "reentrant"
+
+(* FIXME: remove after debugging *)
+external set_debugging : bool -> unit = "caml_set_debugging"
View
@@ -86,3 +86,7 @@ val global_index : 'a -> int
(* FIXME: remove after debugging *)
val dump : string -> unit
+
+(* FIXME: remove after debugging *)
+val set_debugging : bool -> unit
+

0 comments on commit 11901cb

Please sign in to comment.