Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
streaming: optimize queue size handling as suggested in comment
  • Loading branch information
perexg committed Jul 7, 2015
1 parent ecbb176 commit 36573a4
Show file tree
Hide file tree
Showing 8 changed files with 54 additions and 59 deletions.
3 changes: 1 addition & 2 deletions src/dvr/dvr_rec.c
Expand Up @@ -913,8 +913,7 @@ dvr_thread(void *aux)
atomic_add(&ts->ths_bytes_out, pktbuf_len(pb));
}

TAILQ_REMOVE(&sq->sq_queue, sm, sm_link);

streaming_queue_remove(sq, sm);
pthread_mutex_unlock(&sq->sq_mutex);

switch(sm->sm_type) {
Expand Down
2 changes: 1 addition & 1 deletion src/satip/rtp.c
Expand Up @@ -183,7 +183,7 @@ satip_rtp_thread(void *aux)
pthread_cond_wait(&sq->sq_cond, &sq->sq_mutex);
continue;
}
TAILQ_REMOVE(&sq->sq_queue, sm, sm_link);
streaming_queue_remove(sq, sm);
pthread_mutex_unlock(&sq->sq_mutex);

switch (sm->sm_type) {
Expand Down
2 changes: 1 addition & 1 deletion src/service_mapper.c
Expand Up @@ -355,7 +355,7 @@ service_mapper_thread ( void *aux )
if (!tvheadend_running)
break;

TAILQ_REMOVE(&sq->sq_queue, sm, sm_link);
streaming_queue_remove(sq, sm);
pthread_mutex_unlock(&sq->sq_mutex);

if(sm->sm_type == SMT_PACKET) {
Expand Down
95 changes: 45 additions & 50 deletions src/streaming.c
Expand Up @@ -46,6 +46,23 @@ streaming_target_init(streaming_target_t *st, st_callback_t *cb, void *opaque,
st->st_reject_filter = reject_filter;
}

/**
*
*/
static size_t
streaming_message_data_size(streaming_message_t *sm)
{
if (sm->sm_type == SMT_PACKET) {
th_pkt_t *pkt = sm->sm_data;
if (pkt && pkt->pkt_payload)
return pkt->pkt_payload->pb_size;
} else if (sm->sm_type == SMT_MPEGTS) {
pktbuf_t *pkt_payload = sm->sm_data;
if (pkt_payload)
return pkt_payload->pb_size;
}
return 0;
}

/**
*
Expand All @@ -58,18 +75,26 @@ streaming_queue_deliver(void *opauqe, streaming_message_t *sm)
pthread_mutex_lock(&sq->sq_mutex);

/* queue size protection */
// TODO: would be better to update size as we go, but this would
// require updates elsewhere to ensure all removals from the queue
// are covered (new function)
if (sq->sq_maxsize && streaming_queue_size(&sq->sq_queue) >= sq->sq_maxsize)
if (sq->sq_maxsize && sq->sq_maxsize < sq->sq_size) {
streaming_msg_free(sm);
else
} else {
TAILQ_INSERT_TAIL(&sq->sq_queue, sm, sm_link);
sq->sq_size += streaming_message_data_size(sm);
}

pthread_cond_signal(&sq->sq_cond);
pthread_mutex_unlock(&sq->sq_mutex);
}

/**
*
*/
void
streaming_queue_remove(streaming_queue_t *sq, streaming_message_t *sm)
{
sq->sq_size -= streaming_message_data_size(sm);
TAILQ_REMOVE(&sq->sq_queue, sm, sm_link);
}

/**
*
Expand All @@ -84,6 +109,7 @@ streaming_queue_init(streaming_queue_t *sq, int reject_filter, size_t maxsize)
TAILQ_INIT(&sq->sq_queue);

sq->sq_maxsize = maxsize;
sq->sq_size = 0;
}

/**
Expand All @@ -92,11 +118,25 @@ streaming_queue_init(streaming_queue_t *sq, int reject_filter, size_t maxsize)
void
streaming_queue_deinit(streaming_queue_t *sq)
{
sq->sq_size = 0;
streaming_queue_clear(&sq->sq_queue);
pthread_mutex_destroy(&sq->sq_mutex);
pthread_cond_destroy(&sq->sq_cond);
}

/**
*
*/
void
streaming_queue_clear(struct streaming_message_queue *q)
{
streaming_message_t *sm;

while((sm = TAILQ_FIRST(q)) != NULL) {
TAILQ_REMOVE(q, sm, sm_link);
streaming_msg_free(sm);
}
}

/**
*
Expand Down Expand Up @@ -348,51 +388,6 @@ streaming_pad_deliver(streaming_pad_t *sp, streaming_message_t *sm)
streaming_msg_free(sm);
}

/**
*
*/
void
streaming_queue_clear(struct streaming_message_queue *q)
{
streaming_message_t *sm;

while((sm = TAILQ_FIRST(q)) != NULL) {
TAILQ_REMOVE(q, sm, sm_link);
streaming_msg_free(sm);
}
}


/**
*
*/
size_t streaming_queue_size(struct streaming_message_queue *q)
{
streaming_message_t *sm;
int size = 0;

TAILQ_FOREACH(sm, q, sm_link) {
if (sm->sm_type == SMT_PACKET)
{
th_pkt_t *pkt = sm->sm_data;
if (pkt && pkt->pkt_payload)
{
size += pkt->pkt_payload->pb_size;
}
}
else if (sm->sm_type == SMT_MPEGTS)
{
pktbuf_t *pkt_payload = sm->sm_data;
if (pkt_payload)
{
size += pkt_payload->pb_size;
}
}
}
return size;
}


/**
*
*/
Expand Down
4 changes: 2 additions & 2 deletions src/streaming.h
Expand Up @@ -78,10 +78,10 @@ void streaming_queue_init

void streaming_queue_clear(struct streaming_message_queue *q);

size_t streaming_queue_size(struct streaming_message_queue *q);

void streaming_queue_deinit(streaming_queue_t *sq);

void streaming_queue_remove(streaming_queue_t *sq, streaming_message_t *sm);

void streaming_target_connect(streaming_pad_t *sp, streaming_target_t *st);

void streaming_target_disconnect(streaming_pad_t *sp, streaming_target_t *st);
Expand Down
4 changes: 2 additions & 2 deletions src/timeshift/timeshift_writer.c
Expand Up @@ -352,7 +352,7 @@ void *timeshift_writer ( void *aux )
pthread_cond_wait(&sq->sq_cond, &sq->sq_mutex);
continue;
}
TAILQ_REMOVE(&sq->sq_queue, sm, sm_link);
streaming_queue_remove(sq, sm);
pthread_mutex_unlock(&sq->sq_mutex);

_process_msg(ts, sm, &run);
Expand All @@ -376,7 +376,7 @@ void timeshift_writer_flush ( timeshift_t *ts )

pthread_mutex_lock(&sq->sq_mutex);
while ((sm = TAILQ_FIRST(&sq->sq_queue))) {
TAILQ_REMOVE(&sq->sq_queue, sm, sm_link);
streaming_queue_remove(sq, sm);
_process_msg(ts, sm, NULL);
}
pthread_mutex_unlock(&sq->sq_mutex);
Expand Down
1 change: 1 addition & 0 deletions src/tvheadend.h
Expand Up @@ -522,6 +522,7 @@ typedef struct streaming_queue {
pthread_cond_t sq_cond; /* Condvar for signalling new packets */

size_t sq_maxsize; /* Max queue size (bytes) */
size_t sq_size; /* Actual queue size (bytes) - only data */

struct streaming_message_queue sq_queue;

Expand Down
2 changes: 1 addition & 1 deletion src/webui/webui.c
Expand Up @@ -305,7 +305,7 @@ http_stream_run(http_connection_t *hc, profile_chain_t *prch,
continue;
}

TAILQ_REMOVE(&sq->sq_queue, sm, sm_link);
streaming_queue_remove(sq, sm);
pthread_mutex_unlock(&sq->sq_mutex);

switch(sm->sm_type) {
Expand Down

1 comment on commit 36573a4

@jdierkse
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi,
I'm seeing some sporadic crashes, and I think they might be related to this changeset.
GDB gives me the following callstack (using version cb5f6a1):

#0  0x00007f11430b3227 in raise () from /lib64/libc.so.6
#1  0x00007f11430b4778 in abort () from /lib64/libc.so.6
#2  0x00007f11430f2e7b in ?? () from /lib64/libc.so.6
#3  0x00007f11430f89d6 in ?? () from /lib64/libc.so.6
#4  0x00007f11430f9753 in ?? () from /lib64/libc.so.6
#5  0x000000000042a611 in pktbuf_ref_dec (pb=0x7f10e80225d0) at src/packet.c:168
#6  pktbuf_ref_dec (pb=0x7f10e80225d0) at src/packet.c:164
#7  0x000000000042a639 in pkt_destroy (pkt=0x7f10e8002220) at src/packet.c:35
#8  pkt_ref_dec (pkt=0x7f10e8002220) at src/packet.c:67
#9  0x000000000048403c in mk_mux_write_pkt (mkm=0x1d2e6c0, pkt=0x7f10d414dae0) at src/muxer/tvh/mkmux.c:1240
#10 0x0000000000481b74 in tvh_muxer_write_pkt (m=0x261df00, smt=<optimized out>, data=<optimized out>) at src/muxer/muxer_tvh.c:162
#11 0x0000000000472b47 in muxer_write_pkt (data=<optimized out>, smt=<optimized out>, m=<optimized out>)
    at /root/spksrc.new/spk/tvheadend-testing/work-cedarview-5.1/tvheadend-testing-gitcb5f6a1a4b/src/muxer.h:137
#12 dvr_thread (aux=<optimized out>) at src/dvr/dvr_rec.c:951
#13 0x00000000004158be in thread_wrapper (p=0x3347d30) at src/wrappers.c:149
#14 0x00007f1143937def in ?? () from /lib64/libpthread.so.0
#15 0x00007f114316678d in clone () from /lib64/libc.so.6

I'm not sure what the cause of this crash is, does this callstack mean anything to you @perexg?

Please sign in to comment.