Skip to content

Commit

Permalink
Add support for receiving offers in Streaming plugin (see meetecho#3199)
Browse files Browse the repository at this point in the history
  • Loading branch information
lminiero committed Apr 26, 2023
1 parent 9f89269 commit a182323
Showing 1 changed file with 190 additions and 4 deletions.
194 changes: 190 additions & 4 deletions plugins/janus_streaming.c
Original file line number Diff line number Diff line change
Expand Up @@ -740,6 +740,7 @@ rtsp_conn_timeout = connection timeout for cURL (CURLOPT_CONNECTTIMEOUT) call ga
#include "../rtcp.h"
#include "../record.h"
#include "../utils.h"
#include "../sdp-utils.h"
#include "../ip-utils.h"

/* Default settings */
Expand All @@ -749,8 +750,8 @@ rtsp_conn_timeout = connection timeout for cURL (CURLOPT_CONNECTTIMEOUT) call ga
#define JANUS_STREAMING_DEFAULT_CURL_CONNECT_TIMEOUT 5L /* Connection timeout for cURL. */

/* Plugin information */
#define JANUS_STREAMING_VERSION 8
#define JANUS_STREAMING_VERSION_STRING "0.0.8"
#define JANUS_STREAMING_VERSION 9
#define JANUS_STREAMING_VERSION_STRING "0.0.9"
#define JANUS_STREAMING_DESCRIPTION "This is a streaming plugin for Janus, allowing WebRTC peers to watch/listen to pre-recorded files or media generated by an external source."
#define JANUS_STREAMING_NAME "JANUS Streaming plugin"
#define JANUS_STREAMING_AUTHOR "Meetecho s.r.l."
Expand Down Expand Up @@ -1652,6 +1653,7 @@ static void janus_streaming_parse_sprop(janus_streaming_rtp_source *source, char
#define JANUS_STREAMING_ERROR_CANT_SWITCH 458
#define JANUS_STREAMING_ERROR_CANT_RECORD 459
#define JANUS_STREAMING_ERROR_INVALID_STATE 460
#define JANUS_STREAMING_ERROR_INVALID_SDP 461
#define JANUS_STREAMING_ERROR_UNKNOWN_ERROR 470


Expand Down Expand Up @@ -4745,11 +4747,13 @@ static void *janus_streaming_handler(void *data) {
json_t *request = json_object_get(root, "request");
const char *request_text = json_string_value(request);
json_t *result = NULL;
const char *sdp_type = NULL;
const char *sdp_type = json_string_value(json_object_get(msg->jsep, "type"));
const char *jsep_sdp = (char *)json_string_value(json_object_get(msg->jsep, "sdp"));
char *sdp = NULL;
gboolean do_restart = FALSE;
/* All these requests can only be handled asynchronously */
if(!strcasecmp(request_text, "watch")) {
if(!strcasecmp(request_text, "watch") && jsep_sdp == NULL) {
/* New subscriber, plugin will generate an offer */
JANUS_VALIDATE_JSON_OBJECT(root, watch_parameters,
error_code, error_cause, TRUE,
JANUS_STREAMING_ERROR_MISSING_ELEMENT, JANUS_STREAMING_ERROR_INVALID_ELEMENT);
Expand Down Expand Up @@ -5122,6 +5126,188 @@ static void *janus_streaming_handler(void *data) {
}
janus_mutex_unlock(&session->mutex);
janus_mutex_unlock(&mp->mutex);
} else if(!strcasecmp(request_text, "watch") && jsep_sdp != NULL) {
/* New subscriber provided an offer, plugin will answer */
if(sdp_type == NULL || strcasecmp(sdp_type, "offer")) {
/* This isn't an offer, respond with an error */
JANUS_LOG(LOG_ERR, "User provided SDP for a watch request must be an offer\n");
error_code = JANUS_STREAMING_ERROR_INVALID_SDP;
g_snprintf(error_cause, 512, "User provided SDP for a watch request must be an offer");
goto error;
}
char error_str[512];
janus_sdp *parsed_sdp = janus_sdp_parse(jsep_sdp, error_str, sizeof(error_str));
if(parsed_sdp == NULL) {
JANUS_LOG(LOG_ERR, "Error parsing SDP: %s\n", error_str);
error_code = JANUS_STREAMING_ERROR_INVALID_SDP;
g_snprintf(error_cause, 512, "Error parsing SDP: %s", error_str);
goto error;
}
/* When users provide an offer for a "watch", we ignore the media object, as
* we'll just match offered m-lines with available streams in the mountpoint;
* that said, we still validate the JSON request as for a generic "watch" */
JANUS_VALIDATE_JSON_OBJECT(root, watch_parameters,
error_code, error_cause, TRUE,
JANUS_STREAMING_ERROR_MISSING_ELEMENT, JANUS_STREAMING_ERROR_INVALID_ELEMENT);
if(error_code != 0) {
janus_sdp_destroy(parsed_sdp);
goto error;
}
if(!string_ids) {
JANUS_VALIDATE_JSON_OBJECT(root, id_parameters,
error_code, error_cause, TRUE,
JANUS_STREAMING_ERROR_MISSING_ELEMENT, JANUS_STREAMING_ERROR_INVALID_ELEMENT);
} else {
JANUS_VALIDATE_JSON_OBJECT(root, idstr_parameters,
error_code, error_cause, TRUE,
JANUS_STREAMING_ERROR_MISSING_ELEMENT, JANUS_STREAMING_ERROR_INVALID_ELEMENT);
}
if(error_code != 0) {
janus_sdp_destroy(parsed_sdp);
goto error;
}
json_t *id = json_object_get(root, "id");
guint64 id_value = 0;
char id_num[30], *id_value_str = NULL;
if(!string_ids) {
id_value = json_integer_value(id);
g_snprintf(id_num, sizeof(id_num), "%"SCNu64, id_value);
id_value_str = id_num;
} else {
id_value_str = (char *)json_string_value(id);
}
/* Find the mountpoint and go on */
janus_mutex_lock(&mountpoints_mutex);
janus_streaming_mountpoint *mp = g_hash_table_lookup(mountpoints,
string_ids ? (gpointer)id_value_str : (gpointer)&id_value);
if(mp == NULL) {
janus_mutex_unlock(&mountpoints_mutex);
janus_sdp_destroy(parsed_sdp);
JANUS_LOG(LOG_VERB, "No such mountpoint/stream %s\n", id_value_str);
error_code = JANUS_STREAMING_ERROR_NO_SUCH_MOUNTPOINT;
g_snprintf(error_cause, 512, "No such mountpoint/stream %s", id_value_str);
goto error;
}
janus_refcount_increase(&mp->ref);
/* A secret may be required for this action */
JANUS_CHECK_SECRET(mp->pin, root, "pin", error_code, error_cause,
JANUS_STREAMING_ERROR_MISSING_ELEMENT, JANUS_STREAMING_ERROR_INVALID_ELEMENT, JANUS_STREAMING_ERROR_UNAUTHORIZED);
if(error_code != 0) {
janus_refcount_decrease(&mp->ref);
janus_mutex_unlock(&mountpoints_mutex);
janus_sdp_destroy(parsed_sdp);
goto error;
}
janus_mutex_lock(&mp->mutex);
janus_mutex_lock(&session->mutex);
janus_mutex_unlock(&mountpoints_mutex);
if(session->mountpoint) {
/* Already watching something else */
JANUS_LOG(LOG_ERR, "Already watching mountpoint %s\n", session->mountpoint->id_str);
error_code = JANUS_STREAMING_ERROR_INVALID_STATE;
g_snprintf(error_cause, 512, "Already watching mountpoint %s", session->mountpoint->id_str);
janus_mutex_unlock(&session->mutex);
janus_mutex_unlock(&mp->mutex);
janus_refcount_decrease(&mp->ref);
janus_sdp_destroy(parsed_sdp);
goto error;
}
if(g_list_find(mp->viewers, session) != NULL) {
janus_mutex_unlock(&session->mutex);
janus_mutex_unlock(&mp->mutex);
janus_refcount_decrease(&mp->ref);
JANUS_LOG(LOG_ERR, "Already watching a stream (found %p in %s's viewers)...\n", session, id_value_str);
error_code = JANUS_STREAMING_ERROR_UNKNOWN_ERROR;
g_snprintf(error_cause, 512, "Already watching a stream");
janus_sdp_destroy(parsed_sdp);
goto error;
}
g_atomic_int_set(&session->stopping, 0);
session->mountpoint = mp;
session->audio = TRUE; /* True by default */
if(!mp->audio)
session->audio = FALSE; /* ... unless the mountpoint isn't sending any audio */
session->video = TRUE; /* True by default */
if(!mp->video)
session->video = FALSE; /* ... unless the mountpoint isn't sending any video */
session->data = TRUE; /* True by default */
if(!mp->data)
session->data = FALSE; /* ... unless the mountpoint isn't sending any data */
/* In case this mountpoint is simulcasting, let's aim high by default */
janus_rtp_switching_context_reset(&session->context);
janus_rtp_simulcasting_context_reset(&session->sim_context);
session->sim_context.substream_target = 2;
session->sim_context.templayer_target = 2;
janus_vp8_simulcast_context_reset(&session->vp8_context);
/* Start preparing an answer */
char *audio_codec = NULL, *video_codec = NULL;
if(session->audio) {
audio_codec = g_strdup(mp->codecs.audio_rtpmap);
char *slash = strstr(audio_codec, "/");
if(slash)
*slash = '\0';
session->audio_pt = janus_sdp_get_codec_pt(parsed_sdp, audio_codec);
if(session->audio_pt < 0)
session->audio = FALSE;
}
if(session->video) {
video_codec = g_strdup(mp->codecs.video_rtpmap);
char *slash = strstr(video_codec, "/");
if(slash)
*slash = '\0';
session->video_pt = janus_sdp_get_codec_pt(parsed_sdp, video_codec);
if(session->video_pt < 0)
session->video = FALSE;
}
janus_sdp *answer = janus_sdp_generate_answer(parsed_sdp,
JANUS_SDP_OA_AUDIO_CODEC, session->audio ? audio_codec : NULL,
JANUS_SDP_OA_AUDIO_FMTP, mp->codecs.audio_fmtp,
JANUS_SDP_OA_AUDIO_DIRECTION, JANUS_SDP_SENDONLY,
JANUS_SDP_OA_VIDEO_CODEC, session->video ? video_codec : NULL,
JANUS_SDP_OA_VIDEO_FMTP, mp->codecs.video_fmtp,
JANUS_SDP_OA_VIDEO_DIRECTION, JANUS_SDP_SENDONLY,
JANUS_SDP_OA_ACCEPT_EXTMAP, JANUS_RTP_EXTMAP_MID,
JANUS_SDP_OA_ACCEPT_EXTMAP, JANUS_RTP_EXTMAP_ABS_SEND_TIME,
JANUS_SDP_OA_ACCEPT_EXTMAP, JANUS_RTP_EXTMAP_PLAYOUT_DELAY,
JANUS_SDP_OA_DONE);
g_free(audio_codec);
g_free(video_codec);
/* Prepare the response */
sdp_type = "answer";
sdp = janus_sdp_write(answer);
janus_sdp_destroy(parsed_sdp);
janus_sdp_destroy(answer);
result = json_object();
json_object_set_new(result, "status", json_string("starting"));
/* Add the user to the list of watchers and we're done */
if(g_list_find(mp->viewers, session) == NULL) {
mp->viewers = g_list_append(mp->viewers, session);
if(mp->streaming_source == janus_streaming_source_rtp) {
/* If we're using helper threads, add the viewer to one of those */
if(mp->helper_threads > 0) {
int viewers = -1;
janus_streaming_helper *helper = NULL;
GList *l = mp->threads;
while(l) {
janus_streaming_helper *ht = (janus_streaming_helper *)l->data;
if(viewers == -1 || (helper == NULL && ht->num_viewers == 0) || ht->num_viewers < viewers) {
viewers = ht->num_viewers;
helper = ht;
}
l = l->next;
}
janus_mutex_lock(&helper->mutex);
helper->viewers = g_list_append(helper->viewers, session);
helper->num_viewers++;
janus_mutex_unlock(&helper->mutex);
JANUS_LOG(LOG_VERB, "Added viewer to helper thread #%d (%d viewers)\n",
helper->id, helper->num_viewers);
}
}
}
janus_refcount_increase(&session->ref);
janus_mutex_unlock(&session->mutex);
janus_mutex_unlock(&mp->mutex);
} else if(!strcasecmp(request_text, "start")) {
if(session->mountpoint == NULL) {
JANUS_LOG(LOG_VERB, "Can't start: no mountpoint set\n");
Expand Down

0 comments on commit a182323

Please sign in to comment.