Skip to content
This repository has been archived by the owner on Aug 16, 2023. It is now read-only.

Commit

Permalink
Merge pull request #97 from netology-group/feature/ULMS-462
Browse files Browse the repository at this point in the history
Send rtc_stream.update on publisher detach
  • Loading branch information
feymartynov committed Apr 23, 2020
2 parents a6845e4 + 2f312c3 commit ac2dd1a
Show file tree
Hide file tree
Showing 4 changed files with 40 additions and 24 deletions.
17 changes: 13 additions & 4 deletions src/app/endpoint/subscription.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use svc_agent::mqtt::{
IncomingEvent, IncomingEventProperties, IntoPublishableDump, OutgoingEvent, ResponseStatus,
ShortTermTimingProperties,
};
use svc_agent::AgentId;
use svc_agent::{AccountId, AgentId};
use svc_authn::Authenticable;
use svc_error::Error as SvcError;
use uuid::Uuid;
Expand Down Expand Up @@ -40,13 +40,15 @@ impl RoomEnterLeaveEventData {
pub(crate) struct State {
broker_account_id: svc_agent::AccountId,
db: ConnectionPool,
me: AgentId,
}

impl State {
pub(crate) fn new(broker_account_id: svc_agent::AccountId, db: ConnectionPool) -> Self {
pub(crate) fn new(broker_account_id: AccountId, db: ConnectionPool, me: AgentId) -> Self {
Self {
broker_account_id,
db,
me,
}
}
}
Expand Down Expand Up @@ -132,6 +134,7 @@ impl State {
backend.handle_id(),
agent_id,
backend.id(),
&self.me,
evt.properties().tracking(),
);

Expand Down Expand Up @@ -219,8 +222,14 @@ mod test {
use super::*;

fn build_state(db: &TestDb) -> State {
let account_id = svc_agent::AccountId::new("mqtt-gateway", AUDIENCE);
State::new(account_id, db.connection_pool().clone())
let broker_account_id = svc_agent::AccountId::new("mqtt-gateway", AUDIENCE);
let me = TestAgent::new("alpha", "conference", AUDIENCE);

State::new(
broker_account_id,
db.connection_pool().clone(),
me.agent_id().to_owned(),
)
}

#[test]
Expand Down
29 changes: 16 additions & 13 deletions src/app/janus.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use std::str::FromStr;

use chrono::{DateTime, Duration, NaiveDateTime, Utc};
use failure::{err_msg, format_err, Error};
use log::{info, warn};
Expand Down Expand Up @@ -501,22 +503,22 @@ impl AgentLeaveRequestBody {
}
}

pub(crate) fn agent_leave_request<M>(
pub(crate) fn agent_leave_request<T, M>(
evp: IncomingEventProperties,
session_id: i64,
handle_id: i64,
agent_id: &AgentId,
to: &T,
me: &M,
tracking: &TrackingProperties,
) -> Result<OutgoingRequest<MessageRequest>, Error>
where
T: Addressable,
M: Addressable,
{
let to = evp.as_agent_id().to_owned();

let mut props = OutgoingRequestProperties::new(
"janus_conference_agent.leave",
&response_topic(&to, me)?,
&response_topic(to, me)?,
&generate_correlation_data(),
ShortTermTimingProperties::new(Utc::now()),
);
Expand All @@ -537,7 +539,7 @@ where
Ok(OutgoingRequest::unicast(
payload,
props,
&to,
to,
JANUS_API_VERSION,
))
}
Expand Down Expand Up @@ -982,6 +984,7 @@ where
)),
}
}

pub(crate) async fn handle_event(
payload: Arc<Vec<u8>>,
janus: Arc<State>,
Expand All @@ -992,15 +995,15 @@ pub(crate) async fn handle_event(

match message.payload() {
IncomingEvent::WebRtcUp(ref inev) => {
use std::str::FromStr;
let rtc_stream_id = Uuid::from_str(inev.opaque_id())?;

let conn = janus.db.get()?;

// If the event relates to a publisher's handle,
// we will find the corresponding stream and send event w/ updated stream object
// to the room's topic.
if let Some(rtc_stream) = janus_rtc_stream::start(rtc_stream_id, &conn)? {
let rtc_id = rtc_stream.rtc_id();

let room = room::FindQuery::new()
.time(room::now())
.rtc_id(rtc_id)
Expand All @@ -1019,16 +1022,16 @@ pub(crate) async fn handle_event(
Ok(vec![])
}
}
IncomingEvent::HangUp(ref inev) => {
use std::str::FromStr;
IncomingEvent::Detached(ref inev) => {
let rtc_stream_id = Uuid::from_str(inev.opaque_id())?;

let conn = janus.db.get()?;

// If the event relates to a publisher's handle,
// we will find the corresponding stream and send event w/ updated stream object
// to the room's topic.
if let Some(rtc_stream) = janus_rtc_stream::stop(rtc_stream_id, &conn)? {
let rtc_id = rtc_stream.rtc_id();

let room = room::FindQuery::new()
.time(room::now())
.rtc_id(rtc_id)
Expand All @@ -1051,10 +1054,10 @@ pub(crate) async fn handle_event(

Ok(vec![])
}
IncomingEvent::Media(_)
IncomingEvent::HangUp(_)
| IncomingEvent::Media(_)
| IncomingEvent::Timeout(_)
| IncomingEvent::SlowLink(_)
| IncomingEvent::Detached(_) => {
| IncomingEvent::SlowLink(_) => {
// Ignore these kinds of events.
Ok(vec![])
}
Expand Down
6 changes: 5 additions & 1 deletion src/app/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,11 @@ pub(crate) async fn run(db: &ConnectionPool, authz_cache: Option<AuthzCache>) ->
rtc_signal: endpoint::rtc_signal::State::new(authz.clone(), db.clone(), agent_id.clone()),
rtc_stream: endpoint::rtc_stream::State::new(authz.clone(), db.clone()),
message: endpoint::message::State::new(agent_id.clone(), db.clone()),
subscription: endpoint::subscription::State::new(config.broker_id, db.clone()),
subscription: endpoint::subscription::State::new(
config.broker_id,
db.clone(),
agent_id.clone(),
),
system: endpoint::system::State::new(agent_id.clone(), authz.clone(), db.clone()),
});

Expand Down
12 changes: 6 additions & 6 deletions src/backend/janus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -344,12 +344,6 @@ pub(crate) struct HangUpEvent {
reason: String,
}

impl HangUpEvent {
pub(crate) fn opaque_id(&self) -> &str {
&self.opaque_id
}
}

////////////////////////////////////////////////////////////////////////////////

// Audio or video bytes being received by plugin handle
Expand Down Expand Up @@ -396,6 +390,12 @@ pub(crate) struct DetachedEvent {
opaque_id: String,
}

impl DetachedEvent {
pub(crate) fn opaque_id(&self) -> &str {
&self.opaque_id
}
}

////////////////////////////////////////////////////////////////////////////////

// Janus Gateway actual status
Expand Down

0 comments on commit ac2dd1a

Please sign in to comment.