Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
added tasklets
  • Loading branch information
perexg committed Apr 24, 2015
1 parent c649ef2 commit c4d8b7a
Show file tree
Hide file tree
Showing 2 changed files with 109 additions and 4 deletions.
96 changes: 93 additions & 3 deletions src/main.c
Expand Up @@ -159,6 +159,7 @@ const tvh_caps_t tvheadend_capabilities[] = {
};

pthread_mutex_t global_lock;
pthread_mutex_t tasklet_lock;
pthread_mutex_t ffmpeg_lock;
pthread_mutex_t fork_lock;
pthread_mutex_t atomic_lock;
Expand All @@ -168,6 +169,9 @@ pthread_mutex_t atomic_lock;
*/
static LIST_HEAD(, gtimer) gtimers;
static pthread_cond_t gtimer_cond;
static TAILQ_HEAD(, tasklet) tasklets;
static pthread_cond_t tasklet_cond;
static pthread_t tasklet_tid;

static void
handle_sigpipe(int x)
Expand Down Expand Up @@ -248,8 +252,6 @@ gtimer_arm_abs2

LIST_INSERT_SORTED(&gtimers, gti, gti_link, gtimercmp);

//tvhdebug("gtimer", "%p @ %ld.%09ld", gti, when->tv_sec, when->tv_nsec);

if (LIST_FIRST(&gtimers) == gti)
pthread_cond_signal(&gtimer_cond); // force timer re-check
}
Expand Down Expand Up @@ -298,12 +300,90 @@ void
gtimer_disarm(gtimer_t *gti)
{
if(gti->gti_callback) {
//tvhdebug("gtimer", "%p disarm", gti);
LIST_REMOVE(gti, gti_link);
gti->gti_callback = NULL;
}
}

/**
*
*/
void
tasklet_arm(tasklet_t *tsk, tsk_callback_t *callback, void *opaque)
{
pthread_mutex_lock(&tasklet_lock);

if (tsk->tsk_callback != NULL)
TAILQ_REMOVE(&tasklets, tsk, tsk_link);

tsk->tsk_callback = callback;
tsk->tsk_opaque = opaque;

TAILQ_INSERT_TAIL(&tasklets, tsk, tsk_link);

if (TAILQ_FIRST(&tasklets) == tsk)
pthread_cond_signal(&tasklet_cond);

pthread_mutex_unlock(&tasklet_lock);
}

/**
*
*/
void
tasklet_disarm(tasklet_t *tsk)
{
pthread_mutex_lock(&tasklet_lock);

if(tsk->tsk_callback) {
TAILQ_REMOVE(&tasklets, tsk, tsk_link);
tsk->tsk_callback(tsk->tsk_opaque, 1);
tsk->tsk_callback = NULL;
}

pthread_mutex_unlock(&tasklet_lock);
}

static void
tasklet_flush()
{
tasklet_t *tsk;

pthread_mutex_lock(&tasklet_lock);

while ((tsk = TAILQ_FIRST(&tasklets)) != NULL) {
TAILQ_REMOVE(&tasklets, tsk, tsk_link);
tsk->tsk_callback(tsk->tsk_opaque, 1);
tsk->tsk_callback = NULL;
}

pthread_mutex_unlock(&tasklet_lock);
}

/**
*
*/
static void *
tasklet_thread ( void *aux )
{
tasklet_t *tsk;

pthread_mutex_lock(&tasklet_lock);
while (tvheadend_running) {
tsk = TAILQ_FIRST(&tasklets);
if (tsk == NULL) {
pthread_cond_wait(&tasklet_cond, &tasklet_lock);
continue;
}
if (tsk->tsk_callback)
tsk->tsk_callback(tsk->tsk_opaque, 0);
TAILQ_REMOVE(&tasklets, tsk, tsk_link);
}
pthread_mutex_unlock(&tasklet_lock);

return NULL;
}

/**
* Show version info
*/
Expand Down Expand Up @@ -454,8 +534,10 @@ main(int argc, char **argv)
pthread_mutex_init(&ffmpeg_lock, NULL);
pthread_mutex_init(&fork_lock, NULL);
pthread_mutex_init(&global_lock, NULL);
pthread_mutex_init(&tasklet_lock, NULL);
pthread_mutex_init(&atomic_lock, NULL);
pthread_cond_init(&gtimer_cond, NULL);
pthread_cond_init(&tasklet_cond, NULL);

/* Defaults */
tvheadend_webui_port = 9981;
Expand Down Expand Up @@ -823,6 +905,8 @@ main(int argc, char **argv)
* Initialize subsystems
*/

tvhthread_create(&tasklet_tid, NULL, tasklet_thread, NULL);

dbus_server_init(opt_dbus, opt_dbus_session);

intlconv_init();
Expand Down Expand Up @@ -973,6 +1057,12 @@ main(int argc, char **argv)
tvhftrace("main", idnode_done);
tvhftrace("main", spawn_done);

tvhtrace("main", "tasklet enter");
pthread_join(tasklet_tid, NULL);
tvhtrace("main", "tasklet thread end");
tasklet_flush();
tvhtrace("main", "tasklet leave");

tvhlog(LOG_NOTICE, "STOP", "Exiting HTS Tvheadend");
tvhlog_end();

Expand Down
17 changes: 16 additions & 1 deletion src/tvheadend.h
Expand Up @@ -150,7 +150,6 @@ typedef enum {
* global timer
*/


typedef void (gti_callback_t)(void *opaque);

typedef struct gtimer {
Expand All @@ -175,6 +174,22 @@ void gtimer_arm_abs2(gtimer_t *gti, gti_callback_t *callback, void *opaque,
void gtimer_disarm(gtimer_t *gti);


/*
* tasklet
*/

typedef void (tsk_callback_t)(void *opaque, int disarmed);

typedef struct tasklet {
TAILQ_ENTRY(tasklet) tsk_link;
tsk_callback_t *tsk_callback;
void *tsk_opaque;
} tasklet_t;

void tasklet_arm(tasklet_t *tsk, tsk_callback_t *callback, void *opaque);
void tasklet_disarm(tasklet_t *gti);


/*
* List / Queue header declarations
*/
Expand Down

0 comments on commit c4d8b7a

Please sign in to comment.