Skip to content
This repository has been archived by the owner on Aug 16, 2023. It is now read-only.

Commit

Permalink
Put readers to ready state on stream stop
Browse files Browse the repository at this point in the history
  • Loading branch information
feymartynov committed Aug 20, 2020
1 parent 8797737 commit 1c8854d
Show file tree
Hide file tree
Showing 5 changed files with 215 additions and 52 deletions.
101 changes: 98 additions & 3 deletions src/app/endpoint/subscription.rs
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,21 @@ impl EventHandler for DeleteHandler {
.active(true)
.execute(&conn)?;

for stream in streams.iter() {
// If the agent is a publisher.
if stream.sent_by() == &payload.subject {
// Stop the stream.
db::janus_rtc_stream::stop(stream.id(), &conn)?;

// Put stream readers into `ready` status since the stream has gone.
db::agent::BulkStatusUpdateQuery::new(db::agent::Status::Ready)
.room_id(room_id)
.status(db::agent::Status::Connected)
.execute(&conn)?;
}
}

// Send agent.leave requests to those backends where the agent is connected to.
let mut backend_ids = streams
.iter()
.map(|stream| stream.backend_id())
Expand Down Expand Up @@ -189,9 +204,6 @@ impl EventHandler for DeleteHandler {
}
}

// Stop Janus active rtc streams of this agent.
db::janus_rtc_stream::stop_by_agent_id(&payload.subject, &conn)?;

Ok(Box::new(stream::from_iter(messages)))
} else {
let err = format!(
Expand All @@ -208,6 +220,8 @@ impl EventHandler for DeleteHandler {

#[cfg(test)]
mod tests {
use std::ops::Bound;

use crate::db::agent::{ListQuery as AgentListQuery, Status as AgentStatus};
use crate::test_helpers::prelude::*;

Expand Down Expand Up @@ -396,6 +410,87 @@ mod tests {
});
}

#[test]
fn delete_subscription_for_stream_writer() {
async_std::task::block_on(async {
let db = TestDb::new();
let writer = TestAgent::new("web", "writer", USR_AUDIENCE);
let reader = TestAgent::new("web", "reader", USR_AUDIENCE);

let (rtc, stream) = {
let conn = db
.connection_pool()
.get()
.expect("Failed to get DB connection");

// Create room with rtc, backend and a started active stream.
let rtc = shared_helpers::insert_rtc(&conn);
let backend = shared_helpers::insert_janus_backend(&conn);

let stream = factory::JanusRtcStream::new(USR_AUDIENCE)
.backend(&backend)
.rtc(&rtc)
.sent_by(writer.agent_id())
.insert(&conn);

let started_stream = crate::db::janus_rtc_stream::start(stream.id(), &conn)
.expect("Failed to start janus rtc stream")
.expect("Janus rtc stream couldn't start");

// Put agents online.
shared_helpers::insert_agent(&conn, writer.agent_id(), rtc.room_id());
shared_helpers::insert_agent(&conn, reader.agent_id(), rtc.room_id());

(rtc, started_stream)
};

// Send subscription.delete event for the writer.
let context = TestContext::new(db.clone(), TestAuthz::new());
let room_id = rtc.room_id().to_string();

let payload = SubscriptionEvent {
subject: writer.agent_id().to_owned(),
object: vec!["rooms".to_string(), room_id, "events".to_string()],
};

let broker_account_label = context.config().broker_id.label();
let broker = TestAgent::new("alpha", broker_account_label, SVC_AUDIENCE);

handle_event::<DeleteHandler>(&context, &broker, payload)
.await
.expect("Subscription deletion failed");

// Assert the stream is stopped.
let conn = db
.connection_pool()
.get()
.expect("Failed to get DB connection");

let db_stream = crate::db::janus_rtc_stream::FindQuery::new()
.id(stream.id())
.execute(&conn)
.expect("Failed to get janus rtc stream")
.expect("Janus rtc stream not found");

println!("{:?}", db_stream.time());

assert!(matches!(
db_stream.time(),
Some((Bound::Included(_), Bound::Excluded(_)))
));

// Assert the reader is in `ready` status.
let db_agents = AgentListQuery::new()
.agent_id(reader.agent_id())
.room_id(rtc.room_id())
.execute(&conn)
.expect("Failed to execute agent list query");

let db_agent = db_agents.first().expect("Reader agent not found");
assert_eq!(db_agent.status(), AgentStatus::Ready);
});
}

#[test]
fn delete_subscription_missing_agent() {
async_std::task::block_on(async {
Expand Down
18 changes: 13 additions & 5 deletions src/app/janus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use crate::backend::janus::{
CreateHandleRequest, CreateSessionRequest, ErrorResponse, IncomingEvent, IncomingResponse,
MessageRequest, OpaqueId, StatusEvent, TrickleRequest, JANUS_API_VERSION,
};
use crate::db::{janus_backend, janus_rtc_stream, recording, room, rtc};
use crate::db::{agent, janus_backend, janus_rtc_stream, recording, room, rtc};
use crate::util::{from_base64, generate_correlation_data, to_base64};

////////////////////////////////////////////////////////////////////////////////
Expand Down Expand Up @@ -1159,8 +1159,8 @@ fn handle_hangup_detach<C: Context, E: OpaqueId>(

let conn = context.db().get()?;

// If the event relates to a publisher's handle,
// we will find the corresponding stream and send event w/ updated stream object
// If the event relates to the publisher's handle,
// we will find the corresponding stream and send an event w/ updated stream object
// to the room's topic.
if let Some(rtc_stream) = janus_rtc_stream::stop(rtc_stream_id, &conn)? {
let rtc_id = rtc_stream.rtc_id();
Expand All @@ -1172,9 +1172,17 @@ fn handle_hangup_detach<C: Context, E: OpaqueId>(
.ok_or_else(|| format!("a room for rtc = '{}' is not found", &rtc_id))
.status(ResponseStatus::NOT_FOUND)?;

// Publish the update event if only stream object has been changed
// (if there weren't any actual media stream, the object won't contain its start time)
// Publish the update event only if the stream object has been changed.
// If there's no actual media stream, the object wouldn't contain its start time.
if rtc_stream.time().is_some() {
// Put connected `agents` back into `ready` status since the stream has gone and
// they haven't been connected anymore.
agent::BulkStatusUpdateQuery::new(agent::Status::Ready)
.room_id(room.id())
.status(agent::Status::Connected)
.execute(&conn)?;

// Send rtc_stream.update event.
let event = endpoint::rtc_stream::update_event(
room.id(),
rtc_stream,
Expand Down
49 changes: 49 additions & 0 deletions src/db/agent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,55 @@ impl<'a> UpdateQuery<'a> {

///////////////////////////////////////////////////////////////////////////////

#[derive(Debug)]
pub(crate) struct BulkStatusUpdateQuery {
room_id: Option<Uuid>,
status: Option<Status>,
new_status: Status,
}

impl BulkStatusUpdateQuery {
pub(crate) fn new(new_status: Status) -> Self {
Self {
room_id: None,
status: None,
new_status,
}
}

pub(crate) fn room_id(self, room_id: Uuid) -> Self {
Self {
room_id: Some(room_id),
..self
}
}

pub(crate) fn status(self, status: Status) -> Self {
Self {
status: Some(status),
..self
}
}

pub(crate) fn execute(&self, conn: &PgConnection) -> Result<usize, Error> {
use diesel::prelude::*;

let mut query = diesel::update(agent::table).into_boxed();

if let Some(room_id) = self.room_id {
query = query.filter(agent::room_id.eq(room_id));
}

if let Some(status) = self.status {
query = query.filter(agent::status.eq(status));
}

query.set(agent::status.eq(self.new_status)).execute(conn)
}
}

///////////////////////////////////////////////////////////////////////////////

pub(crate) struct DeleteQuery<'a> {
agent_id: Option<&'a AgentId>,
room_id: Option<Uuid>,
Expand Down
78 changes: 36 additions & 42 deletions src/db/janus_rtc_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,6 @@ pub(crate) struct Object {
}

impl Object {
#[cfg(test)]
pub(crate) fn id(&self) -> Uuid {
self.id
}
Expand All @@ -78,7 +77,6 @@ impl Object {
self.label.as_ref()
}

#[cfg(test)]
pub(crate) fn sent_by(&self) -> &AgentId {
&self.sent_by
}
Expand Down Expand Up @@ -108,6 +106,14 @@ impl FindQuery {
}
}

#[cfg(test)]
pub(crate) fn id(self, id: Uuid) -> Self {
Self {
id: Some(id),
..self
}
}

pub(crate) fn rtc_id(self, rtc_id: Uuid) -> Self {
Self {
rtc_id: Some(rtc_id),
Expand Down Expand Up @@ -285,51 +291,39 @@ impl<'a> InsertQuery<'a> {

////////////////////////////////////////////////////////////////////////////////

const START_TIME_SQL: &str = "(TSTZRANGE(NOW(), NULL, '[)'))";

pub(crate) fn start(id: Uuid, conn: &PgConnection) -> Result<Option<Object>, Error> {
use diesel::dsl::sql;
use diesel::prelude::*;
use diesel::sql_types::Uuid;

diesel::sql_query(
"\
update janus_rtc_stream \
set time = tstzrange(now(), null, '[)') \
where id = $1 \
returning *\
",
)
.bind::<Uuid, _>(id)
.get_result(conn)
.optional()

diesel::update(janus_rtc_stream::table.filter(janus_rtc_stream::id.eq(id)))
.set(janus_rtc_stream::time.eq(sql(START_TIME_SQL)))
.get_result(conn)
.optional()
}

pub(crate) fn stop(id: Uuid, conn: &PgConnection) -> Result<Option<Object>, Error> {
use diesel::prelude::*;
use diesel::sql_types::Uuid;

diesel::sql_query(
"\
update janus_rtc_stream \
set time = case when time is not null then tstzrange(lower(time), now(), '[)') end \
where id = $1 \
returning *\
",
// Close the stream with current timestamp.
// Fall back to start + 1 ms when closing instantly after starting because lower and upper
// values of a range can't be equal in Postgres.
const STOP_TIME_SQL: &str = r#"
(
CASE WHEN "time" IS NOT NULL THEN
TSTZRANGE(
LOWER("time"),
GREATEST(NOW(), LOWER("time") + '1 millisecond'::INTERVAL),
'[)'
)
END
)
.bind::<Uuid, _>(id)
.get_result(conn)
.optional()
}
"#;

pub(crate) fn stop_by_agent_id(agent_id: &AgentId, conn: &PgConnection) -> Result<usize, Error> {
pub(crate) fn stop(id: Uuid, conn: &PgConnection) -> Result<Option<Object>, Error> {
use diesel::dsl::sql;
use diesel::prelude::*;
use svc_agent::sql::Agent_id;

diesel::sql_query(
"\
update janus_rtc_stream \
set time = case when time is not null then tstzrange(lower(time), now(), '[)') end \
where sent_by = $1 \
",
)
.bind::<Agent_id, _>(agent_id)
.execute(conn)

diesel::update(janus_rtc_stream::table.filter(janus_rtc_stream::id.eq(id)))
.set(janus_rtc_stream::time.eq(sql(STOP_TIME_SQL)))
.get_result(conn)
.optional()
}
21 changes: 19 additions & 2 deletions src/test_helpers/factory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,7 @@ pub(crate) struct JanusRtcStream<'a> {
audience: &'a str,
backend: Option<&'a db::janus_backend::Object>,
rtc: Option<&'a db::rtc::Object>,
sent_by: Option<&'a AgentId>,
}

impl<'a> JanusRtcStream<'a> {
Expand All @@ -193,6 +194,7 @@ impl<'a> JanusRtcStream<'a> {
audience,
backend: None,
rtc: None,
sent_by: None,
}
}

Expand All @@ -210,6 +212,13 @@ impl<'a> JanusRtcStream<'a> {
}
}

pub(crate) fn sent_by(self, sent_by: &'a AgentId) -> Self {
Self {
sent_by: Some(sent_by),
..self
}
}

pub(crate) fn insert(&self, conn: &PgConnection) -> db::janus_rtc_stream::Object {
let default_backend;

Expand All @@ -231,15 +240,23 @@ impl<'a> JanusRtcStream<'a> {
}
};

let agent = TestAgent::new("web", "user123", self.audience);
let default_agent;

let sent_by = match self.sent_by {
Some(value) => value,
None => {
default_agent = TestAgent::new("web", "user123", self.audience);
default_agent.agent_id()
}
};

db::janus_rtc_stream::InsertQuery::new(
Uuid::new_v4(),
backend.handle_id(),
rtc.id(),
backend.id(),
"alpha",
agent.agent_id(),
sent_by,
)
.execute(conn)
.expect("Failed to insert janus_rtc_stream")
Expand Down

0 comments on commit 1c8854d

Please sign in to comment.