Skip to content
This repository has been archived by the owner on Aug 16, 2023. It is now read-only.

Commit

Permalink
Add rtc.created_by to room.upload event (#234)
Browse files Browse the repository at this point in the history
  • Loading branch information
feymartynov committed Apr 1, 2021
1 parent eac8b42 commit 6b4afcd
Show file tree
Hide file tree
Showing 3 changed files with 64 additions and 38 deletions.
15 changes: 10 additions & 5 deletions src/app/endpoint/system.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,12 @@ use async_std::stream;
use async_trait::async_trait;
use chrono::{DateTime, Utc};
use serde_derive::{Deserialize, Serialize};
use svc_agent::mqtt::{
IncomingRequestProperties, IntoPublishableMessage, OutgoingEvent, OutgoingEventProperties,
OutgoingMessage, ShortTermTimingProperties, TrackingProperties,
use svc_agent::{
mqtt::{
IncomingRequestProperties, IntoPublishableMessage, OutgoingEvent, OutgoingEventProperties,
OutgoingMessage, ShortTermTimingProperties, TrackingProperties,
},
AgentId,
};
use svc_authn::Authenticable;
use uuid::Uuid;
Expand Down Expand Up @@ -45,6 +48,7 @@ struct RtcUploadEventData {
started_at: Option<DateTime<Utc>>,
#[serde(skip_serializing_if = "Option::is_none")]
uri: Option<String>,
created_by: AgentId,
}

pub(crate) type RoomUploadEvent = OutgoingMessage<RoomUploadEventData>;
Expand Down Expand Up @@ -136,11 +140,11 @@ pub(crate) fn upload_event<C: Context, I>(
tracking: &TrackingProperties,
) -> StdResult<RoomUploadEvent, AppError>
where
I: Iterator<Item = db::recording::Object>,
I: Iterator<Item = (db::recording::Object, db::rtc::Object)>,
{
let mut event_entries = Vec::new();

for recording in recordings {
for (recording, rtc) in recordings {
let uri = match recording.status() {
RecordingStatus::InProgress => {
let err = anyhow!(
Expand All @@ -164,6 +168,7 @@ where
uri,
segments: recording.segments().to_owned(),
started_at: recording.started_at().to_owned(),
created_by: rtc.created_by().to_owned(),
};

event_entries.push(entry);
Expand Down
59 changes: 27 additions & 32 deletions src/backend/janus/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -351,7 +351,9 @@ async fn handle_response_impl<C: Context>(
})?;

// if vacuuming was already started by previous request - just do nothing
let maybe_already_running = plugin_data.get("state").and_then(|v| v.as_str()) == Some(ALREADY_RUNNING_STATE);
let maybe_already_running =
plugin_data.get("state").and_then(|v| v.as_str())
== Some(ALREADY_RUNNING_STATE);
if maybe_already_running {
return Ok(Box::new(stream::empty()) as MessageStream);
}
Expand All @@ -362,7 +364,9 @@ async fn handle_response_impl<C: Context>(
.error(AppErrorKind::MessageParsingFailed)
.and_then(|val| {
let unix_ts = serde_json::from_value::<u64>(val.clone())
.map_err(|err| anyhow!("Invalid value for 'started_at': {}", err))
.map_err(|err| {
anyhow!("Invalid value for 'started_at': {}", err)
})
.error(AppErrorKind::MessageParsingFailed)?;

let naive_datetime = NaiveDateTime::from_timestamp(
Expand All @@ -388,7 +392,10 @@ async fn handle_response_impl<C: Context>(
.collect())
})?;

let (room, rtcs, recs): (room::Object, Vec<rtc::Object>, Vec<recording::Object>) = {
let (room, rtcs_with_recs): (
room::Object,
Vec<(rtc::Object, Option<recording::Object>)>,
) = {
let conn = context.get_conn()?;

recording::UpdateQuery::new(rtc_id)
Expand All @@ -409,49 +416,37 @@ async fn handle_response_impl<C: Context>(
endpoint::helpers::RoomTimeRequirement::Any,
)?;

// TODO: move to db module
use diesel::prelude::*;
let rtcs = rtc::Object::belonging_to(&room).load(&conn)?;
let recs = recording::Object::belonging_to(&rtcs).load(&conn)?;
let rtcs_with_recs =
rtc::ListWithRecordingQuery::new(room.id()).execute(&conn)?;

(room, rtcs, recs)
(room, rtcs_with_recs)
};

// Ensure that all rtcs have a recording.
let mut rtc_ids_with_recs = recs
.iter()
.map(|rec| rec.rtc_id());

for rtc in rtcs {
let id = rtc.id();
if !rtc_ids_with_recs.any(|v| v == id) {
let mut logger = context.logger().new(o!(
"room_id" => room.id().to_string(),
"rtc_id" => rtc.id().to_string(),
));

if let Some(scope) = room.tags().get("scope") {
logger = logger.new(o!("scope" => scope.to_string()));
}

info!(
logger,
"postpone 'room.upload' event because still waiting for rtcs being uploaded";
);
let rtcs_total = rtcs_with_recs.len();

return Ok(Box::new(stream::empty()) as MessageStream);
}
let recs_with_rtcs = rtcs_with_recs
.into_iter()
.filter_map(|(rtc, maybe_recording)| {
maybe_recording.map(|recording| (recording, rtc))
})
.collect::<Vec<_>>();

if recs_with_rtcs.len() < rtcs_total {
return Ok(Box::new(stream::empty()) as MessageStream);
}

// Send room.upload event.
let event = endpoint::system::upload_event(
context,
&room,
recs.into_iter(),
recs_with_rtcs.into_iter(),
respp.tracking(),
)?;

let event_box = Box::new(event) as Box<dyn IntoPublishableMessage + Send>;
let event_box =
Box::new(event) as Box<dyn IntoPublishableMessage + Send>;

Ok(Box::new(stream::once(event_box)) as MessageStream)
})
}
Expand Down
28 changes: 27 additions & 1 deletion src/db/rtc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,9 @@ use serde_derive::{Deserialize, Serialize};
use svc_agent::AgentId;
use uuid::Uuid;

use super::recording::Object as Recording;
use super::room::Object as Room;
use crate::schema::rtc;
use crate::schema::{recording, rtc};

////////////////////////////////////////////////////////////////////////////////

Expand Down Expand Up @@ -162,6 +163,31 @@ impl<'a> ListQuery<'a> {

////////////////////////////////////////////////////////////////////////////////

#[derive(Default)]
pub(crate) struct ListWithRecordingQuery {
room_id: Uuid,
}

impl ListWithRecordingQuery {
pub(crate) fn new(room_id: Uuid) -> Self {
Self { room_id }
}

pub(crate) fn execute(
&self,
conn: &PgConnection,
) -> Result<Vec<(Object, Option<Recording>)>, Error> {
use diesel::prelude::*;

rtc::table
.left_join(recording::table)
.filter(rtc::room_id.eq(self.room_id))
.get_results(conn)
}
}

////////////////////////////////////////////////////////////////////////////////

#[derive(Debug, Insertable)]
#[table_name = "rtc"]
pub(crate) struct InsertQuery<'a> {
Expand Down

0 comments on commit 6b4afcd

Please sign in to comment.