Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
streaming profile: allow configuratible timeout and restart, fixes #2368
Note: If restart is enabled, tvh will keep trying to subscribe any service
from the channel's list until unsubscribed.
  • Loading branch information
perexg committed Oct 14, 2014
1 parent cc27626 commit 6b9d0d1
Show file tree
Hide file tree
Showing 15 changed files with 183 additions and 111 deletions.
21 changes: 20 additions & 1 deletion src/config.c
Expand Up @@ -1081,6 +1081,24 @@ config_migrate_v14 ( void )
}
}

static void
config_migrate_v15 ( void )
{
htsmsg_t *c, *e;
htsmsg_field_t *f;
int i;

if ((c = hts_settings_load("profile")) != NULL) {
HTSMSG_FOREACH(f, c) {
if (!(e = htsmsg_field_get_map(f))) continue;
if (htsmsg_get_s32(e, "timeout", &i)) {
htsmsg_set_s32(e, "timeout", 5);
hts_settings_save(e, "profile/%s", f->hmf_name);
}
}
}
}

/*
* Perform backup
*/
Expand Down Expand Up @@ -1183,7 +1201,8 @@ static const config_migrate_t config_migrate_table[] = {
config_migrate_v11,
config_migrate_v12,
config_migrate_v13,
config_migrate_v14
config_migrate_v14,
config_migrate_v15
};

/*
Expand Down
2 changes: 1 addition & 1 deletion src/dvr/dvr_rec.c
Expand Up @@ -84,7 +84,7 @@ dvr_rec_subscribe(dvr_entry_t *de)
return;
}

de->de_s = subscription_create_from_channel(de->de_channel, weight,
de->de_s = subscription_create_from_channel(de->de_channel, pro, weight,
buf, prch->prch_st,
prch->prch_flags,
NULL, NULL, NULL);
Expand Down
28 changes: 4 additions & 24 deletions src/htsmsg.c
Expand Up @@ -196,45 +196,25 @@ htsmsg_add_bool(htsmsg_t *msg, const char *name, int b)
*
*/
void
htsmsg_add_u32(htsmsg_t *msg, const char *name, uint32_t u32)
htsmsg_add_s64(htsmsg_t *msg, const char *name, int64_t s64)
{
htsmsg_field_t *f = htsmsg_field_add(msg, name, HMF_S64, HMF_NAME_ALLOCED);
f->hmf_s64 = u32;
f->hmf_s64 = s64;
}

/*
*
*/
int
htsmsg_set_u32(htsmsg_t *msg, const char *name, uint32_t u32)
htsmsg_set_s64(htsmsg_t *msg, const char *name, int64_t s64)
{
htsmsg_field_t *f = htsmsg_field_find(msg, name);
if (!f)
f = htsmsg_field_add(msg, name, HMF_S64, HMF_NAME_ALLOCED);
if (f->hmf_type != HMF_S64)
return 1;
f->hmf_s64 = u32;
return 0;
}

/*
*
*/
void
htsmsg_add_s64(htsmsg_t *msg, const char *name, int64_t s64)
{
htsmsg_field_t *f = htsmsg_field_add(msg, name, HMF_S64, HMF_NAME_ALLOCED);
f->hmf_s64 = s64;
}

/*
*
*/
void
htsmsg_add_s32(htsmsg_t *msg, const char *name, int32_t s32)
{
htsmsg_field_t *f = htsmsg_field_add(msg, name, HMF_S64, HMF_NAME_ALLOCED);
f->hmf_s64 = s32;
return 0;
}


Expand Down
28 changes: 23 additions & 5 deletions src/htsmsg.h
Expand Up @@ -110,25 +110,43 @@ void htsmsg_destroy(htsmsg_t *msg);

void htsmsg_add_bool(htsmsg_t *msg, const char *name, int b);

/**
* Add an integer field where source is signed 64 bit.
*/
void htsmsg_add_s64(htsmsg_t *msg, const char *name, int64_t s64);

/**
* Add/update an integer field where source is signed 64 bit.
*/
int htsmsg_set_s64(htsmsg_t *msg, const char *name, int64_t s64);

/**
* Add an integer field where source is unsigned 32 bit.
*/
void htsmsg_add_u32(htsmsg_t *msg, const char *name, uint32_t u32);
static inline void
htsmsg_add_u32(htsmsg_t *msg, const char *name, uint32_t u32)
{ htsmsg_add_s64(msg, name, u32); }

/**
* Add/update an integer field
*/
int htsmsg_set_u32(htsmsg_t *msg, const char *name, uint32_t u32);
static inline int
htsmsg_set_u32(htsmsg_t *msg, const char *name, uint32_t u32)
{ return htsmsg_set_s64(msg, name, u32); }

/**
* Add an integer field where source is signed 32 bit.
*/
void htsmsg_add_s32(htsmsg_t *msg, const char *name, int32_t s32);
static inline void
htsmsg_add_s32(htsmsg_t *msg, const char *name, int32_t s32)
{ htsmsg_add_s64(msg, name, s32); }

/**
* Add an integer field where source is signed 64 bit.
* Add/update an integer field
*/
void htsmsg_add_s64(htsmsg_t *msg, const char *name, int64_t s64);
static inline int
htsmsg_set_s32(htsmsg_t *msg, const char *name, int32_t s32)
{ return htsmsg_set_s64(msg, name, s32); }

/**
* Add a string field.
Expand Down
4 changes: 3 additions & 1 deletion src/htsp_server.c
Expand Up @@ -1792,13 +1792,15 @@ htsp_method_subscribe(htsp_connection_t *htsp, htsmsg_t *in)
st = hs->hs_work;
normts = 1;
}
#else
profile_t *pro = NULL;
#endif

if(normts)
st = hs->hs_tsfix = tsfix_create(st);

tvhdebug("htsp", "%s - subscribe to %s\n", htsp->htsp_logname, ch->ch_name ?: "");
hs->hs_s = subscription_create_from_channel(ch, weight,
hs->hs_s = subscription_create_from_channel(ch, pro, weight,
htsp->htsp_logname,
st,
SUBSCRIPTION_STREAMING,
Expand Down
2 changes: 1 addition & 1 deletion src/input/mpegts/mpegts_mux.c
Expand Up @@ -1048,7 +1048,7 @@ mpegts_mux_subscribe
{
int err = 0;
th_subscription_t *s;
s = subscription_create_from_mux(mm, weight, name, NULL,
s = subscription_create_from_mux(mm, NULL, weight, name, NULL,
SUBSCRIPTION_NONE,
NULL, NULL, NULL, &err);
return s ? 0 : err;
Expand Down
2 changes: 1 addition & 1 deletion src/input/mpegts/mpegts_mux_sched.c
Expand Up @@ -205,7 +205,7 @@ mpegts_mux_sched_timer ( void *p )
assert(mms->mms_sub == NULL);

mms->mms_sub
= subscription_create_from_mux(mm, mms->mms_weight,
= subscription_create_from_mux(mm, NULL, mms->mms_weight,
mms->mms_creator ?: "",
&mms->mms_input,
SUBSCRIPTION_NONE,
Expand Down
14 changes: 14 additions & 0 deletions src/profile.c
Expand Up @@ -259,6 +259,20 @@ const idclass_t profile_class =
.name = "Comment",
.off = offsetof(profile_t, pro_comment),
},
{
.type = PT_INT,
.id = "timeout",
.name = "Timeout (sec)",
.off = offsetof(profile_t, pro_timeout),
.def.i = 5,
},
{
.type = PT_BOOL,
.id = "restart",
.name = "Restart On Error",
.off = offsetof(profile_t, pro_restart),
.def.i = 0,
},
{ }
}
};
Expand Down
2 changes: 2 additions & 0 deletions src/profile.h
Expand Up @@ -69,6 +69,8 @@ typedef struct profile {
int pro_shield;
char *pro_name;
char *pro_comment;
int pro_timeout;
int pro_restart;

void (*pro_free)(struct profile *pro);
void (*pro_conf_changed)(struct profile *pro);
Expand Down
23 changes: 12 additions & 11 deletions src/service.c
Expand Up @@ -592,10 +592,10 @@ service_build_filter(service_t *t)
*
*/
int
service_start(service_t *t, int instance, int postpone)
service_start(service_t *t, int instance, int timeout, int postpone)
{
elementary_stream_t *st;
int r, timeout = 10;
int r, stimeout = 10;

lock_assert(&global_lock);

Expand Down Expand Up @@ -631,11 +631,13 @@ service_start(service_t *t, int instance, int postpone)
pthread_mutex_unlock(&t->s_stream_mutex);

if(t->s_grace_period != NULL)
timeout = t->s_grace_period(t);
stimeout = t->s_grace_period(t);

timeout += postpone;
t->s_grace_delay = timeout;
gtimer_arm(&t->s_receive_timer, service_data_timeout, t, timeout);
stimeout += postpone;
t->s_timeout = timeout;
t->s_grace_delay = stimeout;
if (stimeout > 0)
gtimer_arm(&t->s_receive_timer, service_data_timeout, t, stimeout);
return 0;
}

Expand All @@ -646,7 +648,7 @@ service_start(service_t *t, int instance, int postpone)
service_instance_t *
service_find_instance
(service_t *s, channel_t *ch, service_instance_list_t *sil,
int *error, int weight, int flags, int postpone)
int *error, int weight, int flags, int timeout, int postpone)
{
channel_service_mapping_t *csm;
service_instance_t *si, *next;
Expand Down Expand Up @@ -723,7 +725,7 @@ service_find_instance

/* Start */
tvhtrace("service", "will start new instance %d", si->si_instance);
if (service_start(si->si_s, si->si_instance, postpone)) {
if (service_start(si->si_s, si->si_instance, timeout, postpone)) {
tvhtrace("service", "tuning failed");
si->si_error = SM_CODE_TUNING_FAILED;
if (*error < SM_CODE_TUNING_FAILED)
Expand Down Expand Up @@ -1020,7 +1022,8 @@ service_data_timeout(void *aux)

pthread_mutex_unlock(&t->s_stream_mutex);

gtimer_arm(&t->s_receive_timer, service_data_timeout, t, 5);
if (t->s_timeout > 0)
gtimer_arm(&t->s_receive_timer, service_data_timeout, t, t->s_timeout);
}

/**
Expand Down Expand Up @@ -1485,7 +1488,6 @@ service_instance_add(service_instance_list_t *sil,
return si;
}


/**
*
*/
Expand All @@ -1498,7 +1500,6 @@ service_instance_destroy
free(si);
}


/**
*
*/
Expand Down
6 changes: 4 additions & 2 deletions src/service.h
Expand Up @@ -341,6 +341,7 @@ typedef struct service {
/**
* Stream start time
*/
int s_timeout;
int s_grace_delay;
time_t s_start_time;

Expand Down Expand Up @@ -448,7 +449,7 @@ typedef struct service {
void service_init(void);
void service_done(void);

int service_start(service_t *t, int instance, int postpone);
int service_start(service_t *t, int instance, int timeout, int postpone);
void service_stop(service_t *t);

void service_build_filter(service_t *t);
Expand All @@ -470,7 +471,8 @@ service_instance_t *service_find_instance(struct service *s,
struct channel *ch,
service_instance_list_t *sil,
int *error, int weight,
int flags, int postpone);
int flags, int timeout,
int postpone);

elementary_stream_t *service_stream_find_(service_t *t, int pid);

Expand Down
3 changes: 2 additions & 1 deletion src/service_mapper.c
Expand Up @@ -30,6 +30,7 @@
#include "service_mapper.h"
#include "streaming.h"
#include "service.h"
#include "profile.h"
#include "api.h"

static service_mapper_status_t service_mapper_stat;
Expand Down Expand Up @@ -361,7 +362,7 @@ service_mapper_thread ( void *aux )

/* Subscribe */
tvhinfo("service_mapper", "checking %s", s->s_nicename);
sub = subscription_create_from_service(s, SUBSCRIPTION_PRIO_MAPPER,
sub = subscription_create_from_service(s, NULL, SUBSCRIPTION_PRIO_MAPPER,
"service_mapper", &sq.sq_st,
0, NULL, NULL, "service_mapper");

Expand Down

0 comments on commit 6b9d0d1

Please sign in to comment.