Skip to content

Commit

Permalink
logthrdestdrv: make the thread ivykis based.
Browse files Browse the repository at this point in the history
It is easier to add timer, task, event etc. to the threaded destination,
and this re-factorizing eliminates the locking and waiting for conditions.
And the throttle timeout handling is also added to logthreaded destination.

Make this code is more maintainable.

Signed-off-by: Juhász Viktor <viktor.juhasz@balabit.com>
  • Loading branch information
juhaszviktor committed Jul 10, 2014
1 parent c648541 commit 470beca
Show file tree
Hide file tree
Showing 2 changed files with 117 additions and 55 deletions.
157 changes: 110 additions & 47 deletions lib/logthrdestdrv.c
Original file line number Diff line number Diff line change
Expand Up @@ -27,65 +27,136 @@
void
log_threaded_dest_driver_suspend(LogThrDestDriver *self)
{
self->writer_thread_suspended = TRUE;
g_get_current_time(&self->writer_thread_suspend_target);
g_time_val_add(&self->writer_thread_suspend_target,
self->time_reopen * 1000000);
iv_validate_now();
self->timer_reopen.expires = iv_now;
self->timer_reopen.expires.tv_sec += self->time_reopen;
iv_timer_register(&self->timer_reopen);
}

static void
log_threaded_dest_driver_message_became_available_in_the_queue(gpointer user_data)
{
LogThrDestDriver *self = (LogThrDestDriver *) user_data;
iv_event_post(&self->wake_up_event);
}

g_mutex_lock(self->suspend_mutex);
g_cond_signal(self->writer_thread_wakeup_cond);
g_mutex_unlock(self->suspend_mutex);
static void
log_threaded_dest_driver_wake_up(gpointer data)
{
LogThrDestDriver *self = (LogThrDestDriver *)data;
if (!iv_task_registered(&self->do_work))
{
iv_task_register(&self->do_work);
}
}

static void
log_threaded_dest_driver_worker_thread_main(gpointer arg)
log_threaded_dest_driver_stop_watches(LogThrDestDriver* self)
{
LogThrDestDriver *self = (LogThrDestDriver *)arg;
if (iv_task_registered(&self->do_work))
{
iv_task_unregister(&self->do_work);
}
if (iv_timer_registered(&self->timer_reopen))
{
iv_timer_unregister(&self->timer_reopen);
}
if (iv_timer_registered(&self->timer_throttle))
{
iv_timer_unregister(&self->timer_throttle);
}
}

msg_debug("Worker thread started",
evt_tag_str("driver", self->super.super.id),
NULL);
static void
log_threaded_dest_driver_shutdown(gpointer data)
{
LogThrDestDriver *self = (LogThrDestDriver *)data;
log_threaded_dest_driver_stop_watches(self);
iv_quit();
}

if (self->worker.thread_init)
self->worker.thread_init(self);

while (!self->writer_thread_terminate)
static void
log_threaded_dest_driver_do_work(gpointer data)
{
LogThrDestDriver *self = (LogThrDestDriver *)data;
gint timeout_msec = 0;
log_threaded_dest_driver_stop_watches(self);
if (log_queue_check_items(self->queue, &timeout_msec,
log_threaded_dest_driver_message_became_available_in_the_queue,
self, NULL))
{
g_mutex_lock(self->suspend_mutex);
if (self->writer_thread_suspended)
{
g_cond_timed_wait(self->writer_thread_wakeup_cond,
self->suspend_mutex,
&self->writer_thread_suspend_target);
self->writer_thread_suspended = FALSE;
g_mutex_unlock(self->suspend_mutex);
}
else if (!log_queue_check_items(self->queue, NULL,
log_threaded_dest_driver_message_became_available_in_the_queue,
self, NULL))
{
g_cond_wait(self->writer_thread_wakeup_cond, self->suspend_mutex);
g_mutex_unlock(self->suspend_mutex);
}
else
g_mutex_unlock(self->suspend_mutex);

if (self->writer_thread_terminate)
break;

if (!self->worker.insert(self))
{
if (self->worker.disconnect)
self->worker.disconnect(self);
log_queue_reset_parallel_push(self->queue);
log_threaded_dest_driver_suspend(self);
}
else
{
iv_task_register(&self->do_work);
}
}
else if (timeout_msec != 0)
{
log_queue_reset_parallel_push(self->queue);
iv_validate_now();
self->timer_throttle.expires = iv_now;
timespec_add_msec(&self->timer_throttle.expires, timeout_msec);
iv_timer_register(&self->timer_throttle);
}
}

static void
log_threaded_dest_driver_init_watches(LogThrDestDriver* self)
{
IV_EVENT_INIT(&self->wake_up_event);
self->wake_up_event.cookie = self;
self->wake_up_event.handler = log_threaded_dest_driver_wake_up;
iv_event_register(&self->wake_up_event);

IV_EVENT_INIT(&self->shutdown_event);
self->shutdown_event.cookie = self;
self->shutdown_event.handler = log_threaded_dest_driver_shutdown;
iv_event_register(&self->shutdown_event);

IV_TIMER_INIT(&self->timer_reopen);
self->timer_reopen.cookie = self;
self->timer_reopen.handler = log_threaded_dest_driver_do_work;

IV_TIMER_INIT(&self->timer_throttle);
self->timer_throttle.cookie = self;
self->timer_throttle.handler = log_threaded_dest_driver_do_work;

IV_TASK_INIT(&self->do_work);
self->do_work.cookie = self;
self->do_work.handler = log_threaded_dest_driver_do_work;
}

static void
log_threaded_dest_driver_start_watches(LogThrDestDriver* self)
{
iv_task_register(&self->do_work);
}

static void
log_threaded_dest_driver_worker_thread_main(gpointer arg)
{
LogThrDestDriver *self = (LogThrDestDriver *)arg;

iv_init();

msg_debug("Worker thread started",
evt_tag_str("driver", self->super.super.id),
NULL);

if (self->worker.thread_init)
self->worker.thread_init(self);
log_threaded_dest_driver_init_watches(self);

log_threaded_dest_driver_start_watches(self);
iv_main();

if (self->worker.disconnect)
self->worker.disconnect(self);
Expand All @@ -96,17 +167,15 @@ log_threaded_dest_driver_worker_thread_main(gpointer arg)
msg_debug("Worker thread finished",
evt_tag_str("driver", self->super.super.id),
NULL);
iv_deinit();
}

static void
log_threaded_dest_driver_stop_thread(gpointer s)
{
LogThrDestDriver *self = (LogThrDestDriver *) s;

self->writer_thread_terminate = TRUE;
g_mutex_lock(self->suspend_mutex);
g_cond_signal(self->writer_thread_wakeup_cond);
g_mutex_unlock(self->suspend_mutex);
iv_event_post(&self->shutdown_event);
}

static void
Expand Down Expand Up @@ -181,9 +250,6 @@ log_threaded_dest_driver_free(LogPipe *s)
{
LogThrDestDriver *self = (LogThrDestDriver *)s;

g_mutex_free(self->suspend_mutex);
g_cond_free(self->writer_thread_wakeup_cond);

if (self->queue)
log_queue_unref(self->queue);

Expand Down Expand Up @@ -217,9 +283,6 @@ log_threaded_dest_driver_init_instance(LogThrDestDriver *self, GlobalConfig *cfg

self->worker_options.is_output_thread = TRUE;

self->writer_thread_wakeup_cond = g_cond_new();
self->suspend_mutex = g_mutex_new();

self->super.super.super.init = log_threaded_dest_driver_start;
self->super.super.super.deinit = log_threaded_dest_driver_deinit_method;
self->super.super.super.queue = log_threaded_dest_driver_queue;
Expand Down
15 changes: 7 additions & 8 deletions lib/logthrdestdrv.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@
#include "stats/stats-registry.h"
#include "logqueue.h"
#include "mainloop-worker.h"
#include <iv.h>
#include <iv_event.h>

typedef struct _LogThrDestDriver LogThrDestDriver;

Expand All @@ -42,14 +44,6 @@ struct _LogThrDestDriver

time_t time_reopen;

/* Thread related stuff; shared */
GMutex *suspend_mutex;
GCond *writer_thread_wakeup_cond;

gboolean writer_thread_terminate;
gboolean writer_thread_suspended;
GTimeVal writer_thread_suspend_target;

LogQueue *queue;

/* Worker stuff */
Expand All @@ -70,6 +64,11 @@ struct _LogThrDestDriver

void (*queue_method) (LogThrDestDriver *s);
WorkerOptions worker_options;
struct iv_event wake_up_event;
struct iv_event shutdown_event;
struct iv_timer timer_reopen;
struct iv_timer timer_throttle;
struct iv_task do_work;
};

gboolean log_threaded_dest_driver_deinit_method(LogPipe *s);
Expand Down

0 comments on commit 470beca

Please sign in to comment.