Skip to content

Commit

Permalink
Conver player feature to use pproc_manager.
Browse files Browse the repository at this point in the history
  • Loading branch information
sobomax committed Jan 13, 2023
1 parent b5de107 commit 1a46e7a
Show file tree
Hide file tree
Showing 3 changed files with 78 additions and 35 deletions.
17 changes: 10 additions & 7 deletions src/rtpp_proc_async.c
Original file line number Diff line number Diff line change
Expand Up @@ -274,16 +274,19 @@ relay_packet(const struct pkt_proc_ctx *pktxp)
* Check that we have some address to which packet is to be
* sent out, drop otherwise.
*/
if (!CALL_SMETHOD(stp_out, issendable) || CALL_SMETHOD(stp_out, isplayer_active)) {
if (!CALL_SMETHOD(stp_out, issendable)) {
return PPROC_ACT_DROP;
}
CALL_SMETHOD(stp_out, send_pkt, packet->sender, packet);
CALL_SMETHOD(stp_in->pcount, reg_reld);
if (pktxp->rsp != NULL) {
pktxp->rsp->npkts_relayed.cnt++;
} else {
struct rtpp_proc_async_cf *proc_cf = pktxp->pproc->arg;
CALL_SMETHOD(proc_cf->cf_save->rtpp_stats, updatebyidx, proc_cf->npkts_relayed_idx, 1);
if ((pktxp->flags & PPROC_FLAG_LGEN) == 0) {
CALL_SMETHOD(stp_in->pcount, reg_reld);
if (pktxp->rsp != NULL) {
pktxp->rsp->npkts_relayed.cnt++;
} else {
struct rtpp_proc_async_cf *proc_cf = pktxp->pproc->arg;
CALL_SMETHOD(proc_cf->cf_save->rtpp_stats, updatebyidx,
proc_cf->npkts_relayed_idx, 1);
}
}
return PPROC_ACT_TAKE;
}
Expand Down
60 changes: 36 additions & 24 deletions src/rtpp_proc_servers.c
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@
#include "rtpp_mallocs.h"
#include "rtpp_stats.h"
#include "rtpp_debug.h"
#include "advanced/packet_processor.h"
#include "advanced/pproc_manager.h"

struct rtpp_proc_servers_priv {
struct rtpp_proc_servers pub;
Expand All @@ -64,49 +66,59 @@ struct foreach_args {
double dtime;
struct sthread_args *sender;
struct rtpp_proc_stat *npkts_played;
struct rtpp_weakref_obj *rtp_streams_wrt;
const struct rtpp_cfg *cfsp;
};

static int
process_rtp_servers_foreach(void *dp, void *ap)
{
struct foreach_args *fap;
const struct foreach_args *fap;
struct rtpp_server *rsrv;
struct rtp_packet *pkt;
int len;
struct rtpp_stream *rsop;
struct rtpp_stream *strmp_out;
struct rtpp_stream *strmp_in;

fap = (struct foreach_args *)ap;
/*
* This method does not need us to bump ref, since we are in the
* locked context of the rtpp_hash_table, which holds its own ref.
*/
rsrv = (struct rtpp_server *)dp;
rsop = CALL_METHOD(fap->rtp_streams_wrt, get_by_idx, rsrv->stuid);
if (rsop == NULL) {
return (RTPP_WR_MATCH_CONT);
}
strmp_out = CALL_METHOD(fap->cfsp->rtp_streams_wrt, get_by_idx, rsrv->stuid);
if (strmp_out == NULL)
goto e0;
strmp_in = CALL_SMETHOD(strmp_out, get_sender, fap->cfsp);
if (strmp_in == NULL)
goto e1;
for (;;) {
pkt = CALL_SMETHOD(rsrv, get, fap->dtime, &len);
if (pkt == NULL) {
if (len == RTPS_EOF) {
CALL_SMETHOD(rsop, finish_playback, rsrv->sruid);
RTPP_OBJ_DECREF(rsop);
CALL_SMETHOD(strmp_out, finish_playback, rsrv->sruid);
RTPP_OBJ_DECREF(strmp_in);
RTPP_OBJ_DECREF(strmp_out);
return (RTPP_WR_MATCH_DEL);
} else if (len != RTPS_LATER) {
/* XXX some error, brag to logs */
}
break;
}
if (CALL_SMETHOD(rsop, issendable) == 0) {
/* We have a packet, but nowhere to send it, drop */
RTPP_OBJ_DECREF(pkt);
continue;
}
CALL_SMETHOD(rsop, send_pkt, fap->sender, pkt);
fap->npkts_played->cnt++;
pkt->sender = fap->sender;
struct pkt_proc_ctx pktx = {
.strmp_in = strmp_in,
.strmp_out = strmp_out,
.pktp = pkt,
.flags = PPROC_FLAG_LGEN,
};
if (CALL_SMETHOD(strmp_in->pproc_manager, handleat, &pktx,
PPROC_ORD_PLAY + 1) & PPROC_ACT_TAKE)
fap->npkts_played->cnt++;
}
RTPP_OBJ_DECREF(rsop);
RTPP_OBJ_DECREF(strmp_in);
e1:
RTPP_OBJ_DECREF(strmp_out);
e0:
return (RTPP_WR_MATCH_CONT);
}

Expand All @@ -115,15 +127,15 @@ static enum rtpp_timed_cb_rvals
run_servers(double dtime, void *arg)
{
struct rtpp_proc_servers_priv *tp = arg;
struct foreach_args fargs;

fargs.dtime = dtime;
fargs.sender = rtpp_anetio_pick_sender(tp->netio);
fargs.npkts_played = &tp->npkts_played;
fargs.rtp_streams_wrt = tp->cfsp->rtp_streams_wrt;
const struct foreach_args fargs = {
.dtime = dtime,
.sender = rtpp_anetio_pick_sender(tp->netio),
.npkts_played = &tp->npkts_played,
.cfsp = tp->cfsp,
};

CALL_METHOD(tp->cfsp->servers_wrt, foreach, process_rtp_servers_foreach,
&fargs);
(void *)&fargs);

rtpp_anetio_pump_q(fargs.sender);
FLUSH_STAT(tp->cfsp->rtpp_stats, tp->npkts_played);
Expand Down
36 changes: 32 additions & 4 deletions src/rtpp_stream.c
Original file line number Diff line number Diff line change
Expand Up @@ -432,6 +432,13 @@ player_predestroy_cb(struct rtpp_stats *rtpp_stats)
CALL_SMETHOD(rtpp_stats, updatebyname, "nplrs_destroyed", 1);
}

static enum pproc_action
drop_packets(const struct pkt_proc_ctx *pktxp)
{

return (PPROC_ACT_DROP);
}

static int
rtpp_stream_handle_play(struct rtpp_stream *self, const char *codecs,
const char *pname, int playcount, struct rtpp_command *cmd, int ptime)
Expand All @@ -447,13 +454,21 @@ rtpp_stream_handle_play(struct rtpp_stream *self, const char *codecs,
.ptime = ptime};

PUB2PVT(self, pvt);

const struct packet_processor_if drop_on_pa_poi = {
.descr = "drop_packets(player_active)",
.arg = (void *)pvt,
.key = (void *)(pvt + 2),
.enqueue = &drop_packets
};

pthread_mutex_lock(&pvt->lock);
plerror = "reason unknown";
while (*codecs != '\0') {
n = strtol(codecs, &cp, 10);
if (cp == codecs) {
plerror = "invalid codecs";
break;
goto e0;
}
codecs = cp;
if (*codecs != '\0')
Expand All @@ -466,16 +481,21 @@ rtpp_stream_handle_play(struct rtpp_stream *self, const char *codecs,
plerror = "rtpp_server_ctor() failed";
if (sca.result == RTPP_SERV_NOENT)
continue;
break;
goto e0;
}
rsrv->stuid = self->stuid;
ssrc = CALL_SMETHOD(rsrv, get_ssrc);
seq = CALL_SMETHOD(rsrv, get_seq);
_s_rtps(pvt, rsrv->sruid, 0);
int regres = CALL_SMETHOD(self->pproc_manager->reverse, reg, PPROC_ORD_PLAY,
&drop_on_pa_poi);
if (regres < 0) {
plerror = "pproc_manager->reg() method failed";
goto e1;
}
if (CALL_METHOD(pvt->servers_wrt, reg, rsrv->rcnt, rsrv->sruid) != 0) {
RTPP_OBJ_DECREF(rsrv);
plerror = "servers_wrt->reg() method failed";
break;
goto e2;
}
if (pvt->rtps.inact == 0) {
CALL_SMETHOD(rsrv, start, cmd->dtime->mono);
Expand All @@ -490,6 +510,12 @@ rtpp_stream_handle_play(struct rtpp_stream *self, const char *codecs,
playcount, pname, n, ssrc, seq);
return 0;
}
goto e0;
e2:
CALL_SMETHOD(self->pproc_manager->reverse, unreg, drop_on_pa_poi.key);
e1:
RTPP_OBJ_DECREF(rsrv);
e0:
pthread_mutex_unlock(&pvt->lock);
RTPP_LOG(pvt->pub.log, RTPP_LOG_ERR, "can't create player: %s", plerror);
return -1;
Expand All @@ -508,6 +534,7 @@ rtpp_stream_handle_noplay(struct rtpp_stream *self)
ruid = pvt->rtps.uid;
pthread_mutex_unlock(&pvt->lock);
if (ruid != RTPP_UID_NONE) {
CALL_SMETHOD(self->pproc_manager->reverse, unreg, pvt + 2);
if (CALL_METHOD(pvt->servers_wrt, unreg, ruid) != NULL) {
pthread_mutex_lock(&pvt->lock);
if (pvt->rtps.uid == ruid) {
Expand Down Expand Up @@ -545,6 +572,7 @@ rtpp_stream_finish_playback(struct rtpp_stream *self, uint64_t sruid)
PUB2PVT(self, pvt);
pthread_mutex_lock(&pvt->lock);
if (pvt->rtps.uid != RTPP_UID_NONE && pvt->rtps.uid == sruid) {
CALL_SMETHOD(self->pproc_manager->reverse, unreg, pvt + 2);
_s_rtps(pvt, RTPP_UID_NONE, 1);
RTPP_LOG(pvt->pub.log, RTPP_LOG_INFO,
"player at port %d has finished", self->port);
Expand Down

0 comments on commit 1a46e7a

Please sign in to comment.