Skip to content
This repository has been archived by the owner on Aug 16, 2023. It is now read-only.

Commit

Permalink
Relate recordings to backends w/o mediation of streams (#150)
Browse files Browse the repository at this point in the history
  • Loading branch information
feymartynov committed Sep 17, 2020
1 parent b24105a commit dba75e2
Show file tree
Hide file tree
Showing 13 changed files with 208 additions and 97 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
DROP INDEX recording_rtc_id_idx;

ALTER TABLE janus_rtc_stream
DROP CONSTRAINT janus_rtc_stream_backend_id_fkey;

ALTER TABLE recording DROP COLUMN backend_id;
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
ALTER TABLE recording ADD COLUMN backend_id agent_id NULL;
UPDATE recording SET backend_id = '("(janus,svc.netology-group.services)",unknown)'::agent_id;
ALTER TABLE recording ALTER COLUMN backend_id SET NOT NULL;

UPDATE recording AS rec
SET backend_id = jrs.backend_id
FROM janus_rtc_stream AS jrs
WHERE jrs.rtc_id = rec.rtc_id;

ALTER TABLE janus_rtc_stream
ADD CONSTRAINT janus_rtc_stream_backend_id_fkey
FOREIGN KEY (backend_id)
REFERENCES janus_backend(id)
ON DELETE CASCADE;

CREATE UNIQUE INDEX recording_rtc_id_idx ON recording (rtc_id);
29 changes: 29 additions & 0 deletions src/app/endpoint/metric.rs
Original file line number Diff line number Diff line change
Expand Up @@ -199,3 +199,32 @@ fn append_janus_stats(

Ok(())
}

#[cfg(test)]
mod tests {
use super::*;

#[derive(Deserialize)]
struct DynamicMetric {
metric: String,
value: u64,
timestamp: DateTime<Utc>,
}

#[test]
fn serialize_dynamic_metric() {
let now = Utc::now();

let json = serde_json::json!(Metric::Dynamic {
key: String::from("example"),
value: MetricValue::new(123, now),
});

let parsed: DynamicMetric =
serde_json::from_str(&json.to_string()).expect("Failed to parse json");

assert_eq!(&parsed.metric, "apps.conference.example_total");
assert_eq!(parsed.value, 123);
assert_eq!(parsed.timestamp, now);
}
}
6 changes: 6 additions & 0 deletions src/app/endpoint/rtc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -324,6 +324,12 @@ impl RequestHandler for ConnectHandler {
.status(ResponseStatus::SERVICE_UNAVAILABLE)?,
};

// Create recording if a writer connects for the first time.
if payload.intent == ConnectIntent::Write && maybe_rtc_stream.is_none() {
db::recording::InsertQuery::new(payload.id, backend.id())
.execute(&conn)?;
}

// Check that the backend's capacity is not exceeded for readers.
if payload.intent == ConnectIntent::Read
&& db::janus_backend::free_capacity(payload.id, &conn)? == 0
Expand Down
30 changes: 15 additions & 15 deletions src/app/endpoint/system.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,24 +74,22 @@ impl RequestHandler for VacuumHandler {

let mut requests = Vec::new();
let conn = context.db().get()?;
let rooms = db::room::finished_without_recordings(&conn)?;
let rooms = db::room::finished_with_in_progress_recordings(&conn)?;

for (room, rtc, backend) in rooms.into_iter() {
for (room, recording, backend) in rooms.into_iter() {
db::agent::DeleteQuery::new()
.room_id(room.id())
.execute(&conn)?;

db::recording::InsertQuery::new(rtc.id()).execute(&conn)?;

// TODO: Send the error as an event to "app/${APP}/audiences/${AUD}" topic
let backreq = janus::upload_stream_request(
reqp,
backend.session_id(),
backend.handle_id(),
janus::UploadStreamRequestBody::new(
rtc.id(),
recording.rtc_id(),
&bucket_name(&room),
&record_name(&rtc),
&record_name(&recording),
),
backend.id(),
context.agent_id(),
Expand All @@ -111,28 +109,30 @@ impl RequestHandler for VacuumHandler {

pub(crate) fn upload_event<I>(
room: &db::room::Object,
rtcs_and_recordings: I,
recordings: I,
start_timestamp: DateTime<Utc>,
tracking: &TrackingProperties,
) -> anyhow::Result<RoomUploadEvent>
where
I: Iterator<Item = (db::rtc::Object, db::recording::Object)>,
I: Iterator<Item = db::recording::Object>,
{
let mut event_entries = Vec::new();
for (rtc, recording) in rtcs_and_recordings {
for recording in recordings {
let uri = match recording.status() {
RecordingStatus::InProgress => bail!(
"Unexpected recording in in_progress status, rtc_id = '{}'",
recording.rtc_id()
),
RecordingStatus::Missing => None,
RecordingStatus::Ready => {
Some(format!("s3://{}/{}", bucket_name(&room), record_name(&rtc)))
}
RecordingStatus::Ready => Some(format!(
"s3://{}/{}",
bucket_name(&room),
record_name(&recording)
)),
};

let entry = RtcUploadEventData {
id: rtc.id(),
id: recording.rtc_id(),
status: recording.status().to_owned(),
uri,
segments: recording.segments().to_owned(),
Expand All @@ -159,8 +159,8 @@ fn bucket_name(room: &db::room::Object) -> String {
format!("origin.webinar.{}", room.audience())
}

fn record_name(rtc: &db::rtc::Object) -> String {
format!("{}.source.webm", rtc.id())
fn record_name(recording: &db::recording::Object) -> String {
format!("{}.source.webm", recording.rtc_id())
}

///////////////////////////////////////////////////////////////////////////////
Expand Down
58 changes: 23 additions & 35 deletions src/app/janus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -946,7 +946,7 @@ async fn handle_response_impl<C: Context>(
.collect())
})?;

let (room, rtcs, recs) = {
let (room, rtcs, recs): (room::Object, Vec<rtc::Object>, Vec<recording::Object>) = {
let conn = context.db().get()?;

recording::UpdateQuery::new(rtc_id)
Expand Down Expand Up @@ -980,52 +980,40 @@ async fn handle_response_impl<C: Context>(
// TODO: move to db module
use diesel::prelude::*;
let rtcs = rtc::Object::belonging_to(&room).load(&conn)?;
let recs = recording::Object::belonging_to(&rtcs).load(&conn)?.grouped_by(&rtcs);
let recs = recording::Object::belonging_to(&rtcs).load(&conn)?;

(room, rtcs, recs)
};

let maybe_rtcs_and_recordings: Option<Vec<(rtc::Object, recording::Object)>> = rtcs
.into_iter()
.zip(recs)
.map(|(rtc, rtc_recs)| {
if rtc_recs.len() > 1 {
warn!(
"there must be at most 1 recording for an rtc, got {} for the room = '{}', rtc = '{}'; using the first one, ignoring the rest",
rtc_recs.len(),
room.id(),
rtc.id(),
);
}

rtc_recs.into_iter().next().map(|rec| (rtc, rec))
})
.collect();

match maybe_rtcs_and_recordings {
Some(rtcs_and_recordings) => {
let event = endpoint::system::upload_event(
&room,
rtcs_and_recordings.into_iter(),
start_timestamp,
respp.tracking(),
)
.map_err(|e| format!("error creating a system event, {}", e))
.status(ResponseStatus::UNPROCESSABLE_ENTITY)?;
// Ensure that all rtcs have a recording.
let rtc_ids_with_recs = recs
.iter()
.map(|rec| rec.rtc_id())
.collect::<Vec<Uuid>>();

let event_box = Box::new(event) as Box<dyn IntoPublishableMessage + Send>;
Ok(Box::new(stream::once(event_box)) as MessageStream)
}
None => {
// Waiting for all the room's rtc being uploaded
for rtc in rtcs {
if !rtc_ids_with_recs.contains(&rtc.id()) {
info!(
"postpone 'room.upload' event because still waiting for rtcs being uploaded for the room = '{}'",
room.id(),
);

Ok(Box::new(stream::empty()) as MessageStream)
return Ok(Box::new(stream::empty()) as MessageStream);
}
}

// Send room.upload event.
let event = endpoint::system::upload_event(
&room,
recs.into_iter(),
start_timestamp,
respp.tracking(),
)
.map_err(|e| format!("error creating a system event, {}", e))
.status(ResponseStatus::UNPROCESSABLE_ENTITY)?;

let event_box = Box::new(event) as Box<dyn IntoPublishableMessage + Send>;
Ok(Box::new(stream::once(event_box)) as MessageStream)
})
}
// An unsupported incoming Event message has been received
Expand Down
5 changes: 3 additions & 2 deletions src/db/janus_backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,15 @@ use uuid::Uuid;

use crate::schema::janus_backend;

type AllColumns = (
pub(crate) type AllColumns = (
janus_backend::id,
janus_backend::handle_id,
janus_backend::session_id,
janus_backend::created_at,
janus_backend::capacity,
);
pub const ALL_COLUMNS: AllColumns = (

pub(crate) const ALL_COLUMNS: AllColumns = (
janus_backend::id,
janus_backend::handle_id,
janus_backend::session_id,
Expand Down
31 changes: 27 additions & 4 deletions src/db/recording.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,33 @@ use std::ops::Bound;
use chrono::{DateTime, Utc};
use diesel::{pg::PgConnection, result::Error};
use serde_derive::{Deserialize, Serialize};
use svc_agent::AgentId;
use uuid::Uuid;

use super::janus_backend::Object as JanusBackend;
use super::rtc::Object as Rtc;
use crate::schema::recording;

////////////////////////////////////////////////////////////////////////////////

pub(crate) type AllColumns = (
recording::rtc_id,
recording::started_at,
recording::segments,
recording::status,
recording::backend_id,
);

pub(crate) const ALL_COLUMNS: AllColumns = (
recording::rtc_id,
recording::started_at,
recording::segments,
recording::status,
recording::backend_id,
);

////////////////////////////////////////////////////////////////////////////////

pub(crate) type Segment = (Bound<i64>, Bound<i64>);

#[derive(Clone, Copy, Debug, DbEnum, Deserialize, Serialize, PartialEq)]
Expand All @@ -35,6 +55,7 @@ impl fmt::Display for Status {

#[derive(Debug, Serialize, Identifiable, Associations, Queryable)]
#[belongs_to(Rtc, foreign_key = "rtc_id")]
#[belongs_to(JanusBackend, foreign_key = "backend_id")]
#[primary_key(rtc_id)]
#[table_name = "recording"]
pub(crate) struct Object {
Expand All @@ -43,6 +64,7 @@ pub(crate) struct Object {
started_at: Option<DateTime<Utc>>,
segments: Option<Vec<Segment>>,
status: Status,
backend_id: AgentId,
}

impl Object {
Expand All @@ -67,13 +89,14 @@ impl Object {

#[derive(Debug, Insertable)]
#[table_name = "recording"]
pub(crate) struct InsertQuery {
pub(crate) struct InsertQuery<'a> {
rtc_id: Uuid,
backend_id: &'a AgentId,
}

impl InsertQuery {
pub(crate) fn new(rtc_id: Uuid) -> Self {
Self { rtc_id }
impl<'a> InsertQuery<'a> {
pub(crate) fn new(rtc_id: Uuid, backend_id: &'a AgentId) -> Self {
Self { rtc_id, backend_id }
}

pub(crate) fn execute(self, conn: &PgConnection) -> Result<Object, Error> {
Expand Down
Loading

0 comments on commit dba75e2

Please sign in to comment.