Skip to content
This repository has been archived by the owner on Aug 16, 2023. It is now read-only.

Commit

Permalink
Add janus grouping (#248)
Browse files Browse the repository at this point in the history
  • Loading branch information
feymartynov committed Jun 8, 2021
1 parent 2745ea5 commit a9bce0a
Show file tree
Hide file tree
Showing 13 changed files with 181 additions and 12 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
ALTER TABLE janus_backend DROP COLUMN "group";
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
ALTER TABLE janus_backend ADD COLUMN "group" TEXT;
91 changes: 89 additions & 2 deletions src/app/endpoint/rtc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -354,6 +354,7 @@ impl RequestHandler for ConnectHandler {
// Choose backend to connect.
let backend = {
let conn = context.get_conn()?;
let group = context.config().janus_group.as_deref();

// There are 3 cases:
// 1. Connecting as writer for the first time. There's no `backend_id` in that case.
Expand All @@ -371,9 +372,9 @@ impl RequestHandler for ConnectHandler {
.execute(&conn)?
.ok_or_else(|| anyhow!("No backend found for stream"))
.error(AppErrorKind::BackendNotFound)?,
None => match db::janus_backend::most_loaded(room.id(), &conn)? {
None => match db::janus_backend::most_loaded(room.id(), group, &conn)? {
Some(backend) => backend,
None => db::janus_backend::least_loaded(room.id(), &conn)?
None => db::janus_backend::least_loaded(room.id(), group, &conn)?
.map(|backend| {
use sentry::protocol::{value::Value, Event, Level};
let backend_id = backend.id().to_string();
Expand Down Expand Up @@ -2048,6 +2049,92 @@ mod test {
});
}

#[test]
fn connect_to_rtc_with_backend_grouping() {
async_std::task::block_on(async {
let mut rng = rand::thread_rng();
let db = TestDb::new();
let mut authz = TestAuthz::new();

let (rtc, backend) = {
let conn = db
.connection_pool()
.get()
.expect("Failed to get DB connection");

// Insert two backends in different groups.
let backend1_agent = TestAgent::new("alpha", "janus", SVC_AUDIENCE);

let backend1 = factory::JanusBackend::new(
backend1_agent.agent_id().to_owned(),
rng.gen(),
rng.gen(),
)
.group("wrong")
.insert(&conn);

let backend2_agent = TestAgent::new("beta", "janus", SVC_AUDIENCE);

let backend2 = factory::JanusBackend::new(
backend2_agent.agent_id().to_owned(),
rng.gen(),
rng.gen(),
)
.group("right")
.insert(&conn);

// Add some load to the first backend.
let room1 = shared_helpers::insert_room_with_backend_id(&conn, backend1.id());
let rtc1 = shared_helpers::insert_rtc_with_room(&conn, &room1);
let someone = TestAgent::new("web", "user456", USR_AUDIENCE);

shared_helpers::insert_connected_agent(
&conn,
someone.agent_id(),
rtc1.room_id(),
rtc1.id(),
);

// Insert an RTC to connect to
let rtc2 = shared_helpers::insert_rtc(&conn);
(rtc2, backend2)
};

// Allow agent to read the RTC.
let agent = TestAgent::new("web", "user123", USR_AUDIENCE);
let room_id = rtc.room_id().to_string();
let rtc_id = rtc.id().to_string();
let object = vec!["rooms", &room_id, "rtcs", &rtc_id];
authz.allow(agent.account_id(), object, "read");

// Configure the app to the `right` janus group.
let mut context = TestContext::new(db, authz);
context.config_mut().janus_group = Some(String::from("right"));

// Make rtc.connect request.
let payload = ConnectRequest {
id: rtc.id(),
intent: ConnectIntent::Read,
};

let messages = handle_request::<ConnectHandler>(&mut context, &agent, payload)
.await
.expect("RTC connect failed");

// Assert outgoing request goes to the expected backend.
let (_req, _reqp, topic) = find_request::<JanusAttachRequest>(messages.as_slice());

let expected_topic = format!(
"agents/{}/api/{}/in/{}",
backend.id(),
janus::JANUS_API_VERSION,
context.config().id,
);

assert_eq!(topic, &expected_topic);
});
}

#[test]
fn connect_to_rtc_not_authorized() {
async_std::task::block_on(async {
Expand Down
5 changes: 5 additions & 0 deletions src/backend/janus/events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ pub(crate) struct StatusEvent {
online: bool,
capacity: Option<i32>,
balancer_capacity: Option<i32>,
group: Option<String>,
}

impl StatusEvent {
Expand All @@ -106,4 +107,8 @@ impl StatusEvent {
pub(crate) fn balancer_capacity(&self) -> Option<i32> {
self.balancer_capacity
}

pub(crate) fn group(&self) -> Option<&str> {
self.group.as_deref()
}
}
5 changes: 5 additions & 0 deletions src/backend/janus/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ async fn handle_response_impl<C: Context>(
inresp.data().id(),
tn.capacity(),
tn.balancer_capacity(),
tn.group(),
context.start_timestamp(),
)
.error(AppErrorKind::MessageBuildingFailed)?;
Expand All @@ -109,6 +110,10 @@ async fn handle_response_impl<C: Context>(
q = q.balancer_capacity(balancer_capacity);
}

if let Some(group) = tn.group() {
q = q.group(group);
}

q.execute(&conn)?;
Ok(Box::new(stream::empty()))
}
Expand Down
16 changes: 16 additions & 0 deletions src/backend/janus/transactions/create_handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ pub(crate) struct TransactionData {
session_id: i64,
capacity: Option<i32>,
balancer_capacity: Option<i32>,
group: Option<String>,
}

impl TransactionData {
Expand All @@ -32,6 +33,7 @@ impl TransactionData {
session_id,
capacity: None,
balancer_capacity: None,
group: None,
}
}

Expand All @@ -56,6 +58,15 @@ impl TransactionData {
self.balancer_capacity = Some(balancer_capacity);
self
}

pub(crate) fn group(&self) -> Option<&str> {
self.group.as_ref().map(AsRef::as_ref)
}

pub(crate) fn set_group(&mut self, group: &str) -> &mut Self {
self.group = Some(group.to_owned());
self
}
}

////////////////////////////////////////////////////////////////////////////////
Expand All @@ -67,6 +78,7 @@ impl Client {
session_id: i64,
capacity: Option<i32>,
balancer_capacity: Option<i32>,
group: Option<&str>,
start_timestamp: DateTime<Utc>,
) -> Result<OutgoingMessage<CreateHandleRequest>> {
let to = respp.as_agent_id();
Expand All @@ -80,6 +92,10 @@ impl Client {
tn_data.set_balancer_capacity(balancer_capacity);
}

if let Some(group) = group {
tn_data.set_group(group);
}

let transaction = Transaction::CreateHandle(tn_data);

let payload = CreateHandleRequest::new(
Expand Down
21 changes: 16 additions & 5 deletions src/backend/janus/transactions/create_session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,18 +20,16 @@ const METHOD: &str = "janus_session.create";

////////////////////////////////////////////////////////////////////////////////

#[derive(Debug, Deserialize, Serialize)]
#[derive(Debug, Default, Deserialize, Serialize)]
pub(crate) struct TransactionData {
capacity: Option<i32>,
balancer_capacity: Option<i32>,
group: Option<String>,
}

impl TransactionData {
pub(crate) fn new() -> Self {
Self {
capacity: None,
balancer_capacity: None,
}
Default::default()
}

pub(crate) fn capacity(&self) -> Option<i32> {
Expand All @@ -51,6 +49,15 @@ impl TransactionData {
self.balancer_capacity = Some(balancer_capacity);
self
}

pub(crate) fn group(&self) -> Option<&str> {
self.group.as_ref().map(AsRef::as_ref)
}

pub(crate) fn set_group(&mut self, group: &str) -> &mut Self {
self.group = Some(group.to_owned());
self
}
}

////////////////////////////////////////////////////////////////////////////////
Expand All @@ -73,6 +80,10 @@ impl Client {
tn_data.set_balancer_capacity(balancer_capacity);
}

if let Some(group) = payload.group() {
tn_data.set_group(group);
}

let transaction = Transaction::CreateSession(tn_data);
let payload = CreateSessionRequest::new(&to_base64(&transaction)?);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ impl Client {
) -> Result<OutgoingMessage<MessageRequest>> {
let to = backend.id();
let mut short_term_timing = ShortTermTimingProperties::until_now(start_timestamp);

if let Some(authz_time) = maybe_authz_time {
short_term_timing.set_authorization_time(authz_time);
}
Expand Down
1 change: 1 addition & 0 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ pub(crate) struct Config {
pub(crate) kruonis: KruonisConfig,
pub(crate) metrics: Option<MetricsConfig>,
pub(crate) max_room_duration: Option<i64>,
pub(crate) janus_group: Option<String>,
}

#[derive(Clone, Debug, Deserialize)]
Expand Down
32 changes: 28 additions & 4 deletions src/db/janus_backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ pub(crate) type AllColumns = (
janus_backend::capacity,
janus_backend::balancer_capacity,
janus_backend::api_version,
janus_backend::group,
);

pub(crate) const ALL_COLUMNS: AllColumns = (
Expand All @@ -25,6 +26,7 @@ pub(crate) const ALL_COLUMNS: AllColumns = (
janus_backend::capacity,
janus_backend::balancer_capacity,
janus_backend::api_version,
janus_backend::group,
);

////////////////////////////////////////////////////////////////////////////////
Expand All @@ -39,6 +41,7 @@ pub(crate) struct Object {
capacity: Option<i32>,
balancer_capacity: Option<i32>,
api_version: String,
group: Option<String>,
}

impl Object {
Expand Down Expand Up @@ -134,6 +137,7 @@ pub(crate) struct UpsertQuery<'a> {
capacity: Option<i32>,
balancer_capacity: Option<i32>,
api_version: String,
group: Option<&'a str>,
}

impl<'a> UpsertQuery<'a> {
Expand All @@ -145,6 +149,7 @@ impl<'a> UpsertQuery<'a> {
capacity: None,
balancer_capacity: None,
api_version: JANUS_API_VERSION.to_string(),
group: None,
}
}

Expand All @@ -162,6 +167,13 @@ impl<'a> UpsertQuery<'a> {
}
}

pub(crate) fn group(self, group: &'a str) -> Self {
Self {
group: Some(group),
..self
}
}

pub(crate) fn execute(&self, conn: &PgConnection) -> Result<Object, Error> {
use crate::schema::janus_backend::dsl::janus_backend;
use diesel::RunQueryDsl;
Expand Down Expand Up @@ -249,17 +261,23 @@ const MOST_LOADED_SQL: &str = r#"
WHERE r2.id = $1
AND COALESCE(jb.balancer_capacity, jb.capacity, 2147483647) - COALESCE(jbl.load, 0) >= COALESCE(r2.reserve, 1)
AND jb.api_version = $2
AND ($3 IS NULL OR jb."group" = $3)
ORDER BY COALESCE(jbl.load, 0) DESC, RANDOM()
LIMIT 1
"#;

pub(crate) fn most_loaded(room_id: Uuid, conn: &PgConnection) -> Result<Option<Object>, Error> {
pub(crate) fn most_loaded(
room_id: Uuid,
group: Option<&str>,
conn: &PgConnection,
) -> Result<Option<Object>, Error> {
use diesel::prelude::*;
use diesel::sql_types::{Text, Uuid};
use diesel::sql_types::{Nullable, Text, Uuid};

diesel::sql_query(MOST_LOADED_SQL)
.bind::<Uuid, _>(room_id)
.bind::<Text, _>(JANUS_API_VERSION)
.bind::<Nullable<Text>, _>(group)
.get_result(conn)
.optional()
}
Expand Down Expand Up @@ -310,19 +328,25 @@ const LEAST_LOADED_SQL: &str = r#"
ON 1 = 1
WHERE r2.id = $1
AND jb.api_version = $2
AND ($3 IS NULL OR jb."group" = $3)
ORDER BY
COALESCE(jb.balancer_capacity, jb.capacity, 2147483647) - COALESCE(jbl.load, 0) DESC,
RANDOM()
LIMIT 1
"#;

pub(crate) fn least_loaded(room_id: Uuid, conn: &PgConnection) -> Result<Option<Object>, Error> {
pub(crate) fn least_loaded(
room_id: Uuid,
group: Option<&str>,
conn: &PgConnection,
) -> Result<Option<Object>, Error> {
use diesel::prelude::*;
use diesel::sql_types::{Text, Uuid};
use diesel::sql_types::{Nullable, Text, Uuid};

diesel::sql_query(LEAST_LOADED_SQL)
.bind::<Uuid, _>(room_id)
.bind::<Text, _>(JANUS_API_VERSION)
.bind::<Nullable<Text>, _>(group)
.get_result(conn)
.optional()
}
Expand Down
1 change: 1 addition & 0 deletions src/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ table! {
capacity -> Nullable<Int4>,
balancer_capacity -> Nullable<Int4>,
api_version -> Text,
group -> Nullable<Text>,
}
}

Expand Down
4 changes: 4 additions & 0 deletions src/test_helpers/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,10 @@ impl TestContext {
start_timestamp: Utc::now(),
}
}

pub(crate) fn config_mut(&mut self) -> &mut Config {
&mut self.config
}
}

impl GlobalContext for TestContext {
Expand Down
Loading

0 comments on commit a9bce0a

Please sign in to comment.