Skip to content
This repository has been archived by the owner on Aug 16, 2023. It is now read-only.

Commit

Permalink
Remove active streams on publisher disconnection (#81)
Browse files Browse the repository at this point in the history
  • Loading branch information
feymartynov committed Dec 19, 2019
1 parent b3438ee commit 865b814
Show file tree
Hide file tree
Showing 3 changed files with 52 additions and 15 deletions.
1 change: 0 additions & 1 deletion src/app/endpoint/room.rs
Original file line number Diff line number Diff line change
Expand Up @@ -372,7 +372,6 @@ mod test {
use failure::format_err;
use serde_json::{json, Value as JsonValue};
use svc_agent::{AccountId, AgentId, Destination};
use svc_authn::Authenticable;

use crate::test_helpers::{
agent::TestAgent,
Expand Down
51 changes: 37 additions & 14 deletions src/app/endpoint/subscription.rs
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,9 @@ impl State {
}
}

// Stop Janus active rtc streams of this agent.
janus_rtc_stream::stop_by_agent_id(agent_id, &conn)?;

messages.into()
} else {
let err = format!(
Expand Down Expand Up @@ -207,7 +210,8 @@ mod test {
use svc_agent::Destination;

use crate::db::agent::Object as Agent;
use crate::schema::agent::dsl::*;
use crate::schema::agent as agent_schema;
use crate::schema::janus_rtc_stream as janus_rtc_stream_schema;
use crate::test_helpers::{
agent::TestAgent, db::TestDb, extract_payload, factory, factory::insert_room,
};
Expand Down Expand Up @@ -279,13 +283,13 @@ mod test {
// Assert agent presence in the DB.
let conn = db.connection_pool().get().unwrap();

let db_agent: Agent = agent
.filter(agent_id.eq(user_agent.agent_id()))
let agent: Agent = agent_schema::table
.filter(agent_schema::agent_id.eq(user_agent.agent_id()))
.get_result(&conn)
.unwrap();

assert_eq!(db_agent.room_id(), room.id());
assert_eq!(*db_agent.status(), crate::db::agent::Status::Ready);
assert_eq!(agent.room_id(), room.id());
assert_eq!(*agent.status(), crate::db::agent::Status::Ready);
});
}

Expand Down Expand Up @@ -384,17 +388,26 @@ mod test {
let db = TestDb::new();

// Insert agent.
let db_agent = db
let (agent, stream) = db
.connection_pool()
.get()
.map_err(|err| format_err!("Failed to get DB connection: {}", err))
.and_then(|conn| factory::Agent::new().audience(AUDIENCE).insert(&conn))
.expect("Failed to insert agent");
.and_then(|conn| {
let stream = factory::insert_janus_rtc_stream(&conn, AUDIENCE);

let agent = factory::Agent::new()
.agent_id(stream.sent_by())
.audience(AUDIENCE)
.insert(&conn)?;

Ok((agent, stream))
})
.expect("Failed to insert test data");

// Send subscription.delete event.
let payload = json!({
"object": vec!["rooms", &db_agent.room_id().to_string(), "events"],
"subject": db_agent.agent_id().to_string(),
"object": vec!["rooms", &agent.room_id().to_string(), "events"],
"subject": agent.agent_id().to_string(),
});

let broker_agent = TestAgent::new("alpha", "mqtt-gateway", AUDIENCE);
Expand All @@ -412,19 +425,29 @@ mod test {

match message.destination() {
Destination::Broadcast(destination) => {
assert_eq!(destination, &format!("rooms/{}/events", db_agent.room_id()))
assert_eq!(destination, &format!("rooms/{}/events", agent.room_id()))
}
_ => panic!("Expected broadcast destination"),
}

let payload: RoomEnterLeaveEventData = extract_payload(message).unwrap();
assert_eq!(payload.id, db_agent.room_id());
assert_eq!(payload.agent_id, *db_agent.agent_id());
assert_eq!(payload.id, agent.room_id());
assert_eq!(payload.agent_id, *agent.agent_id());

// Assert agent absence in the DB.
let conn = db.connection_pool().get().unwrap();
let query = agent.filter(agent_id.eq(db_agent.agent_id()));
let query = agent_schema::table.filter(agent_schema::agent_id.eq(agent.agent_id()));
assert_eq!(query.execute(&conn).unwrap(), 0);

// Assert active Janus RTC stream closed.
assert!(janus_rtc_stream_schema::table
.find(stream.id())
// TODO: https://burning-heart.atlassian.net/browse/ULMS-969
.select(diesel::dsl::sql(
"time is not null and (time = 'empty'::tstzrange or upper(time) is not null)"
))
.get_result::<bool>(&conn)
.expect("Failed to fetch closing time"));
});
}

Expand Down
15 changes: 15 additions & 0 deletions src/db/janus_rtc_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -322,3 +322,18 @@ pub(crate) fn stop(id: Uuid, conn: &PgConnection) -> Result<Option<Object>, Erro
.get_result(conn)
.optional()
}

pub(crate) fn stop_by_agent_id(agent_id: &AgentId, conn: &PgConnection) -> Result<usize, Error> {
use diesel::prelude::*;
use svc_agent::sql::Agent_id;

diesel::sql_query(
"\
update janus_rtc_stream \
set time = case when time is not null then tstzrange(lower(time), now(), '[)') end \
where sent_by = $1 \
",
)
.bind::<Agent_id, _>(agent_id)
.execute(conn)
}

0 comments on commit 865b814

Please sign in to comment.