Skip to content
This repository has been archived by the owner on Aug 16, 2023. It is now read-only.

Commit

Permalink
Consider recording backend when connecting to rtc (#152)
Browse files Browse the repository at this point in the history
  • Loading branch information
feymartynov committed Sep 21, 2020
1 parent d1fe7a4 commit 83dcbb5
Show file tree
Hide file tree
Showing 4 changed files with 102 additions and 77 deletions.
90 changes: 69 additions & 21 deletions src/app/endpoint/rtc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -302,20 +302,18 @@ impl RequestHandler for ConnectHandler {
let conn = context.db().get()?;

// There are 3 cases:
// 1. Connecting as writer with no previous stream. Select the least loaded backend
// that is capable to host the room's reservation.
// 2. Connecting as reader with existing stream. Choose the backend of the active
// stream because Janus doesn't support clustering and it must be the same server
// that the stream's writer is connected to.
// 3. Reconnecting as writer with previous stream. Select the backend of the previous
// stream to avoid partitioning the record across multiple servers.
let maybe_rtc_stream = db::janus_rtc_stream::FindQuery::new()
.rtc_id(payload.id)
.execute(&conn)?;

let backend = match maybe_rtc_stream {
Some(ref stream) => db::janus_backend::FindQuery::new()
.id(stream.backend_id().to_owned())
// 1. Connecting as writer for the first time. There's no recording in that case.
// Select the least loaded backend that is capable to host the room's reservation.
// 2. Connecting as reader with existing recording. Choose the backend of the active
// recording because Janus doesn't support clustering and it must be the same server
// that the writer is connected to.
// 3. Reconnecting as writer with previous recording. Select the recording's backend id
// to avoid partitioning of the record across multiple servers.
let maybe_recording = db::recording::FindQuery::new(payload.id).execute(&conn)?;

let backend = match maybe_recording {
Some(ref recording) => db::janus_backend::FindQuery::new()
.id(recording.backend_id().to_owned())
.execute(&conn)?
.ok_or("no backend found for stream")
.status(ResponseStatus::UNPROCESSABLE_ENTITY)?,
Expand All @@ -325,9 +323,8 @@ impl RequestHandler for ConnectHandler {
};

// Create recording if a writer connects for the first time.
if payload.intent == ConnectIntent::Write && maybe_rtc_stream.is_none() {
db::recording::InsertQuery::new(payload.id, backend.id())
.execute(&conn)?;
if payload.intent == ConnectIntent::Write && maybe_recording.is_none() {
db::recording::InsertQuery::new(payload.id, backend.id()).execute(&conn)?;
}

// Check that the backend's capacity is not exceeded for readers.
Expand Down Expand Up @@ -721,9 +718,14 @@ mod test {
let backend1 = shared_helpers::insert_janus_backend(&conn);
let backend2 = shared_helpers::insert_janus_backend(&conn);

// The first backend has 1 active stream with 1 agent.
// The first backend has a recording and an active agent.
let rtc1 = shared_helpers::insert_rtc(&conn);

factory::Recording::new()
.rtc(&rtc1)
.backend(&backend1)
.insert(&conn);

let stream1 = factory::JanusRtcStream::new(USR_AUDIENCE)
.rtc(&rtc1)
.backend(&backend1)
Expand All @@ -739,6 +741,11 @@ mod test {
// by the balancer.
let rtc2 = shared_helpers::insert_rtc(&conn);

factory::Recording::new()
.rtc(&rtc2)
.backend(&backend2)
.insert(&conn);

let _stream2 = factory::JanusRtcStream::new(USR_AUDIENCE)
.rtc(&rtc2)
.backend(&backend2)
Expand Down Expand Up @@ -834,6 +841,12 @@ mod test {
.insert(&conn);

crate::db::janus_rtc_stream::start(stream.id(), &conn).unwrap();

factory::Recording::new()
.rtc(&rtc)
.backend(&backend2)
.insert(&conn);

(rtc, backend2)
})
.unwrap();
Expand Down Expand Up @@ -920,6 +933,11 @@ mod test {

crate::db::janus_rtc_stream::start(stream1.id(), &conn).unwrap();

factory::Recording::new()
.rtc(&rtc1)
.backend(&backend1)
.insert(&conn);

let agent = TestAgent::new("web", "user456", SVC_AUDIENCE);
shared_helpers::insert_agent(&conn, agent.agent_id(), rtc1.room_id());

Expand Down Expand Up @@ -1028,13 +1046,23 @@ mod test {

crate::db::janus_rtc_stream::start(stream1.id(), &conn).unwrap();

factory::Recording::new()
.rtc(&rtc1)
.backend(&backend)
.insert(&conn);

let stream2 = factory::JanusRtcStream::new(USR_AUDIENCE)
.backend(&backend)
.rtc(&rtc2)
.insert(&conn);

crate::db::janus_rtc_stream::start(stream2.id(), &conn).unwrap();

factory::Recording::new()
.rtc(&rtc2)
.backend(&backend)
.insert(&conn);

// Insert active agents.
shared_helpers::insert_agent(&conn, writer1.agent_id(), room1.id());
shared_helpers::insert_agent(&conn, writer2.agent_id(), room2.id());
Expand Down Expand Up @@ -1114,12 +1142,17 @@ mod test {
.capacity(2)
.insert(&conn);

// Insert active stream.
// Insert stream and recording.
factory::JanusRtcStream::new(USR_AUDIENCE)
.backend(&backend)
.rtc(&rtc)
.insert(&conn);

factory::Recording::new()
.rtc(&rtc)
.backend(&backend)
.insert(&conn);

// Insert active agents.
shared_helpers::insert_agent(&conn, writer.agent_id(), rtc.room_id());

Expand Down Expand Up @@ -1180,14 +1213,19 @@ mod test {
.capacity(2)
.insert(&conn);

// Insert active stream.
// Insert active stream and recording.
let stream = factory::JanusRtcStream::new(USR_AUDIENCE)
.backend(&backend)
.rtc(&rtc)
.insert(&conn);

crate::db::janus_rtc_stream::start(stream.id(), &conn).unwrap();

factory::Recording::new()
.rtc(&rtc)
.backend(&backend)
.insert(&conn);

// Insert active agents.
shared_helpers::insert_agent(&conn, writer.agent_id(), rtc.room_id());
shared_helpers::insert_agent(&conn, reader1.agent_id(), rtc.room_id());
Expand Down Expand Up @@ -1251,12 +1289,17 @@ mod test {
.capacity(1)
.insert(&conn);

// Insert active stream.
// Insert stream and recording.
factory::JanusRtcStream::new(USR_AUDIENCE)
.backend(&backend)
.rtc(&rtc)
.insert(&conn);

factory::Recording::new()
.rtc(&rtc)
.backend(&backend)
.insert(&conn);

// Insert active agents.
shared_helpers::insert_agent(&conn, reader.agent_id(), rtc.room_id());

Expand Down Expand Up @@ -1348,6 +1391,11 @@ mod test {
crate::db::janus_rtc_stream::start(stream.id(), &conn).unwrap();
crate::db::janus_rtc_stream::stop(stream.id(), &conn).unwrap();

factory::Recording::new()
.rtc(&rtc1)
.backend(&backend)
.insert(&conn);

rtc2
})
.unwrap();
Expand Down
14 changes: 7 additions & 7 deletions src/app/endpoint/subscription.rs
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,7 @@ mod tests {
use std::ops::Bound;

use crate::db::agent::{ListQuery as AgentListQuery, Status as AgentStatus};
use crate::db::janus_rtc_stream::Object as JanusRtcStream;
use crate::test_helpers::prelude::*;

use super::*;
Expand Down Expand Up @@ -413,6 +414,8 @@ mod tests {
#[test]
fn delete_subscription_for_stream_writer() {
async_std::task::block_on(async {
use diesel::prelude::*;

let db = TestDb::new();
let writer = TestAgent::new("web", "writer", USR_AUDIENCE);
let reader = TestAgent::new("web", "reader", USR_AUDIENCE);
Expand Down Expand Up @@ -466,13 +469,10 @@ mod tests {
.get()
.expect("Failed to get DB connection");

let db_stream = crate::db::janus_rtc_stream::FindQuery::new()
.id(stream.id())
.execute(&conn)
.expect("Failed to get janus rtc stream")
.expect("Janus rtc stream not found");

println!("{:?}", db_stream.time());
let db_stream: JanusRtcStream = crate::schema::janus_rtc_stream::table
.find(stream.id())
.get_result(&conn)
.expect("Failed to get janus rtc stream");

assert!(matches!(
db_stream.time(),
Expand Down
49 changes: 0 additions & 49 deletions src/db/janus_rtc_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,55 +93,6 @@ impl Object {

////////////////////////////////////////////////////////////////////////////////

pub(crate) struct FindQuery {
id: Option<Uuid>,
rtc_id: Option<Uuid>,
}

impl FindQuery {
pub(crate) fn new() -> Self {
Self {
id: None,
rtc_id: None,
}
}

#[cfg(test)]
pub(crate) fn id(self, id: Uuid) -> Self {
Self {
id: Some(id),
..self
}
}

pub(crate) fn rtc_id(self, rtc_id: Uuid) -> Self {
Self {
rtc_id: Some(rtc_id),
..self
}
}

pub(crate) fn execute(&self, conn: &PgConnection) -> Result<Option<Object>, Error> {
use diesel::prelude::*;

let query = match (self.id, self.rtc_id) {
(Some(ref id), None) => janus_rtc_stream::table.find(id.to_owned()).into_boxed(),
(None, Some(ref rtc_id)) => janus_rtc_stream::table
.filter(janus_rtc_stream::rtc_id.eq(rtc_id.to_owned()))
.into_boxed(),
_ => {
return Err(Error::QueryBuilderError(
"id either rtc_id is required parameter of the query".into(),
))
}
};

query.get_result(conn).optional()
}
}

////////////////////////////////////////////////////////////////////////////////

const ACTIVE_SQL: &str = r#"(
lower("janus_rtc_stream"."time") is not null
and upper("janus_rtc_stream"."time") is null
Expand Down
26 changes: 26 additions & 0 deletions src/db/recording.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,32 @@ impl Object {
pub(crate) fn status(&self) -> &Status {
&self.status
}

pub(crate) fn backend_id(&self) -> &AgentId {
&self.backend_id
}
}

////////////////////////////////////////////////////////////////////////////////

#[derive(Debug)]
pub(crate) struct FindQuery {
rtc_id: Uuid,
}

impl FindQuery {
pub(crate) fn new(rtc_id: Uuid) -> Self {
Self { rtc_id }
}

pub(crate) fn execute(&self, conn: &PgConnection) -> Result<Option<Object>, Error> {
use diesel::prelude::*;

recording::table
.find(self.rtc_id)
.get_result(conn)
.optional()
}
}

////////////////////////////////////////////////////////////////////////////////
Expand Down

0 comments on commit 83dcbb5

Please sign in to comment.