From 25b3bd3c7cf03137bb24c13cdf97c30b7b1d8030 Mon Sep 17 00:00:00 2001 From: Lorenzo Miniero Date: Tue, 11 Apr 2023 15:39:17 +0200 Subject: [PATCH 1/2] Add support for receiving offers in Streaming plugin --- src/plugins/janus_streaming.c | 295 +++++++++++++++++++++++++++++++++- 1 file changed, 291 insertions(+), 4 deletions(-) diff --git a/src/plugins/janus_streaming.c b/src/plugins/janus_streaming.c index 72db586faf..09150f3a06 100644 --- a/src/plugins/janus_streaming.c +++ b/src/plugins/janus_streaming.c @@ -923,8 +923,8 @@ multistream-test: { #define JANUS_STREAMING_DEFAULT_CURL_CONNECT_TIMEOUT 5L /* Connection timeout for cURL. */ /* Plugin information */ -#define JANUS_STREAMING_VERSION 9 -#define JANUS_STREAMING_VERSION_STRING "0.0.9" +#define JANUS_STREAMING_VERSION 10 +#define JANUS_STREAMING_VERSION_STRING "0.0.10" #define JANUS_STREAMING_DESCRIPTION "This is a streaming plugin for Janus, allowing WebRTC peers to watch/listen to pre-recorded files or media generated by an external source." #define JANUS_STREAMING_NAME "JANUS Streaming plugin" #define JANUS_STREAMING_AUTHOR "Meetecho s.r.l." @@ -1917,6 +1917,7 @@ static char *janus_streaming_parse_sprop(char *sprop, int *len) { #define JANUS_STREAMING_ERROR_CANT_SWITCH 458 #define JANUS_STREAMING_ERROR_CANT_RECORD 459 #define JANUS_STREAMING_ERROR_INVALID_STATE 460 +#define JANUS_STREAMING_ERROR_INVALID_SDP 461 #define JANUS_STREAMING_ERROR_UNKNOWN_ERROR 470 @@ -5746,11 +5747,13 @@ static void *janus_streaming_handler(void *data) { json_t *request = json_object_get(root, "request"); const char *request_text = json_string_value(request); json_t *result = NULL; - const char *sdp_type = NULL; + const char *sdp_type = json_string_value(json_object_get(msg->jsep, "type")); + const char *jsep_sdp = (char *)json_string_value(json_object_get(msg->jsep, "sdp")); char *sdp = NULL; gboolean do_restart = FALSE; /* All these requests can only be handled asynchronously */ - if(!strcasecmp(request_text, "watch")) { + if(!strcasecmp(request_text, "watch") && jsep_sdp == NULL) { + /* New subscriber, plugin will generate an offer */ JANUS_VALIDATE_JSON_OBJECT(root, watch_parameters, error_code, error_cause, TRUE, JANUS_STREAMING_ERROR_MISSING_ELEMENT, JANUS_STREAMING_ERROR_INVALID_ELEMENT); @@ -6103,6 +6106,10 @@ static void *janus_streaming_handler(void *data) { JANUS_SDP_OA_CODEC, janus_audiocodec_name(source->codecs.audio_codec), JANUS_SDP_OA_FMTP, source->codecs.fmtp, JANUS_SDP_OA_DIRECTION, JANUS_SDP_SENDONLY, + JANUS_SDP_OA_EXTENSION, JANUS_RTP_EXTMAP_MID, janus_rtp_extension_id(JANUS_RTP_EXTMAP_MID), + JANUS_SDP_OA_EXTENSION, JANUS_RTP_EXTMAP_ABS_SEND_TIME, janus_rtp_extension_id(JANUS_RTP_EXTMAP_ABS_SEND_TIME), + JANUS_SDP_OA_EXTENSION, JANUS_RTP_EXTMAP_PLAYOUT_DELAY, + (session->playoutdelay_ext ? janus_rtp_extension_id(JANUS_RTP_EXTMAP_PLAYOUT_DELAY) : 0), JANUS_SDP_OA_DONE); } else { /* Iterate on all media streams */ @@ -6123,6 +6130,9 @@ static void *janus_streaming_handler(void *data) { JANUS_SDP_OA_FMTP, stream->codecs.fmtp, JANUS_SDP_OA_DIRECTION, JANUS_SDP_SENDONLY, JANUS_SDP_OA_EXTENSION, JANUS_RTP_EXTMAP_MID, janus_rtp_extension_id(JANUS_RTP_EXTMAP_MID), + JANUS_SDP_OA_EXTENSION, JANUS_RTP_EXTMAP_ABS_SEND_TIME, janus_rtp_extension_id(JANUS_RTP_EXTMAP_ABS_SEND_TIME), + JANUS_SDP_OA_EXTENSION, JANUS_RTP_EXTMAP_PLAYOUT_DELAY, + (session->playoutdelay_ext ? janus_rtp_extension_id(JANUS_RTP_EXTMAP_PLAYOUT_DELAY) : 0), JANUS_SDP_OA_DONE); } else if(stream->type == JANUS_STREAMING_MEDIA_VIDEO && video) { /* Add video line */ @@ -6186,6 +6196,283 @@ static void *janus_streaming_handler(void *data) { } janus_mutex_unlock(&session->mutex); janus_mutex_unlock(&mp->mutex); + } else if(!strcasecmp(request_text, "watch") && jsep_sdp != NULL) { + /* New subscriber provided an offer, plugin will answer */ + if(sdp_type == NULL || strcasecmp(sdp_type, "offer")) { + /* This isn't an offer, respond with an error */ + JANUS_LOG(LOG_ERR, "User provided SDP for a watch request must be an offer\n"); + error_code = JANUS_STREAMING_ERROR_INVALID_SDP; + g_snprintf(error_cause, 512, "User provided SDP for a watch request must be an offer"); + goto error; + } + char error_str[512]; + janus_sdp *parsed_sdp = janus_sdp_parse(jsep_sdp, error_str, sizeof(error_str)); + if(parsed_sdp == NULL) { + JANUS_LOG(LOG_ERR, "Error parsing SDP: %s\n", error_str); + error_code = JANUS_STREAMING_ERROR_INVALID_SDP; + g_snprintf(error_cause, 512, "Error parsing SDP: %s", error_str); + goto error; + } + /* When users provide an offer for a "watch", we ignore the media object, as + * we'll just match offered m-lines with available streams in the mountpoint; + * that said, we still validate the JSON request as for a generic "watch" */ + JANUS_VALIDATE_JSON_OBJECT(root, watch_parameters, + error_code, error_cause, TRUE, + JANUS_STREAMING_ERROR_MISSING_ELEMENT, JANUS_STREAMING_ERROR_INVALID_ELEMENT); + if(error_code != 0) { + janus_sdp_destroy(parsed_sdp); + goto error; + } + if(!string_ids) { + JANUS_VALIDATE_JSON_OBJECT(root, id_parameters, + error_code, error_cause, TRUE, + JANUS_STREAMING_ERROR_MISSING_ELEMENT, JANUS_STREAMING_ERROR_INVALID_ELEMENT); + } else { + JANUS_VALIDATE_JSON_OBJECT(root, idstr_parameters, + error_code, error_cause, TRUE, + JANUS_STREAMING_ERROR_MISSING_ELEMENT, JANUS_STREAMING_ERROR_INVALID_ELEMENT); + } + if(error_code != 0) { + janus_sdp_destroy(parsed_sdp); + goto error; + } + json_t *id = json_object_get(root, "id"); + guint64 id_value = 0; + char id_num[30], *id_value_str = NULL; + if(!string_ids) { + id_value = json_integer_value(id); + g_snprintf(id_num, sizeof(id_num), "%"SCNu64, id_value); + id_value_str = id_num; + } else { + id_value_str = (char *)json_string_value(id); + } + /* Find the mountpoint and go on */ + janus_mutex_lock(&mountpoints_mutex); + janus_streaming_mountpoint *mp = g_hash_table_lookup(mountpoints, + string_ids ? (gpointer)id_value_str : (gpointer)&id_value); + if(mp == NULL) { + janus_mutex_unlock(&mountpoints_mutex); + janus_sdp_destroy(parsed_sdp); + JANUS_LOG(LOG_VERB, "No such mountpoint/stream %s\n", id_value_str); + error_code = JANUS_STREAMING_ERROR_NO_SUCH_MOUNTPOINT; + g_snprintf(error_cause, 512, "No such mountpoint/stream %s", id_value_str); + goto error; + } + janus_refcount_increase(&mp->ref); + /* A secret may be required for this action */ + JANUS_CHECK_SECRET(mp->pin, root, "pin", error_code, error_cause, + JANUS_STREAMING_ERROR_MISSING_ELEMENT, JANUS_STREAMING_ERROR_INVALID_ELEMENT, JANUS_STREAMING_ERROR_UNAUTHORIZED); + if(error_code != 0) { + janus_refcount_decrease(&mp->ref); + janus_mutex_unlock(&mountpoints_mutex); + janus_sdp_destroy(parsed_sdp); + goto error; + } + janus_mutex_lock(&mp->mutex); + janus_mutex_lock(&session->mutex); + janus_mutex_unlock(&mountpoints_mutex); + if(session->mountpoint) { + /* Already watching something else */ + JANUS_LOG(LOG_ERR, "Already watching mountpoint %s\n", session->mountpoint->id_str); + error_code = JANUS_STREAMING_ERROR_INVALID_STATE; + g_snprintf(error_cause, 512, "Already watching mountpoint %s", session->mountpoint->id_str); + janus_mutex_unlock(&session->mutex); + janus_mutex_unlock(&mp->mutex); + janus_refcount_decrease(&mp->ref); + janus_sdp_destroy(parsed_sdp); + goto error; + } + if(g_list_find(mp->viewers, session) != NULL) { + janus_mutex_unlock(&session->mutex); + janus_mutex_unlock(&mp->mutex); + janus_refcount_decrease(&mp->ref); + JANUS_LOG(LOG_ERR, "Already watching a stream (found %p in %s's viewers)...\n", session, id_value_str); + error_code = JANUS_STREAMING_ERROR_UNKNOWN_ERROR; + g_snprintf(error_cause, 512, "Already watching a stream"); + janus_sdp_destroy(parsed_sdp); + goto error; + } + g_atomic_int_set(&session->stopping, 0); + session->mountpoint = mp; + /* Start preparing an answer */ + janus_sdp *answer = janus_sdp_generate_answer(parsed_sdp); + /* Iterate through the m-lines in the offer, and see if we can find a match */ + GList *subscribed = NULL; + GList *temp = parsed_sdp->m_lines; + while(temp) { + janus_sdp_mline *m = (janus_sdp_mline *)temp->data; + if(m->direction == JANUS_SDP_INACTIVE || m->direction == JANUS_SDP_SENDONLY) { + JANUS_LOG(LOG_WARN, "Skipping (%s) m-line (unsupported media direction)\n", janus_sdp_mtype_str(m->type)); + temp = temp->next; + continue; + } + if(mp->streaming_source == janus_streaming_source_file) { + /* File based streaming, check if we already subscribed to the stream */ + if(m->type == JANUS_SDP_AUDIO) { + janus_streaming_file_source *source = mp->source; + int pt = janus_sdp_get_codec_pt(parsed_sdp, m->index, janus_audiocodec_name(source->codecs.audio_codec)); + if(pt != -1) { + /* Create a session stream */ + janus_streaming_session_stream *s = g_malloc0(sizeof(janus_streaming_session_stream)); + s->mindex = -1; + s->send = TRUE; + s->pt = pt; + janus_rtp_switching_context_reset(&s->context); + s->min_delay = -1; + s->max_delay = -1; + session->streams = g_list_append(session->streams, s); + if(session->streams_byid == NULL) + session->streams_byid = g_hash_table_new(NULL, NULL); + g_hash_table_insert(session->streams_byid, GINT_TO_POINTER(s->mindex), s); + /* Accept the m-line */ + janus_sdp_generate_answer_mline(parsed_sdp, answer, m, + JANUS_SDP_OA_MLINE, JANUS_SDP_AUDIO, + JANUS_SDP_OA_CODEC, janus_audiocodec_name(source->codecs.audio_codec), + JANUS_SDP_OA_FMTP, source->codecs.fmtp, + JANUS_SDP_OA_DIRECTION, JANUS_SDP_SENDONLY, + JANUS_SDP_OA_ACCEPT_EXTMAP, JANUS_RTP_EXTMAP_MID, + JANUS_SDP_OA_ACCEPT_EXTMAP, JANUS_RTP_EXTMAP_ABS_SEND_TIME, + JANUS_SDP_OA_ACCEPT_EXTMAP, JANUS_RTP_EXTMAP_PLAYOUT_DELAY, + JANUS_SDP_OA_DONE); + /* Done */ + subscribed = g_list_append(subscribed, source); + temp = temp->next; + continue; + } + } + } else { + /* Iterate on all media streams, to see if we can find a match */ + gboolean found = FALSE; + janus_streaming_rtp_source *source = (janus_streaming_rtp_source *)mp->source; + janus_streaming_rtp_source_stream *stream = NULL; + janus_streaming_session_stream *s = NULL; + GList *stemp = source->media; + while(stemp) { + stream = (janus_streaming_rtp_source_stream *)stemp->data; + /* Try matching this stream with this m-line */ + if(g_list_find(subscribed, stream)) { + /* Stream already matched to a different m-line */ + stemp = stemp->next; + continue; + } + if((m->type == JANUS_SDP_AUDIO && stream->type != JANUS_STREAMING_MEDIA_AUDIO) || + (m->type == JANUS_SDP_VIDEO && stream->type != JANUS_STREAMING_MEDIA_VIDEO) || + (m->type == JANUS_SDP_APPLICATION && stream->type != JANUS_STREAMING_MEDIA_DATA)) { + /* Stream is not the same type as the m-line, skip to the next */ + stemp = stemp->next; + continue; + } + int pt = -1; + const char *codec = NULL; + if(stream->type != JANUS_STREAMING_MEDIA_AUDIO) { + codec = (stream->type == JANUS_STREAMING_MEDIA_AUDIO ? + janus_audiocodec_name(stream->codecs.audio_codec) : janus_videocodec_name(stream->codecs.video_codec)); + pt = janus_sdp_get_codec_pt(parsed_sdp, m->index, codec); + if(pt == -1) { + /* This m-line doesn't support this stream's codec, skip to the next stream */ + stemp = stemp->next; + continue; + } + } + /* Create a new session stream and add a reference to the source stream */ + s = g_malloc0(sizeof(janus_streaming_session_stream)); + s->mindex = m->index; + s->send = TRUE; + s->pt = pt; + janus_rtp_switching_context_reset(&s->context); + s->min_delay = -1; + s->max_delay = -1; + if(stream && stream->simulcast) { + /* In case this mountpoint is simulcasting, let's aim high by default */ + janus_rtp_switching_context_reset(&s->context); + janus_rtp_simulcasting_context_reset(&s->sim_context); + s->sim_context.substream_target = 2; + s->sim_context.templayer_target = 2; + janus_vp8_simulcast_context_reset(&s->vp8_context); + } else if(stream && stream->svc) { + /* In case this mountpoint is doing VP9-SVC, let's aim high by default */ + s->spatial_layer = -1; + s->target_spatial_layer = 2; /* FIXME Chrome sends 0, 1 and 2 (if using EnabledByFlag_3SL3TL) */ + s->temporal_layer = -1; + s->target_temporal_layer = 2; /* FIXME Chrome sends 0, 1 and 2 */ + } + s->stream = stream; + janus_refcount_increase(&stream->ref); + session->streams = g_list_append(session->streams, s); + if(session->streams_byid == NULL) + session->streams_byid = g_hash_table_new(NULL, NULL); + g_hash_table_insert(session->streams_byid, GINT_TO_POINTER(stream->mindex), s); + /* If this mountpoint is broadcasting end-to-end encrypted media, + * add the info to the JSEP offer we'll be sending them */ + session->e2ee = source->e2ee; + /* Also check if we have to offer the playout-delay extension */ + session->playoutdelay_ext = source->playoutdelay_ext; + /* Accept the m-line */ + janus_sdp_generate_answer_mline(parsed_sdp, answer, m, + JANUS_SDP_OA_MLINE, m->type, + JANUS_SDP_OA_CODEC, codec, + JANUS_SDP_OA_FMTP, stream->codecs.fmtp, + JANUS_SDP_OA_VIDEO_RTCPFB_DEFAULTS, (m->type == JANUS_SDP_VIDEO), + JANUS_SDP_OA_DIRECTION, JANUS_SDP_SENDONLY, + JANUS_SDP_OA_ACCEPT_EXTMAP, JANUS_RTP_EXTMAP_MID, + JANUS_SDP_OA_ACCEPT_EXTMAP, JANUS_RTP_EXTMAP_ABS_SEND_TIME, + JANUS_SDP_OA_ACCEPT_EXTMAP, JANUS_RTP_EXTMAP_PLAYOUT_DELAY, + JANUS_SDP_OA_DONE); + /* Done */ + subscribed = g_list_append(subscribed, stream); + found = TRUE; + break; + } + if(found) { + temp = temp->next; + continue; + } + } + /* If we got here, the m-line was rejected */ + JANUS_LOG(LOG_WARN, "Skipping %s m-line (no matching mountpoint stream)\n", janus_sdp_mtype_str(m->type)); + temp = temp->next; + } + if(subscribed == NULL) { + /* FIXME Ended up not subscribing to any stream? */ + JANUS_LOG(LOG_WARN, "Not subscribed to any stream (all m-lines rejected)\n"); + } + g_list_free(subscribed); + /* Prepare the response */ + sdp_type = "answer"; + sdp = janus_sdp_write(answer); + janus_sdp_destroy(parsed_sdp); + janus_sdp_destroy(answer); + result = json_object(); + json_object_set_new(result, "status", json_string("starting")); + /* Add the user to the list of watchers and we're done */ + if(g_list_find(mp->viewers, session) == NULL) { + mp->viewers = g_list_append(mp->viewers, session); + if(mp->streaming_source == janus_streaming_source_rtp) { + /* If we're using helper threads, add the viewer to one of those */ + if(mp->helper_threads > 0) { + int viewers = -1; + janus_streaming_helper *helper = NULL; + GList *l = mp->threads; + while(l) { + janus_streaming_helper *ht = (janus_streaming_helper *)l->data; + if(viewers == -1 || (helper == NULL && ht->num_viewers == 0) || ht->num_viewers < viewers) { + viewers = ht->num_viewers; + helper = ht; + } + l = l->next; + } + janus_mutex_lock(&helper->mutex); + helper->viewers = g_list_append(helper->viewers, session); + helper->num_viewers++; + janus_mutex_unlock(&helper->mutex); + JANUS_LOG(LOG_VERB, "Added viewer to helper thread #%d (%d viewers)\n", + helper->id, helper->num_viewers); + } + } + } + janus_refcount_increase(&session->ref); + janus_mutex_unlock(&session->mutex); + janus_mutex_unlock(&mp->mutex); } else if(!strcasecmp(request_text, "start")) { if(session->mountpoint == NULL) { JANUS_LOG(LOG_VERB, "Can't start: no mountpoint set\n"); From df367e918de0d288338941b3c181481984ec6112 Mon Sep 17 00:00:00 2001 From: Lorenzo Miniero Date: Tue, 11 Apr 2023 16:12:14 +0200 Subject: [PATCH 2/2] Fixed typo --- src/plugins/janus_streaming.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/plugins/janus_streaming.c b/src/plugins/janus_streaming.c index 09150f3a06..83f44d1776 100644 --- a/src/plugins/janus_streaming.c +++ b/src/plugins/janus_streaming.c @@ -6364,7 +6364,7 @@ static void *janus_streaming_handler(void *data) { } int pt = -1; const char *codec = NULL; - if(stream->type != JANUS_STREAMING_MEDIA_AUDIO) { + if(stream->type != JANUS_STREAMING_MEDIA_DATA) { codec = (stream->type == JANUS_STREAMING_MEDIA_AUDIO ? janus_audiocodec_name(stream->codecs.audio_codec) : janus_videocodec_name(stream->codecs.video_codec)); pt = janus_sdp_get_codec_pt(parsed_sdp, m->index, codec);