Skip to content
This repository has been archived by the owner on Aug 16, 2023. It is now read-only.

Commit

Permalink
Consider reserve in stream pauses (#141)
Browse files Browse the repository at this point in the history
  • Loading branch information
feymartynov authored Aug 26, 2020
1 parent b8bfb77 commit 3e262f0
Show file tree
Hide file tree
Showing 2 changed files with 148 additions and 34 deletions.
88 changes: 87 additions & 1 deletion src/app/endpoint/rtc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1264,7 +1264,7 @@ mod test {
})
.unwrap();

// Allow user to read the rtc.
// Allow user to update the rtc.
let room_id = rtc.room_id().to_string();
let rtc_id = rtc.id().to_string();
let object = vec!["rooms", &room_id, "rtcs", &rtc_id];
Expand All @@ -1284,6 +1284,92 @@ mod test {
});
}

#[test]
fn connect_to_rtc_reservation_with_stopped_stream() {
async_std::task::block_on(async {
let mut rng = rand::thread_rng();
let db = TestDb::new();
let mut authz = TestAuthz::new();
let writer = TestAgent::new("web", "writer", USR_AUDIENCE);

let rtc = db
.connection_pool()
.get()
.map(|conn| {
let now = Utc::now();

// Insert rooms with reserves.
let room1 = factory::Room::new()
.audience(USR_AUDIENCE)
.time((
Bound::Included(now),
Bound::Excluded(now + Duration::hours(1)),
))
.backend(RoomBackend::Janus)
.reserve(15)
.insert(&conn);

let room2 = factory::Room::new()
.audience(USR_AUDIENCE)
.time((
Bound::Included(now),
Bound::Excluded(now + Duration::hours(1)),
))
.backend(RoomBackend::Janus)
.reserve(15)
.insert(&conn);

// Insert rtcs.
let rtc1 = factory::Rtc::new(room1.id()).insert(&conn);
let rtc2 = factory::Rtc::new(room2.id()).insert(&conn);

// Insert a backend with capacity less than the sum of reserves.
let backend_id = {
let agent = TestAgent::new("alpha", "janus", SVC_AUDIENCE);
agent.agent_id().to_owned()
};

let backend = factory::JanusBackend::new(backend_id, rng.gen(), rng.gen())
.capacity(20)
.insert(&conn);

// Insert a stopped stream in the first room.
let stream = factory::JanusRtcStream::new(USR_AUDIENCE)
.backend(&backend)
.rtc(&rtc1)
.insert(&conn);

crate::db::janus_rtc_stream::start(stream.id(), &conn).unwrap();
crate::db::janus_rtc_stream::stop(stream.id(), &conn).unwrap();

rtc2
})
.unwrap();

// Allow user to update the rtc.
let room_id = rtc.room_id().to_string();
let rtc_id = rtc.id().to_string();
let object = vec!["rooms", &room_id, "rtcs", &rtc_id];
authz.allow(writer.account_id(), object, "update");

// Make an rtc.connect request.
// The reserve of the first room must be taken into account despited of the
// stream has been already stopped.
let context = TestContext::new(db, authz);

let payload = ConnectRequest {
id: rtc.id(),
intent: ConnectIntent::Write,
};

let err = handle_request::<ConnectHandler>(&context, &writer, payload)
.await
.expect_err("Unexpected success on rtc connecting");

assert_eq!(err.status_code(), ResponseStatus::SERVICE_UNAVAILABLE);
});
}

#[test]
fn connect_to_rtc_not_authorized() {
async_std::task::block_on(async {
Expand Down
94 changes: 61 additions & 33 deletions src/db/janus_backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -177,33 +177,46 @@ impl<'a> DeleteQuery<'a> {
////////////////////////////////////////////////////////////////////////////////

// Returns the least loaded backend capable to host the room with its reserve considering:
// - room opening period;
// - actual number of online agents;
// - optional backend capacity;
// - optional reserved capacity.
// - optional room reserve.
const LEAST_LOADED_SQL: &str = r#"
WITH
room_load AS (
SELECT
r.id AS room_id,
COALESCE(r.reserve, 0) AS reserve,
COUNT(a.id) AS taken
FROM room AS r
LEFT JOIN agent AS a
ON a.room_id = r.id
WHERE a.status = 'connected'
GROUP BY r.id
room_id,
COUNT(id) AS taken
FROM agent
WHERE status = 'connected'
GROUP BY room_id
),
active_room AS (
SELECT *
FROM room
WHERE backend = 'janus'
AND NOW() < UPPER(time)
),
janus_backend_load AS (
SELECT
jrs.backend_id,
SUM(GREATEST(rl.taken, rl.reserve)) AS load
FROM room_load as rl
LEFT JOIN rtc
ON rtc.room_id = rl.room_id
LEFT JOIN janus_rtc_stream AS jrs
ON jrs.rtc_id = rtc.id
WHERE LOWER(jrs.time) IS NOT NULL AND UPPER(jrs.time) IS NULL
GROUP BY jrs.backend_id
backend_id,
SUM(GREATEST(taken, reserve)) AS load
FROM (
SELECT DISTINCT ON(backend_id, room_id)
jrs.backend_id,
rtc.room_id,
COALESCE(rl.taken, 0) AS taken,
COALESCE(ar.reserve, 0) AS reserve
FROM janus_rtc_stream AS jrs
INNER JOIN rtc
ON rtc.id = jrs.rtc_id
LEFT JOIN active_room AS ar
ON ar.id = rtc.room_id
LEFT JOIN room_load AS rl
ON rl.room_id = rtc.room_id
WHERE LOWER(jrs.time) IS NOT NULL
) AS sub
GROUP BY backend_id
)
SELECT jb.*
FROM janus_backend AS jb
Expand Down Expand Up @@ -235,26 +248,41 @@ const FREE_CAPACITY_SQL: &str = r#"
WITH
room_load AS (
SELECT
r.id AS room_id,
COALESCE(r.reserve, 0) AS reserve,
COUNT(a.id) AS taken
FROM room AS r
LEFT JOIN agent AS a
ON a.room_id = r.id
r.id AS room_id,
r.reserve,
COUNT(a.id) AS taken
FROM agent AS a
INNER JOIN room AS r
ON r.id = a.room_id
WHERE a.status = 'connected'
GROUP BY r.id
),
active_room AS (
SELECT *
FROM room
WHERE backend = 'janus'
AND NOW() < UPPER(time)
),
janus_backend_load AS (
SELECT
jrs.backend_id,
SUM(GREATEST(rl.taken, rl.reserve)) AS load
FROM room_load as rl
LEFT JOIN rtc
ON rtc.room_id = rl.room_id
LEFT JOIN janus_rtc_stream AS jrs
ON jrs.rtc_id = rtc.id
WHERE LOWER(jrs.time) IS NOT NULL AND UPPER(jrs.time) IS NULL
GROUP BY jrs.backend_id
backend_id,
SUM(GREATEST(taken, reserve)) AS load
FROM (
SELECT DISTINCT ON(backend_id, room_id)
jrs.backend_id,
rtc.room_id,
COALESCE(rl.taken, 0) AS taken,
COALESCE(ar.reserve, 0) AS reserve
FROM janus_rtc_stream AS jrs
INNER JOIN rtc
ON rtc.id = jrs.rtc_id
LEFT JOIN active_room AS ar
ON ar.id = rtc.room_id
LEFT JOIN room_load AS rl
ON rl.room_id = rtc.room_id
WHERE LOWER(jrs.time) IS NOT NULL
) AS sub
GROUP BY backend_id
)
SELECT
GREATEST(
Expand Down

0 comments on commit 3e262f0

Please sign in to comment.