Skip to content
This repository has been archived by the owner on Aug 16, 2023. It is now read-only.

Commit

Permalink
Put connected agents to ready state on backend disconnect
Browse files Browse the repository at this point in the history
  • Loading branch information
feymartynov committed Aug 20, 2020
1 parent 78c1e42 commit 11c7679
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 11 deletions.
6 changes: 6 additions & 0 deletions src/app/janus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1237,6 +1237,12 @@ async fn handle_status_event_impl<C: Context>(
Ok(Box::new(stream::once(boxed_event)))
} else {
let conn = context.db().get()?;

agent::BulkStatusUpdateQuery::new(agent::Status::Ready)
.backend_id(evp.as_agent_id())
.status(agent::Status::Connected)
.execute(&conn)?;

janus_backend::DeleteQuery::new(evp.as_agent_id()).execute(&conn)?;
Ok(Box::new(stream::empty()))
}
Expand Down
45 changes: 34 additions & 11 deletions src/db/agent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use svc_agent::AgentId;
use uuid::Uuid;

use super::room::Object as Room;
use crate::schema::agent;
use crate::schema::{agent, janus_rtc_stream, rtc};

////////////////////////////////////////////////////////////////////////////////

Expand Down Expand Up @@ -193,16 +193,18 @@ impl<'a> UpdateQuery<'a> {
///////////////////////////////////////////////////////////////////////////////

#[derive(Debug)]
pub(crate) struct BulkStatusUpdateQuery {
pub(crate) struct BulkStatusUpdateQuery<'a> {
room_id: Option<Uuid>,
backend_id: Option<&'a AgentId>,
status: Option<Status>,
new_status: Status,
}

impl BulkStatusUpdateQuery {
impl<'a> BulkStatusUpdateQuery<'a> {
pub(crate) fn new(new_status: Status) -> Self {
Self {
room_id: None,
backend_id: None,
status: None,
new_status,
}
Expand All @@ -215,6 +217,13 @@ impl BulkStatusUpdateQuery {
}
}

pub(crate) fn backend_id(self, backend_id: &'a AgentId) -> Self {
Self {
backend_id: Some(backend_id),
..self
}
}

pub(crate) fn status(self, status: Status) -> Self {
Self {
status: Some(status),
Expand All @@ -225,17 +234,31 @@ impl BulkStatusUpdateQuery {
pub(crate) fn execute(&self, conn: &PgConnection) -> Result<usize, Error> {
use diesel::prelude::*;

let mut query = diesel::update(agent::table).into_boxed();
conn.transaction::<_, Error, _>(|| {
let mut query = diesel::update(agent::table).into_boxed();

if let Some(room_id) = self.room_id {
query = query.filter(agent::room_id.eq(room_id));
}
if let Some(room_id) = self.room_id {
query = query.filter(agent::room_id.eq(room_id));
}

if let Some(status) = self.status {
query = query.filter(agent::status.eq(status));
}
if let Some(backend_id) = self.backend_id {
// Diesel doesn't allow JOINs with UPDATE so find backend ids with a separate query.
let room_ids: Vec<Uuid> = rtc::table
.inner_join(janus_rtc_stream::table)
.filter(janus_rtc_stream::backend_id.eq(backend_id))
.select(rtc::room_id)
.distinct()
.get_results(conn)?;

query = query.filter(agent::room_id.eq_any(room_ids))
}

if let Some(status) = self.status {
query = query.filter(agent::status.eq(status));
}

query.set(agent::status.eq(self.new_status)).execute(conn)
query.set(agent::status.eq(self.new_status)).execute(conn)
})
}
}

Expand Down

0 comments on commit 11c7679

Please sign in to comment.