Skip to content
This repository has been archived by the owner on Aug 16, 2023. It is now read-only.

Commit

Permalink
filter rooms in finished_without_recordings by backend
Browse files Browse the repository at this point in the history
  • Loading branch information
khodzha committed Aug 19, 2020
1 parent 6d3b0b7 commit e6a577a
Show file tree
Hide file tree
Showing 4 changed files with 121 additions and 35 deletions.
54 changes: 23 additions & 31 deletions src/app/endpoint/system.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,40 +70,32 @@ impl RequestHandler for VacuumHandler {
.authorize(audience, reqp, vec!["system"], "update")
.await?;

// TODO: Update 'finished_without_recordings' in order to return (backend,room,rtc)
let backends = {
let mut requests = Vec::new();

let rooms = {
let conn = context.db().get()?;
db::janus_backend::ListQuery::new().execute(&conn)?
db::room::finished_without_recordings(&conn)?
};

let mut requests = Vec::new();
for backend in backends {
// Retrieve all the finished rooms without recordings.
let rooms = {
let conn = context.db().get()?;
db::room::finished_without_recordings(&conn)?
};

for (room, rtc) in rooms.into_iter() {
// TODO: Send the error as an event to "app/${APP}/audiences/${AUD}" topic
let backreq = janus::upload_stream_request(
reqp,
backend.session_id(),
backend.handle_id(),
janus::UploadStreamRequestBody::new(
rtc.id(),
&bucket_name(&room),
&record_name(&rtc),
),
backend.id(),
context.agent_id(),
start_timestamp,
)
.map_err(|err| format!("error creating a backend request: {}", err))
.status(ResponseStatus::UNPROCESSABLE_ENTITY)?;

requests.push(Box::new(backreq) as Box<dyn IntoPublishableMessage + Send>);
}
for (room, rtc, backend) in rooms.into_iter() {
// TODO: Send the error as an event to "app/${APP}/audiences/${AUD}" topic
let backreq = janus::upload_stream_request(
reqp,
backend.session_id(),
backend.handle_id(),
janus::UploadStreamRequestBody::new(
rtc.id(),
&bucket_name(&room),
&record_name(&rtc),
),
backend.id(),
context.agent_id(),
start_timestamp,
)
.map_err(|err| format!("error creating a backend request: {}", err))
.status(ResponseStatus::UNPROCESSABLE_ENTITY)?;

requests.push(Box::new(backreq) as Box<dyn IntoPublishableMessage + Send>);
}

Ok(Box::new(stream::from_iter(requests)))
Expand Down
15 changes: 15 additions & 0 deletions src/db/janus_backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,21 @@ use uuid::Uuid;
use crate::db::agent::Status as AgentStatus;
use crate::schema::{agent, janus_backend, janus_rtc_stream, rtc};

type AllColumns = (
janus_backend::id,
janus_backend::handle_id,
janus_backend::session_id,
janus_backend::created_at,
janus_backend::capacity,
);
pub const ALL_COLUMNS: AllColumns = (
janus_backend::id,
janus_backend::handle_id,
janus_backend::session_id,
janus_backend::created_at,
janus_backend::capacity,
);

////////////////////////////////////////////////////////////////////////////////

#[derive(Debug, Identifiable, Queryable, QueryableByName, Associations)]
Expand Down
62 changes: 59 additions & 3 deletions src/db/room.rs
Original file line number Diff line number Diff line change
Expand Up @@ -186,16 +186,33 @@ impl FindQuery {
// room3 | rtc4 | null room3 | null | null
pub(crate) fn finished_without_recordings(
conn: &PgConnection,
) -> Result<Vec<(self::Object, super::rtc::Object)>, Error> {
) -> Result<
Vec<(
self::Object,
super::rtc::Object,
super::janus_backend::Object,
)>,
Error,
> {
use crate::schema;
use diesel::{dsl::sql, prelude::*};

schema::room::table
.inner_join(schema::rtc::table.left_join(schema::recording::table))
.inner_join(
schema::rtc::table
.left_join(schema::recording::table)
.left_join(
schema::janus_rtc_stream::table.inner_join(schema::janus_backend::table),
),
)
.filter(room::backend.ne(RoomBackend::None))
.filter(schema::recording::rtc_id.is_null())
.filter(sql("upper(\"room\".\"time\") < now()"))
.select((self::ALL_COLUMNS, super::rtc::ALL_COLUMNS))
.select((
self::ALL_COLUMNS,
super::rtc::ALL_COLUMNS,
super::janus_backend::ALL_COLUMNS,
))
.load(conn)
}

Expand Down Expand Up @@ -305,3 +322,42 @@ impl UpdateQuery {
diesel::update(self).set(self).get_result(conn)
}
}

#[cfg(test)]
mod tests {
mod finished_without_recordings {
use super::super::*;
use crate::test_helpers::prelude::*;

#[test]
fn selects_appropriate_backend() {
let db = TestDb::new();
let pool = db.connection_pool();
let conn = pool.get().expect("Failed to get db connection");

let room1 = shared_helpers::insert_closed_room(&conn);
let room2 = shared_helpers::insert_closed_room(&conn);
let backend1 = shared_helpers::insert_janus_backend(&conn);
let backend2 = shared_helpers::insert_janus_backend(&conn);
let rtc1 = shared_helpers::insert_rtc_with_room(&conn, &room1);
let rtc2 = shared_helpers::insert_rtc_with_room(&conn, &room2);
shared_helpers::insert_janus_rtc_stream(&conn, &backend1, &rtc1);
shared_helpers::insert_janus_rtc_stream(&conn, &backend2, &rtc2);
let rooms = finished_without_recordings(&conn)
.expect("finished_without_recordings call failed");
assert_eq!(rooms.len(), 2);
// order of rooms is not specified so we check that its [(room1, _, backend1), (room2, _, backend2)] in any order
if rooms[0].0.id() == room1.id() {
assert_eq!(rooms[0].0.id(), room1.id());
assert_eq!(rooms[0].2.id(), backend1.id());
assert_eq!(rooms[1].0.id(), room2.id());
assert_eq!(rooms[1].2.id(), backend2.id());
} else {
assert_eq!(rooms[1].0.id(), room1.id());
assert_eq!(rooms[1].2.id(), backend1.id());
assert_eq!(rooms[0].0.id(), room2.id());
assert_eq!(rooms[0].2.id(), backend2.id());
}
}
}
}
25 changes: 24 additions & 1 deletion src/test_helpers/shared_helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use uuid::Uuid;

use crate::db::agent::{Object as Agent, Status as AgentStatus};
use crate::db::janus_backend::Object as JanusBackend;
use crate::db::janus_rtc_stream::Object as JanusRtcStream;
use crate::db::room::{Object as Room, RoomBackend};
use crate::db::rtc::Object as Rtc;

Expand Down Expand Up @@ -48,11 +49,33 @@ pub(crate) fn insert_agent(conn: &PgConnection, agent_id: &AgentId, room_id: Uui

pub(crate) fn insert_janus_backend(conn: &PgConnection) -> JanusBackend {
let mut rng = rand::thread_rng();
let agent = TestAgent::new("alpha", "janus-gateway", SVC_AUDIENCE);

let label_suffix: String = rng
.sample_iter(&rand::distributions::Alphanumeric)
.take(5)
.collect();
let label = format!("janus-gateway-{}", label_suffix);

let agent = TestAgent::new("alpha", &label, SVC_AUDIENCE);
factory::JanusBackend::new(agent.agent_id().to_owned(), rng.gen(), rng.gen()).insert(conn)
}

pub(crate) fn insert_rtc(conn: &PgConnection) -> Rtc {
let room = insert_room(conn);
factory::Rtc::new(room.id()).insert(conn)
}

pub(crate) fn insert_rtc_with_room(conn: &PgConnection, room: &Room) -> Rtc {
factory::Rtc::new(room.id()).insert(conn)
}

pub(crate) fn insert_janus_rtc_stream(
conn: &PgConnection,
backend: &JanusBackend,
rtc: &Rtc,
) -> JanusRtcStream {
factory::JanusRtcStream::new(USR_AUDIENCE)
.backend(backend)
.rtc(rtc)
.insert(conn)
}

0 comments on commit e6a577a

Please sign in to comment.