Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
subscription: add ONESHOT subscription type for mux subs
  • Loading branch information
perexg committed Mar 11, 2015
1 parent 20d5219 commit 38b541e
Show file tree
Hide file tree
Showing 11 changed files with 101 additions and 59 deletions.
4 changes: 2 additions & 2 deletions src/dvr/dvr_rec.c
Expand Up @@ -87,7 +87,7 @@ dvr_rec_subscribe(dvr_entry_t *de)

de->de_s = subscription_create_from_channel(prch, NULL, weight,
buf, prch->prch_flags,
NULL, NULL, NULL);
NULL, NULL, NULL, NULL);
if (de->de_s == NULL) {
tvherror("dvr", "unable to create new channel subcription for '%s'",
channel_get_name(de->de_channel));
Expand Down Expand Up @@ -117,7 +117,7 @@ dvr_rec_unsubscribe(dvr_entry_t *de, int stopcode)

pthread_join(de->de_thread, NULL);

subscription_unsubscribe(de->de_s);
subscription_unsubscribe(de->de_s, 0);
de->de_s = NULL;

de->de_chain = NULL;
Expand Down
2 changes: 1 addition & 1 deletion src/epggrab/otamux.c
Expand Up @@ -547,7 +547,7 @@ epggrab_ota_kick_cb ( void *p )
/* Subscribe to the mux */
om->om_requeue = 1;
if ((r = mpegts_mux_subscribe(mm, NULL, "epggrab", SUBSCRIPTION_PRIO_EPG,
SUBSCRIPTION_EPG))) {
SUBSCRIPTION_EPG | SUBSCRIPTION_ONESHOT))) {
TAILQ_INSERT_TAIL(&epggrab_ota_pending, om, om_q_link);
om->om_q_type = EPGGRAB_OTA_MUX_PENDING;
if (r == SM_CODE_NO_FREE_ADAPTER)
Expand Down
5 changes: 3 additions & 2 deletions src/htsp_server.c
Expand Up @@ -330,7 +330,7 @@ htsp_subscription_destroy(htsp_connection_t *htsp, htsp_subscription_t *hs)
LIST_REMOVE(hs, hs_link);
LIST_INSERT_HEAD(&htsp->htsp_dead_subscriptions, hs, hs_link);

subscription_unsubscribe(hs->hs_s);
subscription_unsubscribe(hs->hs_s, 0);

if(hs->hs_prch.prch_st != NULL)
profile_chain_close(&hs->hs_prch);
Expand Down Expand Up @@ -2028,7 +2028,8 @@ htsp_method_subscribe(htsp_connection_t *htsp, htsmsg_t *in)
SUBSCRIPTION_STREAMING,
htsp->htsp_peername,
htsp->htsp_username,
htsp->htsp_clientname);
htsp->htsp_clientname,
NULL);
return NULL;
}

Expand Down
7 changes: 4 additions & 3 deletions src/input/mpegts/mpegts_mux.c
Expand Up @@ -1130,13 +1130,14 @@ mpegts_mux_subscribe
{
profile_chain_t prch;
th_subscription_t *s;
int err = 0;
memset(&prch, 0, sizeof(prch));
prch.prch_id = mm;
s = subscription_create_from_mux(&prch, (tvh_input_t *)mi,
weight, name,
SUBSCRIPTION_NONE | flags,
NULL, NULL, NULL);
return s ? 0 : -EIO;
NULL, NULL, NULL, &err);
return s ? 0 : (err ? err : SM_CODE_UNDEFINED_ERROR);
}

void
Expand All @@ -1153,7 +1154,7 @@ mpegts_mux_unsubscribe_by_name
n = LIST_NEXT(s, ths_global_link);
t = s->ths_service;
if (t && t->s_type == STYPE_RAW && !strcmp(s->ths_title, name))
subscription_unsubscribe(s);
subscription_unsubscribe(s, 0);
s = n;
}
}
Expand Down
8 changes: 4 additions & 4 deletions src/input/mpegts/mpegts_mux_sched.c
Expand Up @@ -39,7 +39,7 @@ mpegts_mux_sched_set_timer ( mpegts_mux_sched_t *mms )
/* Upate timer */
if (!mms->mms_enabled) {
if (mms->mms_sub)
subscription_unsubscribe(mms->mms_sub);
subscription_unsubscribe(mms->mms_sub, 0);
mms->mms_sub = NULL;
mms->mms_active = 0;
gtimer_disarm(&mms->mms_timer);
Expand Down Expand Up @@ -214,7 +214,7 @@ mpegts_mux_sched_timer ( void *p )
= subscription_create_from_mux(mms->mms_prch, NULL, mms->mms_weight,
mms->mms_creator ?: "",
SUBSCRIPTION_NONE,
NULL, NULL, NULL);
NULL, NULL, NULL, NULL);

/* Failed (try-again soon) */
if (!mms->mms_sub) {
Expand All @@ -232,7 +232,7 @@ mpegts_mux_sched_timer ( void *p )
/* Cancel sub */
} else {
if (mms->mms_sub) {
subscription_unsubscribe(mms->mms_sub);
subscription_unsubscribe(mms->mms_sub, 0);
mms->mms_sub = NULL;
}
mms->mms_active = 0;
Expand Down Expand Up @@ -310,7 +310,7 @@ mpegts_mux_sched_delete ( mpegts_mux_sched_t *mms, int delconf )
if (delconf)
hts_settings_remove("muxsched/%s", idnode_uuid_as_str(&mms->mms_id));
if (mms->mms_sub)
subscription_unsubscribe(mms->mms_sub);
subscription_unsubscribe(mms->mms_sub, 0);
gtimer_disarm(&mms->mms_timer);
idnode_unlink(&mms->mms_id);
free(mms->mms_cronstr);
Expand Down
3 changes: 2 additions & 1 deletion src/input/mpegts/mpegts_network_scan.c
Expand Up @@ -53,7 +53,8 @@ mpegts_network_scan_timer_cb ( void *p )
if (mm->mm_active) continue;

/* Attempt to tune */
r = mpegts_mux_subscribe(mm, NULL, "scan", mm->mm_scan_weight, mm->mm_scan_flags);
r = mpegts_mux_subscribe(mm, NULL, "scan", mm->mm_scan_weight,
mm->mm_scan_flags | SUBSCRIPTION_ONESHOT);

/* Started */
if (!r) {
Expand Down
5 changes: 3 additions & 2 deletions src/satip/rtsp.c
Expand Up @@ -226,7 +226,7 @@ static void
rtsp_clean(session_t *rs)
{
if (rs->subs) {
subscription_unsubscribe(rs->subs);
subscription_unsubscribe(rs->subs, 0);
rs->subs = NULL;
}
if (rs->prch.prch_pro)
Expand Down Expand Up @@ -293,7 +293,8 @@ rtsp_start
rs->prch.prch_flags |
SUBSCRIPTION_STREAMING,
addrbuf, hc->hc_username,
http_arg_get(&hc->hc_args, "User-Agent"));
http_arg_get(&hc->hc_args, "User-Agent"),
NULL);
if (!rs->subs)
goto endrtp;
if (rs->run) {
Expand Down
5 changes: 3 additions & 2 deletions src/service_mapper.c
Expand Up @@ -377,7 +377,8 @@ service_mapper_thread ( void *aux )
prch.prch_id = s;
sub = subscription_create_from_service(&prch, NULL, SUBSCRIPTION_PRIO_MAPPER,
"service_mapper",
0, NULL, NULL, "service_mapper");
0, NULL, NULL, "service_mapper",
NULL);

/* Failed */
if (!sub) {
Expand Down Expand Up @@ -433,7 +434,7 @@ service_mapper_thread ( void *aux )
pthread_mutex_unlock(&sq->sq_mutex);

pthread_mutex_lock(&global_lock);
subscription_unsubscribe(sub);
subscription_unsubscribe(sub, 0);

if(err) {
tvhinfo("service_mapper", "%s: failed [err %s]", s->s_nicename, err);
Expand Down
84 changes: 57 additions & 27 deletions src/subscriptions.c
Expand Up @@ -219,6 +219,29 @@ subscription_reschedule_cb(void *aux)
subscription_reschedule();
}

/**
*
*/
static service_instance_t *
subscription_start_instance
(th_subscription_t *s, int *error)
{
service_instance_t *si;

if (s->ths_channel)
tvhtrace("subscription", "%04X: find service for %s weight %d",
shortid(s), channel_get_name(s->ths_channel), s->ths_weight);
else
tvhtrace("subscription", "%04X: find instance for %s weight %d",
shortid(s), s->ths_service->s_nicename, s->ths_weight);
si = service_find_instance(s->ths_service, s->ths_channel,
s->ths_source,
&s->ths_instances, error, s->ths_weight,
s->ths_flags, s->ths_timeout,
dispatch_clock > s->ths_postpone_end ?
0 : s->ths_postpone_end - dispatch_clock);
return s->ths_current_instance = si;
}

/**
*
Expand Down Expand Up @@ -278,18 +301,7 @@ subscription_reschedule(void)
}

error = s->ths_testing_error;
if (s->ths_channel)
tvhtrace("subscription", "%04X: find service for %s weight %d",
shortid(s), channel_get_name(s->ths_channel), s->ths_weight);
else
tvhtrace("subscription", "%04X: find instance for %s weight %d",
shortid(s), s->ths_service->s_nicename, s->ths_weight);
si = service_find_instance(s->ths_service, s->ths_channel,
s->ths_source,
&s->ths_instances, &error, s->ths_weight,
s->ths_flags, s->ths_timeout,
dispatch_clock > s->ths_postpone_end ?
0 : s->ths_postpone_end - dispatch_clock);
si = subscription_start_instance(s, &error);
s->ths_current_instance = si;

if(si == NULL) {
Expand Down Expand Up @@ -325,7 +337,7 @@ subscription_reschedule(void)

while ((s = LIST_FIRST(&subscriptions_remove))) {
LIST_REMOVE(s, ths_remove_link);
subscription_unsubscribe(s);
subscription_unsubscribe(s, 0);
}

if (postpone <= 0 || postpone == INT_MAX)
Expand Down Expand Up @@ -487,7 +499,7 @@ subscription_input(void *opauqe, streaming_message_t *sm)
* Delete
*/
void
subscription_unsubscribe(th_subscription_t *s)
subscription_unsubscribe(th_subscription_t *s, int quiet)
{
service_t *t = s->ths_service;
char buf[512];
Expand Down Expand Up @@ -517,7 +529,7 @@ subscription_unsubscribe(th_subscription_t *s)
s->ths_username ?: "<N/A>",
s->ths_client ?: "<N/A>");
}
tvhlog(LOG_INFO, "subscription", "%04X: %s", shortid(s), buf);
tvhlog(quiet ? LOG_TRACE : LOG_INFO, "subscription", "%04X: %s", shortid(s), buf);

if (t) {
service_remove_subscriber(t, s, SM_CODE_OK);
Expand Down Expand Up @@ -621,6 +633,7 @@ subscription_create_from_channel_or_service(profile_chain_t *prch,
const char *hostname,
const char *username,
const char *client,
int *error,
service_t *service)
{
th_subscription_t *s;
Expand All @@ -629,6 +642,9 @@ subscription_create_from_channel_or_service(profile_chain_t *prch,
assert(prch);
assert(prch->prch_id);

if (error)
*error = 0;

if (!service)
ch = prch->prch_id;

Expand All @@ -640,7 +656,7 @@ subscription_create_from_channel_or_service(profile_chain_t *prch,
tvhtrace("subscription", "%04X: creating subscription for %s weight %d using profile %s",
shortid(s), channel_get_name(ch), weight, pro_name);
else
tvhtrace("subscription", "%04X: creating subscription for service %s weight %d sing profile %s",
tvhtrace("subscription", "%04X: creating subscription for service %s weight %d using profile %s",
shortid(s), service->s_nicename, weight, pro_name);
#endif
s->ths_channel = ch;
Expand All @@ -656,7 +672,14 @@ subscription_create_from_channel_or_service(profile_chain_t *prch,
}
#endif

subscription_reschedule();
if (flags & SUBSCRIPTION_ONESHOT) {
if (subscription_start_instance(s, error) == NULL) {
subscription_unsubscribe(s, 1);
return NULL;
}
} else {
subscription_reschedule();
}
return s;
}

Expand All @@ -665,12 +688,16 @@ subscription_create_from_channel(profile_chain_t *prch,
tvh_input_t *ti,
unsigned int weight,
const char *name,
int flags, const char *hostname,
const char *username, const char *client)
int flags,
const char *hostname,
const char *username,
const char *client,
int *error)
{
assert(prch->prch_st);
return subscription_create_from_channel_or_service
(prch, ti, weight, name, flags, hostname, username, client, NULL);
(prch, ti, weight, name, flags, hostname, username, client,
error, NULL);
}

/**
Expand All @@ -682,13 +709,15 @@ subscription_create_from_service(profile_chain_t *prch,
unsigned int weight,
const char *name,
int flags,
const char *hostname, const char *username,
const char *client)
const char *hostname,
const char *username,
const char *client,
int *error)
{
assert(prch->prch_st);
return subscription_create_from_channel_or_service
(prch, ti, weight, name, flags, hostname, username, client,
prch->prch_id);
error, prch->prch_id);
}

/**
Expand All @@ -704,7 +733,8 @@ subscription_create_from_mux(profile_chain_t *prch,
int flags,
const char *hostname,
const char *username,
const char *client)
const char *client,
int *error)
{
mpegts_mux_t *mm = prch->prch_id;
mpegts_service_t *s = mpegts_service_create_raw(mm);
Expand All @@ -714,7 +744,7 @@ subscription_create_from_mux(profile_chain_t *prch,

return subscription_create_from_channel_or_service
(prch, ti, weight, name, flags, hostname, username, client,
(service_t *)s);
error, (service_t *)s);
}
#endif

Expand Down Expand Up @@ -833,7 +863,7 @@ subscription_done(void)

pthread_mutex_lock(&global_lock);
LIST_FOREACH(s, &subscriptions, ths_global_link)
subscription_unsubscribe(s);
subscription_unsubscribe(s, 0);
/* clear remaining subscriptions */
subscription_reschedule();
pthread_mutex_unlock(&global_lock);
Expand Down Expand Up @@ -972,7 +1002,7 @@ subscription_dummy_join(const char *id, int first)
st = calloc(1, sizeof(*st));
streaming_target_init(st, dummy_callback, NULL, 0);
prch->prch_st = st;
s = subscription_create_from_service(prch, NULL, 1, "dummy", 0, NULL, NULL, "dummy");
s = subscription_create_from_service(prch, NULL, 1, "dummy", 0, NULL, NULL, "dummy", NULL);

tvhlog(LOG_NOTICE, "subscription",
"%04X: Dummy join %s ok", shortid(s), id);
Expand Down

0 comments on commit 38b541e

Please sign in to comment.