Skip to content

Commit

Permalink
Batch vcd work item creation
Browse files Browse the repository at this point in the history
Rather then lock/unlock the work queue ring for every item, save
tons of pthread lock manipulation by allocating to the producer
in batches. Over the long run, this doesn't change the CPU balance
or hold up either thread, but it eliminates almost 3/4 of the
lock/unlock episodes.
  • Loading branch information
steveicarus committed Jan 9, 2010
1 parent 76ebde4 commit 7fc6b02
Show file tree
Hide file tree
Showing 2 changed files with 73 additions and 35 deletions.
5 changes: 0 additions & 5 deletions vpi/vcd_priv.h
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,6 @@ typedef enum vcd_work_item_type_e {

struct lxt2_wr_symbol;

# define VAL_CHAR_ARRAY_SIZE 64
struct vcd_work_item_s {
vcd_work_item_type_t type;
uint64_t time;
Expand All @@ -89,11 +88,7 @@ struct vcd_work_item_s {

union {
double val_double;
#ifdef VAL_CHAR_ARRAY_SIZE
char val_char[VAL_CHAR_ARRAY_SIZE];
#else
char*val_char;
#endif
} op_;
};

Expand Down
103 changes: 73 additions & 30 deletions vpi/vcd_priv2.cc
Original file line number Diff line number Diff line change
Expand Up @@ -83,16 +83,18 @@ extern "C" void vcd_scope_names_delete(void)
static pthread_t work_thread;

static const unsigned WORK_QUEUE_SIZE = 128*1024;
static const unsigned WORK_QUEUE_BATCH_MIN = 4*1024;
static const unsigned WORK_QUEUE_BATCH_MAX = 32*1024;

static struct vcd_work_item_s work_queue[WORK_QUEUE_SIZE];
static volatile unsigned work_queue_next = 0;
static volatile unsigned work_queue_fill = 0;

static pthread_mutex_t work_queue_mutex = PTHREAD_MUTEX_INITIALIZER;
static pthread_cond_t work_queue_is_empty_sig = PTHREAD_COND_INITIALIZER;
static pthread_cond_t work_queue_notempty_sig = PTHREAD_COND_INITIALIZER;
static pthread_cond_t work_queue_notfull_sig = PTHREAD_COND_INITIALIZER;
static pthread_cond_t work_queue_minfree_sig = PTHREAD_COND_INITIALIZER;

static uint64_t work_queue_next_time = 0;

struct vcd_work_item_s* vcd_work_thread_peek(void)
{
Expand All @@ -119,47 +121,64 @@ void vcd_work_thread_pop(void)
work_queue_fill = use_fill;

unsigned use_next = work_queue_next;
#ifndef VAL_CHAR_ARRAY_SIZE

struct vcd_work_item_s*cell = work_queue + use_next;
if (cell->type == WT_EMIT_BITS) {
free(cell->op_.val_char);
}
#endif

use_next += 1;
if (use_next >= WORK_QUEUE_SIZE)
use_next = 0;
work_queue_next = use_next;

if (use_fill == WORK_QUEUE_SIZE-1)
pthread_cond_signal(&work_queue_notfull_sig);
if (use_fill == WORK_QUEUE_SIZE-WORK_QUEUE_BATCH_MIN)
pthread_cond_signal(&work_queue_minfree_sig);
else if (use_fill == 0)
pthread_cond_signal(&work_queue_is_empty_sig);

pthread_mutex_unlock(&work_queue_mutex);
}

/*
* Work queue items are created in batches to reduce thread
* bouncing. When the producer gets a free work item, it actually
* locks the queue in order to produce a batch. The queue stays locked
* until the batch is complete. Then the releases the whole lot to the
* consumer.
*/
static uint64_t work_queue_next_time = 0;
static unsigned current_batch_cnt = 0;
static unsigned current_batch_alloc = 0;
static unsigned current_batch_base = 0;

void vcd_work_start( void* (*fun) (void*), void*arg )
{
pthread_create(&work_thread, 0, fun, arg);
}

void vcd_work_sync(void)
static struct vcd_work_item_s* grab_item(void)
{
if (work_queue_fill > 0) {
pthread_mutex_lock(&work_queue_mutex);
while (work_queue_fill > 0)
pthread_cond_wait(&work_queue_is_empty_sig, &work_queue_mutex);
pthread_mutex_unlock(&work_queue_mutex);
if (current_batch_alloc == 0) {
pthread_mutex_lock(&work_queue_mutex);
while ((WORK_QUEUE_SIZE-work_queue_fill) < WORK_QUEUE_BATCH_MIN)
pthread_cond_wait(&work_queue_minfree_sig, &work_queue_mutex);

current_batch_base = work_queue_next + work_queue_fill;
current_batch_alloc = WORK_QUEUE_SIZE - work_queue_fill;

pthread_mutex_unlock(&work_queue_mutex);

if (current_batch_base >= WORK_QUEUE_SIZE)
current_batch_base -= WORK_QUEUE_SIZE;
if (current_batch_alloc > WORK_QUEUE_BATCH_MAX)
current_batch_alloc = WORK_QUEUE_BATCH_MAX;
current_batch_cnt = 0;
}
}

static struct vcd_work_item_s* grab_item(void)
{
pthread_mutex_lock(&work_queue_mutex);
while (work_queue_fill >= WORK_QUEUE_SIZE)
pthread_cond_wait(&work_queue_notfull_sig, &work_queue_mutex);
assert(current_batch_cnt < current_batch_alloc);

unsigned cur = work_queue_next + work_queue_fill;
unsigned cur = current_batch_base + current_batch_cnt;
if (cur >= WORK_QUEUE_SIZE)
cur -= WORK_QUEUE_SIZE;

Expand All @@ -169,21 +188,50 @@ static struct vcd_work_item_s* grab_item(void)
return cell;
}

static void unlock_item(void)
static void end_batch(void)
{
unsigned use_fill = work_queue_fill + 1;
pthread_mutex_lock(&work_queue_mutex);

unsigned use_fill = work_queue_fill;
bool was_empty_flag = (use_fill==0) && (current_batch_cnt > 0);

use_fill += current_batch_cnt;
work_queue_fill = use_fill;
if (use_fill == 1)

current_batch_alloc = 0;
current_batch_cnt = 0;

if (was_empty_flag)
pthread_cond_signal(&work_queue_notempty_sig);

pthread_mutex_unlock(&work_queue_mutex);
}

static inline void unlock_item(bool flush_batch =false)
{
current_batch_cnt += 1;
if (current_batch_cnt == current_batch_alloc || flush_batch)
end_batch();
}

void vcd_work_sync(void)
{
if (current_batch_alloc > 0)
end_batch();

if (work_queue_fill > 0) {
pthread_mutex_lock(&work_queue_mutex);
while (work_queue_fill > 0)
pthread_cond_wait(&work_queue_is_empty_sig, &work_queue_mutex);
pthread_mutex_unlock(&work_queue_mutex);
}
}

void vcd_work_flush(void)
{
struct vcd_work_item_s*cell = grab_item();
cell->type = WT_FLUSH;
unlock_item();
unlock_item(true);
}

void vcd_work_dumpon(void)
Expand Down Expand Up @@ -220,20 +268,15 @@ void vcd_work_emit_bits(struct lxt2_wr_symbol*sym, const char* val)
struct vcd_work_item_s*cell = grab_item();
cell->type = WT_EMIT_BITS;
cell->sym_.lxt2 = sym;
#ifdef VAL_CHAR_ARRAY_SIZE
size_t need_len = strlen(val) + 1;
assert(need_len <= VAL_CHAR_ARRAY_SIZE);
memcpy(cell->op_.val_char, val, need_len);
#else
cell->op_.val_char = strdup(val);
#endif

unlock_item();
}

void vcd_work_terminate(void)
{
struct vcd_work_item_s*cell = grab_item();
cell->type = WT_TERMINATE;
unlock_item();
unlock_item(true);
pthread_join(work_thread, 0);
}

0 comments on commit 7fc6b02

Please sign in to comment.