Skip to content

Commit

Permalink
Add support for receiving offers in Streaming plugin
Browse files Browse the repository at this point in the history
  • Loading branch information
lminiero committed Apr 11, 2023
1 parent cce3f23 commit 7aabc85
Showing 1 changed file with 291 additions and 4 deletions.
295 changes: 291 additions & 4 deletions src/plugins/janus_streaming.c
Original file line number Diff line number Diff line change
Expand Up @@ -923,8 +923,8 @@ multistream-test: {
#define JANUS_STREAMING_DEFAULT_CURL_CONNECT_TIMEOUT 5L /* Connection timeout for cURL. */

/* Plugin information */
#define JANUS_STREAMING_VERSION 9
#define JANUS_STREAMING_VERSION_STRING "0.0.9"
#define JANUS_STREAMING_VERSION 10
#define JANUS_STREAMING_VERSION_STRING "0.0.10"
#define JANUS_STREAMING_DESCRIPTION "This is a streaming plugin for Janus, allowing WebRTC peers to watch/listen to pre-recorded files or media generated by an external source."
#define JANUS_STREAMING_NAME "JANUS Streaming plugin"
#define JANUS_STREAMING_AUTHOR "Meetecho s.r.l."
Expand Down Expand Up @@ -1917,6 +1917,7 @@ static char *janus_streaming_parse_sprop(char *sprop, int *len) {
#define JANUS_STREAMING_ERROR_CANT_SWITCH 458
#define JANUS_STREAMING_ERROR_CANT_RECORD 459
#define JANUS_STREAMING_ERROR_INVALID_STATE 460
#define JANUS_STREAMING_ERROR_INVALID_SDP 461
#define JANUS_STREAMING_ERROR_UNKNOWN_ERROR 470


Expand Down Expand Up @@ -5746,11 +5747,13 @@ static void *janus_streaming_handler(void *data) {
json_t *request = json_object_get(root, "request");
const char *request_text = json_string_value(request);
json_t *result = NULL;
const char *sdp_type = NULL;
const char *sdp_type = json_string_value(json_object_get(msg->jsep, "type"));
const char *jsep_sdp = (char *)json_string_value(json_object_get(msg->jsep, "sdp"));
char *sdp = NULL;
gboolean do_restart = FALSE;
/* All these requests can only be handled asynchronously */
if(!strcasecmp(request_text, "watch")) {
if(!strcasecmp(request_text, "watch") && jsep_sdp == NULL) {
/* New subscriber, plugin will generate an offer */
JANUS_VALIDATE_JSON_OBJECT(root, watch_parameters,
error_code, error_cause, TRUE,
JANUS_STREAMING_ERROR_MISSING_ELEMENT, JANUS_STREAMING_ERROR_INVALID_ELEMENT);
Expand Down Expand Up @@ -6103,6 +6106,10 @@ static void *janus_streaming_handler(void *data) {
JANUS_SDP_OA_CODEC, janus_audiocodec_name(source->codecs.audio_codec),
JANUS_SDP_OA_FMTP, source->codecs.fmtp,
JANUS_SDP_OA_DIRECTION, JANUS_SDP_SENDONLY,
JANUS_SDP_OA_EXTENSION, JANUS_RTP_EXTMAP_MID, janus_rtp_extension_id(JANUS_RTP_EXTMAP_MID),
JANUS_SDP_OA_EXTENSION, JANUS_RTP_EXTMAP_ABS_SEND_TIME, janus_rtp_extension_id(JANUS_RTP_EXTMAP_ABS_SEND_TIME),
JANUS_SDP_OA_EXTENSION, JANUS_RTP_EXTMAP_PLAYOUT_DELAY,
(session->playoutdelay_ext ? janus_rtp_extension_id(JANUS_RTP_EXTMAP_PLAYOUT_DELAY) : 0),
JANUS_SDP_OA_DONE);
} else {
/* Iterate on all media streams */
Expand All @@ -6123,6 +6130,9 @@ static void *janus_streaming_handler(void *data) {
JANUS_SDP_OA_FMTP, stream->codecs.fmtp,
JANUS_SDP_OA_DIRECTION, JANUS_SDP_SENDONLY,
JANUS_SDP_OA_EXTENSION, JANUS_RTP_EXTMAP_MID, janus_rtp_extension_id(JANUS_RTP_EXTMAP_MID),
JANUS_SDP_OA_EXTENSION, JANUS_RTP_EXTMAP_ABS_SEND_TIME, janus_rtp_extension_id(JANUS_RTP_EXTMAP_ABS_SEND_TIME),
JANUS_SDP_OA_EXTENSION, JANUS_RTP_EXTMAP_PLAYOUT_DELAY,
(session->playoutdelay_ext ? janus_rtp_extension_id(JANUS_RTP_EXTMAP_PLAYOUT_DELAY) : 0),
JANUS_SDP_OA_DONE);
} else if(stream->type == JANUS_STREAMING_MEDIA_VIDEO && video) {
/* Add video line */
Expand Down Expand Up @@ -6186,6 +6196,283 @@ static void *janus_streaming_handler(void *data) {
}
janus_mutex_unlock(&session->mutex);
janus_mutex_unlock(&mp->mutex);
} else if(!strcasecmp(request_text, "watch") && jsep_sdp != NULL) {
/* New subscriber provided an offer, plugin will answer */
if(sdp_type == NULL || strcasecmp(sdp_type, "offer")) {
/* This isn't an offer, respond with an error */
JANUS_LOG(LOG_ERR, "User provided SDP for a watch request must be an offer\n");
error_code = JANUS_STREAMING_ERROR_INVALID_SDP;
g_snprintf(error_cause, 512, "User provided SDP for a watch request must be an offer");
goto error;
}
char error_str[512];
janus_sdp *parsed_sdp = janus_sdp_parse(jsep_sdp, error_str, sizeof(error_str));
if(parsed_sdp == NULL) {
JANUS_LOG(LOG_ERR, "Error parsing SDP: %s\n", error_str);
error_code = JANUS_STREAMING_ERROR_INVALID_SDP;
g_snprintf(error_cause, 512, "Error parsing SDP: %s", error_str);
goto error;
}
/* When users provide an offer for a "watch", we ignore the media object, as
* we'll just match offered m-lines with available streams in the mountpoint;
* that said, we still validate the JSON request as for a generic "watch" */
JANUS_VALIDATE_JSON_OBJECT(root, watch_parameters,
error_code, error_cause, TRUE,
JANUS_STREAMING_ERROR_MISSING_ELEMENT, JANUS_STREAMING_ERROR_INVALID_ELEMENT);
if(error_code != 0) {
janus_sdp_destroy(parsed_sdp);
goto error;
}
if(!string_ids) {
JANUS_VALIDATE_JSON_OBJECT(root, id_parameters,
error_code, error_cause, TRUE,
JANUS_STREAMING_ERROR_MISSING_ELEMENT, JANUS_STREAMING_ERROR_INVALID_ELEMENT);
} else {
JANUS_VALIDATE_JSON_OBJECT(root, idstr_parameters,
error_code, error_cause, TRUE,
JANUS_STREAMING_ERROR_MISSING_ELEMENT, JANUS_STREAMING_ERROR_INVALID_ELEMENT);
}
if(error_code != 0) {
janus_sdp_destroy(parsed_sdp);
goto error;
}
json_t *id = json_object_get(root, "id");
guint64 id_value = 0;
char id_num[30], *id_value_str = NULL;
if(!string_ids) {
id_value = json_integer_value(id);
g_snprintf(id_num, sizeof(id_num), "%"SCNu64, id_value);
id_value_str = id_num;
} else {
id_value_str = (char *)json_string_value(id);
}
/* Find the mountpoint and go on */
janus_mutex_lock(&mountpoints_mutex);
janus_streaming_mountpoint *mp = g_hash_table_lookup(mountpoints,
string_ids ? (gpointer)id_value_str : (gpointer)&id_value);
if(mp == NULL) {
janus_mutex_unlock(&mountpoints_mutex);
janus_sdp_destroy(parsed_sdp);
JANUS_LOG(LOG_VERB, "No such mountpoint/stream %s\n", id_value_str);
error_code = JANUS_STREAMING_ERROR_NO_SUCH_MOUNTPOINT;
g_snprintf(error_cause, 512, "No such mountpoint/stream %s", id_value_str);
goto error;
}
janus_refcount_increase(&mp->ref);
/* A secret may be required for this action */
JANUS_CHECK_SECRET(mp->pin, root, "pin", error_code, error_cause,
JANUS_STREAMING_ERROR_MISSING_ELEMENT, JANUS_STREAMING_ERROR_INVALID_ELEMENT, JANUS_STREAMING_ERROR_UNAUTHORIZED);
if(error_code != 0) {
janus_refcount_decrease(&mp->ref);
janus_mutex_unlock(&mountpoints_mutex);
janus_sdp_destroy(parsed_sdp);
goto error;
}
janus_mutex_lock(&mp->mutex);
janus_mutex_lock(&session->mutex);
janus_mutex_unlock(&mountpoints_mutex);
if(session->mountpoint) {
/* Already watching something else */
JANUS_LOG(LOG_ERR, "Already watching mountpoint %s\n", session->mountpoint->id_str);
error_code = JANUS_STREAMING_ERROR_INVALID_STATE;
g_snprintf(error_cause, 512, "Already watching mountpoint %s", session->mountpoint->id_str);
janus_mutex_unlock(&session->mutex);
janus_mutex_unlock(&mp->mutex);
janus_refcount_decrease(&mp->ref);
janus_sdp_destroy(parsed_sdp);
goto error;
}
if(g_list_find(mp->viewers, session) != NULL) {
janus_mutex_unlock(&session->mutex);
janus_mutex_unlock(&mp->mutex);
janus_refcount_decrease(&mp->ref);
JANUS_LOG(LOG_ERR, "Already watching a stream (found %p in %s's viewers)...\n", session, id_value_str);
error_code = JANUS_STREAMING_ERROR_UNKNOWN_ERROR;
g_snprintf(error_cause, 512, "Already watching a stream");
janus_sdp_destroy(parsed_sdp);
goto error;
}
g_atomic_int_set(&session->stopping, 0);
session->mountpoint = mp;
/* Start preparing an answer */
janus_sdp *answer = janus_sdp_generate_answer(parsed_sdp);
/* Iterate through the m-lines in the offer, and see if we can find a match */
GList *subscribed = NULL;
GList *temp = parsed_sdp->m_lines;
while(temp) {
janus_sdp_mline *m = (janus_sdp_mline *)temp->data;
if(m->direction == JANUS_SDP_INACTIVE || m->direction == JANUS_SDP_SENDONLY) {
JANUS_LOG(LOG_WARN, "Skipping (%s) m-line (unsupported media direction)\n", janus_sdp_mtype_str(m->type));
temp = temp->next;
continue;
}
if(mp->streaming_source == janus_streaming_source_file) {
/* File based streaming, check if we already subscribed to the stream */
if(m->type == JANUS_SDP_AUDIO) {
janus_streaming_file_source *source = mp->source;
int pt = janus_sdp_get_codec_pt(parsed_sdp, m->index, janus_audiocodec_name(source->codecs.audio_codec));
if(pt != -1) {
/* Create a session stream */
janus_streaming_session_stream *s = g_malloc0(sizeof(janus_streaming_session_stream));
s->mindex = -1;
s->send = TRUE;
s->pt = pt;
janus_rtp_switching_context_reset(&s->context);
s->min_delay = -1;
s->max_delay = -1;
session->streams = g_list_append(session->streams, s);
if(session->streams_byid == NULL)
session->streams_byid = g_hash_table_new(NULL, NULL);
g_hash_table_insert(session->streams_byid, GINT_TO_POINTER(s->mindex), s);
/* Accept the m-line */
janus_sdp_generate_answer_mline(parsed_sdp, answer, m,
JANUS_SDP_OA_MLINE, JANUS_SDP_AUDIO,
JANUS_SDP_OA_CODEC, janus_audiocodec_name(source->codecs.audio_codec),
JANUS_SDP_OA_FMTP, source->codecs.fmtp,
JANUS_SDP_OA_DIRECTION, JANUS_SDP_SENDONLY,
JANUS_SDP_OA_ACCEPT_EXTMAP, JANUS_RTP_EXTMAP_MID,
JANUS_SDP_OA_ACCEPT_EXTMAP, JANUS_RTP_EXTMAP_ABS_SEND_TIME,
JANUS_SDP_OA_ACCEPT_EXTMAP, JANUS_RTP_EXTMAP_PLAYOUT_DELAY,
JANUS_SDP_OA_DONE);
/* Done */
subscribed = g_list_append(subscribed, source);
temp = temp->next;
continue;
}
}
} else {
/* Iterate on all media streams, to see if we can find a match */
gboolean found = FALSE;
janus_streaming_rtp_source *source = (janus_streaming_rtp_source *)mp->source;
janus_streaming_rtp_source_stream *stream = NULL;
janus_streaming_session_stream *s = NULL;
GList *stemp = source->media;
while(stemp) {
stream = (janus_streaming_rtp_source_stream *)stemp->data;
/* Try matching this stream with this m-line */
if(g_list_find(subscribed, stream)) {
/* Stream already matched to a different m-line */
stemp = stemp->next;
continue;
}
if((m->type == JANUS_SDP_AUDIO && stream->type != JANUS_STREAMING_MEDIA_AUDIO) ||
(m->type == JANUS_SDP_VIDEO && stream->type != JANUS_STREAMING_MEDIA_VIDEO) ||
(m->type == JANUS_SDP_APPLICATION && stream->type != JANUS_STREAMING_MEDIA_DATA)) {
/* Stream is not the same type as the m-line, skip to the next */
stemp = stemp->next;
continue;
}
int pt = -1;
const char *codec = NULL;
if(stream->type != JANUS_STREAMING_MEDIA_AUDIO) {
codec = (stream->type == JANUS_STREAMING_MEDIA_AUDIO ?
janus_audiocodec_name(stream->codecs.audio_codec) : janus_videocodec_name(stream->codecs.video_codec));
pt = janus_sdp_get_codec_pt(parsed_sdp, m->index, codec);
if(pt == -1) {
/* This m-line doesn't support this stream's codec, skip to the next stream */
stemp = stemp->next;
continue;
}
}
/* Create a new session stream and add a reference to the source stream */
s = g_malloc0(sizeof(janus_streaming_session_stream));
s->mindex = m->index;
s->send = TRUE;
s->pt = pt;
janus_rtp_switching_context_reset(&s->context);
s->min_delay = -1;
s->max_delay = -1;
if(stream && stream->simulcast) {
/* In case this mountpoint is simulcasting, let's aim high by default */
janus_rtp_switching_context_reset(&s->context);
janus_rtp_simulcasting_context_reset(&s->sim_context);
s->sim_context.substream_target = 2;
s->sim_context.templayer_target = 2;
janus_vp8_simulcast_context_reset(&s->vp8_context);
} else if(stream && stream->svc) {
/* In case this mountpoint is doing VP9-SVC, let's aim high by default */
s->spatial_layer = -1;
s->target_spatial_layer = 2; /* FIXME Chrome sends 0, 1 and 2 (if using EnabledByFlag_3SL3TL) */
s->temporal_layer = -1;
s->target_temporal_layer = 2; /* FIXME Chrome sends 0, 1 and 2 */
}
s->stream = stream;
janus_refcount_increase(&stream->ref);
session->streams = g_list_append(session->streams, s);
if(session->streams_byid == NULL)
session->streams_byid = g_hash_table_new(NULL, NULL);
g_hash_table_insert(session->streams_byid, GINT_TO_POINTER(stream->mindex), s);
/* If this mountpoint is broadcasting end-to-end encrypted media,
* add the info to the JSEP offer we'll be sending them */
session->e2ee = source->e2ee;
/* Also check if we have to offer the playout-delay extension */
session->playoutdelay_ext = source->playoutdelay_ext;
/* Accept the m-line */
janus_sdp_generate_answer_mline(parsed_sdp, answer, m,
JANUS_SDP_OA_MLINE, m->type,
JANUS_SDP_OA_CODEC, codec,
JANUS_SDP_OA_FMTP, stream->codecs.fmtp,
JANUS_SDP_OA_VIDEO_RTCPFB_DEFAULTS, (m->type == JANUS_SDP_VIDEO),
JANUS_SDP_OA_DIRECTION, JANUS_SDP_SENDONLY,
JANUS_SDP_OA_ACCEPT_EXTMAP, JANUS_RTP_EXTMAP_MID,
JANUS_SDP_OA_ACCEPT_EXTMAP, JANUS_RTP_EXTMAP_ABS_SEND_TIME,
JANUS_SDP_OA_ACCEPT_EXTMAP, JANUS_RTP_EXTMAP_PLAYOUT_DELAY,
JANUS_SDP_OA_DONE);
/* Done */
subscribed = g_list_append(subscribed, stream);
found = TRUE;
break;
}
if(found) {
temp = temp->next;
continue;
}
}
/* If we got here, the m-line was rejected */
JANUS_LOG(LOG_WARN, "Skipping %s m-line (no matching mountpoint stream)\n", janus_sdp_mtype_str(m->type));
temp = temp->next;
}
if(subscribed == NULL) {
/* FIXME Ended up not subscribing to any stream? */
JANUS_LOG(LOG_WARN, "Not subscribed to any stream (all m-lines rejected)\n");
}
g_list_free(subscribed);
/* Prepare the response */
sdp_type = "answer";
sdp = janus_sdp_write(answer);
janus_sdp_destroy(parsed_sdp);
janus_sdp_destroy(answer);
result = json_object();
json_object_set_new(result, "status", json_string("starting"));
/* Add the user to the list of watchers and we're done */
if(g_list_find(mp->viewers, session) == NULL) {
mp->viewers = g_list_append(mp->viewers, session);
if(mp->streaming_source == janus_streaming_source_rtp) {
/* If we're using helper threads, add the viewer to one of those */
if(mp->helper_threads > 0) {
int viewers = -1;
janus_streaming_helper *helper = NULL;
GList *l = mp->threads;
while(l) {
janus_streaming_helper *ht = (janus_streaming_helper *)l->data;
if(viewers == -1 || (helper == NULL && ht->num_viewers == 0) || ht->num_viewers < viewers) {
viewers = ht->num_viewers;
helper = ht;
}
l = l->next;
}
janus_mutex_lock(&helper->mutex);
helper->viewers = g_list_append(helper->viewers, session);
helper->num_viewers++;
janus_mutex_unlock(&helper->mutex);
JANUS_LOG(LOG_VERB, "Added viewer to helper thread #%d (%d viewers)\n",
helper->id, helper->num_viewers);
}
}
}
janus_refcount_increase(&session->ref);
janus_mutex_unlock(&session->mutex);
janus_mutex_unlock(&mp->mutex);
} else if(!strcasecmp(request_text, "start")) {
if(session->mountpoint == NULL) {
JANUS_LOG(LOG_VERB, "Can't start: no mountpoint set\n");
Expand Down

0 comments on commit 7aabc85

Please sign in to comment.