Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
subscription: another try to protect removed services
  • Loading branch information
perexg committed Nov 8, 2015
1 parent a3f4048 commit 5fe5abb
Show file tree
Hide file tree
Showing 9 changed files with 52 additions and 32 deletions.
2 changes: 1 addition & 1 deletion src/dvr/dvr_rec.c
Expand Up @@ -171,7 +171,7 @@ dvr_rec_unsubscribe(dvr_entry_t *de)

pthread_join(de->de_thread, NULL);

subscription_unsubscribe(de->de_s, 0);
subscription_unsubscribe(de->de_s, UNSUBSCRIBE_FINAL);
de->de_s = NULL;

de->de_chain = NULL;
Expand Down
2 changes: 1 addition & 1 deletion src/htsp_server.c
Expand Up @@ -342,7 +342,7 @@ htsp_subscription_destroy(htsp_connection_t *htsp, htsp_subscription_t *hs)
LIST_REMOVE(hs, hs_link);
LIST_INSERT_HEAD(&htsp->htsp_dead_subscriptions, hs, hs_link);

subscription_unsubscribe(hs->hs_s, 0);
subscription_unsubscribe(hs->hs_s, UNSUBSCRIBE_FINAL);

if(hs->hs_prch.prch_st != NULL)
profile_chain_close(&hs->hs_prch);
Expand Down
4 changes: 2 additions & 2 deletions src/input/mpegts/mpegts_mux.c
Expand Up @@ -210,7 +210,7 @@ mpegts_mux_unsubscribe_linked
ths_next = LIST_NEXT(ths, ths_global_link);
if (ths->ths_source == (tvh_input_t *)mi && !strcmp(ths->ths_title, "keep") &&
ths->ths_service != t)
subscription_unsubscribe(ths, 0);
subscription_unsubscribe(ths, UNSUBSCRIBE_FINAL);
}
}
}
Expand Down Expand Up @@ -1256,7 +1256,7 @@ mpegts_mux_unsubscribe_by_name
n = LIST_NEXT(s, ths_mux_link);
t = s->ths_service;
if (t && t->s_type == STYPE_RAW && !strcmp(s->ths_title, name))
subscription_unsubscribe(s, 0);
subscription_unsubscribe(s, UNSUBSCRIBE_FINAL);
s = n;
}
}
Expand Down
6 changes: 3 additions & 3 deletions src/input/mpegts/mpegts_mux_sched.c
Expand Up @@ -39,7 +39,7 @@ mpegts_mux_sched_set_timer ( mpegts_mux_sched_t *mms )
/* Upate timer */
if (!mms->mms_enabled) {
if (mms->mms_sub)
subscription_unsubscribe(mms->mms_sub, 0);
subscription_unsubscribe(mms->mms_sub, UNSUBSCRIBE_FINAL);
mms->mms_sub = NULL;
mms->mms_active = 0;
gtimer_disarm(&mms->mms_timer);
Expand Down Expand Up @@ -232,7 +232,7 @@ mpegts_mux_sched_timer ( void *p )
/* Cancel sub */
} else {
if (mms->mms_sub) {
subscription_unsubscribe(mms->mms_sub, 0);
subscription_unsubscribe(mms->mms_sub, UNSUBSCRIBE_FINAL);
mms->mms_sub = NULL;
}
mms->mms_active = 0;
Expand Down Expand Up @@ -310,7 +310,7 @@ mpegts_mux_sched_delete ( mpegts_mux_sched_t *mms, int delconf )
if (delconf)
hts_settings_remove("muxsched/%s", idnode_uuid_as_sstr(&mms->mms_id));
if (mms->mms_sub)
subscription_unsubscribe(mms->mms_sub, 0);
subscription_unsubscribe(mms->mms_sub, UNSUBSCRIBE_FINAL);
gtimer_disarm(&mms->mms_timer);
idnode_unlink(&mms->mms_id);
free(mms->mms_cronstr);
Expand Down
4 changes: 2 additions & 2 deletions src/satip/rtsp.c
Expand Up @@ -335,7 +335,7 @@ rtsp_slave_remove
rs->frontend, rs->session, rs->stream, slave->s_nicename);
master->s_unlink(master, slave);
if (sub->ths)
subscription_unsubscribe(sub->ths, 0);
subscription_unsubscribe(sub->ths, UNSUBSCRIBE_FINAL);
if (sub->prch.prch_id)
profile_chain_close(&sub->prch);
LIST_REMOVE(sub, link);
Expand All @@ -358,7 +358,7 @@ rtsp_clean(session_t *rs)
while ((sub = LIST_FIRST(&rs->slaves)) != NULL)
rtsp_slave_remove(rs, (mpegts_service_t *)rs->subs->ths_raw_service,
sub->service);
subscription_unsubscribe(rs->subs, 0);
subscription_unsubscribe(rs->subs, UNSUBSCRIBE_FINAL);
rs->subs = NULL;
}
if (rs->prch.prch_id)
Expand Down
2 changes: 1 addition & 1 deletion src/service_mapper.c
Expand Up @@ -382,7 +382,7 @@ service_mapper_thread ( void *aux )
pthread_mutex_unlock(&sq->sq_mutex);

pthread_mutex_lock(&global_lock);
subscription_unsubscribe(sub, 0);
subscription_unsubscribe(sub, UNSUBSCRIBE_FINAL);

if(err) {
tvhinfo("service_mapper", "%s: failed [err %s]", s->s_nicename, err);
Expand Down
50 changes: 33 additions & 17 deletions src/subscriptions.c
Expand Up @@ -145,7 +145,7 @@ subscription_unlink_service0(th_subscription_t *s, int reason, int stop)

LIST_REMOVE(s, ths_service_link);

if (stop && s-(s->ths_flags & SUBSCRIPTION_ONESHOT) != 0)
if (stop && (s->ths_flags & SUBSCRIPTION_ONESHOT) != 0)
gtimer_arm(&s->ths_remove_timer, subscription_unsubscribe_cb, s, 0);

stop:
Expand Down Expand Up @@ -554,11 +554,28 @@ subscription_input(void *opauqe, streaming_message_t *sm)
static void
subscription_unsubscribe_cb(void *aux)
{
subscription_unsubscribe((th_subscription_t *)aux, 0);
subscription_unsubscribe((th_subscription_t *)aux, UNSUBSCRIBE_FINAL);
}

static void
subscription_destroy(th_subscription_t *s)
{
streaming_msg_free(s->ths_start_message);

if(s->ths_output->st_cb == subscription_input_null)
free(s->ths_output);

free(s->ths_title);
free(s->ths_hostname);
free(s->ths_username);
free(s->ths_client);
free(s->ths_dvrfile);
free(s);

}

void
subscription_unsubscribe(th_subscription_t *s, int quiet)
subscription_unsubscribe(th_subscription_t *s, int flags)
{
service_t *t;
char buf[512];
Expand All @@ -573,7 +590,13 @@ subscription_unsubscribe(th_subscription_t *s, int quiet)
t = s->ths_service;
raw = s->ths_raw_service;

assert(s->ths_state != SUBSCRIPTION_ZOMBIE);
if (s->ths_state == SUBSCRIPTION_ZOMBIE) {
if ((flags & UNSUBSCRIBE_FINAL) != 0) {
subscription_destroy(s);
return;
}
abort();
}
s->ths_state = SUBSCRIPTION_ZOMBIE;

service_instance_list_clear(&s->ths_instances);
Expand Down Expand Up @@ -602,24 +625,17 @@ subscription_unsubscribe(th_subscription_t *s, int quiet)
tvh_strlcatf(buf, sizeof(buf), l, ", username=\"%s\"", s->ths_username);
if (s->ths_client)
tvh_strlcatf(buf, sizeof(buf), l, ", client=\"%s\"", s->ths_client);
tvhlog(quiet ? LOG_TRACE : LOG_INFO, "subscription", "%04X: %s", shortid(s), buf);
tvhlog((flags & UNSUBSCRIBE_QUIET) != 0 ? LOG_TRACE : LOG_INFO,
"subscription", "%04X: %s", shortid(s), buf);

if (t)
service_remove_subscriber(t, s, SM_CODE_OK);

gtimer_disarm(&s->ths_remove_timer);

streaming_msg_free(s->ths_start_message);

if(s->ths_output->st_cb == subscription_input_null)
free(s->ths_output);

free(s->ths_title);
free(s->ths_hostname);
free(s->ths_username);
free(s->ths_client);
free(s->ths_dvrfile);
free(s);
if ((flags & UNSUBSCRIBE_FINAL) != 0 ||
(s->ths_flags & SUBSCRIPTION_ONESHOT) != 0)
subscription_destroy(s);

gtimer_arm(&subscription_reschedule_timer,
subscription_reschedule_cb, NULL, 0);
Expand Down Expand Up @@ -763,7 +779,7 @@ subscription_create_from_channel_or_service(profile_chain_t *prch,

if (flags & SUBSCRIPTION_ONESHOT) {
if ((si = subscription_start_instance(s, error)) == NULL) {
subscription_unsubscribe(s, 1);
subscription_unsubscribe(s, UNSUBSCRIBE_QUIET | UNSUBSCRIBE_FINAL);
return NULL;
}
subscription_link_service(s, si->si_s);
Expand Down
6 changes: 5 additions & 1 deletion src/subscriptions.h
Expand Up @@ -53,6 +53,10 @@ extern struct th_subscription_list subscriptions;
#define SUBSCRIPTION_PRIO_MAPPER 7 ///< Channel mapper
#define SUBSCRIPTION_PRIO_MIN 10 ///< User defined / Normal levels

/* Unsubscribe flags */
#define UNSUBSCRIBE_QUIET 0x01
#define UNSUBSCRIBE_FINAL 0x02

typedef struct th_subscription {

int ths_id;
Expand Down Expand Up @@ -144,7 +148,7 @@ void subscription_init(void);

void subscription_done(void);

void subscription_unsubscribe(th_subscription_t *s, int quiet);
void subscription_unsubscribe(th_subscription_t *s, int flags);

void subscription_set_weight(th_subscription_t *s, unsigned int weight);

Expand Down
8 changes: 4 additions & 4 deletions src/webui/webui.c
Expand Up @@ -1087,7 +1087,7 @@ http_stream_service(http_connection_t *hc, service_t *service, int weight)
pthread_mutex_unlock(&global_lock);
http_stream_run(hc, &prch, name, s);
pthread_mutex_lock(&global_lock);
subscription_unsubscribe(s, 0);
subscription_unsubscribe(s, UNSUBSCRIBE_FINAL);
res = 0;
}
}
Expand Down Expand Up @@ -1167,7 +1167,7 @@ http_stream_mux(http_connection_t *hc, mpegts_mux_t *mm, int weight)
http_stream_run(hc, &prch, name, s);
pthread_mutex_lock(&global_lock);
}
subscription_unsubscribe(s, 0);
subscription_unsubscribe(s, UNSUBSCRIBE_FINAL);
res = 0;
}
}
Expand Down Expand Up @@ -1226,7 +1226,7 @@ http_stream_channel(http_connection_t *hc, channel_t *ch, int weight)
pthread_mutex_unlock(&global_lock);
http_stream_run(hc, &prch, name, s);
pthread_mutex_lock(&global_lock);
subscription_unsubscribe(s, 0);
subscription_unsubscribe(s, UNSUBSCRIBE_FINAL);
res = 0;
}
}
Expand Down Expand Up @@ -1629,7 +1629,7 @@ page_dvrfile(http_connection_t *hc, const char *remain, void *opaque)

pthread_mutex_lock(&global_lock);
if (sub)
subscription_unsubscribe(sub, 0);
subscription_unsubscribe(sub, UNSUBSCRIBE_FINAL);
http_stream_postop(tcp_id);
pthread_mutex_unlock(&global_lock);
return ret;
Expand Down

0 comments on commit 5fe5abb

Please sign in to comment.