Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
subscription: add proper raw service cleanup, fixes instabilities
  • Loading branch information
perexg committed Oct 18, 2015
1 parent ecd402e commit bd55042
Show file tree
Hide file tree
Showing 3 changed files with 59 additions and 19 deletions.
46 changes: 39 additions & 7 deletions src/service.c
Expand Up @@ -55,6 +55,9 @@ static void service_class_save(struct idnode *self);

struct service_queue service_all;
struct service_queue service_raw_all;
struct service_queue service_raw_remove;

static gtimer_t service_raw_remove_timer;

static void
service_class_notify_enabled ( void *obj, const char *lang )
Expand Down Expand Up @@ -349,9 +352,6 @@ service_remove_subscriber(service_t *t, th_subscription_t *s,
} else {
subscription_unlink_service(s, reason);
}

if(LIST_FIRST(&t->s_subscriptions) == NULL)
service_stop(t);
}


Expand Down Expand Up @@ -811,8 +811,7 @@ service_destroy(service_t *t, int delconf)

idnode_unlink(&t->s_id);

if(t->s_status != SERVICE_IDLE)
service_stop(t);
assert(t->s_status == SERVICE_IDLE);

t->s_status = SERVICE_ZOMBIE;

Expand All @@ -824,14 +823,39 @@ service_destroy(service_t *t, int delconf)

avgstat_flush(&t->s_rate);

if (t->s_type == STYPE_RAW)
switch (t->s_type) {
case STYPE_RAW:
TAILQ_REMOVE(&service_raw_all, t, s_all_link);
else
break;
case STYPE_RAW_REMOVED:
TAILQ_REMOVE(&service_raw_remove, t, s_all_link);
break;
default:
TAILQ_REMOVE(&service_all, t, s_all_link);
}

service_unref(t);
}

static void
service_remove_raw_timer_cb(void *aux)
{
service_t *t;
while ((t = TAILQ_FIRST(&service_raw_remove)) != NULL)
service_destroy(t, 0);
}

void
service_remove_raw(service_t *t)
{
if (t == NULL) return;
assert(t->s_type == STYPE_RAW);
t->s_type = STYPE_RAW_REMOVED;
TAILQ_REMOVE(&service_raw_all, t, s_all_link);
TAILQ_INSERT_TAIL(&service_raw_remove, t, s_all_link);
gtimer_arm(&service_raw_remove_timer, service_remove_raw_timer_cb, NULL, 0);
}

void
service_set_enabled(service_t *t, int enabled, int _auto)
{
Expand Down Expand Up @@ -1393,6 +1417,7 @@ service_init(void)
TAILQ_INIT(&pending_save_queue);
TAILQ_INIT(&service_all);
TAILQ_INIT(&service_raw_all);
TAILQ_INIT(&service_raw_remove);
pthread_mutex_init(&pending_save_mutex, NULL);
pthread_cond_init(&pending_save_cond, NULL);
tvhthread_create(&service_saver_tid, NULL, service_saver, NULL, "service");
Expand All @@ -1401,8 +1426,15 @@ service_init(void)
void
service_done(void)
{
service_t *t;

pthread_cond_signal(&pending_save_cond);
pthread_join(service_saver_tid, NULL);

pthread_mutex_lock(&global_lock);
while ((t = TAILQ_FIRST(&service_raw_remove)) != NULL)
service_destroy(t, 0);
pthread_mutex_unlock(&global_lock);
}

/**
Expand Down
6 changes: 5 additions & 1 deletion src/service.h
Expand Up @@ -29,6 +29,7 @@ extern const idclass_t service_raw_class;

extern struct service_queue service_all;
extern struct service_queue service_raw_all;
extern struct service_queue service_raw_remove;

struct channel;
struct tvh_input;
Expand Down Expand Up @@ -240,7 +241,8 @@ typedef struct service {
*/
enum {
STYPE_STD,
STYPE_RAW
STYPE_RAW,
STYPE_RAW_REMOVED
} s_type;

/**
Expand Down Expand Up @@ -543,6 +545,8 @@ void service_set_enabled ( service_t *t, int enabled, int _auto );

void service_destroy(service_t *t, int delconf);

void service_remove_raw(service_t *);

void service_remove_subscriber(service_t *t, struct th_subscription *s,
int reason);

Expand Down
26 changes: 15 additions & 11 deletions src/subscriptions.c
Expand Up @@ -116,16 +116,14 @@ subscription_link_service(th_subscription_t *s, service_t *t)
/**
* Called from service code
*/
static void
static int
subscription_unlink_service0(th_subscription_t *s, int reason, int stop)
{
streaming_message_t *sm;
service_t *t = s->ths_service;
service_t *t = s->ths_service, *tr = s->ths_raw_service;

/* Ignore - not actually linked */
if (!s->ths_current_instance) return;

tvhtrace("subscription", "%04X: unlinking sub %p from svc %p", shortid(s), s, t);
if (!s->ths_current_instance) goto stop;

pthread_mutex_lock(&t->s_stream_mutex);

Expand All @@ -142,9 +140,19 @@ subscription_unlink_service0(th_subscription_t *s, int reason, int stop)

LIST_REMOVE(s, ths_service_link);
s->ths_service = NULL;
s->ths_raw_service = NULL;

if (stop && (s->ths_flags & SUBSCRIPTION_ONESHOT) != 0)
if (stop && (s->ths_flags & SUBSCRIPTION_ONESHOT) != 0) {
if (tr)
LIST_REMOVE(s, ths_mux_link);
subscription_unsubscribe(s, 0);
service_remove_raw(tr);
}

stop:
if(LIST_FIRST(&t->s_subscriptions) == NULL)
service_stop(t);
return 1;
}

void
Expand Down Expand Up @@ -329,11 +337,7 @@ subscription_reschedule(void)

subscription_unlink_service0(s, SM_CODE_BAD_SOURCE, 0);

if(t && LIST_FIRST(&t->s_subscriptions) == NULL)
service_stop(t);

si = s->ths_current_instance;

assert(si != NULL);
si->si_error = s->ths_testing_error;
time(&si->si_error_time);
Expand Down Expand Up @@ -597,7 +601,7 @@ subscription_unsubscribe(th_subscription_t *s, int quiet)

#if ENABLE_MPEGTS
if (s->ths_raw_service)
service_destroy(s->ths_raw_service, 0);
service_remove_raw(s->ths_raw_service);
#endif

streaming_msg_free(s->ths_start_message);
Expand Down

0 comments on commit bd55042

Please sign in to comment.