Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
profile/subscription: use profile chain everywhere, add profile refco…
…unting
  • Loading branch information
perexg committed Oct 22, 2014
1 parent 97e7f1a commit c260931
Show file tree
Hide file tree
Showing 11 changed files with 187 additions and 109 deletions.
8 changes: 4 additions & 4 deletions src/dvr/dvr_rec.c
Expand Up @@ -78,15 +78,15 @@ dvr_rec_subscribe(dvr_entry_t *de)

pro = de->de_config->dvr_profile;
prch = malloc(sizeof(*prch));
if (profile_chain_open(pro, prch, de->de_channel, &de->de_config->dvr_muxcnf, 0, 0)) {
profile_chain_init(prch, pro, de->de_channel);
if (profile_chain_open(prch, &de->de_config->dvr_muxcnf, 0, 0)) {
tvherror("dvr", "unable to create new channel streaming chain for '%s'",
channel_get_name(de->de_channel));
return;
}

de->de_s = subscription_create_from_channel(de->de_channel, pro, weight,
buf, prch->prch_st,
prch->prch_flags,
de->de_s = subscription_create_from_channel(prch, weight,
buf, prch->prch_flags,
NULL, NULL, NULL);
if (de->de_s == NULL) {
tvherror("dvr", "unable to create new channel subcription for '%s'",
Expand Down
6 changes: 3 additions & 3 deletions src/htsp_server.c
Expand Up @@ -1762,7 +1762,8 @@ htsp_method_subscribe(htsp_connection_t *htsp, htsmsg_t *in)
#endif

pro = profile_find_by_list(htsp->htsp_granted_access->aa_profiles, profile_id, "htsp");
if (!profile_work(pro, &hs->hs_prch, ch, &hs->hs_input, timeshiftPeriod, pflags)) {
profile_chain_init(&hs->hs_prch, pro, ch);
if (!profile_chain_work(&hs->hs_prch, &hs->hs_input, timeshiftPeriod, pflags)) {
tvhlog(LOG_ERR, "htsp", "unable to create profile chain '%s'", pro->pro_name);
free(hs);
return htsp_error("Stream setup error");
Expand Down Expand Up @@ -1795,9 +1796,8 @@ htsp_method_subscribe(htsp_connection_t *htsp, htsmsg_t *in)
LIST_INSERT_HEAD(&htsp->htsp_subscriptions, hs, hs_link);

tvhdebug("htsp", "%s - subscribe to %s\n", htsp->htsp_logname, ch->ch_name ?: "");
hs->hs_s = subscription_create_from_channel(ch, pro, weight,
hs->hs_s = subscription_create_from_channel(&hs->hs_prch, weight,
htsp->htsp_logname,
hs->hs_prch.prch_st,
SUBSCRIPTION_STREAMING,
htsp->htsp_peername,
htsp->htsp_username,
Expand Down
6 changes: 5 additions & 1 deletion src/input/mpegts/mpegts_mux.c
Expand Up @@ -23,6 +23,7 @@
#include "subscriptions.h"
#include "channels.h"
#include "access.h"
#include "profile.h"
#include "dvb_charset.h"

#include <assert.h>
Expand Down Expand Up @@ -1090,8 +1091,11 @@ mpegts_mux_subscribe
( mpegts_mux_t *mm, const char *name, int weight )
{
int err = 0;
profile_chain_t prch;
th_subscription_t *s;
s = subscription_create_from_mux(mm, NULL, weight, name, NULL,
memset(&prch, 0, sizeof(prch));
prch.prch_id = mm;
s = subscription_create_from_mux(&prch, weight, name,
SUBSCRIPTION_NONE,
NULL, NULL, NULL, &err);
return s ? 0 : err;
Expand Down
10 changes: 8 additions & 2 deletions src/input/mpegts/mpegts_mux_sched.c
Expand Up @@ -22,6 +22,7 @@
#include "input/mpegts/mpegts_mux_sched.h"
#include "streaming.h"
#include "settings.h"
#include "profile.h"

static void mpegts_mux_sched_timer ( void *p );
static void mpegts_mux_sched_input ( void *p, streaming_message_t *sm );
Expand Down Expand Up @@ -204,10 +205,14 @@ mpegts_mux_sched_timer ( void *p )
if (!mms->mms_active) {
assert(mms->mms_sub == NULL);

if (!mms->mms_prch)
mms->mms_prch = calloc(1, sizeof(mms->mms_prch));
mms->mms_prch->prch_id = mm;
mms->mms_prch->prch_st = &mms->mms_input;

mms->mms_sub
= subscription_create_from_mux(mm, NULL, mms->mms_weight,
= subscription_create_from_mux(mms->mms_prch, mms->mms_weight,
mms->mms_creator ?: "",
&mms->mms_input,
SUBSCRIPTION_NONE,
NULL, NULL, NULL, NULL);

Expand Down Expand Up @@ -311,6 +316,7 @@ mpegts_mux_sched_delete ( mpegts_mux_sched_t *mms, int delconf )
free(mms->mms_cronstr);
free(mms->mms_mux);
free(mms->mms_creator);
free(mms->mms_prch);
free(mms);
}

Expand Down
7 changes: 5 additions & 2 deletions src/input/mpegts/mpegts_mux_sched.h
Expand Up @@ -25,6 +25,8 @@
#include "idnode.h"
#include "subscriptions.h"

struct profile_chain;

typedef LIST_HEAD(,mpegts_mux_sched) mpegts_mux_sched_list_t;

extern mpegts_mux_sched_list_t mpegts_mux_sched_all;
Expand Down Expand Up @@ -57,8 +59,9 @@ typedef struct mpegts_mux_sched
/*
* Subscription
*/
th_subscription_t *mms_sub; ///< Subscription handler
streaming_target_t mms_input; ///< Streaming input
struct profile_chain *mms_prch; ///< Dummy profile chain
th_subscription_t *mms_sub; ///< Subscription handler
streaming_target_t mms_input; ///< Streaming input

} mpegts_mux_sched_t;

Expand Down
96 changes: 68 additions & 28 deletions src/profile.c
Expand Up @@ -99,6 +99,7 @@ profile_create
if (!htsmsg_get_bool(conf, "shield", &b))
pro->pro_shield = !!b;
}
pro->pro_refcount = 1;
TAILQ_INSERT_TAIL(&profiles, pro, pro_link);
if (save)
profile_class_save((idnode_t *)pro);
Expand All @@ -107,6 +108,16 @@ profile_create
return pro;
}

void
profile_release_(profile_t *pro)
{
if (pro->pro_free)
pro->pro_free(pro);
free(pro->pro_name);
free(pro->pro_comment);
free(pro);
}

static void
profile_delete(profile_t *pro, int delconf)
{
Expand All @@ -119,11 +130,7 @@ profile_delete(profile_t *pro, int delconf)
idnode_unlink(&pro->pro_id);
dvr_config_destroy_by_profile(pro, delconf);
access_destroy_by_profile(pro, delconf);
if (pro->pro_free)
pro->pro_free(pro);
free(pro->pro_name);
free(pro->pro_comment);
free(pro);
profile_release(pro);
}

static void
Expand Down Expand Up @@ -427,17 +434,58 @@ profile_get_htsp_list(htsmsg_t *array, htsmsg_t *filter)
}
}

/*
*
*/
void
profile_chain_init(profile_chain_t *prch, profile_t *pro, void *id)
{
memset(prch, 0, sizeof(*prch));
if (pro)
profile_grab(pro);
prch->prch_pro = pro;
prch->prch_id = id;
streaming_queue_init(&prch->prch_sq, 0, 0);
}

/*
*
*/
int
profile_chain_raw_open(profile_chain_t *prch, size_t qsize)
profile_chain_work(profile_chain_t *prch, struct streaming_target *dst,
uint32_t timeshift_period, int flags)
{
profile_t *pro = prch->prch_pro;
if (pro && pro->pro_work)
return pro->pro_work(prch, dst, timeshift_period, flags);
return -1;
}

/*
*
*/
int
profile_chain_open(profile_chain_t *prch,
muxer_config_t *m_cfg, int flags, size_t qsize)
{
profile_t *pro = prch->prch_pro;
if (pro && pro->pro_open)
return pro->pro_open(prch, m_cfg, flags, qsize);
return -1;
}

/*
*
*/
int
profile_chain_raw_open(profile_chain_t *prch, void *id, size_t qsize)
{
muxer_config_t c;

memset(&c, 0, sizeof(c));
c.m_type = MC_RAW;
memset(prch, 0, sizeof(*prch));
prch->prch_id = id;
prch->prch_flags = SUBSCRIPTION_RAW_MPEGTS;
streaming_queue_init(&prch->prch_sq, SMT_PACKET, qsize);
prch->prch_st = &prch->prch_sq.sq_st;
Expand Down Expand Up @@ -467,6 +515,8 @@ profile_chain_close(profile_chain_t *prch)
muxer_destroy(prch->prch_muxer);
streaming_queue_deinit(&prch->prch_sq);
prch->prch_st = NULL;
if (prch->prch_pro)
profile_release(prch->prch_pro);
}

/*
Expand All @@ -484,13 +534,10 @@ const idclass_t profile_htsp_class =
};

static int
profile_htsp_work(profile_t *_pro, profile_chain_t *prch,
void *id, streaming_target_t *dst,
profile_htsp_work(profile_chain_t *prch,
streaming_target_t *dst,
uint32_t timeshift_period, int flags)
{
if (!(flags & PRCH_FLAG_SKIPZEROING))
memset(prch, 0, sizeof(*prch));

if (flags & PRCH_FLAG_TSFIX)
dst = prch->prch_tsfix = tsfix_create(prch->prch_transcoder);

Expand Down Expand Up @@ -548,10 +595,10 @@ const idclass_t profile_mpegts_pass_class =
};

static int
profile_mpegts_pass_open(profile_t *_pro, profile_chain_t *prch, void *id,
profile_mpegts_pass_open(profile_chain_t *prch,
muxer_config_t *m_cfg, int flags, size_t qsize)
{
profile_mpegts_t *pro = (profile_mpegts_t *)_pro;
profile_mpegts_t *pro = (profile_mpegts_t *)prch->prch_pro;
muxer_config_t c;

if (m_cfg)
Expand All @@ -563,7 +610,6 @@ profile_mpegts_pass_open(profile_t *_pro, profile_chain_t *prch, void *id,
c.m_rewrite_pat = pro->pro_rewrite_pat;
c.m_rewrite_pmt = pro->pro_rewrite_pmt;

memset(prch, 0, sizeof(*prch));
prch->prch_flags = SUBSCRIPTION_RAW_MPEGTS;
streaming_queue_init(&prch->prch_sq, SMT_PACKET, qsize);
prch->prch_muxer = muxer_create(&c);
Expand Down Expand Up @@ -612,10 +658,10 @@ const idclass_t profile_matroska_class =
};

static int
profile_matroska_open(profile_t *_pro, profile_chain_t *prch, void *id,
profile_matroska_open(profile_chain_t *prch,
muxer_config_t *m_cfg, int flags, size_t qsize)
{
profile_matroska_t *pro = (profile_matroska_t *)_pro;
profile_matroska_t *pro = (profile_matroska_t *)prch->prch_pro;
muxer_config_t c;

if (m_cfg)
Expand All @@ -627,7 +673,6 @@ profile_matroska_open(profile_t *_pro, profile_chain_t *prch, void *id,
if (pro->pro_webm)
c.m_type = MC_WEBM;

memset(prch, 0, sizeof(*prch));
streaming_queue_init(&prch->prch_sq, 0, qsize);
prch->prch_gh = globalheaders_create(&prch->prch_sq.sq_st);
prch->prch_tsfix = tsfix_create(prch->prch_gh);
Expand Down Expand Up @@ -883,11 +928,11 @@ const idclass_t profile_transcode_class =
};

static int
profile_transcode_work(profile_t *_pro, profile_chain_t *prch,
void *id, streaming_target_t *dst,
profile_transcode_work(profile_chain_t *prch,
streaming_target_t *dst,
uint32_t timeshift_period, int flags)
{
profile_transcode_t *pro = (profile_transcode_t *)_pro;
profile_transcode_t *pro = (profile_transcode_t *)prch->prch_pro;
transcoder_props_t props;

memset(&props, 0, sizeof(props));
Expand All @@ -899,9 +944,6 @@ profile_transcode_work(profile_t *_pro, profile_chain_t *prch,
props.tp_bandwidth = pro->pro_bandwidth >= 64 ? pro->pro_bandwidth : 64;
strncpy(props.tp_language, pro->pro_language ?: "", 3);

if (!(flags & PRCH_FLAG_SKIPZEROING))
memset(prch, 0, sizeof(*prch));

#if ENABLE_TIMESHIFT
if (timeshift_period > 0)
dst = prch->prch_timeshift = timeshift_create(dst, timeshift_period);
Expand Down Expand Up @@ -935,10 +977,10 @@ profile_transcode_mc_valid(int mc)
}

static int
profile_transcode_open(profile_t *_pro, profile_chain_t *prch, void *id,
profile_transcode_open(profile_chain_t *prch,
muxer_config_t *m_cfg, int flags, size_t qsize)
{
profile_transcode_t *pro = (profile_transcode_t *)_pro;
profile_transcode_t *pro = (profile_transcode_t *)prch->prch_pro;
muxer_config_t c;
int r;

Expand All @@ -952,12 +994,10 @@ profile_transcode_open(profile_t *_pro, profile_chain_t *prch, void *id,
c.m_type = MC_MATROSKA;
}

memset(prch, 0, sizeof(*prch));

streaming_queue_init(&prch->prch_sq, 0, qsize);
prch->prch_gh = globalheaders_create(&prch->prch_sq.sq_st);

r = profile_transcode_work(_pro, prch, prch->prch_gh, id, 0,
r = profile_transcode_work(prch, prch->prch_gh, 0,
PRCH_FLAG_SKIPZEROING | PRCH_FLAG_TSFIX);
if (r)
return r;
Expand Down
38 changes: 24 additions & 14 deletions src/profile.h
Expand Up @@ -51,6 +51,8 @@ extern profile_builders_queue profile_builders;
#define PRCH_FLAG_TSFIX (1<<1)

typedef struct profile_chain {
struct profile *prch_pro;
void *prch_id;
int prch_flags;
struct streaming_queue prch_sq;
struct streaming_target *prch_st;
Expand All @@ -69,6 +71,8 @@ typedef struct profile {
idnode_t pro_id;
TAILQ_ENTRY(profile) pro_link;

int pro_refcount;

LIST_HEAD(,dvr_config) pro_dvr_configs;
LIST_HEAD(,access_entry) pro_accesses;

Expand All @@ -83,10 +87,9 @@ typedef struct profile {
void (*pro_conf_changed)(struct profile *pro);
muxer_container_type_t (*pro_get_mc)(struct profile *pro);

int (*pro_work)(struct profile *pro, profile_chain_t *prch,
void *id, struct streaming_target *dst,
int (*pro_work)(profile_chain_t *prch, struct streaming_target *dst,
uint32_t timeshift_period, int flags);
int (*pro_open)(struct profile *pro, profile_chain_t *prch, void *id,
int (*pro_open)(profile_chain_t *prch,
muxer_config_t *m_cfg, int flags, size_t qsize);
} profile_t;

Expand All @@ -95,17 +98,24 @@ void profile_register(const idclass_t *clazz, profile_builder_t builder);
profile_t *profile_create
(const char *uuid, htsmsg_t *conf, int save);

static inline int
profile_work(profile_t *pro, profile_chain_t *prch,
void *id, struct streaming_target *dst,
uint32_t timeshift_period, int flags)
{ return pro && pro->pro_work ? pro->pro_work(pro, prch, id, dst, timeshift_period, flags) : -1; }

static inline int
profile_chain_open(profile_t *pro, profile_chain_t *prch, void *id,
muxer_config_t *m_cfg, int flags, size_t qsize)
{ return pro && pro->pro_open ? pro->pro_open(pro, prch, id, m_cfg, flags, qsize) : -1; }
int profile_chain_raw_open(profile_chain_t *prch, size_t qsize);
static inline void profile_grab( profile_t *pro )
{ pro->pro_refcount++; }

void profile_release_( profile_t *pro );
static inline void profile_release( profile_t *pro )
{
assert(pro->pro_refcount > 0);
if (--pro->pro_refcount == 0) profile_release_(pro);
}

int
profile_chain_work(profile_chain_t *prch, struct streaming_target *dst,
uint32_t timeshift_period, int flags);
int
profile_chain_open(profile_chain_t *prch,
muxer_config_t *m_cfg, int flags, size_t qsize);
void profile_chain_init(profile_chain_t *prch, profile_t *pro, void *id);
int profile_chain_raw_open(profile_chain_t *prch, void *id, size_t qsize);
void profile_chain_close(profile_chain_t *prch);

static inline profile_t *profile_find_by_uuid(const char *uuid)
Expand Down

0 comments on commit c260931

Please sign in to comment.