Skip to content
This repository has been archived by the owner on Aug 16, 2023. It is now read-only.

Commit

Permalink
Refactor backend binding to room (#187)
Browse files Browse the repository at this point in the history
  • Loading branch information
feymartynov committed Nov 30, 2020
1 parent a56ad3d commit 1c27500
Show file tree
Hide file tree
Showing 10 changed files with 304 additions and 386 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
ALTER TABLE recording ADD COLUMN backend_id agent_id;

UPDATE recording AS rec
SET backend_id = r.backend_id
FROM rtc,
room AS r
WHERE rtc.id = rec.rtc_id
AND r.id = rtc.room_id
AND r.backend = 'janus';

ALTER TABLE room DROP COLUMN backend_id;
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
ALTER TABLE room ADD COLUMN backend_id agent_id;

ALTER TABLE room
ADD CONSTRAINT room_backend_id_check
CHECK (backend_id IS NULL OR backend = 'janus');

UPDATE room AS r
SET backend_id = rec.backend_id
FROM rtc,
recording AS rec
WHERE rtc.room_id = r.id
AND rec.rtc_id = rtc.id
AND r.backend = 'janus';

ALTER TABLE recording DROP COLUMN backend_id;
345 changes: 127 additions & 218 deletions src/app/endpoint/rtc.rs

Large diffs are not rendered by default.

31 changes: 15 additions & 16 deletions src/app/endpoint/system.rs
Original file line number Diff line number Diff line change
Expand Up @@ -203,13 +203,11 @@ fn record_name(recording: &Recording) -> String {
#[cfg(test)]
mod test {
mod vacuum {
use chrono::{Duration, Utc};
use diesel::prelude::*;
use serde_json::Value as JsonValue;
use svc_agent::mqtt::ResponseStatus;

use crate::backend::janus::JANUS_API_VERSION;
use crate::db;
use crate::test_helpers::prelude::*;
use crate::test_helpers::{find_event_by_predicate, find_request_by_predicate};

Expand Down Expand Up @@ -243,28 +241,29 @@ mod test {
.connection_pool()
.get()
.map(|conn| {
// Insert an rtc and janus backend.
// Insert janus backend and rooms.
let backend = shared_helpers::insert_janus_backend(&conn);

let room1 =
shared_helpers::insert_closed_room_with_backend(&conn, &backend.id());

let room2 =
shared_helpers::insert_closed_room_with_backend(&conn, &backend.id());

// Insert rtcs.
let rtcs = vec![
shared_helpers::insert_rtc(&conn),
shared_helpers::insert_rtc(&conn),
shared_helpers::insert_rtc_with_room(&conn, &room1),
shared_helpers::insert_rtc_with_room(&conn, &room2),
];

let _other_rtc = shared_helpers::insert_rtc(&conn);
let backend = shared_helpers::insert_janus_backend(&conn);

// Insert active agents and close rooms.
let start = Utc::now() - Duration::hours(2);
let finish = start + Duration::hours(1);
let time = (Bound::Included(start), Bound::Excluded(finish));
// Insert active agents.
let agent = TestAgent::new("web", "user123", USR_AUDIENCE);

for rtc in rtcs.iter() {
shared_helpers::insert_agent(&conn, agent.agent_id(), rtc.room_id());
shared_helpers::insert_recording(&conn, rtc, &backend);

db::room::UpdateQuery::new(rtc.room_id().to_owned())
.time(Some(time))
.execute(&conn)
.unwrap();
shared_helpers::insert_recording(&conn, rtc);
}

(rtcs, backend)
Expand Down
82 changes: 31 additions & 51 deletions src/db/janus_backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,16 +94,16 @@ impl<'a> ListQuery<'a> {

////////////////////////////////////////////////////////////////////////////////

pub(crate) struct FindQuery {
id: Option<AgentId>,
pub(crate) struct FindQuery<'a> {
id: Option<&'a AgentId>,
}

impl FindQuery {
impl<'a> FindQuery<'a> {
pub(crate) fn new() -> Self {
Self { id: None }
}

pub(crate) fn id(self, id: AgentId) -> Self {
pub(crate) fn id(self, id: &'a AgentId) -> Self {
Self { id: Some(id) }
}

Expand Down Expand Up @@ -208,6 +208,7 @@ const MOST_LOADED_SQL: &str = r#"
SELECT *
FROM room
WHERE backend = 'janus'
AND backend_id IS NOT NULL
AND UPPER(time) BETWEEN NOW() AND NOW() + INTERVAL '1 day'
),
janus_backend_load AS (
Expand All @@ -216,18 +217,13 @@ const MOST_LOADED_SQL: &str = r#"
SUM(GREATEST(taken, reserve)) AS load
FROM (
SELECT DISTINCT ON(backend_id, room_id)
rec.backend_id,
rtc.room_id,
ar.backend_id,
ar.id AS room_id,
COALESCE(rl.taken, 0) AS taken,
COALESCE(ar.reserve, 0) AS reserve
FROM recording AS rec
INNER JOIN rtc
ON rtc.id = rec.rtc_id
LEFT JOIN active_room AS ar
ON ar.id = rtc.room_id
FROM active_room AS ar
LEFT JOIN room_load AS rl
ON rl.room_id = rtc.room_id
WHERE rec.status = 'in_progress'
ON rl.room_id = ar.id
) AS sub
GROUP BY backend_id
)
Expand Down Expand Up @@ -268,6 +264,7 @@ const LEAST_LOADED_SQL: &str = r#"
SELECT *
FROM room
WHERE backend = 'janus'
AND backend_id IS NOT NULL
AND UPPER(time) BETWEEN NOW() AND NOW() + INTERVAL '1 day'
),
janus_backend_load AS (
Expand All @@ -276,17 +273,12 @@ const LEAST_LOADED_SQL: &str = r#"
SUM(taken) AS load
FROM (
SELECT DISTINCT ON(backend_id, room_id)
rec.backend_id,
rtc.room_id,
ar.backend_id,
ar.id AS room_id,
COALESCE(rl.taken, 0) AS taken
FROM recording AS rec
INNER JOIN rtc
ON rtc.id = rec.rtc_id
LEFT JOIN active_room AS ar
ON ar.id = rtc.room_id
FROM active_room AS ar
LEFT JOIN room_load AS rl
ON rl.room_id = rtc.room_id
WHERE rec.status = 'in_progress'
ON rl.room_id = ar.id
) AS sub
GROUP BY backend_id
)
Expand Down Expand Up @@ -332,6 +324,7 @@ const FREE_CAPACITY_SQL: &str = r#"
SELECT *
FROM room
WHERE backend = 'janus'
AND backend_id IS NOT NULL
AND UPPER(time) BETWEEN NOW() AND NOW() + INTERVAL '1 day'
),
janus_backend_load AS (
Expand All @@ -342,18 +335,13 @@ const FREE_CAPACITY_SQL: &str = r#"
SUM(GREATEST(taken, reserve)) AS load
FROM (
SELECT DISTINCT ON(backend_id, room_id)
rec.backend_id,
rtc.room_id,
ar.backend_id,
ar.id AS room_id,
COALESCE(rl.taken, 0) AS taken,
COALESCE(ar.reserve, 0) AS reserve
FROM recording AS rec
INNER JOIN rtc
ON rtc.id = rec.rtc_id
LEFT JOIN active_room AS ar
ON ar.id = rtc.room_id
FROM active_room AS ar
LEFT JOIN room_load AS rl
ON rl.room_id = rtc.room_id
WHERE rec.status = 'in_progress'
ON rl.room_id = ar.id
) AS sub
GROUP BY backend_id
)
Expand All @@ -379,11 +367,8 @@ const FREE_CAPACITY_SQL: &str = r#"
ON ar.id = rtc.room_id
LEFT JOIN room_load as rl
ON rl.room_id = rtc.room_id
LEFT JOIN recording AS rec
ON rec.rtc_id = rtc.id
AND rec.status = 'in_progress'
LEFT JOIN janus_backend AS jb
ON jb.id = rec.backend_id
ON jb.id = ar.backend_id
LEFT JOIN janus_backend_load AS jbl
ON jbl.backend_id = jb.id
WHERE rtc.id = $1
Expand Down Expand Up @@ -458,6 +443,7 @@ WITH
SELECT *
FROM room
WHERE backend = 'janus'
AND backend_id IS NOT NULL
AND UPPER(time) BETWEEN NOW() AND NOW() + INTERVAL '1 day'
),
janus_backend_load AS (
Expand All @@ -467,18 +453,13 @@ WITH
SUM(taken) AS taken
FROM (
SELECT DISTINCT ON(backend_id, room_id)
rec.backend_id,
rtc.room_id,
ar.backend_id,
ar.id AS room_id,
COALESCE(rl.taken, 0) AS taken,
COALESCE(ar.reserve, 0) AS reserve
FROM recording AS rec
INNER JOIN rtc
ON rtc.id = rec.rtc_id
LEFT JOIN active_room AS ar
ON ar.id = rtc.room_id
FROM active_room AS ar
LEFT JOIN room_load AS rl
ON rl.room_id = rtc.room_id
WHERE rec.status = 'in_progress'
ON rl.room_id = ar.id
) AS sub
GROUP BY backend_id
)
Expand Down Expand Up @@ -522,6 +503,7 @@ mod tests {
Bound::Excluded(now + Duration::hours(1)),
))
.backend(RoomBackend::Janus)
.backend_id(backend1.id())
.reserve(200)
.insert(&conn);

Expand All @@ -533,6 +515,7 @@ mod tests {
))
.reserve(300)
.backend(RoomBackend::Janus)
.backend_id(backend1.id())
.insert(&conn);

let room3 = factory::Room::new()
Expand All @@ -543,15 +526,12 @@ mod tests {
))
.reserve(400)
.backend(RoomBackend::Janus)
.backend_id(backend2.id())
.insert(&conn);

let rtc1 = factory::Rtc::new(room1.id()).insert(&conn);
let rtc2 = factory::Rtc::new(room2.id()).insert(&conn);
let rtc3 = factory::Rtc::new(room3.id()).insert(&conn);

shared_helpers::insert_recording(&conn, &rtc1, &backend1);
shared_helpers::insert_recording(&conn, &rtc2, &backend1);
shared_helpers::insert_recording(&conn, &rtc3, &backend2);
shared_helpers::insert_rtc_with_room(&conn, &room1);
shared_helpers::insert_rtc_with_room(&conn, &room2);
shared_helpers::insert_rtc_with_room(&conn, &room3);

let loads = super::reserve_load_for_each_backend(&conn).expect("Db query failed");
assert_eq!(loads.len(), 3);
Expand Down
41 changes: 4 additions & 37 deletions src/db/recording.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,8 @@ use std::ops::Bound;
use chrono::{DateTime, Utc};
use diesel::{pg::PgConnection, result::Error};
use serde_derive::{Deserialize, Serialize};
use svc_agent::AgentId;
use uuid::Uuid;

use super::janus_backend::Object as JanusBackend;
use super::rtc::Object as Rtc;
use crate::schema::recording;

Expand All @@ -18,15 +16,13 @@ pub(crate) type AllColumns = (
recording::started_at,
recording::segments,
recording::status,
recording::backend_id,
);

pub(crate) const ALL_COLUMNS: AllColumns = (
recording::rtc_id,
recording::started_at,
recording::segments,
recording::status,
recording::backend_id,
);

////////////////////////////////////////////////////////////////////////////////
Expand Down Expand Up @@ -55,7 +51,6 @@ impl fmt::Display for Status {

#[derive(Debug, Serialize, Identifiable, Associations, Queryable)]
#[belongs_to(Rtc, foreign_key = "rtc_id")]
#[belongs_to(JanusBackend, foreign_key = "backend_id")]
#[primary_key(rtc_id)]
#[table_name = "recording"]
pub(crate) struct Object {
Expand All @@ -64,7 +59,6 @@ pub(crate) struct Object {
started_at: Option<DateTime<Utc>>,
segments: Option<Vec<Segment>>,
status: Status,
backend_id: AgentId,
}

impl Object {
Expand All @@ -83,46 +77,19 @@ impl Object {
pub(crate) fn status(&self) -> &Status {
&self.status
}

pub(crate) fn backend_id(&self) -> &AgentId {
&self.backend_id
}
}

////////////////////////////////////////////////////////////////////////////////

#[derive(Debug)]
pub(crate) struct FindQuery {
rtc_id: Uuid,
}

impl FindQuery {
pub(crate) fn new(rtc_id: Uuid) -> Self {
Self { rtc_id }
}

pub(crate) fn execute(&self, conn: &PgConnection) -> Result<Option<Object>, Error> {
use diesel::prelude::*;

recording::table
.find(self.rtc_id)
.get_result(conn)
.optional()
}
}

////////////////////////////////////////////////////////////////////////////////

#[derive(Debug, Insertable)]
#[table_name = "recording"]
pub(crate) struct InsertQuery<'a> {
pub(crate) struct InsertQuery {
rtc_id: Uuid,
backend_id: &'a AgentId,
}

impl<'a> InsertQuery<'a> {
pub(crate) fn new(rtc_id: Uuid, backend_id: &'a AgentId) -> Self {
Self { rtc_id, backend_id }
impl InsertQuery {
pub(crate) fn new(rtc_id: Uuid) -> Self {
Self { rtc_id }
}

pub(crate) fn execute(self, conn: &PgConnection) -> Result<Object, Error> {
Expand Down
Loading

0 comments on commit 1c27500

Please sign in to comment.