Skip to content
This repository has been archived by the owner on Aug 16, 2023. It is now read-only.

Commit

Permalink
Add simple Janus balancing (#76)
Browse files Browse the repository at this point in the history
  • Loading branch information
feymartynov committed Dec 20, 2019
1 parent 865b814 commit 8d81a72
Show file tree
Hide file tree
Showing 7 changed files with 302 additions and 60 deletions.
8 changes: 7 additions & 1 deletion docs/src/api.rtc.connect.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,13 @@
Connect to the real-time connection to send signal messages and media.
The method isn't available for `none` backend.

*NOTE: If Janus Gateway is used as a backend, a handle to the conference plugin for the patricular agent will be created.*
Creates a Janus handle for the particular agent.

If there's already a stream present for the RTC then the returned handle would be bound to the
Janus instance that hosts this stream.

If there's no stream yet then the handle is being balanced to the instance with the least number
of active RTC streams.

**Request**

Expand Down
121 changes: 104 additions & 17 deletions src/app/endpoint/rtc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use uuid::Uuid;

use crate::app::endpoint::shared;
use crate::app::{endpoint, API_VERSION};
use crate::db::{janus_backend, room, rtc, ConnectionPool};
use crate::db::{janus_backend, janus_rtc_stream, room, rtc, ConnectionPool};

////////////////////////////////////////////////////////////////////////////////

Expand Down Expand Up @@ -168,18 +168,34 @@ impl State {
.map_err(|err| SvcError::from(err))?
};

// TODO: implement resource management
// Picking up first available backend
let backends = {
let backend = {
let conn = self.db.get()?;
janus_backend::ListQuery::new().limit(1).execute(&conn)?

// If there is an active stream choose its backend since Janus doesn't support
// clustering so all agents within one rtc must be sent to the same node. If there's no
// active stream it means we're connecting as publisher and going to create it.
// Then select the least loaded node: the one with the least active rtc streams count.
match janus_rtc_stream::FindQuery::new()
.rtc_id(id)
.execute(&conn)?
{
Some(ref stream) => janus_backend::FindQuery::new()
.id(stream.backend_id().to_owned())
.execute(&conn)?
.ok_or_else(|| {
SvcError::builder()
.status(ResponseStatus::UNPROCESSABLE_ENTITY)
.detail("no backend found for stream")
.build()
})?,
None => janus_backend::least_loaded(&conn)?.ok_or_else(|| {
SvcError::builder()
.status(ResponseStatus::UNPROCESSABLE_ENTITY)
.detail("no available backends")
.build()
})?,
}
};
let backend = backends.first().ok_or_else(|| {
SvcError::builder()
.status(ResponseStatus::UNPROCESSABLE_ENTITY)
.detail("no available backends")
.build()
})?;

// Building a Create Janus Gateway Handle request
crate::app::janus::create_rtc_handle_request(
Expand Down Expand Up @@ -321,12 +337,13 @@ mod test {
use serde_json::{json, Value as JsonValue};
use svc_agent::{AccountId, AgentId, Destination};

use crate::db::janus_rtc_stream;
use crate::test_helpers::{
agent::TestAgent,
authz::{no_authz, TestAuthz},
db::TestDb,
extract_payload,
factory::{insert_janus_backend, insert_room, insert_rtc},
factory::{insert_janus_backend, insert_room, insert_rtc, JanusRtcStream},
};
use crate::util::from_base64;

Expand Down Expand Up @@ -393,7 +410,7 @@ mod test {
let payload: RtcResponse = extract_payload(message).unwrap();
assert_eq!(payload.room_id, room.id());

// Assert room presence in the DB.
// Assert rtc presence in the DB.
let conn = db.connection_pool().get().unwrap();
let query = crate::schema::rtc::table.find(resp.id);
assert_eq!(query.execute(&conn).unwrap(), 1);
Expand Down Expand Up @@ -652,10 +669,27 @@ mod test {
.connection_pool()
.get()
.map(|conn| {
(
insert_rtc(&conn, AUDIENCE),
insert_janus_backend(&conn, AUDIENCE),
)
// Insert janus backends.
let backend1 = insert_janus_backend(&conn, AUDIENCE);
let backend2 = insert_janus_backend(&conn, AUDIENCE);

// The first backend has 1 active stream.
let stream1 = JanusRtcStream::new(AUDIENCE)
.backend(&backend1)
.insert(&conn)
.unwrap();

janus_rtc_stream::start(*stream1.id(), &conn).unwrap();

// The second backend has 1 stream that is not started
// so it's free and should be selected by the balancer.
let _stream2 = JanusRtcStream::new(AUDIENCE)
.backend(&backend2)
.insert(&conn)
.unwrap();

let rtc = insert_rtc(&conn, AUDIENCE);
(rtc, backend2)
})
.unwrap();

Expand Down Expand Up @@ -702,6 +736,59 @@ mod test {
});
}

#[test]
fn connect_to_rtc_with_existing_stream() {
futures::executor::block_on(async {
let db = TestDb::new();
let mut authz = TestAuthz::new(AUDIENCE);

// Insert an rtc and janus backend.
let (rtc, backend) = db
.connection_pool()
.get()
.map(|conn| {
let rtc = insert_rtc(&conn, AUDIENCE);

// Insert janus backends.
let _backend1 = insert_janus_backend(&conn, AUDIENCE);
let backend2 = insert_janus_backend(&conn, AUDIENCE);

// The second backend has an active stream already.
let stream = JanusRtcStream::new(AUDIENCE)
.backend(&backend2)
.rtc(&rtc)
.insert(&conn)
.unwrap();

janus_rtc_stream::start(*stream.id(), &conn).unwrap();
(rtc, backend2)
})
.unwrap();

// Allow user to read the rtc.
let agent = TestAgent::new("web", "user123", AUDIENCE);
let room_id = rtc.room_id().to_string();
let rtc_id = rtc.id().to_string();
let object = vec!["rooms", &room_id, "rtcs", &rtc_id];
authz.allow(agent.account_id(), object, "read");

// Make rtc.connect request.
let state = State::new(authz.into(), db.connection_pool().clone());
let payload = json!({"id": rtc.id()});
let request: ConnectRequest = agent.build_request("rtc.connect", &payload).unwrap();
let mut result = state
.connect(request, Utc::now())
.await
.into_result()
.unwrap();
let message = result.remove(0);

// Ensure we're balanced to the backend with the stream.
let resp: RtcConnectResponse = extract_payload(message).unwrap();
assert_eq!(resp.session_id, backend.session_id());
});
}

#[test]
fn connect_to_rtc_missing() {
futures::executor::block_on(async {
Expand Down
12 changes: 8 additions & 4 deletions src/app/endpoint/rtc_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ mod test {
authz::{no_authz, TestAuthz},
db::TestDb,
extract_payload,
factory::{insert_janus_rtc_stream, insert_rtc},
factory::{insert_rtc, JanusRtcStream},
};

use super::*;
Expand Down Expand Up @@ -172,9 +172,13 @@ mod test {
.connection_pool()
.get()
.map(|conn| {
// Insert a janus rtc stream.
let rtc_stream = insert_janus_rtc_stream(&conn, AUDIENCE);
let _other_rtc_stream = insert_janus_rtc_stream(&conn, AUDIENCE);
// Insert janus rtc streams.
let rtc_stream = JanusRtcStream::new(AUDIENCE).insert(&conn).unwrap();
let start_result = janus_rtc_stream::start(*rtc_stream.id(), &conn).unwrap();
let rtc_stream = start_result.unwrap();

let other_rtc_stream = JanusRtcStream::new(AUDIENCE).insert(&conn).unwrap();
janus_rtc_stream::start(*other_rtc_stream.id(), &conn).unwrap();

// Find rtc.
let rtc: crate::db::rtc::Object = crate::schema::rtc::table
Expand Down
6 changes: 5 additions & 1 deletion src/app/endpoint/subscription.rs
Original file line number Diff line number Diff line change
Expand Up @@ -393,7 +393,11 @@ mod test {
.get()
.map_err(|err| format_err!("Failed to get DB connection: {}", err))
.and_then(|conn| {
let stream = factory::insert_janus_rtc_stream(&conn, AUDIENCE);
let stream = factory::JanusRtcStream::new(AUDIENCE).insert(&conn)?;

let stream = janus_rtc_stream::start(*stream.id(), &conn)
.expect("Failed to start stream")
.expect("No stream returned");

let agent = factory::Agent::new()
.agent_id(stream.sent_by())
Expand Down
54 changes: 54 additions & 0 deletions src/db/janus_backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,36 @@ impl<'a> ListQuery<'a> {

////////////////////////////////////////////////////////////////////////////////

pub(crate) struct FindQuery {
id: Option<AgentId>,
}

impl FindQuery {
pub(crate) fn new() -> Self {
Self { id: None }
}

pub(crate) fn id(self, id: AgentId) -> Self {
Self {
id: Some(id),
..self
}
}

pub(crate) fn execute(&self, conn: &PgConnection) -> Result<Option<Object>, Error> {
use diesel::prelude::*;

match self.id {
Some(ref id) => janus_backend::table.find(id).get_result(conn).optional(),
None => Err(Error::QueryBuilderError(
"id parameter is required parameter of the query".into(),
)),
}
}
}

////////////////////////////////////////////////////////////////////////////////

#[derive(Debug, Insertable, AsChangeset)]
#[table_name = "janus_backend"]
pub(crate) struct UpdateQuery<'a> {
Expand Down Expand Up @@ -134,3 +164,27 @@ impl<'a> DeleteQuery<'a> {
diesel::delete(janus_backend::table.filter(janus_backend::id.eq(self.id))).execute(conn)
}
}

////////////////////////////////////////////////////////////////////////////////

const LEAST_LOADED_SQL: &str = r#"
select jb.*
from janus_backend as jb
left join (
select *
from janus_rtc_stream
where lower(time) is not null
and upper(time) is null
) as jrs
on jrs.backend_id = jb.id
group by jb.id
order by count(jrs.id)
"#;

pub(crate) fn least_loaded(conn: &PgConnection) -> Result<Option<Object>, Error> {
use diesel::prelude::*;

diesel::sql_query(LEAST_LOADED_SQL)
.get_result(conn)
.optional()
}
65 changes: 53 additions & 12 deletions src/db/janus_rtc_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,34 +86,75 @@ impl Object {

////////////////////////////////////////////////////////////////////////////////

const ACTIVE_SQL: &str = r#"(
lower("janus_rtc_stream"."time") is not null
and upper("janus_rtc_stream"."time") is null
)"#;

pub(crate) struct FindQuery {
id: Option<Uuid>,
rtc_id: Option<Uuid>,
active: Option<bool>,
}

impl FindQuery {
pub(crate) fn new() -> Self {
Self { id: None }
Self {
id: None,
rtc_id: None,
active: None,
}
}

pub(crate) fn id(self, id: Uuid) -> Self {
Self {
id: Some(id),
..self
}
}

pub(crate) fn rtc_id(self, rtc_id: Uuid) -> Self {
Self {
rtc_id: Some(rtc_id),
..self
}
}

pub(crate) fn active(self, active: bool) -> Self {
Self {
active: Some(active),
..self
}
}

pub(crate) fn execute(&self, conn: &PgConnection) -> Result<Option<Object>, Error> {
use diesel::dsl::sql;
use diesel::prelude::*;

match self.id {
Some(ref id) => janus_rtc_stream::table.find(id).get_result(conn).optional(),
_ => Err(Error::QueryBuilderError(
"id is required parameter of the query".into(),
)),
}
let mut query = match (self.id, self.rtc_id) {
(Some(ref id), None) => janus_rtc_stream::table.find(id.to_owned()).into_boxed(),
(None, Some(ref rtc_id)) => janus_rtc_stream::table
.filter(janus_rtc_stream::rtc_id.eq(rtc_id.to_owned()))
.into_boxed(),
_ => {
return Err(Error::QueryBuilderError(
"id either rtc_id is required parameter of the query".into(),
))
}
};

let query = match self.active {
Some(true) => query.filter(sql(ACTIVE_SQL)),
Some(false) => query.filter(sql(&format!("not {}", ACTIVE_SQL))),
None => query,
};

query.get_result(conn).optional()
}
}

////////////////////////////////////////////////////////////////////////////////

const ACTIVE_SQL: &str = r#"(
lower("janus_rtc_stream"."time") is not null
and upper("janus_rtc_stream"."time") is null
)"#;

pub(crate) struct ListQuery {
room_id: Option<Uuid>,
rtc_id: Option<Uuid>,
Expand Down
Loading

0 comments on commit 8d81a72

Please sign in to comment.