Skip to content
This repository has been archived by the owner on Aug 16, 2023. It is now read-only.

Commit

Permalink
allow connections until capacity reached in free_capacity_sql
Browse files Browse the repository at this point in the history
  • Loading branch information
khodzha committed Oct 12, 2020
1 parent ccf06a9 commit 6795504
Show file tree
Hide file tree
Showing 2 changed files with 178 additions and 5 deletions.
159 changes: 159 additions & 0 deletions src/app/endpoint/rtc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1495,6 +1495,165 @@ mod test {
});
}

#[test]
fn connect_to_rtc_reserve_overflow() {
async_std::task::block_on(async {
let mut rng = rand::thread_rng();
let db = TestDb::new();
let mut authz = TestAuthz::new();
let new_reader = TestAgent::new("web", "new-reader", USR_AUDIENCE);

let (rtcs, backend) = db
.connection_pool()
.get()
.map(|conn| {
let now = Utc::now();

// Lets say we have a single backend with cap=800
// Somehow reserves of all rooms that were allocated to it overflow its capacity
// We should allow users to connect to rooms with reserves if reserve and cap allows them
// But not allow to connect to room with no reserve

// Setup three rooms with 500, 600 and none.
let room1 = factory::Room::new()
.audience(USR_AUDIENCE)
.time((
Bound::Included(now),
Bound::Excluded(now + Duration::hours(1)),
))
.backend(RoomBackend::Janus)
.reserve(500)
.insert(&conn);

let room2 = factory::Room::new()
.audience(USR_AUDIENCE)
.time((
Bound::Included(now),
Bound::Excluded(now + Duration::hours(1)),
))
.reserve(600)
.backend(RoomBackend::Janus)
.insert(&conn);

let room3 = factory::Room::new()
.audience(USR_AUDIENCE)
.time((
Bound::Included(now),
Bound::Excluded(now + Duration::hours(1)),
))
.backend(RoomBackend::Janus)
.insert(&conn);

// Insert rtcs for each room.
let rtc1 = factory::Rtc::new(room1.id()).insert(&conn);
let rtc2 = factory::Rtc::new(room2.id()).insert(&conn);
let rtc3 = factory::Rtc::new(room3.id()).insert(&conn);

// Insert alpha backend.
let backend1 = {
let agent = TestAgent::new("alpha", "janus", SVC_AUDIENCE);
let id = agent.agent_id().to_owned();
factory::JanusBackend::new(id, rng.gen(), rng.gen())
.balancer_capacity(700)
.capacity(800)
.insert(&conn)
};

// Insert writer for room 1
factory::Agent::new()
.agent_id(TestAgent::new("web", "writer1", USR_AUDIENCE).agent_id())
.room_id(room1.id())
.status(AgentStatus::Connected)
.insert(&conn);

// Insert 450 readers for room 1
for i in 0..450 {
factory::Agent::new()
.agent_id(
TestAgent::new("web", &format!("reader1-{}", i), USR_AUDIENCE)
.agent_id(),
)
.room_id(room1.id())
.status(AgentStatus::Connected)
.insert(&conn);
}

// Insert writer for room 3
factory::Agent::new()
.agent_id(TestAgent::new("web", "writer3", USR_AUDIENCE).agent_id())
.room_id(room2.id())
.status(AgentStatus::Connected)
.insert(&conn);

// Insert reader for room 3
factory::Agent::new()
.agent_id(TestAgent::new("web", "reader3", USR_AUDIENCE).agent_id())
.room_id(room3.id())
.status(AgentStatus::Connected)
.insert(&conn);

// We need these in_progress recordings since they bind each room to its respective backend
shared_helpers::insert_recording(&conn, &rtc1, &backend1);
shared_helpers::insert_recording(&conn, &rtc2, &backend1);
shared_helpers::insert_recording(&conn, &rtc3, &backend1);

([rtc1, rtc2, rtc3], backend1)
})
.unwrap();

// Allow user to read the rtcs.
for rtc in rtcs.iter() {
let room_id = rtc.room_id().to_string();
let rtc_id = rtc.id().to_string();
let object = vec!["rooms", &room_id, "rtcs", &rtc_id];
authz.allow(new_reader.account_id(), object, "read");
}

let mut context = TestContext::new(db, authz);

// First two rooms have reserves AND there is free capacity so we can connect to them
for rtc in rtcs.iter().take(2) {
let payload = ConnectRequest {
id: rtc.id(),
intent: ConnectIntent::Read,
};

// Make an rtc.connect request.
let messages =
handle_request::<ConnectHandler>(&mut context, &new_reader, payload)
.await
.expect("RTC connect failed");

// Assert outgoing request goes to the expected backend.
let (_req, _reqp, topic) =
find_request::<JanusAttachRequest>(messages.as_slice());

let expected_topic = format!(
"agents/{}/api/{}/in/{}",
backend.id(),
janus::JANUS_API_VERSION,
context.config().id,
);

assert_eq!(topic, &expected_topic);
}

let payload = ConnectRequest {
id: rtcs[2].id(),
intent: ConnectIntent::Read,
};

// Last room has NO reserve AND there is free capacity BUT it was exhausted by first two rooms
// So we cant connect to this room
let err = handle_request::<ConnectHandler>(&mut context, &new_reader, payload)
.await
.expect_err("Connected to RTC while expected capacity exceeded error");

assert_eq!(err.status_code(), ResponseStatus::SERVICE_UNAVAILABLE);
assert_eq!(err.kind(), "capacity_exceeded");
});
}

#[test]
fn connect_to_rtc_not_authorized() {
async_std::task::block_on(async {
Expand Down
24 changes: 19 additions & 5 deletions src/db/janus_backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -337,6 +337,8 @@ const FREE_CAPACITY_SQL: &str = r#"
janus_backend_load AS (
SELECT
backend_id,
SUM(taken) AS total_taken,
SUM(reserve) AS total_reserve,
SUM(GREATEST(taken, reserve)) AS load
FROM (
SELECT DISTINCT ON(backend_id, room_id)
Expand All @@ -356,13 +358,25 @@ const FREE_CAPACITY_SQL: &str = r#"
GROUP BY backend_id
)
SELECT
GREATEST(
COALESCE(jb.capacity, 2147483647)
- COALESCE(jbl.load, 0)
+ GREATEST(COALESCE(rl.reserve, 0) - COALESCE(rl.taken, 0), 0),
0
(
CASE
WHEN COALESCE(jb.capacity, 2147483647) <= COALESCE(jbl.total_taken, 0) THEN 0
ELSE (
CASE
WHEN COALESCE(ar.reserve, 0) > COALESCE(rl.taken, 0)
THEN LEAST(
COALESCE(ar.reserve, 0) - COALESCE(rl.taken, 0),
COALESCE(jb.capacity, 2147483647) - COALESCE(jbl.total_taken, 0)
)
ELSE
GREATEST(COALESCE(jb.capacity, 2147483647) - COALESCE(jbl.load, 0), 0)
END
)
END
)::INT AS free_capacity
FROM rtc
LEFT JOIN active_room AS ar
ON ar.id = rtc.room_id
LEFT JOIN room_load as rl
ON rl.room_id = rtc.room_id
LEFT JOIN recording AS rec
Expand Down

0 comments on commit 6795504

Please sign in to comment.