Skip to content
This repository has been archived by the owner on Aug 16, 2023. It is now read-only.

Commit

Permalink
Fix publisher migration to another backend (#122)
Browse files Browse the repository at this point in the history
  • Loading branch information
feymartynov committed Jul 31, 2020
1 parent a1f8c42 commit 843f4f6
Show file tree
Hide file tree
Showing 2 changed files with 104 additions and 17 deletions.
70 changes: 70 additions & 0 deletions src/app/endpoint/rtc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,7 @@ impl RequestHandler for ConnectHandler {
// Then select the least loaded node: the one with the least active rtc streams count.
let maybe_rtc_stream = db::janus_rtc_stream::FindQuery::new()
.rtc_id(payload.id)
.active(true)
.execute(&conn)?;

match maybe_rtc_stream {
Expand Down Expand Up @@ -786,6 +787,75 @@ mod test {
});
}

#[test]
fn connect_to_rtc_migration_to_another_backend() {
async_std::task::block_on(async {
let db = TestDb::new();
let mut authz = TestAuthz::new();

let (rtc, backend) = db
.connection_pool()
.get()
.map(|conn| {
// Insert rtcs and janus backends.
let rtc1 = shared_helpers::insert_rtc(&conn);
let rtc2 = shared_helpers::insert_rtc(&conn);
let backend1 = shared_helpers::insert_janus_backend(&conn);
let backend2 = shared_helpers::insert_janus_backend(&conn);

// The first backend has a finished stream for the first rtc…
let stream1 = factory::JanusRtcStream::new(USR_AUDIENCE)
.backend(&backend1)
.rtc(&rtc1)
.insert(&conn);

crate::db::janus_rtc_stream::start(stream1.id(), &conn).unwrap();
crate::db::janus_rtc_stream::stop_by_agent_id(stream1.sent_by(), &conn)
.unwrap();

// …and an active stream for the second rtc.
let stream2 = factory::JanusRtcStream::new(USR_AUDIENCE)
.backend(&backend1)
.rtc(&rtc2)
.insert(&conn);

crate::db::janus_rtc_stream::start(stream2.id(), &conn).unwrap();

// A new stream for the first rtc should start on the second backend.
(rtc1, backend2)
})
.unwrap();

// Allow user to read the rtc.
let agent = TestAgent::new("web", "user123", USR_AUDIENCE);
let room_id = rtc.room_id().to_string();
let rtc_id = rtc.id().to_string();
let object = vec!["rooms", &room_id, "rtcs", &rtc_id];
authz.allow(agent.account_id(), object, "read");

// Make rtc.connect request.
let context = TestContext::new(db, authz);
let payload = ConnectRequest { id: rtc.id() };

let messages = handle_request::<ConnectHandler>(&context, &agent, payload)
.await
.expect("rtc reading failed");

// Ensure we're balanced to the least loaded backend.
let (req, _reqp, topic) = find_request::<JanusAttachRequest>(messages.as_slice());

let expected_topic = format!(
"agents/{}/api/{}/in/{}",
backend.id(),
janus::JANUS_API_VERSION,
context.config().id,
);

assert_eq!(topic, expected_topic);
assert_eq!(req.session_id, backend.session_id());
});
}

#[test]
fn connect_to_rtc_not_authorized() {
async_std::task::block_on(async {
Expand Down
51 changes: 34 additions & 17 deletions src/db/janus_rtc_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,16 +95,23 @@ impl Object {

////////////////////////////////////////////////////////////////////////////////

const ACTIVE_SQL: &str = r#"
(
lower("janus_rtc_stream"."time") is not null
and upper("janus_rtc_stream"."time") is null
)
"#;

pub(crate) struct FindQuery {
id: Option<Uuid>,
rtc_id: Option<Uuid>,
active: Option<bool>,
}

impl FindQuery {
pub(crate) fn new() -> Self {
Self {
id: None,
rtc_id: None,
active: None,
}
}

Expand All @@ -115,32 +122,42 @@ impl FindQuery {
}
}

pub(crate) fn active(self, active: bool) -> Self {
Self {
active: Some(active),
..self
}
}

pub(crate) fn execute(&self, conn: &PgConnection) -> Result<Option<Object>, Error> {
use diesel::dsl::sql;
use diesel::prelude::*;

let query = match (self.id, self.rtc_id) {
(Some(ref id), None) => janus_rtc_stream::table.find(id.to_owned()).into_boxed(),
(None, Some(ref rtc_id)) => janus_rtc_stream::table
.filter(janus_rtc_stream::rtc_id.eq(rtc_id.to_owned()))
.into_boxed(),
_ => {
let rtc_id = match self.rtc_id {
Some(rtc_id) => rtc_id,
None => {
return Err(Error::QueryBuilderError(
"id either rtc_id is required parameter of the query".into(),
"rtc_id is required parameter of the query".into(),
))
}
};

query.get_result(conn).optional()
let mut q = janus_rtc_stream::table
.filter(janus_rtc_stream::rtc_id.eq(rtc_id))
.into_boxed();

match self.active {
None => (),
Some(true) => q = q.filter(sql(ACTIVE_SQL)),
Some(false) => q = q.filter(sql(&format!("not {}", ACTIVE_SQL))),
}

q.get_result(conn).optional()
}
}

////////////////////////////////////////////////////////////////////////////////

const ACTIVE_SQL: &str = r#"(
lower("janus_rtc_stream"."time") is not null
and upper("janus_rtc_stream"."time") is null
)"#;

pub(crate) struct ListQuery {
room_id: Option<Uuid>,
rtc_id: Option<Uuid>,
Expand Down Expand Up @@ -294,7 +311,7 @@ pub(crate) fn start(id: Uuid, conn: &PgConnection) -> Result<Option<Object>, Err
update janus_rtc_stream \
set time = tstzrange(now(), null, '[)') \
where id = $1 \
returning *\
returning * \
",
)
.bind::<Uuid, _>(id)
Expand All @@ -311,7 +328,7 @@ pub(crate) fn stop(id: Uuid, conn: &PgConnection) -> Result<Option<Object>, Erro
update janus_rtc_stream \
set time = case when time is not null then tstzrange(lower(time), now(), '[)') end \
where id = $1 \
returning *\
returning * \
",
)
.bind::<Uuid, _>(id)
Expand Down

0 comments on commit 843f4f6

Please sign in to comment.