diff --git a/include/thpool.h b/include/thpool.h index ab3063b..04d5af2 100644 --- a/include/thpool.h +++ b/include/thpool.h @@ -1,15 +1,15 @@ -/********************************** +/********************************** * @author Johan Hanssen Seferidis * License: MIT - * + * **********************************/ #ifndef _THPOOL_ #define _THPOOL_ - - - +#ifdef __cplusplus +extern "C" { +#endif /* =================================== API ======================================= */ @@ -19,17 +19,17 @@ typedef struct thpool_* threadpool; /** * @brief Initialize threadpool - * + * * Initializes a threadpool. This function will not return untill all * threads have initialized successfully. - * + * * @example - * + * * .. * threadpool thpool; //First we declare a threadpool * thpool = thpool_init(4); //then we initialize it to 4 threads * .. - * + * * @param num_threads number of threads to be created in the threadpool * @return threadpool created threadpool on success, * NULL on error @@ -39,49 +39,49 @@ threadpool thpool_init(int num_threads); /** * @brief Add work to the job queue - * + * * Takes an action and its argument and adds it to the threadpool's job queue. * If you want to add to work a function with more than one arguments then * a way to implement this is by passing a pointer to a structure. - * + * * NOTICE: You have to cast both the function and argument to not get warnings. - * + * * @example - * + * * void print_num(int num){ * printf("%d\n", num); * } - * + * * int main() { * .. * int a = 10; * thpool_add_work(thpool, (void*)print_num, (void*)a); * .. * } - * + * * @param threadpool threadpool to which the work will be added * @param function_p pointer to function to add as work * @param arg_p pointer to an argument - * @return nothing + * @return 0 on successs, -1 otherwise. */ -int thpool_add_work(threadpool, void *(*function_p)(void*), void* arg_p); +int thpool_add_work(threadpool, void (*function_p)(void*), void* arg_p); /** * @brief Wait for all queued jobs to finish - * + * * Will wait for all jobs - both queued and currently running to finish. * Once the queue is empty and all work has completed, the calling thread * (probably the main program) will continue. - * + * * Smart polling is used in wait. The polling is initially 0 - meaning that * there is virtually no polling at all. If after 1 seconds the threads - * haven't finished, the polling interval starts growing exponentially + * haven't finished, the polling interval starts growing exponentially * untill it reaches max_secs seconds. Then it jumps down to a maximum polling * interval assuming that heavy processing is being used in the threadpool. * * @example - * + * * .. * threadpool thpool = thpool_init(4); * .. @@ -90,7 +90,7 @@ int thpool_add_work(threadpool, void *(*function_p)(void*), void* arg_p); * thpool_wait(thpool); * puts("All added work has finished"); * .. - * + * * @param threadpool the threadpool to wait for * @return nothing */ @@ -99,22 +99,22 @@ void thpool_wait(threadpool); /** * @brief Pauses all threads immediately - * + * * The threads will be paused no matter if they are idle or working. * The threads return to their previous states once thpool_resume * is called. - * + * * While the thread is being paused, new work can be added. - * + * * @example - * + * * threadpool thpool = thpool_init(4); * thpool_pause(thpool); * .. * // Add a bunch of work * .. * thpool_resume(thpool); // Let the threads start their magic - * + * * @param threadpool the threadpool where the threads should be paused * @return nothing */ @@ -123,14 +123,14 @@ void thpool_pause(threadpool); /** * @brief Unpauses all threads if they are paused - * + * * @example * .. * thpool_pause(thpool); * sleep(10); // Delay execution 10 seconds * thpool_resume(thpool); * .. - * + * * @param threadpool the threadpool where the threads should be unpaused * @return nothing */ @@ -139,10 +139,10 @@ void thpool_resume(threadpool); /** * @brief Destroy the threadpool - * + * * This will wait for the currently active threads to finish and then 'kill' * the whole threadpool to free up memory. - * + * * @example * int main() { * threadpool thpool1 = thpool_init(2); @@ -152,13 +152,36 @@ void thpool_resume(threadpool); * .. * return 0; * } - * + * * @param threadpool the threadpool to destroy * @return nothing */ void thpool_destroy(threadpool); +/** + * @brief Show currently working threads + * + * Working threads are the threads that are performing work (not idle). + * + * @example + * int main() { + * threadpool thpool1 = thpool_init(2); + * threadpool thpool2 = thpool_init(2); + * .. + * printf("Working threads: %d\n", thpool_num_threads_working(thpool1)); + * .. + * return 0; + * } + * + * @param threadpool the threadpool of interest + * @return integer number of threads working + */ +int thpool_num_threads_working(threadpool); + +#ifdef __cplusplus +} +#endif #endif diff --git a/src/tag_duplicates.c b/src/tag_duplicates.c index deb88eb..e322b06 100644 --- a/src/tag_duplicates.c +++ b/src/tag_duplicates.c @@ -12,6 +12,7 @@ #include "error.h" #if MULTITHREADING == 1 + /** For each chunk following `parent`, add a cleanout_chunk() worker. */ static void tag_subchunks(threadpool thpool, const t_chunk *parent) @@ -27,7 +28,9 @@ static void tag_subchunks(threadpool thpool, const t_chunk *parent) if (heap_chunk == NULL) die("could not malloc() heap_chunk"); memcpy(heap_chunk, &chunk, sizeof(t_chunk)); - thpool_add_work(thpool, (void*)cleanout_chunk, heap_chunk); + /* thpool already prints error unless DISABLE_PRINT is defined */ + if (thpool_add_work(thpool, (void*)cleanout_chunk, heap_chunk) != 0) + exit(1); chunk_id ++; } } @@ -37,12 +40,16 @@ static void tag_subchunks(threadpool thpool, const t_chunk *parent) */ void tag_duplicates(void) { - threadpool thpool = thpool_init(g_conf.threads); + threadpool thpool; t_chunk main_chunk = { .ptr = NULL, .endptr = NULL }; + /* thpool already prints error unless DISABLE_PRINT is defined */ + if ((thpool = thpool_init(g_conf.threads)) == NULL) + exit(1); + while (get_next_chunk(&main_chunk, g_file)) { populate_hmap(&main_chunk); @@ -54,7 +61,8 @@ void tag_duplicates(void) } -#else +#else /* MULTITHREADING not defined */ + /** Cleanout each chunk following the parent */ static void tag_subchunks(const t_chunk *parent) @@ -94,4 +102,5 @@ void tag_duplicates(void) update_status(CHUNK_DONE); } } + #endif diff --git a/src/thpool.c b/src/thpool.c index 2270087..f5696f0 100644 --- a/src/thpool.c +++ b/src/thpool.c @@ -5,31 +5,40 @@ * work. For usage, check the thpool.h file or README.md * *//** @file thpool.h *//* - * + * ********************************/ - +#define _POSIX_C_SOURCE 200809L #include #include #include #include -#include #include #include -#include +#include +#if defined(__linux__) +#include +#endif #include "thpool.h" -#define MAX_NANOSEC 999999999 -#define CEIL(X) ((X-(int)(X)) > 0 ? (int)(X+1) : (int)(X)) +#ifdef THPOOL_DEBUG +#define THPOOL_DEBUG 1 +#else +#define THPOOL_DEBUG 0 +#endif + +#if !defined(DISABLE_PRINT) || defined(THPOOL_DEBUG) +#define err(str) fprintf(stderr, str) +#else +#define err(str) +#endif static volatile int threads_keepalive; static volatile int threads_on_hold; - - /* ========================== STRUCTURES ============================ */ @@ -44,7 +53,7 @@ typedef struct bsem { /* Job */ typedef struct job{ struct job* prev; /* pointer to previous job */ - void* (*function)(void* arg); /* function pointer */ + void (*function)(void* arg); /* function pointer */ void* arg; /* function's argument */ } job; @@ -73,7 +82,8 @@ typedef struct thpool_{ volatile int num_threads_alive; /* threads currently alive */ volatile int num_threads_working; /* threads currently working */ pthread_mutex_t thcount_lock; /* used for thread count etc */ - jobqueue* jobqueue_p; /* pointer to the job queue */ + pthread_cond_t threads_all_idle; /* signal to thpool_wait */ + jobqueue jobqueue; /* job queue */ } thpool_; @@ -83,16 +93,16 @@ typedef struct thpool_{ /* ========================== PROTOTYPES ============================ */ -static void thread_init(thpool_* thpool_p, struct thread** thread_p, int id); +static int thread_init(thpool_* thpool_p, struct thread** thread_p, int id); static void* thread_do(struct thread* thread_p); -static void thread_hold(); +static void thread_hold(int sig_id); static void thread_destroy(struct thread* thread_p); -static int jobqueue_init(thpool_* thpool_p); -static void jobqueue_clear(thpool_* thpool_p); -static void jobqueue_push(thpool_* thpool_p, struct job* newjob_p); -static struct job* jobqueue_pull(thpool_* thpool_p); -static void jobqueue_destroy(thpool_* thpool_p); +static int jobqueue_init(jobqueue* jobqueue_p); +static void jobqueue_clear(jobqueue* jobqueue_p); +static void jobqueue_push(jobqueue* jobqueue_p, struct job* newjob_p); +static struct job* jobqueue_pull(jobqueue* jobqueue_p); +static void jobqueue_destroy(jobqueue* jobqueue_p); static void bsem_init(struct bsem *bsem_p, int value); static void bsem_reset(struct bsem *bsem_p); @@ -113,40 +123,48 @@ struct thpool_* thpool_init(int num_threads){ threads_on_hold = 0; threads_keepalive = 1; - if ( num_threads < 0){ + if (num_threads < 0){ num_threads = 0; } /* Make new thread pool */ thpool_* thpool_p; thpool_p = (struct thpool_*)malloc(sizeof(struct thpool_)); - if (thpool_p==NULL){ - fprintf(stderr, "thpool_init(): Could not allocate memory for thread pool\n"); - exit(1); + if (thpool_p == NULL){ + err("thpool_init(): Could not allocate memory for thread pool\n"); + return NULL; } thpool_p->num_threads_alive = 0; thpool_p->num_threads_working = 0; /* Initialise the job queue */ - if (jobqueue_init(thpool_p)==-1){ - fprintf(stderr, "thpool_init(): Could not allocate memory for job queue\n"); - exit(1); + if (jobqueue_init(&thpool_p->jobqueue) == -1){ + err("thpool_init(): Could not allocate memory for job queue\n"); + free(thpool_p); + return NULL; } /* Make threads in pool */ - thpool_p->threads = (struct thread**)malloc(num_threads * sizeof(struct thread)); - if (thpool_p->threads==NULL){ - fprintf(stderr, "thpool_init(): Could not allocate memory for threads\n"); - exit(1); + thpool_p->threads = (struct thread**)malloc(num_threads * sizeof(struct thread *)); + if (thpool_p->threads == NULL){ + err("thpool_init(): Could not allocate memory for threads\n"); + jobqueue_destroy(&thpool_p->jobqueue); + free(thpool_p); + return NULL; } - + + pthread_mutex_init(&(thpool_p->thcount_lock), NULL); + pthread_cond_init(&thpool_p->threads_all_idle, NULL); + /* Thread init */ int n; for (n=0; nthreads[n], n); - printf("Created thread %d in pool \n", n); +#if THPOOL_DEBUG + printf("THPOOL_DEBUG: Created thread %d in pool \n", n); +#endif } - + /* Wait for threads to initialize */ while (thpool_p->num_threads_alive != num_threads) {} @@ -155,12 +173,12 @@ struct thpool_* thpool_init(int num_threads){ /* Add work to the thread pool */ -int thpool_add_work(thpool_* thpool_p, void *(*function_p)(void*), void* arg_p){ +int thpool_add_work(thpool_* thpool_p, void (*function_p)(void*), void* arg_p){ job* newjob; newjob=(struct job*)malloc(sizeof(struct job)); if (newjob==NULL){ - fprintf(stderr, "thpool_add_work(): Could not allocate memory for new job\n"); + err("thpool_add_work(): Could not allocate memory for new job\n"); return -1; } @@ -169,9 +187,7 @@ int thpool_add_work(thpool_* thpool_p, void *(*function_p)(void*), void* arg_p){ newjob->arg=arg_p; /* add job to queue */ - pthread_mutex_lock(&thpool_p->jobqueue_p->rwmutex); - jobqueue_push(thpool_p, newjob); - pthread_mutex_unlock(&thpool_p->jobqueue_p->rwmutex); + jobqueue_push(&thpool_p->jobqueue, newjob); return 0; } @@ -179,78 +195,43 @@ int thpool_add_work(thpool_* thpool_p, void *(*function_p)(void*), void* arg_p){ /* Wait until all jobs have finished */ void thpool_wait(thpool_* thpool_p){ - - /* Continuous polling */ - double timeout = 1.0; - time_t start, end; - double tpassed = 0.0; - time (&start); - while (tpassed < timeout && - (thpool_p->jobqueue_p->len || thpool_p->num_threads_working)) - { - time (&end); - tpassed = difftime(end,start); - } - - /* Exponential polling */ - long init_nano = 1; /* MUST be above 0 */ - long new_nano; - double multiplier = 1.01; - int max_secs = 20; - - struct timespec polling_interval; - polling_interval.tv_sec = 0; - polling_interval.tv_nsec = init_nano; - - while (thpool_p->jobqueue_p->len || thpool_p->num_threads_working) - { - nanosleep(&polling_interval, NULL); - if ( polling_interval.tv_sec < max_secs ){ - new_nano = CEIL(polling_interval.tv_nsec * multiplier); - polling_interval.tv_nsec = new_nano % MAX_NANOSEC; - if ( new_nano > MAX_NANOSEC ) { - polling_interval.tv_sec ++; - } - } - else break; - } - - /* Fall back to max polling */ - while (thpool_p->jobqueue_p->len || thpool_p->num_threads_working){ - sleep(max_secs); + pthread_mutex_lock(&thpool_p->thcount_lock); + while (thpool_p->jobqueue.len || thpool_p->num_threads_working) { + pthread_cond_wait(&thpool_p->threads_all_idle, &thpool_p->thcount_lock); } + pthread_mutex_unlock(&thpool_p->thcount_lock); } /* Destroy the threadpool */ void thpool_destroy(thpool_* thpool_p){ - + /* No need to destory if it's NULL */ + if (thpool_p == NULL) return ; + volatile int threads_total = thpool_p->num_threads_alive; /* End each thread 's infinite loop */ threads_keepalive = 0; - + /* Give one second to kill idle threads */ double TIMEOUT = 1.0; time_t start, end; double tpassed = 0.0; time (&start); while (tpassed < TIMEOUT && thpool_p->num_threads_alive){ - bsem_post_all(thpool_p->jobqueue_p->has_jobs); + bsem_post_all(thpool_p->jobqueue.has_jobs); time (&end); tpassed = difftime(end,start); } - + /* Poll remaining threads */ while (thpool_p->num_threads_alive){ - bsem_post_all(thpool_p->jobqueue_p->has_jobs); + bsem_post_all(thpool_p->jobqueue.has_jobs); sleep(1); } /* Job queue cleanup */ - jobqueue_destroy(thpool_p); - free(thpool_p->jobqueue_p); - + jobqueue_destroy(&thpool_p->jobqueue); /* Deallocs */ int n; for (n=0; n < threads_total; n++){ @@ -272,10 +253,20 @@ void thpool_pause(thpool_* thpool_p) { /* Resume all threads in threadpool */ void thpool_resume(thpool_* thpool_p) { + // resuming a single threadpool hasn't been + // implemented yet, meanwhile this supresses + // the warnings + (void)thpool_p; + threads_on_hold = 0; } +int thpool_num_threads_working(thpool_* thpool_p){ + return thpool_p->num_threads_working; +} + + @@ -283,17 +274,17 @@ void thpool_resume(thpool_* thpool_p) { /* Initialize a thread in the thread pool - * + * * @param thread address to the pointer of the thread to be created * @param id id to be given to the thread - * + * @return 0 on success, -1 otherwise. */ -static void thread_init (thpool_* thpool_p, struct thread** thread_p, int id){ - +static int thread_init (thpool_* thpool_p, struct thread** thread_p, int id){ + *thread_p = (struct thread*)malloc(sizeof(struct thread)); if (thread_p == NULL){ - fprintf(stderr, "thpool_init(): Could not allocate memory for thread\n"); - exit(1); + err("thread_init(): Could not allocate memory for thread\n"); + return -1; } (*thread_p)->thpool_p = thpool_p; @@ -301,12 +292,13 @@ static void thread_init (thpool_* thpool_p, struct thread** thread_p, int id){ pthread_create(&(*thread_p)->pthread, NULL, (void *)thread_do, (*thread_p)); pthread_detach((*thread_p)->pthread); - + return 0; } /* Sets the calling thread on hold */ -static void thread_hold () { +static void thread_hold(int sig_id) { + (void)sig_id; threads_on_hold = 1; while (threads_on_hold){ sleep(1); @@ -315,25 +307,40 @@ static void thread_hold () { /* What each thread is doing -* +* * In principle this is an endless loop. The only time this loop gets interuppted is once * thpool_destroy() is invoked or the program exits. -* +* * @param thread thread that will run this function * @return nothing */ static void* thread_do(struct thread* thread_p){ + /* Set thread name for profiling and debuging */ + char thread_name[128] = {0}; + sprintf(thread_name, "thread-pool-%d", thread_p->id); + +#if defined(__linux__) + /* Use prctl instead to prevent using _GNU_SOURCE flag and implicit declaration */ + prctl(PR_SET_NAME, thread_name); +#elif defined(__APPLE__) && defined(__MACH__) + pthread_setname_np(thread_name); +#else + err("thread_do(): pthread_setname_np is not supported on this system"); +#endif + /* Assure all threads have been created before starting serving */ thpool_* thpool_p = thread_p->thpool_p; - + /* Register signal handler */ struct sigaction act; + sigemptyset(&act.sa_mask); + act.sa_flags = 0; act.sa_handler = thread_hold; if (sigaction(SIGUSR1, &act, NULL) == -1) { - fprintf(stderr, "thread_do(): cannot handle SIGUSR1"); + err("thread_do(): cannot handle SIGUSR1"); } - + /* Mark thread as alive (initialized) */ pthread_mutex_lock(&thpool_p->thcount_lock); thpool_p->num_threads_alive += 1; @@ -341,30 +348,30 @@ static void* thread_do(struct thread* thread_p){ while(threads_keepalive){ - bsem_wait(thpool_p->jobqueue_p->has_jobs); + bsem_wait(thpool_p->jobqueue.has_jobs); if (threads_keepalive){ - + pthread_mutex_lock(&thpool_p->thcount_lock); thpool_p->num_threads_working++; pthread_mutex_unlock(&thpool_p->thcount_lock); - + /* Read job from queue and execute it */ - void*(*func_buff)(void* arg); + void (*func_buff)(void*); void* arg_buff; - job* job_p; - pthread_mutex_lock(&thpool_p->jobqueue_p->rwmutex); - job_p = jobqueue_pull(thpool_p); - pthread_mutex_unlock(&thpool_p->jobqueue_p->rwmutex); + job* job_p = jobqueue_pull(&thpool_p->jobqueue); if (job_p) { func_buff = job_p->function; arg_buff = job_p->arg; func_buff(arg_buff); free(job_p); } - + pthread_mutex_lock(&thpool_p->thcount_lock); thpool_p->num_threads_working--; + if (!thpool_p->num_threads_working) { + pthread_cond_signal(&thpool_p->threads_all_idle); + } pthread_mutex_unlock(&thpool_p->thcount_lock); } @@ -390,102 +397,104 @@ static void thread_destroy (thread* thread_p){ /* Initialize queue */ -static int jobqueue_init(thpool_* thpool_p){ - thpool_p->jobqueue_p = (struct jobqueue*)malloc(sizeof(struct jobqueue)); - if (thpool_p->jobqueue_p == NULL){ - return -1; - } - memset(thpool_p->jobqueue_p, 0, sizeof(struct jobqueue)); - thpool_p->jobqueue_p->has_jobs = (struct bsem*)malloc(sizeof(struct bsem)); - if (thpool_p->jobqueue_p->has_jobs == NULL){ +static int jobqueue_init(jobqueue* jobqueue_p){ + jobqueue_p->len = 0; + jobqueue_p->front = NULL; + jobqueue_p->rear = NULL; + + jobqueue_p->has_jobs = (struct bsem*)malloc(sizeof(struct bsem)); + if (jobqueue_p->has_jobs == NULL){ return -1; } - memset(thpool_p->jobqueue_p->has_jobs, 0, sizeof(struct bsem)); - bsem_init(thpool_p->jobqueue_p->has_jobs, 0); - jobqueue_clear(thpool_p); + + pthread_mutex_init(&(jobqueue_p->rwmutex), NULL); + bsem_init(jobqueue_p->has_jobs, 0); + return 0; } /* Clear the queue */ -static void jobqueue_clear(thpool_* thpool_p){ +static void jobqueue_clear(jobqueue* jobqueue_p){ - while(thpool_p->jobqueue_p->len){ - free(jobqueue_pull(thpool_p)); + while(jobqueue_p->len){ + free(jobqueue_pull(jobqueue_p)); } - thpool_p->jobqueue_p->front = NULL; - thpool_p->jobqueue_p->rear = NULL; - bsem_reset(thpool_p->jobqueue_p->has_jobs); - thpool_p->jobqueue_p->len = 0; + jobqueue_p->front = NULL; + jobqueue_p->rear = NULL; + bsem_reset(jobqueue_p->has_jobs); + jobqueue_p->len = 0; } /* Add (allocated) job to queue - * - * Notice: Caller MUST hold a mutex */ -static void jobqueue_push(thpool_* thpool_p, struct job* newjob){ +static void jobqueue_push(jobqueue* jobqueue_p, struct job* newjob){ + pthread_mutex_lock(&jobqueue_p->rwmutex); newjob->prev = NULL; - switch(thpool_p->jobqueue_p->len){ + switch(jobqueue_p->len){ case 0: /* if no jobs in queue */ - thpool_p->jobqueue_p->front = newjob; - thpool_p->jobqueue_p->rear = newjob; + jobqueue_p->front = newjob; + jobqueue_p->rear = newjob; break; default: /* if jobs in queue */ - thpool_p->jobqueue_p->rear->prev = newjob; - thpool_p->jobqueue_p->rear = newjob; - + jobqueue_p->rear->prev = newjob; + jobqueue_p->rear = newjob; + } - thpool_p->jobqueue_p->len++; - - bsem_post(thpool_p->jobqueue_p->has_jobs); + jobqueue_p->len++; + + bsem_post(jobqueue_p->has_jobs); + pthread_mutex_unlock(&jobqueue_p->rwmutex); } /* Get first job from queue(removes it from queue) - * +<<<<<<< HEAD + * * Notice: Caller MUST hold a mutex +======= +>>>>>>> da2c0fe45e43ce0937f272c8cd2704bdc0afb490 */ -static struct job* jobqueue_pull(thpool_* thpool_p){ +static struct job* jobqueue_pull(jobqueue* jobqueue_p){ - job* job_p; - job_p = thpool_p->jobqueue_p->front; + pthread_mutex_lock(&jobqueue_p->rwmutex); + job* job_p = jobqueue_p->front; + + switch(jobqueue_p->len){ - switch(thpool_p->jobqueue_p->len){ - case 0: /* if no jobs in queue */ - return NULL; - + break; + case 1: /* if one job in queue */ - thpool_p->jobqueue_p->front = NULL; - thpool_p->jobqueue_p->rear = NULL; + jobqueue_p->front = NULL; + jobqueue_p->rear = NULL; + jobqueue_p->len = 0; break; - + default: /* if >1 jobs in queue */ - thpool_p->jobqueue_p->front = job_p->prev; - - } - thpool_p->jobqueue_p->len--; - - /* Make sure has_jobs has right value */ - if (thpool_p->jobqueue_p->len > 0) { - bsem_post(thpool_p->jobqueue_p->has_jobs); + jobqueue_p->front = job_p->prev; + jobqueue_p->len--; + /* more than one job in queue -> post it */ + bsem_post(jobqueue_p->has_jobs); + } + pthread_mutex_unlock(&jobqueue_p->rwmutex); return job_p; } /* Free all queue resources back to the system */ -static void jobqueue_destroy(thpool_* thpool_p){ - jobqueue_clear(thpool_p); - free(thpool_p->jobqueue_p->has_jobs); +static void jobqueue_destroy(jobqueue* jobqueue_p){ + jobqueue_clear(jobqueue_p); + free(jobqueue_p->has_jobs); } @@ -498,9 +507,11 @@ static void jobqueue_destroy(thpool_* thpool_p){ /* Init semaphore to 1 or 0 */ static void bsem_init(bsem *bsem_p, int value) { if (value < 0 || value > 1) { - fprintf(stderr, "bsem_init(): Binary semaphore can take only values 1 or 0"); + err("bsem_init(): Binary semaphore can take only values 1 or 0"); exit(1); } + pthread_mutex_init(&(bsem_p->mutex), NULL); + pthread_cond_init(&(bsem_p->cond), NULL); bsem_p->v = value; }