Skip to content
This repository has been archived by the owner on Aug 16, 2023. It is now read-only.

Commit

Permalink
Add falling back to the least loaded backend (#163)
Browse files Browse the repository at this point in the history
  • Loading branch information
feymartynov committed Oct 8, 2020
1 parent 7a47280 commit f797bbf
Show file tree
Hide file tree
Showing 6 changed files with 149 additions and 33 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ http = "0.1"
lazy_static = "1.4"
openssl = "*"
rand = "0.7"
sentry = "0.18"
serde = "1.0"
serde_derive = "1.0"
serde_json = "1.0"
Expand Down
108 changes: 76 additions & 32 deletions src/app/endpoint/rtc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,9 @@ impl RequestHandler for ConnectHandler {
// There are 3 cases:
// 1. Connecting as writer for the first time. There's no recording in that case.
// Select the most loaded backend that is capable to host the room's reservation.
// If there's no capable backend then select the least loaded and send a warning
// to Sentry. If there are no backends at all then return `no available backends`
// error and also send it to Sentry.
// 2. Connecting as reader with existing recording. Choose the backend of the active
// recording because Janus doesn't support clustering and it must be the same server
// that the writer is connected to.
Expand All @@ -311,9 +314,34 @@ impl RequestHandler for ConnectHandler {
.execute(&conn)?
.ok_or("no backend found for stream")
.status(ResponseStatus::UNPROCESSABLE_ENTITY)?,
None => db::janus_backend::most_loaded(room.id(), &conn)?
.ok_or("no available backends")
.status(ResponseStatus::SERVICE_UNAVAILABLE)?,
None => match db::janus_backend::most_loaded(room.id(), &conn)? {
Some(backend) => backend,
None => db::janus_backend::least_loaded(room.id(), &conn)?
.map(|backend| {
use sentry::protocol::{value::Value, Event, Level};

let mut extra = std::collections::BTreeMap::new();
extra.insert(String::from("room_id"), Value::from(room_id));
extra.insert(String::from("rtc_id"), Value::from(rtc_id));
let backend_id = backend.id().to_string();
extra.insert(String::from("backend_id"), Value::from(backend_id));

if let Some(reserve) = room.reserve() {
extra.insert(String::from("reserve"), Value::from(reserve));
}

sentry::capture_event(Event {
message: Some(String::from("No capable backends to host the reserve; falling back to the least loaded backend")),
level: Level::Warning,
extra,
..Default::default()
});

backend
})
.ok_or("no available backends")
.status(ResponseStatus::SERVICE_UNAVAILABLE)?,
},
};

// Create recording if a writer connects for the first time.
Expand Down Expand Up @@ -1326,28 +1354,29 @@ mod test {
}

#[test]
fn connect_to_rtc_reservation_with_stopped_stream() {
fn connect_to_rtc_too_big_reserve() {
async_std::task::block_on(async {
let mut rng = rand::thread_rng();
let db = TestDb::new();
let mut authz = TestAuthz::new();
let writer = TestAgent::new("web", "writer", USR_AUDIENCE);
let writer1 = TestAgent::new("web", "writer1", USR_AUDIENCE);
let writer2 = TestAgent::new("web", "writer2", USR_AUDIENCE);

let rtc = db
let (rtc, backend) = db
.connection_pool()
.get()
.map(|conn| {
let now = Utc::now();

// Insert rooms with reserves.
// Insert room with big reserve and another one for load.
let room1 = factory::Room::new()
.audience(USR_AUDIENCE)
.time((
Bound::Included(now),
Bound::Excluded(now + Duration::hours(1)),
))
.backend(RoomBackend::Janus)
.reserve(15)
.reserve(100)
.insert(&conn);

let room2 = factory::Room::new()
Expand All @@ -1357,62 +1386,77 @@ mod test {
Bound::Excluded(now + Duration::hours(1)),
))
.backend(RoomBackend::Janus)
.reserve(15)
.insert(&conn);

// Insert rtcs.
let rtc1 = factory::Rtc::new(room1.id()).insert(&conn);
let rtc2 = factory::Rtc::new(room2.id()).insert(&conn);
let _rtc2 = factory::Rtc::new(room2.id()).insert(&conn);

// Insert a backend with capacity less than the sum of reserves.
let backend_id = {
// Insert backends with low balancer capacity.
let backend_id1 = {
let agent = TestAgent::new("alpha", "janus", SVC_AUDIENCE);
agent.agent_id().to_owned()
};

let backend = factory::JanusBackend::new(backend_id, rng.gen(), rng.gen())
.capacity(20)
.insert(&conn);
let backend1 =
factory::JanusBackend::new(backend_id1, rng.gen(), rng.gen())
.balancer_capacity(20)
.capacity(200)
.insert(&conn);

// Insert a stopped stream in the first room.
let stream = factory::JanusRtcStream::new(USR_AUDIENCE)
.backend(&backend)
.rtc(&rtc1)
.insert(&conn);
let backend_id2 = {
let agent = TestAgent::new("alpha", "janus", SVC_AUDIENCE);
agent.agent_id().to_owned()
};

crate::db::janus_rtc_stream::start(stream.id(), &conn).unwrap();
crate::db::janus_rtc_stream::stop(stream.id(), &conn).unwrap();
let _backend2 =
factory::JanusBackend::new(backend_id2, rng.gen(), rng.gen())
.balancer_capacity(50)
.capacity(200)
.insert(&conn);

factory::Recording::new()
.rtc(&rtc1)
.backend(&backend)
// Add some load on the biggest one.
factory::Agent::new()
.agent_id(writer2.agent_id())
.room_id(room2.id())
.status(AgentStatus::Ready)
.insert(&conn);

rtc2
(rtc1, backend1)
})
.unwrap();

// Allow user to update the rtc.
let room_id = rtc.room_id().to_string();
let rtc_id = rtc.id().to_string();
let object = vec!["rooms", &room_id, "rtcs", &rtc_id];
authz.allow(writer.account_id(), object, "update");
authz.allow(writer1.account_id(), object, "update");

// Make an rtc.connect request.
// The reserve of the first room must be taken into account despited of the
// stream has been already stopped.
// Despite of none of the backends are capable to host the reserve it should
// select the least loaded one.
let mut context = TestContext::new(db, authz);

let payload = ConnectRequest {
id: rtc.id(),
intent: ConnectIntent::Write,
};

let err = handle_request::<ConnectHandler>(&mut context, &writer, payload)
let messages = handle_request::<ConnectHandler>(&mut context, &writer1, payload)
.await
.expect_err("Unexpected success on rtc connecting");
.expect("RTC connect failed");

assert_eq!(err.status_code(), ResponseStatus::SERVICE_UNAVAILABLE);
// Assert outgoing request goes to the expected backend.
let (_req, _reqp, topic) = find_request::<JanusAttachRequest>(messages.as_slice());

let expected_topic = format!(
"agents/{}/api/{}/in/{}",
backend.id(),
janus::JANUS_API_VERSION,
context.config().id,
);

assert_eq!(topic, &expected_topic);
});
}

Expand Down
58 changes: 58 additions & 0 deletions src/db/janus_backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,64 @@ pub(crate) fn most_loaded(room_id: Uuid, conn: &PgConnection) -> Result<Option<O
.optional()
}

// The same as above but finds the least loaded backend instead without considering the reserve.
const LEAST_LOADED_SQL: &str = r#"
WITH
room_load AS (
SELECT
room_id,
COUNT(id) AS taken
FROM agent
WHERE status = 'connected'
GROUP BY room_id
),
active_room AS (
SELECT *
FROM room
WHERE backend = 'janus'
AND NOW() < UPPER(time)
),
janus_backend_load AS (
SELECT
backend_id,
SUM(taken) AS load
FROM (
SELECT DISTINCT ON(backend_id, room_id)
jrs.backend_id,
rtc.room_id,
COALESCE(rl.taken, 0) AS taken
FROM janus_rtc_stream AS jrs
INNER JOIN rtc
ON rtc.id = jrs.rtc_id
LEFT JOIN active_room AS ar
ON ar.id = rtc.room_id
LEFT JOIN room_load AS rl
ON rl.room_id = rtc.room_id
WHERE LOWER(jrs.time) IS NOT NULL
) AS sub
GROUP BY backend_id
)
SELECT jb.*
FROM janus_backend AS jb
LEFT JOIN janus_backend_load AS jbl
ON jbl.backend_id = jb.id
LEFT JOIN room AS r2
ON 1 = 1
WHERE r2.id = $1
ORDER BY COALESCE(jb.balancer_capacity, jb.capacity, 2147483647) - COALESCE(jbl.load, 0)
LIMIT 1
"#;

pub(crate) fn least_loaded(room_id: Uuid, conn: &PgConnection) -> Result<Option<Object>, Error> {
use diesel::prelude::*;
use diesel::sql_types::Uuid;

diesel::sql_query(LEAST_LOADED_SQL)
.bind::<Uuid, _>(room_id)
.get_result(conn)
.optional()
}

////////////////////////////////////////////////////////////////////////////////

// Similar to the previous one but returns the number of free slots for the room on the backend
Expand Down
1 change: 0 additions & 1 deletion src/db/room.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,6 @@ impl Object {
self.backend
}

#[cfg(test)]
pub(crate) fn reserve(&self) -> Option<i32> {
self.reserve
}
Expand Down
13 changes: 13 additions & 0 deletions src/test_helpers/factory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@ pub(crate) struct JanusBackend {
handle_id: i64,
session_id: i64,
capacity: Option<i32>,
balancer_capacity: Option<i32>,
}

impl JanusBackend {
Expand All @@ -158,6 +159,7 @@ impl JanusBackend {
handle_id,
session_id,
capacity: None,
balancer_capacity: None,
}
}

Expand All @@ -168,13 +170,24 @@ impl JanusBackend {
}
}

pub(crate) fn balancer_capacity(self, balancer_capacity: i32) -> Self {
Self {
balancer_capacity: Some(balancer_capacity),
..self
}
}

pub(crate) fn insert(&self, conn: &PgConnection) -> db::janus_backend::Object {
let mut q = db::janus_backend::UpsertQuery::new(&self.id, self.handle_id, self.session_id);

if let Some(capacity) = self.capacity {
q = q.capacity(capacity);
}

if let Some(balancer_capacity) = self.balancer_capacity {
q = q.balancer_capacity(balancer_capacity);
}

q.execute(conn).expect("Failed to insert janus_backend")
}
}
Expand Down

0 comments on commit f797bbf

Please sign in to comment.