Skip to content

Commit

Permalink
Integrate C-Thread-Pool v 430251c (2017-04)
Browse files Browse the repository at this point in the history
  • Loading branch information
nil0x42 committed Sep 4, 2020
1 parent 9f0a382 commit 2ce9349
Show file tree
Hide file tree
Showing 3 changed files with 236 additions and 193 deletions.
87 changes: 55 additions & 32 deletions include/thpool.h
@@ -1,15 +1,15 @@
/**********************************
/**********************************
* @author Johan Hanssen Seferidis
* License: MIT
*
*
**********************************/

#ifndef _THPOOL_
#define _THPOOL_




#ifdef __cplusplus
extern "C" {
#endif

/* =================================== API ======================================= */

Expand All @@ -19,17 +19,17 @@ typedef struct thpool_* threadpool;

/**
* @brief Initialize threadpool
*
*
* Initializes a threadpool. This function will not return untill all
* threads have initialized successfully.
*
*
* @example
*
*
* ..
* threadpool thpool; //First we declare a threadpool
* thpool = thpool_init(4); //then we initialize it to 4 threads
* ..
*
*
* @param num_threads number of threads to be created in the threadpool
* @return threadpool created threadpool on success,
* NULL on error
Expand All @@ -39,49 +39,49 @@ threadpool thpool_init(int num_threads);

/**
* @brief Add work to the job queue
*
*
* Takes an action and its argument and adds it to the threadpool's job queue.
* If you want to add to work a function with more than one arguments then
* a way to implement this is by passing a pointer to a structure.
*
*
* NOTICE: You have to cast both the function and argument to not get warnings.
*
*
* @example
*
*
* void print_num(int num){
* printf("%d\n", num);
* }
*
*
* int main() {
* ..
* int a = 10;
* thpool_add_work(thpool, (void*)print_num, (void*)a);
* ..
* }
*
*
* @param threadpool threadpool to which the work will be added
* @param function_p pointer to function to add as work
* @param arg_p pointer to an argument
* @return nothing
* @return 0 on successs, -1 otherwise.
*/
int thpool_add_work(threadpool, void *(*function_p)(void*), void* arg_p);
int thpool_add_work(threadpool, void (*function_p)(void*), void* arg_p);


/**
* @brief Wait for all queued jobs to finish
*
*
* Will wait for all jobs - both queued and currently running to finish.
* Once the queue is empty and all work has completed, the calling thread
* (probably the main program) will continue.
*
*
* Smart polling is used in wait. The polling is initially 0 - meaning that
* there is virtually no polling at all. If after 1 seconds the threads
* haven't finished, the polling interval starts growing exponentially
* haven't finished, the polling interval starts growing exponentially
* untill it reaches max_secs seconds. Then it jumps down to a maximum polling
* interval assuming that heavy processing is being used in the threadpool.
*
* @example
*
*
* ..
* threadpool thpool = thpool_init(4);
* ..
Expand All @@ -90,7 +90,7 @@ int thpool_add_work(threadpool, void *(*function_p)(void*), void* arg_p);
* thpool_wait(thpool);
* puts("All added work has finished");
* ..
*
*
* @param threadpool the threadpool to wait for
* @return nothing
*/
Expand All @@ -99,22 +99,22 @@ void thpool_wait(threadpool);

/**
* @brief Pauses all threads immediately
*
*
* The threads will be paused no matter if they are idle or working.
* The threads return to their previous states once thpool_resume
* is called.
*
*
* While the thread is being paused, new work can be added.
*
*
* @example
*
*
* threadpool thpool = thpool_init(4);
* thpool_pause(thpool);
* ..
* // Add a bunch of work
* ..
* thpool_resume(thpool); // Let the threads start their magic
*
*
* @param threadpool the threadpool where the threads should be paused
* @return nothing
*/
Expand All @@ -123,14 +123,14 @@ void thpool_pause(threadpool);

/**
* @brief Unpauses all threads if they are paused
*
*
* @example
* ..
* thpool_pause(thpool);
* sleep(10); // Delay execution 10 seconds
* thpool_resume(thpool);
* ..
*
*
* @param threadpool the threadpool where the threads should be unpaused
* @return nothing
*/
Expand All @@ -139,10 +139,10 @@ void thpool_resume(threadpool);

/**
* @brief Destroy the threadpool
*
*
* This will wait for the currently active threads to finish and then 'kill'
* the whole threadpool to free up memory.
*
*
* @example
* int main() {
* threadpool thpool1 = thpool_init(2);
Expand All @@ -152,13 +152,36 @@ void thpool_resume(threadpool);
* ..
* return 0;
* }
*
*
* @param threadpool the threadpool to destroy
* @return nothing
*/
void thpool_destroy(threadpool);


/**
* @brief Show currently working threads
*
* Working threads are the threads that are performing work (not idle).
*
* @example
* int main() {
* threadpool thpool1 = thpool_init(2);
* threadpool thpool2 = thpool_init(2);
* ..
* printf("Working threads: %d\n", thpool_num_threads_working(thpool1));
* ..
* return 0;
* }
*
* @param threadpool the threadpool of interest
* @return integer number of threads working
*/
int thpool_num_threads_working(threadpool);


#ifdef __cplusplus
}
#endif

#endif
15 changes: 12 additions & 3 deletions src/tag_duplicates.c
Expand Up @@ -12,6 +12,7 @@
#include "error.h"

#if MULTITHREADING == 1

/** For each chunk following `parent`, add a cleanout_chunk() worker.
*/
static void tag_subchunks(threadpool thpool, const t_chunk *parent)
Expand All @@ -27,7 +28,9 @@ static void tag_subchunks(threadpool thpool, const t_chunk *parent)
if (heap_chunk == NULL)
die("could not malloc() heap_chunk");
memcpy(heap_chunk, &chunk, sizeof(t_chunk));
thpool_add_work(thpool, (void*)cleanout_chunk, heap_chunk);
/* thpool already prints error unless DISABLE_PRINT is defined */
if (thpool_add_work(thpool, (void*)cleanout_chunk, heap_chunk) != 0)
exit(1);
chunk_id ++;
}
}
Expand All @@ -37,12 +40,16 @@ static void tag_subchunks(threadpool thpool, const t_chunk *parent)
*/
void tag_duplicates(void)
{
threadpool thpool = thpool_init(g_conf.threads);
threadpool thpool;
t_chunk main_chunk = {
.ptr = NULL,
.endptr = NULL
};

/* thpool already prints error unless DISABLE_PRINT is defined */
if ((thpool = thpool_init(g_conf.threads)) == NULL)
exit(1);

while (get_next_chunk(&main_chunk, g_file))
{
populate_hmap(&main_chunk);
Expand All @@ -54,7 +61,8 @@ void tag_duplicates(void)
}


#else
#else /* MULTITHREADING not defined */

/** Cleanout each chunk following the parent
*/
static void tag_subchunks(const t_chunk *parent)
Expand Down Expand Up @@ -94,4 +102,5 @@ void tag_duplicates(void)
update_status(CHUNK_DONE);
}
}

#endif

0 comments on commit 2ce9349

Please sign in to comment.