Skip to content
This repository has been archived by the owner on Aug 16, 2023. It is now read-only.

Commit

Permalink
Revert publisher migration (#126)
Browse files Browse the repository at this point in the history
  • Loading branch information
feymartynov committed Aug 10, 2020
1 parent 16d8537 commit 8c3e29b
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 108 deletions.
82 changes: 8 additions & 74 deletions src/app/endpoint/rtc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -279,13 +279,16 @@ impl RequestHandler for ConnectHandler {
let backend = {
let conn = context.db().get()?;

// If there is an active stream choose its backend since Janus doesn't support
// clustering so all agents within one rtc must be sent to the same node. If there's no
// active stream it means we're connecting as publisher and going to create it.
// Then select the least loaded node: the one with the least active rtc streams count.
// There are 3 cases:
// 1. Connecting as publisher with no previous stream. Select the least loaded backend
// that is capable to host the room's reservation.
// 2. Connecting as subscriber with existing stream. Choose the backend of the active
// stream because Janus doesn't support clustering and it must be the same server
// that the stream's publisher is connected to.
// 3. Reconnecting as publisher with previous stream. Select the backend of the previous
// stream to avoid partitioning the record across multiple servers.
let maybe_rtc_stream = db::janus_rtc_stream::FindQuery::new()
.rtc_id(payload.id)
.active(true)
.execute(&conn)?;

match maybe_rtc_stream {
Expand Down Expand Up @@ -793,75 +796,6 @@ mod test {
});
}

#[test]
fn connect_to_rtc_migration_to_another_backend() {
async_std::task::block_on(async {
let db = TestDb::new();
let mut authz = TestAuthz::new();

let (rtc, backend) = db
.connection_pool()
.get()
.map(|conn| {
// Insert rtcs and janus backends.
let rtc1 = shared_helpers::insert_rtc(&conn);
let rtc2 = shared_helpers::insert_rtc(&conn);
let backend1 = shared_helpers::insert_janus_backend(&conn);
let backend2 = shared_helpers::insert_janus_backend(&conn);

// The first backend has a finished stream for the first rtc…
let stream1 = factory::JanusRtcStream::new(USR_AUDIENCE)
.backend(&backend1)
.rtc(&rtc1)
.insert(&conn);

crate::db::janus_rtc_stream::start(stream1.id(), &conn).unwrap();
crate::db::janus_rtc_stream::stop_by_agent_id(stream1.sent_by(), &conn)
.unwrap();

// …and an active stream for the second rtc.
let stream2 = factory::JanusRtcStream::new(USR_AUDIENCE)
.backend(&backend1)
.rtc(&rtc2)
.insert(&conn);

crate::db::janus_rtc_stream::start(stream2.id(), &conn).unwrap();

// A new stream for the first rtc should start on the second backend.
(rtc1, backend2)
})
.unwrap();

// Allow user to read the rtc.
let agent = TestAgent::new("web", "user123", USR_AUDIENCE);
let room_id = rtc.room_id().to_string();
let rtc_id = rtc.id().to_string();
let object = vec!["rooms", &room_id, "rtcs", &rtc_id];
authz.allow(agent.account_id(), object, "read");

// Make rtc.connect request.
let context = TestContext::new(db, authz);
let payload = ConnectRequest { id: rtc.id() };

let messages = handle_request::<ConnectHandler>(&context, &agent, payload)
.await
.expect("RTC connect failed");

// Ensure we're balanced to the least loaded backend.
let (req, _reqp, topic) = find_request::<JanusAttachRequest>(messages.as_slice());

let expected_topic = format!(
"agents/{}/api/{}/in/{}",
backend.id(),
janus::JANUS_API_VERSION,
context.config().id,
);

assert_eq!(topic, expected_topic);
assert_eq!(req.session_id, backend.session_id());
});
}

#[test]
fn connect_to_rtc_with_reservation() {
async_std::task::block_on(async {
Expand Down
51 changes: 17 additions & 34 deletions src/db/janus_rtc_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,23 +95,16 @@ impl Object {

////////////////////////////////////////////////////////////////////////////////

const ACTIVE_SQL: &str = r#"
(
lower("janus_rtc_stream"."time") is not null
and upper("janus_rtc_stream"."time") is null
)
"#;

pub(crate) struct FindQuery {
id: Option<Uuid>,
rtc_id: Option<Uuid>,
active: Option<bool>,
}

impl FindQuery {
pub(crate) fn new() -> Self {
Self {
id: None,
rtc_id: None,
active: None,
}
}

Expand All @@ -122,42 +115,32 @@ impl FindQuery {
}
}

pub(crate) fn active(self, active: bool) -> Self {
Self {
active: Some(active),
..self
}
}

pub(crate) fn execute(&self, conn: &PgConnection) -> Result<Option<Object>, Error> {
use diesel::dsl::sql;
use diesel::prelude::*;

let rtc_id = match self.rtc_id {
Some(rtc_id) => rtc_id,
None => {
let query = match (self.id, self.rtc_id) {
(Some(ref id), None) => janus_rtc_stream::table.find(id.to_owned()).into_boxed(),
(None, Some(ref rtc_id)) => janus_rtc_stream::table
.filter(janus_rtc_stream::rtc_id.eq(rtc_id.to_owned()))
.into_boxed(),
_ => {
return Err(Error::QueryBuilderError(
"rtc_id is required parameter of the query".into(),
"id either rtc_id is required parameter of the query".into(),
))
}
};

let mut q = janus_rtc_stream::table
.filter(janus_rtc_stream::rtc_id.eq(rtc_id))
.into_boxed();

match self.active {
None => (),
Some(true) => q = q.filter(sql(ACTIVE_SQL)),
Some(false) => q = q.filter(sql(&format!("not {}", ACTIVE_SQL))),
}

q.get_result(conn).optional()
query.get_result(conn).optional()
}
}

////////////////////////////////////////////////////////////////////////////////

const ACTIVE_SQL: &str = r#"(
lower("janus_rtc_stream"."time") is not null
and upper("janus_rtc_stream"."time") is null
)"#;

pub(crate) struct ListQuery {
room_id: Option<Uuid>,
rtc_id: Option<Uuid>,
Expand Down Expand Up @@ -311,7 +294,7 @@ pub(crate) fn start(id: Uuid, conn: &PgConnection) -> Result<Option<Object>, Err
update janus_rtc_stream \
set time = tstzrange(now(), null, '[)') \
where id = $1 \
returning * \
returning *\
",
)
.bind::<Uuid, _>(id)
Expand All @@ -328,7 +311,7 @@ pub(crate) fn stop(id: Uuid, conn: &PgConnection) -> Result<Option<Object>, Erro
update janus_rtc_stream \
set time = case when time is not null then tstzrange(lower(time), now(), '[)') end \
where id = $1 \
returning * \
returning *\
",
)
.bind::<Uuid, _>(id)
Expand Down

0 comments on commit 8c3e29b

Please sign in to comment.