Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
factor out subscription byte count updates to separate functions. Thi…
…s has the added benefit of ensuring that all updates are done atomically (previously only a minority of them were).
  • Loading branch information
Sam Stenvall authored and perexg committed Sep 2, 2015
1 parent 2218885 commit e1695bf
Show file tree
Hide file tree
Showing 8 changed files with 33 additions and 14 deletions.
2 changes: 1 addition & 1 deletion src/dvr/dvr_rec.c
Expand Up @@ -915,7 +915,7 @@ dvr_thread(void *aux)
}
}
if (pb)
atomic_add(&ts->ths_bytes_out, pktbuf_len(pb));
subscription_add_bytes_out(ts, pktbuf_len(pb));
}

streaming_queue_remove(sq, sm);
Expand Down
4 changes: 2 additions & 2 deletions src/epggrab/module/eit.c
Expand Up @@ -597,8 +597,8 @@ _eit_callback
/* Statistics */
ths = mpegts_mux_find_subscription_by_name(mm, "epggrab");
if (ths) {
ths->ths_bytes_in += len;
ths->ths_bytes_out += len;
subscription_add_bytes_in(ths, len);
subscription_add_bytes_out(ths, len);
}

/* Validate */
Expand Down
8 changes: 4 additions & 4 deletions src/epggrab/module/opentv.c
Expand Up @@ -563,8 +563,8 @@ opentv_table_callback
/* Statistics */
ths = mpegts_mux_find_subscription_by_name(mt->mt_mux, "epggrab");
if (ths) {
ths->ths_bytes_in += len;
ths->ths_bytes_out += len;
subscription_add_bytes_in(ths, len);
subscription_add_bytes_out(ths, len);
}


Expand Down Expand Up @@ -642,8 +642,8 @@ opentv_bat_callback
/* Statistics */
ths = mpegts_mux_find_subscription_by_name(mt->mt_mux, "epggrab");
if (ths) {
ths->ths_bytes_in += len;
ths->ths_bytes_out += len;
subscription_add_bytes_in(ths, len);
subscription_add_bytes_out(ths, len);
}

r = dvb_bat_callback(mt, buf, len, tableid);
Expand Down
2 changes: 1 addition & 1 deletion src/htsp_server.c
Expand Up @@ -3415,7 +3415,7 @@ htsp_stream_deliver(htsp_subscription_t *hs, th_pkt_t *pkt)
payloadlen = pktbuf_len(pkt->pkt_payload);
htsmsg_add_binptr(m, "payload", pktbuf_ptr(pkt->pkt_payload), payloadlen);
htsp_send_subscription(htsp, m, pkt->pkt_payload, hs, payloadlen);
atomic_add(&hs->hs_s->ths_bytes_out, payloadlen);
subscription_add_bytes_out(hs->hs_s, payloadlen);

if(hs->hs_last_report != dispatch_clock) {

Expand Down
2 changes: 1 addition & 1 deletion src/satip/rtp.c
Expand Up @@ -189,7 +189,7 @@ satip_rtp_thread(void *aux)
switch (sm->sm_type) {
case SMT_MPEGTS:
pb = sm->sm_data;
atomic_add(&subs->ths_bytes_out, pktbuf_len(pb));
subscription_add_bytes_out(subs, pktbuf_len(pb));
pthread_mutex_lock(&rtp->lock);
r = satip_rtp_loop(rtp, pktbuf_ptr(pb), pktbuf_len(pb));
pthread_mutex_unlock(&rtp->lock);
Expand Down
20 changes: 18 additions & 2 deletions src/subscriptions.c
Expand Up @@ -457,11 +457,11 @@ subscription_input_direct(void *opauqe, streaming_message_t *sm)
th_pkt_t *pkt = sm->sm_data;
s->ths_total_err += pkt->pkt_err;
if (pkt->pkt_payload)
s->ths_bytes_in += pkt->pkt_payload->pb_size;
subscription_add_bytes_in(s, pkt->pkt_payload->pb_size);
} else if(sm->sm_type == SMT_MPEGTS) {
pktbuf_t *pb = sm->sm_data;
s->ths_total_err += pb->pb_err;
s->ths_bytes_in += pb->pb_size;
subscription_add_bytes_in(s, pb->pb_size);
}

/* Pass to output */
Expand Down Expand Up @@ -951,6 +951,22 @@ subscription_done(void)
* Subscription control
* *************************************************************************/

/**
* Update incoming byte count
*/
void subscription_add_bytes_in(th_subscription_t *s, size_t in)
{
atomic_add(&s->ths_bytes_in, in);
}

/**
* Update outgoing byte count
*/
void subscription_add_bytes_out(th_subscription_t *s, size_t out)
{
atomic_add(&s->ths_bytes_out, out);
}

/**
* Change weight
*/
Expand Down
3 changes: 3 additions & 0 deletions src/subscriptions.h
Expand Up @@ -201,6 +201,9 @@ void subscription_unlink_service(th_subscription_t *s, int reason);

void subscription_dummy_join(const char *id, int first);

void subscription_add_bytes_in(th_subscription_t *s, size_t in);

void subscription_add_bytes_out(th_subscription_t *s, size_t out);

static inline int subscriptions_active(void)
{ return LIST_FIRST(&subscriptions) != NULL; }
Expand Down
6 changes: 3 additions & 3 deletions src/webui/webui.c
Expand Up @@ -349,7 +349,7 @@ http_stream_run(http_connection_t *hc, profile_chain_t *prch,
pb = ((th_pkt_t*)sm->sm_data)->pkt_payload;
else
pb = sm->sm_data;
atomic_add(&s->ths_bytes_out, pktbuf_len(pb));
subscription_add_bytes_out(s, pktbuf_len(pb));
muxer_write_pkt(mux, sm->sm_type, sm->sm_data);
sm->sm_data = NULL;
}
Expand Down Expand Up @@ -1500,8 +1500,8 @@ page_dvrfile(http_connection_t *hc, const char *remain, void *opaque)
}
content_len -= r;
if (sub) {
sub->ths_bytes_in += r;
sub->ths_bytes_out += r;
subscription_add_bytes_in(sub, r);
subscription_add_bytes_out(sub, r);
}
}
}
Expand Down

0 comments on commit e1695bf

Please sign in to comment.