Skip to content
This repository has been archived by the owner on Aug 16, 2023. It is now read-only.

Commit

Permalink
Merge pull request #159 from netology-group/feature/ULMS-817
Browse files Browse the repository at this point in the history
Add janus requests timeout
  • Loading branch information
feymartynov committed Sep 30, 2020
2 parents 2b601a9 + c69b6e6 commit 183eb7a
Show file tree
Hide file tree
Showing 25 changed files with 1,485 additions and 1,153 deletions.
7 changes: 6 additions & 1 deletion App.toml.sample
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
id = "conference.svc.example.org"
agent_label = "alpha"
broker_id = "mqtt-gateway.svc.example.org"
backend_id = "janus-gateway.svc.example.org"

[id_token]
algorithm = "ES256"
Expand All @@ -20,3 +19,9 @@ trusted = ["cron.svc.example.org"]
[mqtt]
uri = "mqtt://192.168.99.100:1883"
clean_session = false

[backend]
id = "janus-gateway.svc.example.org"
default_timeout = 5
stream_upload_timeout = 600
transaction_watchdog_check_period = 1
16 changes: 15 additions & 1 deletion src/app/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use svc_authz::cache::ConnectionPool as RedisConnectionPool;
use svc_authz::ClientMap as Authz;

use crate::app::metrics::{DbPoolStatsCollector, DynamicStatsCollector};
use crate::backend::janus::Client as JanusClient;
use crate::config::Config;
use crate::db::ConnectionPool as Db;

Expand All @@ -16,6 +17,7 @@ pub(crate) struct AppContext {
authz: Authz,
db: Db,
agent_id: AgentId,
janus_client: Arc<JanusClient>,
janus_topics: JanusTopics,
queue_counter: Option<QueueCounterHandle>,
redis_pool: Option<RedisConnectionPool>,
Expand All @@ -24,14 +26,21 @@ pub(crate) struct AppContext {
}

impl AppContext {
pub(crate) fn new(config: Config, authz: Authz, db: Db, janus_topics: JanusTopics) -> Self {
pub(crate) fn new(
config: Config,
authz: Authz,
db: Db,
janus_client: JanusClient,
janus_topics: JanusTopics,
) -> Self {
let agent_id = AgentId::new(&config.agent_label, config.id.to_owned());

Self {
config: Arc::new(config),
authz,
db,
agent_id,
janus_client: Arc::new(janus_client),
janus_topics,
queue_counter: None,
redis_pool: None,
Expand Down Expand Up @@ -67,6 +76,7 @@ pub(crate) trait Context: Sync {
fn config(&self) -> &Config;
fn db(&self) -> &Db;
fn agent_id(&self) -> &AgentId;
fn janus_client(&self) -> Arc<JanusClient>;
fn janus_topics(&self) -> &JanusTopics;
fn queue_counter(&self) -> &Option<QueueCounterHandle>;
fn redis_pool(&self) -> &Option<RedisConnectionPool>;
Expand All @@ -91,6 +101,10 @@ impl Context for AppContext {
&self.agent_id
}

fn janus_client(&self) -> Arc<JanusClient> {
self.janus_client.clone()
}

fn janus_topics(&self) -> &JanusTopics {
&self.janus_topics
}
Expand Down
2 changes: 1 addition & 1 deletion src/app/endpoint/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,11 @@ use svc_agent::mqtt::{
use svc_error::Error as SvcError;

use crate::app::context::Context;
use crate::app::janus;
pub(self) use crate::app::message_handler::MessageStream;
use crate::app::message_handler::{
EventEnvelopeHandler, RequestEnvelopeHandler, ResponseEnvelopeHandler,
};
use crate::backend::janus;

///////////////////////////////////////////////////////////////////////////////

Expand Down
4 changes: 1 addition & 3 deletions src/app/endpoint/rtc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ use uuid::Uuid;
use crate::app::context::Context;
use crate::app::endpoint::prelude::*;
use crate::app::handle_id::HandleId;
use crate::app::janus;
use crate::db;

////////////////////////////////////////////////////////////////////////////////
Expand Down Expand Up @@ -344,13 +343,12 @@ impl RequestHandler for ConnectHandler {
};

// Send janus handle creation request.
let janus_request_result = janus::create_rtc_handle_request(
let janus_request_result = context.janus_client().create_rtc_handle_request(
reqp.clone(),
Uuid::new_v4(),
payload.id,
backend.session_id(),
backend.id(),
context.agent_id(),
start_timestamp,
authz_time,
);
Expand Down
82 changes: 42 additions & 40 deletions src/app/endpoint/rtc_signal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ use svc_error::Error as SvcError;
use crate::app::context::Context;
use crate::app::endpoint::prelude::*;
use crate::app::handle_id::HandleId;
use crate::app::janus;
use crate::db;

////////////////////////////////////////////////////////////////////////////////
Expand Down Expand Up @@ -74,20 +73,21 @@ impl RequestHandler for CreateHandler {
// Authorization
let authz_time = authorize(context, &payload, reqp, "read").await?;

janus::read_stream_request(
reqp.clone(),
payload.handle_id.janus_session_id(),
payload.handle_id.janus_handle_id(),
payload.handle_id.rtc_id(),
payload.jsep.clone(),
payload.handle_id.backend_id(),
context.agent_id(),
start_timestamp,
authz_time,
)
.map(|req| Box::new(req) as Box<dyn IntoPublishableMessage + Send>)
.map_err(|err| format!("error creating a backend request: {}", err))
.status(ResponseStatus::UNPROCESSABLE_ENTITY)?
context
.janus_client()
.read_stream_request(
reqp.clone(),
payload.handle_id.janus_session_id(),
payload.handle_id.janus_handle_id(),
payload.handle_id.rtc_id(),
payload.jsep.clone(),
payload.handle_id.backend_id(),
start_timestamp,
authz_time,
)
.map(|req| Box::new(req) as Box<dyn IntoPublishableMessage + Send>)
.map_err(|err| format!("error creating a backend request: {}", err))
.status(ResponseStatus::UNPROCESSABLE_ENTITY)?
} else {
// Authorization
let authz_time = authorize(context, &payload, reqp, "update").await?;
Expand All @@ -113,42 +113,44 @@ impl RequestHandler for CreateHandler {
.execute(&conn)?;
}

janus::create_stream_request(
context
.janus_client()
.create_stream_request(
reqp.clone(),
payload.handle_id.janus_session_id(),
payload.handle_id.janus_handle_id(),
payload.handle_id.rtc_id(),
payload.jsep.clone(),
payload.handle_id.backend_id(),
start_timestamp,
authz_time,
)
.map(|req| Box::new(req) as Box<dyn IntoPublishableMessage + Send>)
.map_err(|err| format!("error creating a backend request: {}", err))
.status(ResponseStatus::UNPROCESSABLE_ENTITY)?
}
}
SdpType::Answer => {
Err("sdp_type = 'answer' is not allowed").status(ResponseStatus::BAD_REQUEST)?
}
SdpType::IceCandidate => {
// Authorization
let authz_time = authorize(context, &payload, reqp, "read").await?;

context
.janus_client()
.trickle_request(
reqp.clone(),
payload.handle_id.janus_session_id(),
payload.handle_id.janus_handle_id(),
payload.handle_id.rtc_id(),
payload.jsep.clone(),
payload.handle_id.backend_id(),
context.agent_id(),
start_timestamp,
authz_time,
)
.map(|req| Box::new(req) as Box<dyn IntoPublishableMessage + Send>)
.map_err(|err| format!("error creating a backend request: {}", err))
.status(ResponseStatus::UNPROCESSABLE_ENTITY)?
}
}
SdpType::Answer => {
Err("sdp_type = 'answer' is not allowed").status(ResponseStatus::BAD_REQUEST)?
}
SdpType::IceCandidate => {
// Authorization
let authz_time = authorize(context, &payload, reqp, "read").await?;

crate::app::janus::trickle_request(
reqp.clone(),
payload.handle_id.janus_session_id(),
payload.handle_id.janus_handle_id(),
payload.jsep.clone(),
payload.handle_id.backend_id(),
context.agent_id(),
start_timestamp,
authz_time,
)
.map(|req| Box::new(req) as Box<dyn IntoPublishableMessage + Send>)
.map_err(|err| format!("error creating a backend request: {}", err))
.status(ResponseStatus::UNPROCESSABLE_ENTITY)?
}
};

Expand Down
4 changes: 1 addition & 3 deletions src/app/endpoint/subscription.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ use uuid::Uuid;

use crate::app::context::Context;
use crate::app::endpoint::prelude::*;
use crate::app::janus;
use crate::db;

///////////////////////////////////////////////////////////////////////////////
Expand Down Expand Up @@ -185,13 +184,12 @@ impl EventHandler for DeleteHandler {
.execute(&conn)?;

for backend in backends {
let result = janus::agent_leave_request(
let result = context.janus_client().agent_leave_request(
evp.to_owned(),
backend.session_id(),
backend.handle_id(),
&payload.subject,
backend.id(),
context.agent_id(),
evp.tracking(),
);

Expand Down
33 changes: 17 additions & 16 deletions src/app/endpoint/system.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use uuid::Uuid;

use crate::app::context::Context;
use crate::app::endpoint::prelude::*;
use crate::app::janus;
use crate::backend::janus::requests::UploadStreamRequestBody;
use crate::db;
use crate::db::recording::Status as RecordingStatus;

Expand Down Expand Up @@ -82,21 +82,22 @@ impl RequestHandler for VacuumHandler {
.execute(&conn)?;

// TODO: Send the error as an event to "app/${APP}/audiences/${AUD}" topic
let backreq = janus::upload_stream_request(
reqp,
backend.session_id(),
backend.handle_id(),
janus::UploadStreamRequestBody::new(
recording.rtc_id(),
&bucket_name(&room),
&record_name(&recording),
),
backend.id(),
context.agent_id(),
start_timestamp,
)
.map_err(|err| format!("error creating a backend request: {}", err))
.status(ResponseStatus::UNPROCESSABLE_ENTITY)?;
let backreq = context
.janus_client()
.upload_stream_request(
reqp,
backend.session_id(),
backend.handle_id(),
UploadStreamRequestBody::new(
recording.rtc_id(),
&bucket_name(&room),
&record_name(&recording),
),
backend.id(),
start_timestamp,
)
.map_err(|err| format!("error creating a backend request: {}", err))
.status(ResponseStatus::UNPROCESSABLE_ENTITY)?;

requests.push(Box::new(backreq) as Box<dyn IntoPublishableMessage + Send>);
}
Expand Down
20 changes: 13 additions & 7 deletions src/app/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use svc_error::{extension::sentry, Error as SvcError};

use crate::app::context::Context;
use crate::app::metrics::DbPoolStatsCollector;
use crate::backend::janus::Client as JanusClient;
use crate::config::{self, Config, KruonisConfig};
use crate::db::ConnectionPool;
use context::{AppContext, JanusTopics};
Expand Down Expand Up @@ -87,9 +88,15 @@ pub(crate) async fn run(
let janus_topics = subscribe(&mut agent, &agent_id, &config)?;

// Context
let context = AppContext::new(config.clone(), authz, db.clone(), janus_topics)
.add_queue_counter(agent.get_queue_counter())
.db_pool_stats(db_pool_stats);
let context = AppContext::new(
config.clone(),
authz,
db.clone(),
JanusClient::start(&config.backend, agent_id)?,
janus_topics,
)
.add_queue_counter(agent.get_queue_counter())
.db_pool_stats(db_pool_stats);

let context = match redis_pool {
Some(pool) => context.add_redis_pool(pool),
Expand Down Expand Up @@ -156,7 +163,7 @@ fn subscribe(agent: &mut Agent, agent_id: &AgentId, config: &Config) -> Result<J
.context("Error subscribing to unicast requests")?;

// Janus status events
let subscription = Subscription::broadcast_events(&config.backend_id, API_VERSION, "status");
let subscription = Subscription::broadcast_events(&config.backend.id, API_VERSION, "status");

agent
.subscribe(&subscription, QoS::AtLeastOnce, Some(&group))
Expand All @@ -167,7 +174,7 @@ fn subscribe(agent: &mut Agent, agent_id: &AgentId, config: &Config) -> Result<J
.context("Error building janus events subscription topic")?;

// Janus events
let subscription = Subscription::broadcast_events(&config.backend_id, API_VERSION, "events");
let subscription = Subscription::broadcast_events(&config.backend.id, API_VERSION, "events");

agent
.subscribe(&subscription, QoS::AtLeastOnce, Some(&group))
Expand All @@ -178,7 +185,7 @@ fn subscribe(agent: &mut Agent, agent_id: &AgentId, config: &Config) -> Result<J
.context("Error building janus events subscription topic")?;

// Janus responses
let subscription = Subscription::unicast_responses_from(&config.backend_id);
let subscription = Subscription::unicast_responses_from(&config.backend.id);

agent
.subscribe(&subscription, QoS::AtLeastOnce, Some(&group))
Expand Down Expand Up @@ -237,6 +244,5 @@ fn resubscribe(agent: &mut Agent, agent_id: &AgentId, config: &Config) {
pub(crate) mod context;
pub(crate) mod endpoint;
pub(crate) mod handle_id;
mod janus;
pub(crate) mod message_handler;
pub(crate) mod metrics;
Loading

0 comments on commit 183eb7a

Please sign in to comment.