Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for receiving offers in Streaming plugin #3199

Merged
merged 2 commits into from
Apr 26, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
295 changes: 291 additions & 4 deletions src/plugins/janus_streaming.c
Original file line number Diff line number Diff line change
Expand Up @@ -923,8 +923,8 @@ multistream-test: {
#define JANUS_STREAMING_DEFAULT_CURL_CONNECT_TIMEOUT 5L /* Connection timeout for cURL. */

/* Plugin information */
#define JANUS_STREAMING_VERSION 9
#define JANUS_STREAMING_VERSION_STRING "0.0.9"
#define JANUS_STREAMING_VERSION 10
#define JANUS_STREAMING_VERSION_STRING "0.0.10"
#define JANUS_STREAMING_DESCRIPTION "This is a streaming plugin for Janus, allowing WebRTC peers to watch/listen to pre-recorded files or media generated by an external source."
#define JANUS_STREAMING_NAME "JANUS Streaming plugin"
#define JANUS_STREAMING_AUTHOR "Meetecho s.r.l."
Expand Down Expand Up @@ -1917,6 +1917,7 @@ static char *janus_streaming_parse_sprop(char *sprop, int *len) {
#define JANUS_STREAMING_ERROR_CANT_SWITCH 458
#define JANUS_STREAMING_ERROR_CANT_RECORD 459
#define JANUS_STREAMING_ERROR_INVALID_STATE 460
#define JANUS_STREAMING_ERROR_INVALID_SDP 461
#define JANUS_STREAMING_ERROR_UNKNOWN_ERROR 470


Expand Down Expand Up @@ -5746,11 +5747,13 @@ static void *janus_streaming_handler(void *data) {
json_t *request = json_object_get(root, "request");
const char *request_text = json_string_value(request);
json_t *result = NULL;
const char *sdp_type = NULL;
const char *sdp_type = json_string_value(json_object_get(msg->jsep, "type"));
const char *jsep_sdp = (char *)json_string_value(json_object_get(msg->jsep, "sdp"));
char *sdp = NULL;
gboolean do_restart = FALSE;
/* All these requests can only be handled asynchronously */
if(!strcasecmp(request_text, "watch")) {
if(!strcasecmp(request_text, "watch") && jsep_sdp == NULL) {
/* New subscriber, plugin will generate an offer */
JANUS_VALIDATE_JSON_OBJECT(root, watch_parameters,
error_code, error_cause, TRUE,
JANUS_STREAMING_ERROR_MISSING_ELEMENT, JANUS_STREAMING_ERROR_INVALID_ELEMENT);
Expand Down Expand Up @@ -6103,6 +6106,10 @@ static void *janus_streaming_handler(void *data) {
JANUS_SDP_OA_CODEC, janus_audiocodec_name(source->codecs.audio_codec),
JANUS_SDP_OA_FMTP, source->codecs.fmtp,
JANUS_SDP_OA_DIRECTION, JANUS_SDP_SENDONLY,
JANUS_SDP_OA_EXTENSION, JANUS_RTP_EXTMAP_MID, janus_rtp_extension_id(JANUS_RTP_EXTMAP_MID),
JANUS_SDP_OA_EXTENSION, JANUS_RTP_EXTMAP_ABS_SEND_TIME, janus_rtp_extension_id(JANUS_RTP_EXTMAP_ABS_SEND_TIME),
JANUS_SDP_OA_EXTENSION, JANUS_RTP_EXTMAP_PLAYOUT_DELAY,
(session->playoutdelay_ext ? janus_rtp_extension_id(JANUS_RTP_EXTMAP_PLAYOUT_DELAY) : 0),
JANUS_SDP_OA_DONE);
} else {
/* Iterate on all media streams */
Expand All @@ -6123,6 +6130,9 @@ static void *janus_streaming_handler(void *data) {
JANUS_SDP_OA_FMTP, stream->codecs.fmtp,
JANUS_SDP_OA_DIRECTION, JANUS_SDP_SENDONLY,
JANUS_SDP_OA_EXTENSION, JANUS_RTP_EXTMAP_MID, janus_rtp_extension_id(JANUS_RTP_EXTMAP_MID),
JANUS_SDP_OA_EXTENSION, JANUS_RTP_EXTMAP_ABS_SEND_TIME, janus_rtp_extension_id(JANUS_RTP_EXTMAP_ABS_SEND_TIME),
JANUS_SDP_OA_EXTENSION, JANUS_RTP_EXTMAP_PLAYOUT_DELAY,
(session->playoutdelay_ext ? janus_rtp_extension_id(JANUS_RTP_EXTMAP_PLAYOUT_DELAY) : 0),
JANUS_SDP_OA_DONE);
} else if(stream->type == JANUS_STREAMING_MEDIA_VIDEO && video) {
/* Add video line */
Expand Down Expand Up @@ -6186,6 +6196,283 @@ static void *janus_streaming_handler(void *data) {
}
janus_mutex_unlock(&session->mutex);
janus_mutex_unlock(&mp->mutex);
} else if(!strcasecmp(request_text, "watch") && jsep_sdp != NULL) {
/* New subscriber provided an offer, plugin will answer */
if(sdp_type == NULL || strcasecmp(sdp_type, "offer")) {
/* This isn't an offer, respond with an error */
JANUS_LOG(LOG_ERR, "User provided SDP for a watch request must be an offer\n");
error_code = JANUS_STREAMING_ERROR_INVALID_SDP;
g_snprintf(error_cause, 512, "User provided SDP for a watch request must be an offer");
goto error;
}
char error_str[512];
janus_sdp *parsed_sdp = janus_sdp_parse(jsep_sdp, error_str, sizeof(error_str));
if(parsed_sdp == NULL) {
JANUS_LOG(LOG_ERR, "Error parsing SDP: %s\n", error_str);
error_code = JANUS_STREAMING_ERROR_INVALID_SDP;
g_snprintf(error_cause, 512, "Error parsing SDP: %s", error_str);
goto error;
}
/* When users provide an offer for a "watch", we ignore the media object, as
* we'll just match offered m-lines with available streams in the mountpoint;
* that said, we still validate the JSON request as for a generic "watch" */
JANUS_VALIDATE_JSON_OBJECT(root, watch_parameters,
error_code, error_cause, TRUE,
JANUS_STREAMING_ERROR_MISSING_ELEMENT, JANUS_STREAMING_ERROR_INVALID_ELEMENT);
if(error_code != 0) {
janus_sdp_destroy(parsed_sdp);
goto error;
}
if(!string_ids) {
JANUS_VALIDATE_JSON_OBJECT(root, id_parameters,
error_code, error_cause, TRUE,
JANUS_STREAMING_ERROR_MISSING_ELEMENT, JANUS_STREAMING_ERROR_INVALID_ELEMENT);
} else {
JANUS_VALIDATE_JSON_OBJECT(root, idstr_parameters,
error_code, error_cause, TRUE,
JANUS_STREAMING_ERROR_MISSING_ELEMENT, JANUS_STREAMING_ERROR_INVALID_ELEMENT);
}
if(error_code != 0) {
janus_sdp_destroy(parsed_sdp);
goto error;
}
json_t *id = json_object_get(root, "id");
guint64 id_value = 0;
char id_num[30], *id_value_str = NULL;
if(!string_ids) {
id_value = json_integer_value(id);
g_snprintf(id_num, sizeof(id_num), "%"SCNu64, id_value);
id_value_str = id_num;
} else {
id_value_str = (char *)json_string_value(id);
}
/* Find the mountpoint and go on */
janus_mutex_lock(&mountpoints_mutex);
janus_streaming_mountpoint *mp = g_hash_table_lookup(mountpoints,
string_ids ? (gpointer)id_value_str : (gpointer)&id_value);
if(mp == NULL) {
janus_mutex_unlock(&mountpoints_mutex);
janus_sdp_destroy(parsed_sdp);
JANUS_LOG(LOG_VERB, "No such mountpoint/stream %s\n", id_value_str);
error_code = JANUS_STREAMING_ERROR_NO_SUCH_MOUNTPOINT;
g_snprintf(error_cause, 512, "No such mountpoint/stream %s", id_value_str);
goto error;
}
janus_refcount_increase(&mp->ref);
/* A secret may be required for this action */
JANUS_CHECK_SECRET(mp->pin, root, "pin", error_code, error_cause,
JANUS_STREAMING_ERROR_MISSING_ELEMENT, JANUS_STREAMING_ERROR_INVALID_ELEMENT, JANUS_STREAMING_ERROR_UNAUTHORIZED);
if(error_code != 0) {
janus_refcount_decrease(&mp->ref);
janus_mutex_unlock(&mountpoints_mutex);
janus_sdp_destroy(parsed_sdp);
goto error;
}
janus_mutex_lock(&mp->mutex);
janus_mutex_lock(&session->mutex);
janus_mutex_unlock(&mountpoints_mutex);
if(session->mountpoint) {
/* Already watching something else */
JANUS_LOG(LOG_ERR, "Already watching mountpoint %s\n", session->mountpoint->id_str);
error_code = JANUS_STREAMING_ERROR_INVALID_STATE;
g_snprintf(error_cause, 512, "Already watching mountpoint %s", session->mountpoint->id_str);
janus_mutex_unlock(&session->mutex);
janus_mutex_unlock(&mp->mutex);
janus_refcount_decrease(&mp->ref);
janus_sdp_destroy(parsed_sdp);
goto error;
}
if(g_list_find(mp->viewers, session) != NULL) {
janus_mutex_unlock(&session->mutex);
janus_mutex_unlock(&mp->mutex);
janus_refcount_decrease(&mp->ref);
JANUS_LOG(LOG_ERR, "Already watching a stream (found %p in %s's viewers)...\n", session, id_value_str);
error_code = JANUS_STREAMING_ERROR_UNKNOWN_ERROR;
g_snprintf(error_cause, 512, "Already watching a stream");
janus_sdp_destroy(parsed_sdp);
goto error;
}
g_atomic_int_set(&session->stopping, 0);
session->mountpoint = mp;
/* Start preparing an answer */
janus_sdp *answer = janus_sdp_generate_answer(parsed_sdp);
/* Iterate through the m-lines in the offer, and see if we can find a match */
GList *subscribed = NULL;
GList *temp = parsed_sdp->m_lines;
while(temp) {
janus_sdp_mline *m = (janus_sdp_mline *)temp->data;
if(m->direction == JANUS_SDP_INACTIVE || m->direction == JANUS_SDP_SENDONLY) {
JANUS_LOG(LOG_WARN, "Skipping (%s) m-line (unsupported media direction)\n", janus_sdp_mtype_str(m->type));
temp = temp->next;
continue;
}
if(mp->streaming_source == janus_streaming_source_file) {
/* File based streaming, check if we already subscribed to the stream */
if(m->type == JANUS_SDP_AUDIO) {
janus_streaming_file_source *source = mp->source;
int pt = janus_sdp_get_codec_pt(parsed_sdp, m->index, janus_audiocodec_name(source->codecs.audio_codec));
if(pt != -1) {
/* Create a session stream */
janus_streaming_session_stream *s = g_malloc0(sizeof(janus_streaming_session_stream));
s->mindex = -1;
s->send = TRUE;
s->pt = pt;
janus_rtp_switching_context_reset(&s->context);
s->min_delay = -1;
s->max_delay = -1;
session->streams = g_list_append(session->streams, s);
if(session->streams_byid == NULL)
session->streams_byid = g_hash_table_new(NULL, NULL);
g_hash_table_insert(session->streams_byid, GINT_TO_POINTER(s->mindex), s);
/* Accept the m-line */
janus_sdp_generate_answer_mline(parsed_sdp, answer, m,
JANUS_SDP_OA_MLINE, JANUS_SDP_AUDIO,
JANUS_SDP_OA_CODEC, janus_audiocodec_name(source->codecs.audio_codec),
JANUS_SDP_OA_FMTP, source->codecs.fmtp,
JANUS_SDP_OA_DIRECTION, JANUS_SDP_SENDONLY,
JANUS_SDP_OA_ACCEPT_EXTMAP, JANUS_RTP_EXTMAP_MID,
JANUS_SDP_OA_ACCEPT_EXTMAP, JANUS_RTP_EXTMAP_ABS_SEND_TIME,
JANUS_SDP_OA_ACCEPT_EXTMAP, JANUS_RTP_EXTMAP_PLAYOUT_DELAY,
JANUS_SDP_OA_DONE);
/* Done */
subscribed = g_list_append(subscribed, source);
temp = temp->next;
continue;
}
}
} else {
/* Iterate on all media streams, to see if we can find a match */
gboolean found = FALSE;
janus_streaming_rtp_source *source = (janus_streaming_rtp_source *)mp->source;
janus_streaming_rtp_source_stream *stream = NULL;
janus_streaming_session_stream *s = NULL;
GList *stemp = source->media;
while(stemp) {
stream = (janus_streaming_rtp_source_stream *)stemp->data;
/* Try matching this stream with this m-line */
if(g_list_find(subscribed, stream)) {
/* Stream already matched to a different m-line */
stemp = stemp->next;
continue;
}
if((m->type == JANUS_SDP_AUDIO && stream->type != JANUS_STREAMING_MEDIA_AUDIO) ||
(m->type == JANUS_SDP_VIDEO && stream->type != JANUS_STREAMING_MEDIA_VIDEO) ||
(m->type == JANUS_SDP_APPLICATION && stream->type != JANUS_STREAMING_MEDIA_DATA)) {
/* Stream is not the same type as the m-line, skip to the next */
stemp = stemp->next;
continue;
}
int pt = -1;
const char *codec = NULL;
if(stream->type != JANUS_STREAMING_MEDIA_DATA) {
codec = (stream->type == JANUS_STREAMING_MEDIA_AUDIO ?
janus_audiocodec_name(stream->codecs.audio_codec) : janus_videocodec_name(stream->codecs.video_codec));
pt = janus_sdp_get_codec_pt(parsed_sdp, m->index, codec);
if(pt == -1) {
/* This m-line doesn't support this stream's codec, skip to the next stream */
stemp = stemp->next;
continue;
}
}
/* Create a new session stream and add a reference to the source stream */
s = g_malloc0(sizeof(janus_streaming_session_stream));
s->mindex = m->index;
s->send = TRUE;
s->pt = pt;
janus_rtp_switching_context_reset(&s->context);
s->min_delay = -1;
s->max_delay = -1;
if(stream && stream->simulcast) {
/* In case this mountpoint is simulcasting, let's aim high by default */
janus_rtp_switching_context_reset(&s->context);
janus_rtp_simulcasting_context_reset(&s->sim_context);
s->sim_context.substream_target = 2;
s->sim_context.templayer_target = 2;
janus_vp8_simulcast_context_reset(&s->vp8_context);
} else if(stream && stream->svc) {
/* In case this mountpoint is doing VP9-SVC, let's aim high by default */
s->spatial_layer = -1;
s->target_spatial_layer = 2; /* FIXME Chrome sends 0, 1 and 2 (if using EnabledByFlag_3SL3TL) */
s->temporal_layer = -1;
s->target_temporal_layer = 2; /* FIXME Chrome sends 0, 1 and 2 */
}
s->stream = stream;
janus_refcount_increase(&stream->ref);
session->streams = g_list_append(session->streams, s);
if(session->streams_byid == NULL)
session->streams_byid = g_hash_table_new(NULL, NULL);
g_hash_table_insert(session->streams_byid, GINT_TO_POINTER(stream->mindex), s);
/* If this mountpoint is broadcasting end-to-end encrypted media,
* add the info to the JSEP offer we'll be sending them */
session->e2ee = source->e2ee;
/* Also check if we have to offer the playout-delay extension */
session->playoutdelay_ext = source->playoutdelay_ext;
/* Accept the m-line */
janus_sdp_generate_answer_mline(parsed_sdp, answer, m,
JANUS_SDP_OA_MLINE, m->type,
JANUS_SDP_OA_CODEC, codec,
JANUS_SDP_OA_FMTP, stream->codecs.fmtp,
JANUS_SDP_OA_VIDEO_RTCPFB_DEFAULTS, (m->type == JANUS_SDP_VIDEO),
JANUS_SDP_OA_DIRECTION, JANUS_SDP_SENDONLY,
JANUS_SDP_OA_ACCEPT_EXTMAP, JANUS_RTP_EXTMAP_MID,
JANUS_SDP_OA_ACCEPT_EXTMAP, JANUS_RTP_EXTMAP_ABS_SEND_TIME,
JANUS_SDP_OA_ACCEPT_EXTMAP, JANUS_RTP_EXTMAP_PLAYOUT_DELAY,
JANUS_SDP_OA_DONE);
/* Done */
subscribed = g_list_append(subscribed, stream);
found = TRUE;
break;
}
if(found) {
temp = temp->next;
continue;
}
}
/* If we got here, the m-line was rejected */
JANUS_LOG(LOG_WARN, "Skipping %s m-line (no matching mountpoint stream)\n", janus_sdp_mtype_str(m->type));
temp = temp->next;
}
if(subscribed == NULL) {
/* FIXME Ended up not subscribing to any stream? */
JANUS_LOG(LOG_WARN, "Not subscribed to any stream (all m-lines rejected)\n");
}
g_list_free(subscribed);
/* Prepare the response */
sdp_type = "answer";
sdp = janus_sdp_write(answer);
janus_sdp_destroy(parsed_sdp);
janus_sdp_destroy(answer);
result = json_object();
json_object_set_new(result, "status", json_string("starting"));
/* Add the user to the list of watchers and we're done */
if(g_list_find(mp->viewers, session) == NULL) {
mp->viewers = g_list_append(mp->viewers, session);
if(mp->streaming_source == janus_streaming_source_rtp) {
/* If we're using helper threads, add the viewer to one of those */
if(mp->helper_threads > 0) {
int viewers = -1;
janus_streaming_helper *helper = NULL;
GList *l = mp->threads;
while(l) {
janus_streaming_helper *ht = (janus_streaming_helper *)l->data;
if(viewers == -1 || (helper == NULL && ht->num_viewers == 0) || ht->num_viewers < viewers) {
viewers = ht->num_viewers;
helper = ht;
}
l = l->next;
}
janus_mutex_lock(&helper->mutex);
helper->viewers = g_list_append(helper->viewers, session);
helper->num_viewers++;
janus_mutex_unlock(&helper->mutex);
JANUS_LOG(LOG_VERB, "Added viewer to helper thread #%d (%d viewers)\n",
helper->id, helper->num_viewers);
}
}
}
janus_refcount_increase(&session->ref);
janus_mutex_unlock(&session->mutex);
janus_mutex_unlock(&mp->mutex);
} else if(!strcasecmp(request_text, "start")) {
if(session->mountpoint == NULL) {
JANUS_LOG(LOG_VERB, "Can't start: no mountpoint set\n");
Expand Down