Skip to content
This repository has been archived by the owner on Aug 16, 2023. It is now read-only.

Commit

Permalink
Call agent.leave in Janus on subscription deletion (#78)
Browse files Browse the repository at this point in the history
  • Loading branch information
feymartynov committed Dec 12, 2019
1 parent dda0e05 commit 8f7f2c6
Show file tree
Hide file tree
Showing 7 changed files with 167 additions and 17 deletions.
8 changes: 4 additions & 4 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,3 +29,6 @@ svc-error = { version = "0.1", features = ["diesel", "r2d2", "svc-agent", "svc-a

[dev-dependencies]
rand = "0.7"

[patch.crates-io]
svc-agent = { git = "https://github.com/netology-group/svc-agent-rs" }
1 change: 1 addition & 0 deletions src/app/endpoint/rtc_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ impl State {
Some(room_id),
inreq.payload().rtc_id,
inreq.payload().time,
None,
inreq.payload().offset,
Some(std::cmp::min(
inreq.payload().limit.unwrap_or_else(|| MAX_LIMIT),
Expand Down
49 changes: 46 additions & 3 deletions src/app/endpoint/subscription.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use chrono::{DateTime, Utc};
use serde_derive::{Deserialize, Serialize};
use svc_agent::mqtt::{
Connection, IncomingEvent, IncomingEventProperties, OutgoingEvent, ResponseStatus,
Connection, IncomingEvent, IncomingEventProperties, OutgoingEvent, Publishable, ResponseStatus,
ShortTermTimingProperties,
};
use svc_agent::AgentId;
Expand All @@ -10,7 +10,7 @@ use svc_error::Error as SvcError;
use uuid::Uuid;

use crate::app::endpoint;
use crate::db::{agent, room, ConnectionPool};
use crate::db::{agent, janus_backend, janus_rtc_stream, room, ConnectionPool};

///////////////////////////////////////////////////////////////////////////////

Expand Down Expand Up @@ -100,11 +100,54 @@ impl State {
let row_count = agent::DeleteQuery::new(agent_id, room_id).execute(&conn)?;

if row_count == 1 {
// Event to room topic.
let payload = RoomEnterLeaveEventData::new(room_id.to_owned(), agent_id.to_owned());
let short_term_timing = ShortTermTimingProperties::until_now(start_timestamp);
let props = evt.properties().to_event("room.leave", short_term_timing);
let to_uri = format!("rooms/{}/events", room_id);
OutgoingEvent::broadcast(payload, props, &to_uri).into()
let outgoing_event = OutgoingEvent::broadcast(payload, props, &to_uri);
let mut messages: Vec<Box<dyn Publishable>> = vec![Box::new(outgoing_event)];

// `agent.leave` requests to Janus instances that host active streams in this room.
let streams = janus_rtc_stream::ListQuery::new()
.room_id(room_id)
.active(true)
.execute(&conn)?;

let mut backend_ids = streams
.iter()
.map(|stream| stream.backend_id())
.collect::<Vec<&AgentId>>();

backend_ids.dedup();

let backends = janus_backend::ListQuery::new()
.ids(&backend_ids[..])
.execute(&conn)?;

for backend in backends {
let result = crate::app::janus::agent_leave_request(
evt.properties().to_owned(),
backend.session_id(),
backend.handle_id(),
agent_id,
backend.id(),
evt.properties().tracking(),
);

match result {
Ok(req) => messages.push(Box::new(req)),
Err(_) => {
return SvcError::builder()
.status(ResponseStatus::UNPROCESSABLE_ENTITY)
.detail("error creating a backend request")
.build()
.into()
}
}
}

messages.into()
} else {
let err = format!(
"the agent is not found for agent_id = '{}', room = '{}'",
Expand Down
81 changes: 75 additions & 6 deletions src/app/janus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ use std::ops::Bound;
use std::sync::Arc;
use svc_agent::mqtt::{
compat::{into_event, IncomingEnvelope},
IncomingRequestProperties, OutgoingRequest, OutgoingRequestProperties, Publishable,
ResponseStatus, ShortTermTimingProperties, TrackingProperties,
IncomingEventProperties, IncomingRequestProperties, OutgoingRequest, OutgoingRequestProperties,
Publishable, ResponseStatus, ShortTermTimingProperties, TrackingProperties,
};
use svc_agent::{Addressable, AgentId};
use svc_error::Error as SvcError;
Expand Down Expand Up @@ -38,6 +38,7 @@ pub(crate) enum Transaction {
ReadStream(ReadStreamTransaction),
UploadStream(UploadStreamTransaction),
Trickle(TrickleTransaction),
AgentLeave(AgentLeaveTransaction),
}

////////////////////////////////////////////////////////////////////////////////
Expand Down Expand Up @@ -192,13 +193,15 @@ impl CreateStreamTransaction {
pub(crate) struct CreateStreamRequestBody {
method: &'static str,
id: Uuid,
agent_id: AgentId,
}

impl CreateStreamRequestBody {
pub(crate) fn new(id: Uuid) -> Self {
pub(crate) fn new(id: Uuid, agent_id: AgentId) -> Self {
Self {
method: "stream.create",
id,
agent_id,
}
}
}
Expand Down Expand Up @@ -226,8 +229,9 @@ where
short_term_timing,
);

let agent_id = reqp.to_connection().agent_id().to_owned();
let body = CreateStreamRequestBody::new(rtc_id, agent_id);
let transaction = Transaction::CreateStream(CreateStreamTransaction::new(reqp));
let body = CreateStreamRequestBody::new(rtc_id);

let payload = MessageRequest::new(
&to_base64(&transaction)?,
Expand Down Expand Up @@ -257,13 +261,15 @@ impl ReadStreamTransaction {
pub(crate) struct ReadStreamRequestBody {
method: &'static str,
id: Uuid,
agent_id: AgentId,
}

impl ReadStreamRequestBody {
pub(crate) fn new(id: Uuid) -> Self {
pub(crate) fn new(id: Uuid, agent_id: AgentId) -> Self {
Self {
method: "stream.read",
id,
agent_id,
}
}
}
Expand Down Expand Up @@ -291,8 +297,9 @@ where
short_term_timing,
);

let agent_id = reqp.to_connection().agent_id().to_owned();
let body = ReadStreamRequestBody::new(rtc_id, agent_id);
let transaction = Transaction::ReadStream(ReadStreamTransaction::new(reqp));
let body = ReadStreamRequestBody::new(rtc_id);

let payload = MessageRequest::new(
&to_base64(&transaction)?,
Expand Down Expand Up @@ -406,6 +413,68 @@ where

////////////////////////////////////////////////////////////////////////////////

#[derive(Debug, Deserialize, Serialize)]
pub(crate) struct AgentLeaveTransaction {
evp: IncomingEventProperties,
}

impl AgentLeaveTransaction {
pub(crate) fn new(evp: IncomingEventProperties) -> Self {
Self { evp }
}
}

#[derive(Debug, Deserialize, Serialize)]
pub(crate) struct AgentLeaveRequestBody {
method: &'static str,
agent_id: AgentId,
}

impl AgentLeaveRequestBody {
pub(crate) fn new(agent_id: AgentId) -> Self {
Self {
method: "agent.leave",
agent_id,
}
}
}

pub(crate) fn agent_leave_request<A>(
evp: IncomingEventProperties,
session_id: i64,
handle_id: i64,
agent_id: &AgentId,
to: &A,
tracking: &TrackingProperties,
) -> Result<OutgoingRequest<MessageRequest>, Error>
where
A: Addressable,
{
let mut props = OutgoingRequestProperties::new(
"janus_conference_agent.leave",
IGNORE,
IGNORE,
ShortTermTimingProperties::new(Utc::now()),
);

props.set_tracking(tracking.to_owned());

let transaction = Transaction::AgentLeave(AgentLeaveTransaction::new(evp));
let body = AgentLeaveRequestBody::new(agent_id.to_owned());

let payload = MessageRequest::new(
&to_base64(&transaction)?,
session_id,
handle_id,
serde_json::to_value(&body)?,
None,
);

Ok(OutgoingRequest::unicast(payload, props, to))
}

////////////////////////////////////////////////////////////////////////////////

pub(crate) struct State {
db: ConnectionPool,
}
Expand Down
16 changes: 14 additions & 2 deletions src/db/janus_backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,19 +32,28 @@ impl Object {

////////////////////////////////////////////////////////////////////////////////

pub(crate) struct ListQuery {
pub(crate) struct ListQuery<'a> {
ids: Option<&'a [&'a AgentId]>,
offset: Option<i64>,
limit: Option<i64>,
}

impl ListQuery {
impl<'a> ListQuery<'a> {
pub(crate) fn new() -> Self {
Self {
ids: None,
offset: None,
limit: None,
}
}

pub(crate) fn ids(self, ids: &'a [&'a AgentId]) -> Self {
Self {
ids: Some(ids),
..self
}
}

pub(crate) fn offset(self, offset: i64) -> Self {
Self {
offset: Some(offset),
Expand All @@ -63,6 +72,9 @@ impl ListQuery {
use diesel::prelude::*;

let mut q = janus_backend::table.into_boxed();
if let Some(ids) = self.ids {
q = q.filter(janus_backend::id.eq_any(ids))
}
if let Some(offset) = self.offset {
q = q.offset(offset);
}
Expand Down
26 changes: 24 additions & 2 deletions src/db/janus_rtc_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,10 +109,16 @@ impl FindQuery {

////////////////////////////////////////////////////////////////////////////////

const ACTIVE_SQL: &str = r#"(
lower("janus_rtc_stream"."time") is not null
and upper("janus_rtc_stream"."time") is null
)"#;

pub(crate) struct ListQuery {
room_id: Option<Uuid>,
rtc_id: Option<Uuid>,
time: Option<Time>,
active: Option<bool>,
offset: Option<i64>,
limit: Option<i64>,
}
Expand All @@ -123,6 +129,7 @@ impl ListQuery {
room_id: None,
rtc_id: None,
time: None,
active: None,
offset: None,
limit: None,
}
Expand All @@ -149,6 +156,13 @@ impl ListQuery {
}
}

pub(crate) fn active(self, active: bool) -> Self {
Self {
active: Some(active),
..self
}
}

pub(crate) fn offset(self, offset: i64) -> Self {
Self {
offset: Some(offset),
Expand All @@ -174,6 +188,11 @@ impl ListQuery {
if let Some(time) = self.time {
q = q.filter(sql("time && ").bind::<Tstzrange, _>(time));
}
match self.active {
None => (),
Some(true) => q = q.filter(sql(ACTIVE_SQL)),
Some(false) => q = q.filter(sql(&format!("not {}", ACTIVE_SQL))),
}
if let Some(offset) = self.offset {
q = q.offset(offset);
}
Expand All @@ -200,6 +219,7 @@ impl
Option<Uuid>,
Option<Uuid>,
Option<Time>,
Option<bool>,
Option<i64>,
Option<i64>,
)> for ListQuery
Expand All @@ -209,6 +229,7 @@ impl
Option<Uuid>,
Option<Uuid>,
Option<Time>,
Option<bool>,
Option<i64>,
Option<i64>,
),
Expand All @@ -217,8 +238,9 @@ impl
room_id: value.0,
rtc_id: value.1,
time: value.2,
offset: value.3,
limit: value.4,
active: value.3,
offset: value.4,
limit: value.5,
}
}
}
Expand Down

0 comments on commit 8f7f2c6

Please sign in to comment.