Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
subscription/dbus: implement subscription postpone function
  • Loading branch information
perexg committed Aug 11, 2014
1 parent 54164ba commit 4b5da7c
Show file tree
Hide file tree
Showing 5 changed files with 143 additions and 43 deletions.
29 changes: 29 additions & 0 deletions src/dbus.c
Expand Up @@ -27,6 +27,7 @@

#include "tvheadend.h"
#include "tvhpoll.h"
#include "subscriptions.h"
#include "dbus.h"


Expand Down Expand Up @@ -202,6 +203,32 @@ dbus_reply_to_ping(DBusMessage *msg, DBusConnection *conn)
dbus_message_unref(reply);
}

/**
* Set the subscription postpone delay
*/
static void
dbus_reply_to_postpone(DBusMessage *msg, DBusConnection *conn)
{
DBusMessageIter args;
DBusMessage *reply;
int64_t param;

if (!dbus_message_iter_init(msg, &args))
return;
if (DBUS_TYPE_INT64 != dbus_message_iter_get_arg_type(&args))
return;
dbus_message_iter_get_basic(&args, &param);

param = subscription_set_postpone(param);

reply = dbus_message_new_method_return(msg);
dbus_message_iter_init_append(reply, &args);
dbus_message_iter_append_basic(&args, DBUS_TYPE_INT64, &param);
dbus_connection_send(conn, reply, NULL);
dbus_connection_flush(conn);
dbus_message_unref(reply);
}

/**
*
*/
Expand Down Expand Up @@ -313,6 +340,8 @@ dbus_server_thread(void *aux)

if (dbus_message_is_method_call(msg, "org.tvheadend", "ping"))
dbus_reply_to_ping(msg, conn);
else if (dbus_message_is_method_call(msg, "org.tvheadend", "postpone"))
dbus_reply_to_postpone(msg, conn);

dbus_message_unref(msg);
}
Expand Down
7 changes: 4 additions & 3 deletions src/service.c
Expand Up @@ -557,7 +557,7 @@ service_build_filter(service_t *t)
*
*/
int
service_start(service_t *t, int instance)
service_start(service_t *t, int instance, int postpone)
{
elementary_stream_t *st;
int r, timeout = 10;
Expand Down Expand Up @@ -598,6 +598,7 @@ service_start(service_t *t, int instance)
if(t->s_grace_period != NULL)
timeout = t->s_grace_period(t);

timeout += postpone;
t->s_grace_delay = timeout;
gtimer_arm(&t->s_receive_timer, service_data_timeout, t, timeout);
return 0;
Expand All @@ -610,7 +611,7 @@ service_start(service_t *t, int instance)
service_instance_t *
service_find_instance
(service_t *s, channel_t *ch, service_instance_list_t *sil,
int *error, int weight)
int *error, int weight, int postpone)
{
channel_service_mapping_t *csm;
service_instance_t *si, *next;
Expand Down Expand Up @@ -687,7 +688,7 @@ service_find_instance

/* Start */
tvhtrace("service", "will start new instance %d", si->si_instance);
if (service_start(si->si_s, si->si_instance)) {
if (service_start(si->si_s, si->si_instance, postpone)) {
tvhtrace("service", "tuning failed");
si->si_error = SM_CODE_TUNING_FAILED;
if (*error < SM_CODE_TUNING_FAILED)
Expand Down
4 changes: 2 additions & 2 deletions src/service.h
Expand Up @@ -449,7 +449,7 @@ typedef struct service {
void service_init(void);
void service_done(void);

int service_start(service_t *t, int instance);
int service_start(service_t *t, int instance, int postpone);
void service_stop(service_t *t);

void service_build_filter(service_t *t);
Expand All @@ -470,7 +470,7 @@ service_instance_t *service_find_instance(struct service *s,
struct channel *ch,
service_instance_list_t *sil,
int *error,
int weight);
int weight, int postpone);

elementary_stream_t *service_stream_find_(service_t *t, int pid);

Expand Down
138 changes: 100 additions & 38 deletions src/subscriptions.c
Expand Up @@ -45,6 +45,7 @@
struct th_subscription_list subscriptions;
struct th_subscription_list subscriptions_remove;
static gtimer_t subscription_reschedule_timer;
static int subscription_postpone;

/**
*
Expand Down Expand Up @@ -87,7 +88,7 @@ subscription_link_service(th_subscription_t *s, service_t *t)
// Link to service output
streaming_target_connect(&t->s_streaming_pad, &s->ths_input);

sm = streaming_msg_create_code(SMT_GRACE, t->s_grace_delay);
sm = streaming_msg_create_code(SMT_GRACE, s->ths_postpone + t->s_grace_delay);
streaming_pad_deliver(&t->s_streaming_pad, sm);

if(s->ths_start_message != NULL && t->s_streaming_status & TSS_PACKETS) {
Expand Down Expand Up @@ -188,6 +189,49 @@ subscription_sort(th_subscription_t *a, th_subscription_t *b)
}


static void
subscription_show_none(th_subscription_t *s)
{
channel_t *ch = s->ths_channel;
tvhlog(LOG_NOTICE, "subscription",
"No transponder available for subscription \"%s\" "
"to channel \"%s\"",
s->ths_title, ch ? channel_get_name(ch) : "none");
}

static void
subscription_show_info(th_subscription_t *s)
{
char buf[512];
channel_t *ch = s->ths_channel;
source_info_t si;
size_t buflen;

s->ths_service->s_setsourceinfo(s->ths_service, &si);
snprintf(buf, sizeof(buf),
"\"%s\" subscribing on \"%s\", weight: %d, adapter: \"%s\", "
"network: \"%s\", mux: \"%s\", provider: \"%s\", "
"service: \"%s\"",
s->ths_title, ch ? channel_get_name(ch) : "none", s->ths_weight,
si.si_adapter ?: "<N/A>",
si.si_network ?: "<N/A>",
si.si_mux ?: "<N/A>",
si.si_provider ?: "<N/A>",
si.si_service ?: "<N/A>");
service_source_info_free(&si);

if (s->ths_hostname) {
buflen = strlen(buf);
snprintf(buf + buflen, sizeof(buf) - buflen,
", hostname=\"%s\", username=\"%s\", client=\"%s\"",
s->ths_hostname ?: "<N/A>",
s->ths_username ?: "<N/A>",
s->ths_client ?: "<N/A>");
}

tvhlog(LOG_INFO, "subscription", "%s", buf);
}

/**
*
*/
Expand All @@ -209,25 +253,33 @@ subscription_reschedule(void)
service_t *t;
service_instance_t *si;
streaming_message_t *sm;
int error;
int error, postpone = INT_MAX;
assert(reenter == 0);
reenter = 1;

lock_assert(&global_lock);

gtimer_arm(&subscription_reschedule_timer,
subscription_reschedule_cb, NULL, 2);

LIST_FOREACH(s, &subscriptions, ths_global_link) {
if (s->ths_mmi) continue;
if (!s->ths_service && !s->ths_channel) continue;

/* Postpone the tuner decision */
/* Leave some time to wakeup tuners through DBus or so */
if (s->ths_postpone_end > dispatch_clock) {
if (postpone > s->ths_postpone_end - dispatch_clock)
postpone = s->ths_postpone_end - dispatch_clock;
streaming_message_t *sm;
sm = streaming_msg_create_code(SMT_GRACE, (s->ths_postpone_end - dispatch_clock) + 5);
streaming_target_deliver(s->ths_output, sm);
continue;
}

t = s->ths_service;
if(t != NULL && s->ths_current_instance != NULL) {
/* Already got a service */

if(s->ths_state != SUBSCRIPTION_BAD_SERVICE)
continue; /* And it not bad, so we're happy */
continue; /* And it not bad, so we're happy */

t->s_streaming_status = 0;
t->s_status = SERVICE_IDLE;
Expand Down Expand Up @@ -255,28 +307,65 @@ subscription_reschedule(void)
tvhtrace("subscription", "find instance for %s weight %d",
s->ths_service->s_nicename, s->ths_weight);
si = service_find_instance(s->ths_service, s->ths_channel,
&s->ths_instances, &error, s->ths_weight);
&s->ths_instances, &error, s->ths_weight,
dispatch_clock > s->ths_postpone_end ?
0 : s->ths_postpone_end - dispatch_clock);
s->ths_current_instance = si;

if(si == NULL) {
/* No service available */

sm = streaming_msg_create_code(SMT_NOSTART, error);
streaming_target_deliver(s->ths_output, sm);
subscription_show_none(s);
continue;
}

subscription_link_service(s, si->si_s);
subscription_show_info(s);
}

while ((s = LIST_FIRST(&subscriptions_remove))) {
LIST_REMOVE(s, ths_remove_link);
subscription_unsubscribe(s);
}


if (!postpone)
postpone = 2;
gtimer_arm(&subscription_reschedule_timer,
subscription_reschedule_cb, NULL, postpone);

reenter = 0;
}

/**
*
*/
int
subscription_set_postpone(int postpone)
{
th_subscription_t *s;
time_t now = time(NULL);

/* some limits that make sense */
if (postpone < 0)
postpone = 0;
if (postpone > 120)
postpone = 120;
pthread_mutex_lock(&global_lock);
if (subscription_postpone != postpone) {
subscription_postpone = postpone;
LIST_FOREACH(s, &subscriptions, ths_global_link) {
s->ths_postpone = postpone;
if (s->ths_postpone_end > now && s->ths_postpone_end - now > postpone)
s->ths_postpone_end = now + postpone;
}
gtimer_arm(&subscription_reschedule_timer,
subscription_reschedule_cb, NULL, 0);
}
pthread_mutex_unlock(&global_lock);
return postpone;
}

/* **************************************************************************
* Streaming handlers
* *************************************************************************/
Expand Down Expand Up @@ -492,6 +581,8 @@ subscription_create
s->ths_total_err = 0;
s->ths_output = st;
s->ths_flags = flags;
s->ths_postpone = subscription_postpone;
s->ths_postpone_end = dispatch_clock + s->ths_postpone;

time(&s->ths_start);

Expand Down Expand Up @@ -530,36 +621,7 @@ subscription_create_from_channel_or_service
if (ch)
LIST_INSERT_HEAD(&ch->ch_subscriptions, s, ths_channel_link);

// TODO: do we really need this here?
subscription_reschedule();

if(s->ths_service == NULL) {
tvhlog(LOG_NOTICE, "subscription",
"No transponder available for subscription \"%s\" "
"to channel \"%s\"",
s->ths_title, ch ? channel_get_name(ch) : "none");
} else {
source_info_t si;

s->ths_service->s_setsourceinfo(s->ths_service, &si);

tvhlog(LOG_INFO, "subscription",
"\"%s\" subscribing on \"%s\", weight: %d, adapter: \"%s\", "
"network: \"%s\", mux: \"%s\", provider: \"%s\", "
"service: \"%s\", hostname: \"%s\", username: \"%s\", client: \"%s\"",
s->ths_title, ch ? channel_get_name(ch) : "none", weight,
si.si_adapter ?: "<N/A>",
si.si_network ?: "<N/A>",
si.si_mux ?: "<N/A>",
si.si_provider ?: "<N/A>",
si.si_service ?: "<N/A>",
hostname ?: "<N/A>",
username ?: "<N/A>",
client ?: "<N/A>");

service_source_info_free(&si);
}

return s;
}

Expand Down
8 changes: 8 additions & 0 deletions src/subscriptions.h
Expand Up @@ -87,6 +87,12 @@ typedef struct th_subscription {
service_instance_list_t ths_instances;
struct service_instance *ths_current_instance;

/**
* Postpone
*/
int ths_postpone;
time_t ths_postpone_end;

#if ENABLE_MPEGTS
// Note: its a bit ugly linking MPEG-TS code directly here, but to do
// otherwise would probably require adding lots of additional
Expand All @@ -112,6 +118,8 @@ void subscription_set_weight(th_subscription_t *s, unsigned int weight);

void subscription_reschedule(void);

int subscription_set_postpone(int postpone);

th_subscription_t *subscription_create_from_channel(struct channel *ch,
unsigned int weight,
const char *name,
Expand Down

0 comments on commit 4b5da7c

Please sign in to comment.