Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
remove pkt_merge_header()
- rename pkt_header to pkt_meta
- unify logic for the metadata processing
- add meta field to components to the htsp stream start message
  • Loading branch information
perexg committed Oct 19, 2014
1 parent 4a6d5ca commit d504bcf
Show file tree
Hide file tree
Showing 11 changed files with 111 additions and 109 deletions.
55 changes: 46 additions & 9 deletions src/htsp_server.c
Expand Up @@ -68,7 +68,7 @@

static void *htsp_server, *htsp_server_2;

#define HTSP_PROTO_VERSION 16
#define HTSP_PROTO_VERSION 17

#define HTSP_ASYNC_OFF 0x00
#define HTSP_ASYNC_ON 0x01
Expand Down Expand Up @@ -211,6 +211,8 @@ typedef struct htsp_subscription {

int hs_first;

int hs_merge_meta_compomentidx;

} htsp_subscription_t;


Expand Down Expand Up @@ -2912,6 +2914,21 @@ const static char frametypearray[PKT_NTYPES] = {
[PKT_B_FRAME] = 'B',
};

static th_pkt_t *merge_pkt(th_pkt_t *pkt, size_t payloadlen)
{
th_pkt_t *n = pkt_alloc(NULL, 0, 0, 0);
*n = *pkt;
n->pkt_refcount = 1;
n->pkt_meta = NULL;
n->pkt_payload = pktbuf_alloc(NULL, payloadlen);
memcpy(pktbuf_ptr(n->pkt_payload),
pktbuf_ptr(pkt->pkt_meta), pktbuf_len(pkt->pkt_meta));
memcpy(pktbuf_ptr(n->pkt_payload) + pktbuf_len(pkt->pkt_meta),
pktbuf_ptr(pkt->pkt_payload), pktbuf_len(pkt->pkt_payload));
pkt_ref_dec(pkt);
return n;
}

/**
* Build a htsmsg from a th_pkt and enqueue it on our HTSP service
*/
Expand All @@ -2923,6 +2940,7 @@ htsp_stream_deliver(htsp_subscription_t *hs, th_pkt_t *pkt)
htsp_connection_t *htsp = hs->hs_htsp;
int64_t ts;
int qlen = hs->hs_q.hmq_payload;
size_t payloadlen;

if(!htsp_is_stream_enabled(hs, pkt->pkt_componentindex)) {
pkt_ref_dec(pkt);
Expand Down Expand Up @@ -2962,16 +2980,22 @@ htsp_stream_deliver(htsp_subscription_t *hs, th_pkt_t *pkt)
uint32_t dur = hs->hs_90khz ? pkt->pkt_duration : ts_rescale(pkt->pkt_duration, 1000000);
htsmsg_add_u32(m, "duration", dur);

pkt = pkt_merge_header(pkt);

if (pkt->pkt_meta &&
hs->hs_merge_meta_compomentidx == pkt->pkt_componentindex) {
payloadlen = pktbuf_len(pkt->pkt_meta) + pktbuf_len(pkt->pkt_payload);
pkt = merge_pkt(pkt, payloadlen);
/* do it only once */
hs->hs_merge_meta_compomentidx = -1;
} else {
payloadlen = pktbuf_len(pkt->pkt_payload);
}
/**
* Since we will serialize directly we use 'binptr' which is a binary
* object that just points to data, thus avoiding a copy.
*/
htsmsg_add_binptr(m, "payload", pktbuf_ptr(pkt->pkt_payload),
pktbuf_len(pkt->pkt_payload));
htsp_send(htsp, m, pkt->pkt_payload, &hs->hs_q, pktbuf_len(pkt->pkt_payload));
atomic_add(&hs->hs_s->ths_bytes_out, pktbuf_len(pkt->pkt_payload));
htsmsg_add_binptr(m, "payload", pktbuf_ptr(pkt->pkt_payload), payloadlen);
htsp_send(htsp, m, pkt->pkt_payload, &hs->hs_q, payloadlen);
atomic_add(&hs->hs_s->ths_bytes_out, payloadlen);

if(hs->hs_last_report != dispatch_clock) {

Expand Down Expand Up @@ -3041,9 +3065,11 @@ htsp_subscription_start(htsp_subscription_t *hs, const streaming_start_t *ss)

tvhdebug("htsp", "%s - subscription start", hs->hs_htsp->htsp_logname);

hs->hs_merge_meta_compomentidx = -1;

for(i = 0; i < ss->ss_num_components; i++) {
const streaming_start_component_t *ssc = &ss->ss_components[i];
if (ssc->ssc_type == SCT_MPEG2VIDEO || ssc->ssc_type == SCT_H264) {
if (SCT_ISVIDEO(ssc->ssc_type)) {
if (ssc->ssc_width == 0 || ssc->ssc_height == 0) {
hs->hs_wait_for_video = 1;
return;
Expand Down Expand Up @@ -3073,7 +3099,7 @@ htsp_subscription_start(htsp_subscription_t *hs, const streaming_start_t *ss)
htsmsg_add_u32(c, "ancillary_id", ssc->ssc_ancillary_id);
}

if(ssc->ssc_type == SCT_MPEG2VIDEO || ssc->ssc_type == SCT_H264) {
if(SCT_ISVIDEO(ssc->ssc_type)) {
if(ssc->ssc_width)
htsmsg_add_u32(c, "width", ssc->ssc_width);
if(ssc->ssc_height)
Expand All @@ -3096,7 +3122,18 @@ htsp_subscription_start(htsp_subscription_t *hs, const streaming_start_t *ss)
htsmsg_add_u32(c, "rate", ssc->ssc_sri);
}

if (ssc->ssc_gh)
htsmsg_add_binptr(m, "meta", pktbuf_ptr(ssc->ssc_gh),
pktbuf_len(ssc->ssc_gh));

htsmsg_add_msg(streams, NULL, c);

if (ssc->ssc_type == SCT_H264 && hs->hs_htsp->htsp_version < 17) {
if (hs->hs_merge_meta_compomentidx < 0)
hs->hs_merge_meta_compomentidx = ssc->ssc_index;
else
tvherror("htsp", "multiple H264 video streams?");
}
}

htsmsg_add_msg(m, "streams", streams);
Expand Down
3 changes: 0 additions & 3 deletions src/muxer/muxer_libav.c
Expand Up @@ -387,9 +387,6 @@ lav_muxer_write_pkt(muxer_t *m, streaming_message_type_t smt, void *data)

av_init_packet(&packet);

if(st->codec->codec_id == AV_CODEC_ID_MPEG2VIDEO)
pkt = pkt_merge_header(pkt);

if(lm->lm_h264_filter && st->codec->codec_id == AV_CODEC_ID_H264) {
if(av_bitstream_filter_filter(lm->lm_h264_filter,
st->codec,
Expand Down
5 changes: 0 additions & 5 deletions src/muxer/tvh/mkmux.c
Expand Up @@ -47,7 +47,6 @@ TAILQ_HEAD(mk_chapter_queue, mk_chapter);
typedef struct mk_track {
int index;
int enabled;
int merge;
int type;
int tracknum;
int disabled;
Expand Down Expand Up @@ -266,7 +265,6 @@ mk_build_tracks(mk_mux_t *mkm, const streaming_start_t *ss)
case SCT_MPEG2VIDEO:
tracktype = 1;
codec_id = "V_MPEG2";
mkm->tracks[i].merge = 1;
break;

case SCT_H264:
Expand Down Expand Up @@ -1183,9 +1181,6 @@ mk_mux_write_pkt(mk_mux_t *mkm, th_pkt_t *pkt)
if(mark)
mk_mux_insert_chapter(mkm);

if(t->merge)
pkt = pkt_merge_header(pkt);

mk_write_frame_i(mkm, t, pkt);

pkt_ref_dec(pkt);
Expand Down
70 changes: 19 additions & 51 deletions src/packet.c
Expand Up @@ -32,11 +32,9 @@
static void
pkt_destroy(th_pkt_t *pkt)
{
if(pkt->pkt_payload != NULL)
pktbuf_ref_dec(pkt->pkt_payload);
pktbuf_ref_dec(pkt->pkt_payload);
pktbuf_ref_dec(pkt->pkt_meta);

if(pkt->pkt_header != NULL)
pktbuf_ref_dec(pkt->pkt_header);
free(pkt);
}

Expand Down Expand Up @@ -129,41 +127,6 @@ pktref_remove(struct th_pktref_queue *q, th_pktref_t *pr)
}


/**
*
*/
th_pkt_t *
pkt_merge_header(th_pkt_t *pkt)
{
th_pkt_t *n;
size_t s;

if(pkt->pkt_header == NULL)
return pkt;

n = malloc(sizeof(th_pkt_t));
*n = *pkt;

n->pkt_refcount = 1;
n->pkt_header = NULL;

s = pktbuf_len(pkt->pkt_payload) + pktbuf_len(pkt->pkt_header);
n->pkt_payload = pktbuf_alloc(NULL, s);

memcpy(pktbuf_ptr(n->pkt_payload),
pktbuf_ptr(pkt->pkt_header),
pktbuf_len(pkt->pkt_header));

memcpy(pktbuf_ptr(n->pkt_payload) + pktbuf_len(pkt->pkt_header),
pktbuf_ptr(pkt->pkt_payload),
pktbuf_len(pkt->pkt_payload));

pkt_ref_dec(pkt);
return n;
}



/**
*
*/
Expand All @@ -175,11 +138,8 @@ pkt_copy_shallow(th_pkt_t *pkt)

n->pkt_refcount = 1;

if(n->pkt_header)
pktbuf_ref_inc(n->pkt_header);

if(n->pkt_payload)
pktbuf_ref_inc(n->pkt_payload);
pktbuf_ref_inc(n->pkt_meta);
pktbuf_ref_inc(n->pkt_payload);

return n;
}
Expand All @@ -196,21 +156,29 @@ pktref_create(th_pkt_t *pkt)
return pr;
}

/*
*
*/


void
void
pktbuf_ref_dec(pktbuf_t *pb)
{
if((atomic_add(&pb->pb_refcount, -1)) == 1) {
free(pb->pb_data);
free(pb);
if (pb) {
if((atomic_add(&pb->pb_refcount, -1)) == 1) {
free(pb->pb_data);
free(pb);
}
}
}

void
pktbuf_t *
pktbuf_ref_inc(pktbuf_t *pb)
{
atomic_add(&pb->pb_refcount, 1);
if (pb) {
atomic_add(&pb->pb_refcount, 1);
return pb;
}
return NULL;
}

pktbuf_t *
Expand Down
21 changes: 12 additions & 9 deletions src/packet.h
@@ -1,5 +1,5 @@
/*
* Packet nanagement
* Packet management
* Copyright (C) 2008 Andreas Öman
*
* This program is free software: you can redistribute it and/or modify
Expand All @@ -19,15 +19,16 @@
#ifndef PACKET_H_
#define PACKET_H_

/**
* Packet buffer
*/

typedef struct pktbuf {
int pb_refcount;
uint8_t *pb_data;
size_t pb_size;
} pktbuf_t;



/**
* Packets
*/
Expand Down Expand Up @@ -62,8 +63,8 @@ typedef struct th_pkt {
uint16_t pkt_aspect_num;
uint16_t pkt_aspect_den;

pktbuf_t *pkt_meta;
pktbuf_t *pkt_payload;
pktbuf_t *pkt_header;

} th_pkt_t;

Expand Down Expand Up @@ -96,21 +97,23 @@ void pktref_remove(struct th_pktref_queue *q, th_pktref_t *pr);

th_pkt_t *pkt_alloc(const void *data, size_t datalen, int64_t pts, int64_t dts);

th_pkt_t *pkt_merge_header(th_pkt_t *pkt);

th_pkt_t *pkt_copy_shallow(th_pkt_t *pkt);

th_pktref_t *pktref_create(th_pkt_t *pkt);

/*
*
*/

void pktbuf_ref_dec(pktbuf_t *pb);

void pktbuf_ref_inc(pktbuf_t *pb);
pktbuf_t *pktbuf_ref_inc(pktbuf_t *pb);

pktbuf_t *pktbuf_alloc(const void *data, size_t size);

pktbuf_t *pktbuf_make(void *data, size_t size);

#define pktbuf_len(pb) ((pb)->pb_size)
#define pktbuf_ptr(pb) ((pb)->pb_data)
static inline size_t pktbuf_len(pktbuf_t *pb) { return pb->pb_size; }
static inline uint8_t *pktbuf_ptr(pktbuf_t *pb) { return pb->pb_data; }

#endif /* PACKET_H_ */
16 changes: 8 additions & 8 deletions src/parsers/parser_avc.c
Expand Up @@ -213,24 +213,24 @@ avc_convert_pkt(th_pkt_t *src)
th_pkt_t *pkt = malloc(sizeof(th_pkt_t));
*pkt = *src;
pkt->pkt_refcount = 1;
pkt->pkt_header = NULL;
pkt->pkt_meta = NULL;
pkt->pkt_payload = NULL;

if (src->pkt_header) {
if (src->pkt_meta) {
sbuf_t headers;
sbuf_init(&headers);

isom_write_avcc(&headers, pktbuf_ptr(src->pkt_header),
pktbuf_len(src->pkt_header));
pkt->pkt_header = pktbuf_make(headers.sb_data, headers.sb_ptr);
isom_write_avcc(&headers, pktbuf_ptr(src->pkt_meta),
pktbuf_len(src->pkt_meta));
pkt->pkt_meta = pktbuf_make(headers.sb_data, headers.sb_ptr);
}

sbuf_t payload;
sbuf_init(&payload);

if(src->pkt_header)
avc_parse_nal_units(&payload, pktbuf_ptr(src->pkt_header),
pktbuf_len(src->pkt_header));
if(src->pkt_meta)
avc_parse_nal_units(&payload, pktbuf_ptr(src->pkt_meta),
pktbuf_len(src->pkt_meta));

avc_parse_nal_units(&payload, pktbuf_ptr(src->pkt_payload),
pktbuf_len(src->pkt_payload));
Expand Down

0 comments on commit d504bcf

Please sign in to comment.