From 80086ea8a6fc133dba7e07f34b3da66a61b022b6 Mon Sep 17 00:00:00 2001 From: Alexander Malaev Date: Tue, 12 Dec 2023 18:19:26 +0300 Subject: [PATCH 1/2] Add insert of concealed packets --- src/plugins/janus_audiobridge.c | 47 +++++++++++++++++++++++++-------- 1 file changed, 36 insertions(+), 11 deletions(-) diff --git a/src/plugins/janus_audiobridge.c b/src/plugins/janus_audiobridge.c index 45e9fdfe90..ae22568114 100644 --- a/src/plugins/janus_audiobridge.c +++ b/src/plugins/janus_audiobridge.c @@ -1699,6 +1699,7 @@ typedef struct janus_audiobridge_participant { uint16_t expected_seq; /* Expected sequence number */ uint16_t probation; /* Used to determine new ssrc validity */ uint32_t last_timestamp; /* Last in seq timestamp */ + uint16_t last_seq; /* Last sequence number */ gboolean reset; /* Whether or not the Opus context must be reset, without re-joining the room */ GThread *thread; /* Encoding thread for this participant */ gboolean mjr_active; /* Whether this participant has to be recorded to an mjr file or not */ @@ -2135,7 +2136,7 @@ static int janus_audiobridge_resample(int16_t *input, int input_num, int input_r #define JITTER_BUFFER_MIN_PACKETS 2 #define JITTER_BUFFER_MAX_PACKETS 40 #define JITTER_BUFFER_CHECK_USECS 1*G_USEC_PER_SEC -#define QUEUE_IN_MAX_PACKETS 4 +#define QUEUE_IN_MAX_PACKETS 20 /* Error codes */ @@ -6504,6 +6505,7 @@ static void *janus_audiobridge_handler(void *data) { participant->expected_seq = 0; participant->probation = 0; participant->last_timestamp = 0; + participant->last_seq = 0; janus_mutex_init(&participant->qmutex); participant->arc = NULL; janus_audiobridge_plainrtp_media_cleanup(&participant->plainrtp_media); @@ -8613,6 +8615,7 @@ static void *janus_audiobridge_participant_thread(void *data) { gint64 now = janus_get_monotonic_time(), before = now; gboolean first = TRUE, use_fec = FALSE; int ret = 0; + int lost_packets = 0; /* Start working: check both the incoming queue (to decode and queue) and the outgoing one (to encode and send) */ while(!g_atomic_int_get(&stopping) && g_atomic_int_get(&session->destroyed) == 0) { @@ -8659,8 +8662,27 @@ static void *janus_audiobridge_participant_thread(void *data) { /* If this is Opus, check if there's a packet gap we should fix with FEC */ use_fec = FALSE; if(!first && participant->codec == JANUS_AUDIOCODEC_OPUS && participant->fec) { - if(ntohs(rtp->seq_number) == (participant->expected_seq + 1)) { - /* Lost a packet here? Use FEC to recover */ + if(ntohs(rtp->seq_number) != (participant->expected_seq)) { + lost_packets = ntohs(rtp->seq_number) - participant->expected_seq; + JANUS_LOG(LOG_ERR, "[%d] got rtp seq, expected_seq: [%d], diff: [%d]\n", ntohs(rtp->seq_number), participant->expected_seq, lost_packets); + + janus_mutex_lock(&participant->qmutex); + + for(int i=1; i < lost_packets; i++) { + pkt = g_malloc(sizeof(janus_audiobridge_rtp_relay_packet)); + pkt->data = g_malloc0(OPUS_SAMPLES * (participant->stereo ? 2 : 1) * sizeof(opus_int16)); + pkt->ssrc = 0; + pkt->timestamp = participant->last_timestamp + 960 * i; + pkt->seq_number = participant->last_seq + 1 * i; + /* This is a redundant packet, so we can't parse any extension info */ + pkt->silence = FALSE; + pkt->length = opus_decode(participant->decoder, NULL, 0, (opus_int16 *)pkt->data, OPUS_SAMPLES, 0); + JANUS_LOG(LOG_ERR, "[%d] packet concealed, length: [%d], timestamp: [%d]\n", pkt->seq_number, pkt->length, pkt->timestamp); + participant->inbuf = g_list_append(participant->inbuf, pkt); + } + + janus_mutex_unlock(&participant->qmutex); + use_fec = TRUE; } } @@ -8668,16 +8690,18 @@ static void *janus_audiobridge_participant_thread(void *data) { if(use_fec) { /* There was a gap, try to get decode from redundant info first */ pkt = g_malloc(sizeof(janus_audiobridge_rtp_relay_packet)); - pkt->data = g_malloc0(BUFFER_SAMPLES*sizeof(opus_int16)); + pkt->data = g_malloc0(OPUS_SAMPLES * (participant->stereo ? 2 : 1) * sizeof(opus_int16)); pkt->ssrc = 0; - pkt->timestamp = participant->last_timestamp + 960; /* FIXME */ - pkt->seq_number = participant->expected_seq; /* FIXME */ + pkt->timestamp = ntohl(rtp->timestamp) - 960; /* FIXME */ + pkt->seq_number = ntohs(rtp->seq_number) - 1; /* FIXME */ /* This is a redundant packet, so we can't parse any extension info */ pkt->silence = FALSE; - /* Decode the lost packet using fec=1 */ - int32_t output_samples; - opus_decoder_ctl(participant->decoder, OPUS_GET_LAST_PACKET_DURATION(&output_samples)); - pkt->length = opus_decode(participant->decoder, payload, plen, (opus_int16 *)pkt->data, output_samples, 1); + + pkt->length = opus_decode(participant->decoder, payload, plen, (opus_int16 *)pkt->data, OPUS_SAMPLES, 1); + + JANUS_LOG(LOG_ERR, "[%d] packet fec decoded [%d] pkt->length, timestamp: [%d]\n", + pkt->seq_number, pkt->length, pkt->timestamp); + /* Queue the decoded redundant packet for the mixer */ janus_mutex_lock(&participant->qmutex); participant->inbuf = g_list_append(participant->inbuf, pkt); @@ -8720,6 +8744,7 @@ static void *janus_audiobridge_participant_thread(void *data) { /* Get rid of the buffered packet */ janus_audiobridge_buffer_packet_destroy(bpkt); /* Update the details */ + participant->last_seq = pkt->seq_number; participant->last_timestamp = pkt->timestamp; participant->expected_seq = pkt->seq_number + 1; g_atomic_int_set(&participant->decoding, 0); @@ -8738,7 +8763,7 @@ static void *janus_audiobridge_participant_thread(void *data) { locked = TRUE; /* Do not let queue-in grow too much */ guint count = g_list_length(participant->inbuf); - if(count > QUEUE_IN_MAX_PACKETS) { + if((int) count > (QUEUE_IN_MAX_PACKETS + lost_packets)) { JANUS_LOG(LOG_WARN, "Participant queue-in contains too many packets, clearing now (count=%u)\n", count); janus_audiobridge_participant_clear_inbuf(participant); } From 2b003ab6c7c82adae3417031c5b7d9dc3d808fa0 Mon Sep 17 00:00:00 2001 From: Alexander Malaev Date: Thu, 14 Dec 2023 02:13:11 +0300 Subject: [PATCH 2/2] Set output_samples to last decoded packet duration --- src/plugins/janus_audiobridge.c | 17 +++++++++++------ 1 file changed, 11 insertions(+), 6 deletions(-) diff --git a/src/plugins/janus_audiobridge.c b/src/plugins/janus_audiobridge.c index ae22568114..9fa45a6e86 100644 --- a/src/plugins/janus_audiobridge.c +++ b/src/plugins/janus_audiobridge.c @@ -8658,6 +8658,7 @@ static void *janus_audiobridge_participant_thread(void *data) { janus_audiobridge_buffer_packet_destroy(bpkt); break; } + rtp = (janus_rtp_header *)bpkt->buffer; /* If this is Opus, check if there's a packet gap we should fix with FEC */ use_fec = FALSE; @@ -8666,7 +8667,8 @@ static void *janus_audiobridge_participant_thread(void *data) { lost_packets = ntohs(rtp->seq_number) - participant->expected_seq; JANUS_LOG(LOG_ERR, "[%d] got rtp seq, expected_seq: [%d], diff: [%d]\n", ntohs(rtp->seq_number), participant->expected_seq, lost_packets); - janus_mutex_lock(&participant->qmutex); + int32_t output_samples; + opus_decoder_ctl(participant->decoder, OPUS_GET_LAST_PACKET_DURATION(&output_samples)); for(int i=1; i < lost_packets; i++) { pkt = g_malloc(sizeof(janus_audiobridge_rtp_relay_packet)); @@ -8676,13 +8678,13 @@ static void *janus_audiobridge_participant_thread(void *data) { pkt->seq_number = participant->last_seq + 1 * i; /* This is a redundant packet, so we can't parse any extension info */ pkt->silence = FALSE; - pkt->length = opus_decode(participant->decoder, NULL, 0, (opus_int16 *)pkt->data, OPUS_SAMPLES, 0); - JANUS_LOG(LOG_ERR, "[%d] packet concealed, length: [%d], timestamp: [%d]\n", pkt->seq_number, pkt->length, pkt->timestamp); + pkt->length = opus_decode(participant->decoder, NULL, 0, (opus_int16 *)pkt->data, output_samples, 0); + JANUS_LOG(LOG_ERR, "[%d] packet concealed, length: [%d], timestamp: [%d] stereo: [%d]\n", pkt->seq_number, pkt->length, pkt->timestamp, participant->stereo ? 2 : 1); + janus_mutex_lock(&participant->qmutex); participant->inbuf = g_list_append(participant->inbuf, pkt); + janus_mutex_unlock(&participant->qmutex); } - janus_mutex_unlock(&participant->qmutex); - use_fec = TRUE; } } @@ -8697,7 +8699,10 @@ static void *janus_audiobridge_participant_thread(void *data) { /* This is a redundant packet, so we can't parse any extension info */ pkt->silence = FALSE; - pkt->length = opus_decode(participant->decoder, payload, plen, (opus_int16 *)pkt->data, OPUS_SAMPLES, 1); + int32_t output_samples; + opus_decoder_ctl(participant->decoder, OPUS_GET_LAST_PACKET_DURATION(&output_samples)); + + pkt->length = opus_decode(participant->decoder, payload, plen, (opus_int16 *)pkt->data, output_samples, 1); JANUS_LOG(LOG_ERR, "[%d] packet fec decoded [%d] pkt->length, timestamp: [%d]\n", pkt->seq_number, pkt->length, pkt->timestamp);