Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
streaming target: add streaming_ops_t/st_info to show the message chain
  • Loading branch information
perexg committed Aug 24, 2016
1 parent db143e1 commit 54715ff
Show file tree
Hide file tree
Showing 12 changed files with 226 additions and 29 deletions.
18 changes: 17 additions & 1 deletion src/htsp_server.c
Expand Up @@ -94,9 +94,15 @@ static struct htsp_connection_list htsp_async_connections;
static struct htsp_connection_list htsp_connections;

static void htsp_streaming_input(void *opaque, streaming_message_t *sm);
static htsmsg_t *htsp_streaming_input_info(void *opaque, htsmsg_t *list);
const char * _htsp_get_subscription_status(int smcode);
static void htsp_epg_send_waiting(struct htsp_connection *, int64_t mintime);

static streaming_ops_t htsp_streaming_input_ops = {
.st_cb = htsp_streaming_input,
.st_info = htsp_streaming_input_info
};

/**
*
*/
Expand Down Expand Up @@ -2378,7 +2384,7 @@ htsp_method_subscribe(htsp_connection_t *htsp, htsmsg_t *in)
htsp_init_queue(&hs->hs_q, 0);

hs->hs_sid = sid;
streaming_target_init(&hs->hs_input, htsp_streaming_input, hs, 0);
streaming_target_init(&hs->hs_input, &htsp_streaming_input_ops, hs, 0);

#if ENABLE_TIMESHIFT
if (timeshiftPeriod != 0) {
Expand Down Expand Up @@ -4278,3 +4284,13 @@ htsp_streaming_input(void *opaque, streaming_message_t *sm)
}
streaming_msg_free(sm);
}

static htsmsg_t *
htsp_streaming_input_info(void *opaque, htsmsg_t *list)
{
char buf[512];
htsp_subscription_t *hs = opaque;
snprintf(buf, sizeof(buf), "htsp input: %s", hs->hs_htsp->htsp_logname);
htsmsg_add_str(list, NULL, buf);
return list;
}
15 changes: 13 additions & 2 deletions src/input/mpegts/mpegts_mux_sched.c
Expand Up @@ -25,7 +25,6 @@
#include "profile.h"

static void mpegts_mux_sched_timer ( void *p );
static void mpegts_mux_sched_input ( void *p, streaming_message_t *sm );

mpegts_mux_sched_list_t mpegts_mux_sched_all;

Expand Down Expand Up @@ -190,6 +189,18 @@ mpegts_mux_sched_input ( void *p, streaming_message_t *sm )
streaming_msg_free(sm);
}

static htsmsg_t *
mpegts_mux_sched_input_info ( void *p, htsmsg_t *list )
{
htsmsg_add_str(list, NULL, "mux sched input");
return list;
}

static streaming_ops_t mpegts_mux_sched_input_ops = {
.st_cb = mpegts_mux_sched_input,
.st_info = mpegts_mux_sched_input_info
};

/******************************************************************************
* Timer
*****************************************************************************/
Expand Down Expand Up @@ -293,7 +304,7 @@ mpegts_mux_sched_create ( const char *uuid, htsmsg_t *conf )
LIST_INSERT_HEAD(&mpegts_mux_sched_all, mms, mms_link);

/* Initialise */
streaming_target_init(&mms->mms_input, mpegts_mux_sched_input, mms, 0);
streaming_target_init(&mms->mms_input, &mpegts_mux_sched_input_ops, mms, 0);

/* Load conf */
if (conf)
Expand Down
16 changes: 15 additions & 1 deletion src/plumbing/globalheaders.c
Expand Up @@ -413,6 +413,20 @@ globalheaders_input(void *opaque, streaming_message_t *sm)
gh_hold(gh, sm);
}

static htsmsg_t *
globalheaders_input_info(void *opaque, htsmsg_t *list)
{
globalheaders_t *gh = opaque;
streaming_target_t *st = gh->gh_output;
htsmsg_add_str(list, NULL, "globalheaders input");
return st->st_ops.st_info(st->st_opaque, list);
}

static streaming_ops_t globalheaders_input_ops = {
.st_cb = globalheaders_input,
.st_info = globalheaders_input_info
};


/**
*
Expand All @@ -425,7 +439,7 @@ globalheaders_create(streaming_target_t *output)
TAILQ_INIT(&gh->gh_holdq);

gh->gh_output = output;
streaming_target_init(&gh->gh_input, globalheaders_input, gh, 0);
streaming_target_init(&gh->gh_input, &globalheaders_input_ops, gh, 0);
return &gh->gh_input;
}

Expand Down
21 changes: 17 additions & 4 deletions src/plumbing/transcoding.c
Expand Up @@ -2022,11 +2022,9 @@ transcoder_stop(transcoder_t *t)
static void
transcoder_input(void *opaque, streaming_message_t *sm)
{
transcoder_t *t;
transcoder_t *t = opaque;
streaming_start_t *ss;

t = opaque;

switch (sm->sm_type) {
case SMT_PACKET:
transcoder_packet(t, sm->sm_data);
Expand Down Expand Up @@ -2062,6 +2060,21 @@ transcoder_input(void *opaque, streaming_message_t *sm)
}
}

static htsmsg_t *
transcoder_input_info(void *opaque, htsmsg_t *list)
{
transcoder_t *t = opaque;
streaming_target_t *st = t->t_output;
htsmsg_add_str(list, NULL, "transcoder input");
return st->st_ops.st_info(st->st_opaque, list);;
}

static streaming_ops_t transcoder_input_ops = {
.st_cb = transcoder_input,
.st_info = transcoder_input_info
};



/**
*
Expand All @@ -2076,7 +2089,7 @@ transcoder_create(streaming_target_t *output)
if (!t->t_id) t->t_id = ++transcoder_id;
t->t_output = output;

streaming_target_init(&t->t_input, transcoder_input, t, 0);
streaming_target_init(&t->t_input, &transcoder_input_ops, t, 0);

return &t->t_input;
}
Expand Down
16 changes: 15 additions & 1 deletion src/plumbing/tsfix.c
Expand Up @@ -593,6 +593,20 @@ tsfix_input(void *opaque, streaming_message_t *sm)
streaming_target_deliver2(tf->tf_output, sm);
}

static htsmsg_t *
tsfix_input_info(void *opaque, htsmsg_t *list)
{
tsfix_t *tf = opaque;
streaming_target_t *st = tf->tf_output;
htsmsg_add_str(list, NULL, "tsfix input");
return st->st_ops.st_info(st->st_opaque, list);
}

static streaming_ops_t tsfix_input_ops = {
.st_cb = tsfix_input,
.st_info = tsfix_input_info
};


/**
*
Expand All @@ -607,7 +621,7 @@ tsfix_create(streaming_target_t *output)
tf->tf_output = output;
tf->tf_start_time = mclk();

streaming_target_init(&tf->tf_input, tsfix_input, tf, 0);
streaming_target_init(&tf->tf_input, &tsfix_input_ops, tf, 0);
return &tf->tf_input;
}

Expand Down
34 changes: 31 additions & 3 deletions src/profile.c
Expand Up @@ -658,6 +658,22 @@ profile_input(void *opaque, streaming_message_t *sm)
profile_deliver(prch, sm);
}

static htsmsg_t *
profile_input_info(void *opaque, htsmsg_t *list)
{
profile_chain_t *prch = opaque;
streaming_target_t *st = prch->prch_share;
htsmsg_add_str(list, NULL, "profile input");
st->st_ops.st_info(st->st_opaque, list);
st = prch->prch_post_share;
return st->st_ops.st_info(st->st_opaque, list);
}

static streaming_ops_t profile_input_ops = {
.st_cb = profile_input,
.st_info = profile_input_info
};

/*
*
*/
Expand Down Expand Up @@ -734,6 +750,18 @@ profile_sharer_input(void *opaque, streaming_message_t *sm)
streaming_msg_free(sm);
}

static htsmsg_t *
profile_sharer_input_info(void *opaque, htsmsg_t *list)
{
htsmsg_add_str(list, NULL, "profile sharer input");
return list;
}

static streaming_ops_t profile_sharer_input_ops = {
.st_cb = profile_sharer_input,
.st_info = profile_sharer_input_info
};

/*
*
*/
Expand All @@ -755,7 +783,7 @@ profile_sharer_find(profile_chain_t *prch)
}
if (!prsh) {
prsh = calloc(1, sizeof(*prsh));
streaming_target_init(&prsh->prsh_input, profile_sharer_input, prsh, 0);
streaming_target_init(&prsh->prsh_input, &profile_sharer_input_ops, prsh, 0);
LIST_INIT(&prsh->prsh_chains);
}
return prsh;
Expand Down Expand Up @@ -1008,7 +1036,7 @@ profile_htsp_work(profile_chain_t *prch,

prch->prch_share = prsh->prsh_tsfix;
prch->prch_flags = SUBSCRIPTION_PACKET;
streaming_target_init(&prch->prch_input, profile_input, prch, 0);
streaming_target_init(&prch->prch_input, &profile_input_ops, prch, 0);
prch->prch_st = &prch->prch_input;
return 0;

Expand Down Expand Up @@ -1923,7 +1951,7 @@ profile_transcode_work(profile_chain_t *prch,
prsh->prsh_tsfix = tsfix_create(dst);
}
prch->prch_share = prsh->prsh_tsfix;
streaming_target_init(&prch->prch_input, profile_input, prch, 0);
streaming_target_init(&prch->prch_input, &profile_input_ops, prch, 0);
prch->prch_st = &prch->prch_input;
return 0;
fail:
Expand Down
26 changes: 22 additions & 4 deletions src/streaming.c
Expand Up @@ -38,10 +38,10 @@ streaming_pad_init(streaming_pad_t *sp)
*
*/
void
streaming_target_init(streaming_target_t *st, st_callback_t *cb, void *opaque,
int reject_filter)
streaming_target_init(streaming_target_t *st, streaming_ops_t *ops,
void *opaque, int reject_filter)
{
st->st_cb = cb;
st->st_ops = *ops;
st->st_opaque = opaque;
st->st_reject_filter = reject_filter;
}
Expand Down Expand Up @@ -86,6 +86,19 @@ streaming_queue_deliver(void *opauqe, streaming_message_t *sm)
pthread_mutex_unlock(&sq->sq_mutex);
}

/**
*
*/
static htsmsg_t *
streaming_queue_info(void *opaque, htsmsg_t *list)
{
streaming_queue_t *sq = opaque;
char buf[256];
snprintf(buf, sizeof(buf), "streaming queue %p size %zd", sq, sq->sq_size);
htsmsg_add_str(list, NULL, buf);
return list;
}

/**
*
*/
Expand All @@ -102,7 +115,12 @@ streaming_queue_remove(streaming_queue_t *sq, streaming_message_t *sm)
void
streaming_queue_init(streaming_queue_t *sq, int reject_filter, size_t maxsize)
{
streaming_target_init(&sq->sq_st, streaming_queue_deliver, sq, reject_filter);
static streaming_ops_t ops = {
.st_cb = streaming_queue_deliver,
.st_info = streaming_queue_info
};

streaming_target_init(&sq->sq_st, &ops, sq, reject_filter);

pthread_mutex_init(&sq->sq_mutex, NULL);
tvh_cond_init(&sq->sq_cond);
Expand Down
4 changes: 2 additions & 2 deletions src/streaming.h
Expand Up @@ -72,7 +72,7 @@ typedef struct streaming_start {
void streaming_pad_init(streaming_pad_t *sp);

void streaming_target_init(streaming_target_t *st,
st_callback_t *cb, void *opaque,
streaming_ops_t *ops, void *opaque,
int reject_filter);

void streaming_queue_init
Expand Down Expand Up @@ -106,7 +106,7 @@ streaming_message_t *streaming_msg_create_pkt(th_pkt_t *pkt);

static inline void
streaming_target_deliver(streaming_target_t *st, streaming_message_t *sm)
{ st->st_cb(st->st_opaque, sm); }
{ st->st_ops.st_cb(st->st_opaque, sm); }

void streaming_target_deliver2(streaming_target_t *st, streaming_message_t *sm);

Expand Down

0 comments on commit 54715ff

Please sign in to comment.