diff --git a/src/rtpp_proc_async.c b/src/rtpp_proc_async.c index 547428c08..79b2fd85a 100644 --- a/src/rtpp_proc_async.c +++ b/src/rtpp_proc_async.c @@ -274,16 +274,19 @@ relay_packet(const struct pkt_proc_ctx *pktxp) * Check that we have some address to which packet is to be * sent out, drop otherwise. */ - if (!CALL_SMETHOD(stp_out, issendable) || CALL_SMETHOD(stp_out, isplayer_active)) { + if (!CALL_SMETHOD(stp_out, issendable)) { return PPROC_ACT_DROP; } CALL_SMETHOD(stp_out, send_pkt, packet->sender, packet); - CALL_SMETHOD(stp_in->pcount, reg_reld); - if (pktxp->rsp != NULL) { - pktxp->rsp->npkts_relayed.cnt++; - } else { - struct rtpp_proc_async_cf *proc_cf = pktxp->pproc->arg; - CALL_SMETHOD(proc_cf->cf_save->rtpp_stats, updatebyidx, proc_cf->npkts_relayed_idx, 1); + if ((pktxp->flags & PPROC_FLAG_LGEN) == 0) { + CALL_SMETHOD(stp_in->pcount, reg_reld); + if (pktxp->rsp != NULL) { + pktxp->rsp->npkts_relayed.cnt++; + } else { + struct rtpp_proc_async_cf *proc_cf = pktxp->pproc->arg; + CALL_SMETHOD(proc_cf->cf_save->rtpp_stats, updatebyidx, + proc_cf->npkts_relayed_idx, 1); + } } return PPROC_ACT_TAKE; } diff --git a/src/rtpp_proc_servers.c b/src/rtpp_proc_servers.c index 277bec9ee..54f6b3fa7 100644 --- a/src/rtpp_proc_servers.c +++ b/src/rtpp_proc_servers.c @@ -51,6 +51,8 @@ #include "rtpp_mallocs.h" #include "rtpp_stats.h" #include "rtpp_debug.h" +#include "advanced/packet_processor.h" +#include "advanced/pproc_manager.h" struct rtpp_proc_servers_priv { struct rtpp_proc_servers pub; @@ -64,17 +66,18 @@ struct foreach_args { double dtime; struct sthread_args *sender; struct rtpp_proc_stat *npkts_played; - struct rtpp_weakref_obj *rtp_streams_wrt; + const struct rtpp_cfg *cfsp; }; static int process_rtp_servers_foreach(void *dp, void *ap) { - struct foreach_args *fap; + const struct foreach_args *fap; struct rtpp_server *rsrv; struct rtp_packet *pkt; int len; - struct rtpp_stream *rsop; + struct rtpp_stream *strmp_out; + struct rtpp_stream *strmp_in; fap = (struct foreach_args *)ap; /* @@ -82,31 +85,40 @@ process_rtp_servers_foreach(void *dp, void *ap) * locked context of the rtpp_hash_table, which holds its own ref. */ rsrv = (struct rtpp_server *)dp; - rsop = CALL_METHOD(fap->rtp_streams_wrt, get_by_idx, rsrv->stuid); - if (rsop == NULL) { - return (RTPP_WR_MATCH_CONT); - } + strmp_out = CALL_METHOD(fap->cfsp->rtp_streams_wrt, get_by_idx, rsrv->stuid); + if (strmp_out == NULL) + goto e0; + strmp_in = CALL_SMETHOD(strmp_out, get_sender, fap->cfsp); + if (strmp_in == NULL) + goto e1; for (;;) { pkt = CALL_SMETHOD(rsrv, get, fap->dtime, &len); if (pkt == NULL) { if (len == RTPS_EOF) { - CALL_SMETHOD(rsop, finish_playback, rsrv->sruid); - RTPP_OBJ_DECREF(rsop); + CALL_SMETHOD(strmp_out, finish_playback, rsrv->sruid); + RTPP_OBJ_DECREF(strmp_in); + RTPP_OBJ_DECREF(strmp_out); return (RTPP_WR_MATCH_DEL); } else if (len != RTPS_LATER) { /* XXX some error, brag to logs */ } break; } - if (CALL_SMETHOD(rsop, issendable) == 0) { - /* We have a packet, but nowhere to send it, drop */ - RTPP_OBJ_DECREF(pkt); - continue; - } - CALL_SMETHOD(rsop, send_pkt, fap->sender, pkt); - fap->npkts_played->cnt++; + pkt->sender = fap->sender; + struct pkt_proc_ctx pktx = { + .strmp_in = strmp_in, + .strmp_out = strmp_out, + .pktp = pkt, + .flags = PPROC_FLAG_LGEN, + }; + if (CALL_SMETHOD(strmp_in->pproc_manager, handleat, &pktx, + PPROC_ORD_PLAY + 1) & PPROC_ACT_TAKE) + fap->npkts_played->cnt++; } - RTPP_OBJ_DECREF(rsop); + RTPP_OBJ_DECREF(strmp_in); +e1: + RTPP_OBJ_DECREF(strmp_out); +e0: return (RTPP_WR_MATCH_CONT); } @@ -115,15 +127,15 @@ static enum rtpp_timed_cb_rvals run_servers(double dtime, void *arg) { struct rtpp_proc_servers_priv *tp = arg; - struct foreach_args fargs; - - fargs.dtime = dtime; - fargs.sender = rtpp_anetio_pick_sender(tp->netio); - fargs.npkts_played = &tp->npkts_played; - fargs.rtp_streams_wrt = tp->cfsp->rtp_streams_wrt; + const struct foreach_args fargs = { + .dtime = dtime, + .sender = rtpp_anetio_pick_sender(tp->netio), + .npkts_played = &tp->npkts_played, + .cfsp = tp->cfsp, + }; CALL_METHOD(tp->cfsp->servers_wrt, foreach, process_rtp_servers_foreach, - &fargs); + (void *)&fargs); rtpp_anetio_pump_q(fargs.sender); FLUSH_STAT(tp->cfsp->rtpp_stats, tp->npkts_played); diff --git a/src/rtpp_stream.c b/src/rtpp_stream.c index f1f5f4349..ecbaffaf1 100644 --- a/src/rtpp_stream.c +++ b/src/rtpp_stream.c @@ -432,6 +432,13 @@ player_predestroy_cb(struct rtpp_stats *rtpp_stats) CALL_SMETHOD(rtpp_stats, updatebyname, "nplrs_destroyed", 1); } +static enum pproc_action +drop_packets(const struct pkt_proc_ctx *pktxp) +{ + + return (PPROC_ACT_DROP); +} + static int rtpp_stream_handle_play(struct rtpp_stream *self, const char *codecs, const char *pname, int playcount, struct rtpp_command *cmd, int ptime) @@ -447,13 +454,21 @@ rtpp_stream_handle_play(struct rtpp_stream *self, const char *codecs, .ptime = ptime}; PUB2PVT(self, pvt); + + const struct packet_processor_if drop_on_pa_poi = { + .descr = "drop_packets(player_active)", + .arg = (void *)pvt, + .key = (void *)(pvt + 2), + .enqueue = &drop_packets + }; + pthread_mutex_lock(&pvt->lock); plerror = "reason unknown"; while (*codecs != '\0') { n = strtol(codecs, &cp, 10); if (cp == codecs) { plerror = "invalid codecs"; - break; + goto e0; } codecs = cp; if (*codecs != '\0') @@ -466,16 +481,21 @@ rtpp_stream_handle_play(struct rtpp_stream *self, const char *codecs, plerror = "rtpp_server_ctor() failed"; if (sca.result == RTPP_SERV_NOENT) continue; - break; + goto e0; } rsrv->stuid = self->stuid; ssrc = CALL_SMETHOD(rsrv, get_ssrc); seq = CALL_SMETHOD(rsrv, get_seq); _s_rtps(pvt, rsrv->sruid, 0); + int regres = CALL_SMETHOD(self->pproc_manager->reverse, reg, PPROC_ORD_PLAY, + &drop_on_pa_poi); + if (regres < 0) { + plerror = "pproc_manager->reg() method failed"; + goto e1; + } if (CALL_METHOD(pvt->servers_wrt, reg, rsrv->rcnt, rsrv->sruid) != 0) { - RTPP_OBJ_DECREF(rsrv); plerror = "servers_wrt->reg() method failed"; - break; + goto e2; } if (pvt->rtps.inact == 0) { CALL_SMETHOD(rsrv, start, cmd->dtime->mono); @@ -490,6 +510,12 @@ rtpp_stream_handle_play(struct rtpp_stream *self, const char *codecs, playcount, pname, n, ssrc, seq); return 0; } + goto e0; +e2: + CALL_SMETHOD(self->pproc_manager->reverse, unreg, drop_on_pa_poi.key); +e1: + RTPP_OBJ_DECREF(rsrv); +e0: pthread_mutex_unlock(&pvt->lock); RTPP_LOG(pvt->pub.log, RTPP_LOG_ERR, "can't create player: %s", plerror); return -1; @@ -508,6 +534,7 @@ rtpp_stream_handle_noplay(struct rtpp_stream *self) ruid = pvt->rtps.uid; pthread_mutex_unlock(&pvt->lock); if (ruid != RTPP_UID_NONE) { + CALL_SMETHOD(self->pproc_manager->reverse, unreg, pvt + 2); if (CALL_METHOD(pvt->servers_wrt, unreg, ruid) != NULL) { pthread_mutex_lock(&pvt->lock); if (pvt->rtps.uid == ruid) { @@ -545,6 +572,7 @@ rtpp_stream_finish_playback(struct rtpp_stream *self, uint64_t sruid) PUB2PVT(self, pvt); pthread_mutex_lock(&pvt->lock); if (pvt->rtps.uid != RTPP_UID_NONE && pvt->rtps.uid == sruid) { + CALL_SMETHOD(self->pproc_manager->reverse, unreg, pvt + 2); _s_rtps(pvt, RTPP_UID_NONE, 1); RTPP_LOG(pvt->pub.log, RTPP_LOG_INFO, "player at port %d has finished", self->port);