Permalink
Browse files

Streaming queue size protection

  • Loading branch information...
1 parent 3236c4a commit 0254545fbe84d4f88a05306ecbeb7235f22b96f1 @mikrohard committed Oct 23, 2012
Showing with 40 additions and 1 deletion.
  1. +38 −1 src/streaming.c
  2. +2 −0 src/streaming.h
View
@@ -52,7 +52,14 @@ streaming_queue_deliver(void *opauqe, streaming_message_t *sm)
streaming_queue_t *sq = opauqe;
pthread_mutex_lock(&sq->sq_mutex);
- TAILQ_INSERT_TAIL(&sq->sq_queue, sm, sm_link);
+
+ /* queue size protection */
+ int queue_size = streaming_queue_size(&sq->sq_queue);
+ if (queue_size > 1500000)
+ streaming_msg_free(sm);
+ else
+ TAILQ_INSERT_TAIL(&sq->sq_queue, sm, sm_link);
+
pthread_cond_signal(&sq->sq_cond);
pthread_mutex_unlock(&sq->sq_mutex);
}
@@ -331,6 +338,36 @@ streaming_queue_clear(struct streaming_message_queue *q)
}
+/**
+ *
+ */
+int streaming_queue_size(struct streaming_message_queue *q)
+{
+ streaming_message_t *sm;
+ int size = 0;
+
+ TAILQ_FOREACH(sm, q, sm_link) {
+ if (sm->sm_type == SMT_PACKET)
+ {
+ th_pkt_t *pkt = sm->sm_data;
+ if (pkt && pkt->pkt_payload)
+ {
+ size += pkt->pkt_payload->pb_size;
+ }
+ }
+ else if (sm->sm_type == SMT_MPEGTS)
+ {
+ pktbuf_t *pkt_payload = sm->sm_data;
+ if (pkt_payload)
+ {
+ size += pkt_payload->pb_size;
+ }
+ }
+ }
+ return size;
+}
+
+
/**
*
*/
View
@@ -73,6 +73,8 @@ void streaming_queue_init(streaming_queue_t *sq, int reject_filter);
void streaming_queue_clear(struct streaming_message_queue *q);
+int streaming_queue_size(struct streaming_message_queue *q);
+
void streaming_queue_deinit(streaming_queue_t *sq);
void streaming_target_connect(streaming_pad_t *sp, streaming_target_t *st);

0 comments on commit 0254545

Please sign in to comment.