Skip to content
This repository has been archived by the owner on Aug 16, 2023. It is now read-only.

Commit

Permalink
Send rtc_stream.update when the backend goes offline (#154)
Browse files Browse the repository at this point in the history
  • Loading branch information
feymartynov committed Sep 23, 2020
1 parent 772018a commit 0326e1d
Show file tree
Hide file tree
Showing 3 changed files with 82 additions and 14 deletions.
35 changes: 29 additions & 6 deletions src/app/janus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ use crate::backend::janus::{
MessageRequest, OpaqueId, StatusEvent, TrickleRequest, JANUS_API_VERSION,
};
use crate::db::{agent, janus_backend, janus_rtc_stream, recording, room, rtc};
use crate::diesel::Connection;
use crate::util::{from_base64, generate_correlation_data, to_base64};

////////////////////////////////////////////////////////////////////////////////
Expand Down Expand Up @@ -1228,12 +1229,34 @@ async fn handle_status_event_impl<C: Context>(
} else {
let conn = context.db().get()?;

agent::BulkStatusUpdateQuery::new(agent::Status::Ready)
.backend_id(evp.as_agent_id())
.status(agent::Status::Connected)
.execute(&conn)?;
let streams_with_rtc = conn.transaction::<_, SvcError, _>(|| {
let streams_with_rtc = janus_rtc_stream::ListWithRtcQuery::new()
.active(true)
.backend_id(evp.as_agent_id())
.execute(&conn)?;

agent::BulkStatusUpdateQuery::new(agent::Status::Ready)
.backend_id(evp.as_agent_id())
.status(agent::Status::Connected)
.execute(&conn)?;

janus_backend::DeleteQuery::new(evp.as_agent_id()).execute(&conn)?;
Ok(streams_with_rtc)
})?;

let mut events = Vec::with_capacity(streams_with_rtc.len());

for (stream, rtc) in streams_with_rtc {
let event = endpoint::rtc_stream::update_event(
rtc.room_id(),
stream,
start_timestamp,
evp.tracking(),
)?;

events.push(Box::new(event) as Box<dyn IntoPublishableMessage + Send>);
}

janus_backend::DeleteQuery::new(evp.as_agent_id()).execute(&conn)?;
Ok(Box::new(stream::empty()))
Ok(Box::new(stream::from_iter(events)))
}
}
56 changes: 48 additions & 8 deletions src/db/janus_rtc_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use std::ops::Bound;
use svc_agent::AgentId;
use uuid::Uuid;

use crate::db::rtc::Object as Rtc;
use crate::schema::{janus_rtc_stream, rtc};

////////////////////////////////////////////////////////////////////////////////
Expand Down Expand Up @@ -98,6 +99,7 @@ const ACTIVE_SQL: &str = r#"(
and upper("janus_rtc_stream"."time") is null
)"#;

#[derive(Debug, Default)]
pub(crate) struct ListQuery {
room_id: Option<Uuid>,
rtc_id: Option<Uuid>,
Expand All @@ -109,14 +111,7 @@ pub(crate) struct ListQuery {

impl ListQuery {
pub(crate) fn new() -> Self {
Self {
room_id: None,
rtc_id: None,
time: None,
active: None,
offset: None,
limit: None,
}
Self::default()
}

pub(crate) fn room_id(self, room_id: Uuid) -> Self {
Expand Down Expand Up @@ -200,6 +195,51 @@ impl ListQuery {

////////////////////////////////////////////////////////////////////////////////

#[derive(Debug, Default)]
pub(crate) struct ListWithRtcQuery<'a> {
active: Option<bool>,
backend_id: Option<&'a AgentId>,
}

impl<'a> ListWithRtcQuery<'a> {
pub(crate) fn new() -> Self {
Self::default()
}

pub(crate) fn active(self, active: bool) -> Self {
Self {
active: Some(active),
..self
}
}

pub(crate) fn backend_id(self, backend_id: &'a AgentId) -> Self {
Self {
backend_id: Some(backend_id),
..self
}
}

pub(crate) fn execute(&self, conn: &PgConnection) -> Result<Vec<(Object, Rtc)>, Error> {
use diesel::dsl::sql;
use diesel::prelude::*;

let mut q = janus_rtc_stream::table.inner_join(rtc::table).into_boxed();

match self.active {
None => (),
Some(true) => q = q.filter(sql(ACTIVE_SQL)),
Some(false) => q = q.filter(sql(&format!("not {}", ACTIVE_SQL))),
}

q.order_by(janus_rtc_stream::id)
.select((self::ALL_COLUMNS, super::rtc::ALL_COLUMNS))
.load(conn)
}
}

////////////////////////////////////////////////////////////////////////////////

#[derive(Debug, Insertable, AsChangeset)]
#[table_name = "janus_rtc_stream"]
pub(crate) struct InsertQuery<'a> {
Expand Down
5 changes: 5 additions & 0 deletions src/db/rtc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,11 @@ use crate::schema::rtc;

////////////////////////////////////////////////////////////////////////////////

pub(crate) type AllColumns = (rtc::id, rtc::room_id, rtc::created_at);
pub(crate) const ALL_COLUMNS: AllColumns = (rtc::id, rtc::room_id, rtc::created_at);

////////////////////////////////////////////////////////////////////////////////

#[derive(
Clone, Debug, Serialize, Deserialize, Identifiable, Queryable, QueryableByName, Associations,
)]
Expand Down

0 comments on commit 0326e1d

Please sign in to comment.