Skip to content
This repository has been archived by the owner on Aug 16, 2023. It is now read-only.

Commit

Permalink
fixed least loaded query ordering
Browse files Browse the repository at this point in the history
  • Loading branch information
khodzha committed Oct 12, 2020
1 parent cb6c9ae commit ccf06a9
Show file tree
Hide file tree
Showing 2 changed files with 80 additions and 32 deletions.
110 changes: 79 additions & 31 deletions src/app/endpoint/rtc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -304,17 +304,20 @@ impl RequestHandler for ConnectHandler {
None => db::janus_backend::least_loaded(room.id(), &conn)?
.map(|backend| {
use sentry::protocol::{value::Value, Event, Level};
let backend_id = backend.id().to_string();

warn!(crate::LOG, "No capable backends to host the reserve; falling back to the least loaded backend: room_id = {}, rtc_id = {}, backend_id = {}", room_id, rtc_id, backend_id);

let mut extra = std::collections::BTreeMap::new();
extra.insert(String::from("room_id"), Value::from(room_id));
extra.insert(String::from("rtc_id"), Value::from(rtc_id));
let backend_id = backend.id().to_string();
extra.insert(String::from("backend_id"), Value::from(backend_id));

if let Some(reserve) = room.reserve() {
extra.insert(String::from("reserve"), Value::from(reserve));
}


sentry::capture_event(Event {
message: Some(String::from("No capable backends to host the reserve; falling back to the least loaded backend")),
level: Level::Warning,
Expand Down Expand Up @@ -1346,24 +1349,31 @@ mod test {
let mut rng = rand::thread_rng();
let db = TestDb::new();
let mut authz = TestAuthz::new();
let writer1 = TestAgent::new("web", "writer1", USR_AUDIENCE);
let writer2 = TestAgent::new("web", "writer2", USR_AUDIENCE);
let new_writer = TestAgent::new("web", "new-writer", USR_AUDIENCE);

let (rtc, backend) = db
.connection_pool()
.get()
.map(|conn| {
let now = Utc::now();

// Insert room with big reserve and another one for load.
// We have two backends with cap=800 and balance_cap=700 each
// We have two rooms with reserves 500 and 600, each at its own backend
// Room with reserve 500 has 1 writer and 2 readers, ie its load is 3
// Room with reserve 600 has 1 writer and 1 readers, ie its load is 2
// We want to balance a room with reserve 400
// Since it doesnt fit anywhere it should go to backend with smallest current load,
// ie to backend 2 (though it has only 100 free reserve, and backend1 has 200 free reserve)

// Setup three rooms with 500, 600 and 400 reserves.
let room1 = factory::Room::new()
.audience(USR_AUDIENCE)
.time((
Bound::Included(now),
Bound::Excluded(now + Duration::hours(1)),
))
.backend(RoomBackend::Janus)
.reserve(100)
.reserve(500)
.insert(&conn);

let room2 = factory::Room::new()
Expand All @@ -1372,55 +1382,93 @@ mod test {
Bound::Included(now),
Bound::Excluded(now + Duration::hours(1)),
))
.reserve(600)
.backend(RoomBackend::Janus)
.insert(&conn);

// Insert rtcs.
let room3 = factory::Room::new()
.audience(USR_AUDIENCE)
.time((
Bound::Included(now),
Bound::Excluded(now + Duration::hours(1)),
))
.reserve(400)
.backend(RoomBackend::Janus)
.insert(&conn);

// Insert rtcs for each room.
let rtc1 = factory::Rtc::new(room1.id()).insert(&conn);
let _rtc2 = factory::Rtc::new(room2.id()).insert(&conn);
let rtc2 = factory::Rtc::new(room2.id()).insert(&conn);
let rtc3 = factory::Rtc::new(room3.id()).insert(&conn);

// Insert backends with low balancer capacity.
let backend_id1 = {
// Insert alpha and beta backends.
let backend1 = {
let agent = TestAgent::new("alpha", "janus", SVC_AUDIENCE);
agent.agent_id().to_owned()
let id = agent.agent_id().to_owned();
factory::JanusBackend::new(id, rng.gen(), rng.gen())
.balancer_capacity(700)
.capacity(800)
.insert(&conn)
};

let backend1 =
factory::JanusBackend::new(backend_id1, rng.gen(), rng.gen())
.balancer_capacity(20)
.capacity(200)
.insert(&conn);

let backend_id2 = {
let agent = TestAgent::new("alpha", "janus", SVC_AUDIENCE);
agent.agent_id().to_owned()
let backend2 = {
let agent = TestAgent::new("beta", "janus", SVC_AUDIENCE);
let id = agent.agent_id().to_owned();
factory::JanusBackend::new(id, rng.gen(), rng.gen())
.balancer_capacity(700)
.capacity(800)
.insert(&conn)
};

let _backend2 =
factory::JanusBackend::new(backend_id2, rng.gen(), rng.gen())
.balancer_capacity(50)
.capacity(200)
.insert(&conn);
// Insert writer for room 1 @ backend 1
factory::Agent::new()
.agent_id(TestAgent::new("web", "writer1", USR_AUDIENCE).agent_id())
.room_id(room1.id())
.status(AgentStatus::Connected)
.insert(&conn);

// Add some load on the biggest one.
// Insert two readers for room 1 @ backend 1
factory::Agent::new()
.agent_id(writer2.agent_id())
.agent_id(TestAgent::new("web", "reader1-1", USR_AUDIENCE).agent_id())
.room_id(room1.id())
.status(AgentStatus::Connected)
.insert(&conn);
factory::Agent::new()
.agent_id(TestAgent::new("web", "reader1-2", USR_AUDIENCE).agent_id())
.room_id(room1.id())
.status(AgentStatus::Connected)
.insert(&conn);

// Insert writer for room 2 @ backend 2
factory::Agent::new()
.agent_id(TestAgent::new("web", "writer2", USR_AUDIENCE).agent_id())
.room_id(room2.id())
.status(AgentStatus::Ready)
.status(AgentStatus::Connected)
.insert(&conn);

(rtc1, backend1)
// Insert reader for room 2 @ backend 2
factory::Agent::new()
.agent_id(TestAgent::new("web", "reader2", USR_AUDIENCE).agent_id())
.room_id(room2.id())
.status(AgentStatus::Connected)
.insert(&conn);

// We need these in_progress recordings since they bind each room to its respective backend
shared_helpers::insert_recording(&conn, &rtc1, &backend1);
shared_helpers::insert_recording(&conn, &rtc2, &backend2);

(rtc3, backend2)
})
.unwrap();

// Allow user to update the rtc.
let room_id = rtc.room_id().to_string();
let rtc_id = rtc.id().to_string();
let object = vec!["rooms", &room_id, "rtcs", &rtc_id];
authz.allow(writer1.account_id(), object, "update");
authz.allow(new_writer.account_id(), object, "update");

// Make an rtc.connect request.
// Despite of none of the backends are capable to host the reserve it should
// Despite none of the backends are capable to host the reserve it should
// select the least loaded one.
let mut context = TestContext::new(db, authz);

Expand All @@ -1429,7 +1477,7 @@ mod test {
intent: ConnectIntent::Write,
};

let messages = handle_request::<ConnectHandler>(&mut context, &writer1, payload)
let messages = handle_request::<ConnectHandler>(&mut context, &new_writer, payload)
.await
.expect("RTC connect failed");

Expand Down
2 changes: 1 addition & 1 deletion src/db/janus_backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -297,7 +297,7 @@ const LEAST_LOADED_SQL: &str = r#"
LEFT JOIN room AS r2
ON 1 = 1
WHERE r2.id = $1
ORDER BY COALESCE(jb.balancer_capacity, jb.capacity, 2147483647) - COALESCE(jbl.load, 0)
ORDER BY COALESCE(jb.balancer_capacity, jb.capacity, 2147483647) - COALESCE(jbl.load, 0) DESC
LIMIT 1
"#;

Expand Down

0 comments on commit ccf06a9

Please sign in to comment.