Skip to content
This repository has been archived by the owner on Aug 16, 2023. It is now read-only.

Commit

Permalink
Add in_progress status for recordings and prevent stream cascade dele…
Browse files Browse the repository at this point in the history
…tion (#142)

* Add in_progress status for recordings

* Prevent stream cascade deletion
  • Loading branch information
feymartynov committed Aug 28, 2020
1 parent 445ae42 commit 084d03f
Show file tree
Hide file tree
Showing 9 changed files with 117 additions and 15 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
ALTER TABLE recording DROP CONSTRAINT recording_check;

DELETE FROM recording WHERE status = 'in_progress';
ALTER TYPE recording_status RENAME TO recording_status_old;
CREATE TYPE recording_status AS ENUM ('ready', 'missing');
ALTER TABLE recording ALTER COLUMN status DROP DEFAULT;
ALTER TABLE recording ALTER COLUMN status TYPE recording_status USING status::text::recording_status;
ALTER TABLE recording ALTER COLUMN status SET DEFAULT 'ready';
DROP TYPE recording_status_old;

ALTER TABLE recording ADD CONSTRAINT recording_check CHECK (
(
status = 'ready'
AND started_at IS NOT NULL
AND segments IS NOT NULL
) OR (
status = 'missing'
AND started_at IS NULL
AND segments IS NULL
)
);
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
ALTER TABLE recording DROP CONSTRAINT recording_check;

ALTER TYPE recording_status RENAME TO recording_status_old;
CREATE TYPE recording_status AS ENUM ('in_progress', 'ready', 'missing');
ALTER TABLE recording ALTER COLUMN status DROP DEFAULT;
ALTER TABLE recording ALTER COLUMN status TYPE recording_status USING status::text::recording_status;
ALTER TABLE recording ALTER COLUMN status SET DEFAULT 'in_progress';
DROP TYPE recording_status_old;

ALTER TABLE recording ADD CONSTRAINT recording_check CHECK (
(
status = 'ready'
AND started_at IS NOT NULL
AND segments IS NOT NULL
) OR (
status IN ('in_progress', 'missing')
AND started_at IS NULL
AND segments IS NULL
)
);
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
ALTER TABLE janus_rtc_stream
ADD CONSTRAINT janus_rtc_stream_backend_id_fkey
FOREIGN KEY (backend_id)
REFERENCES janus_backend(id)
ON DELETE CASCADE;
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
ALTER TABLE janus_rtc_stream
DROP CONSTRAINT janus_rtc_stream_backend_id_fkey;
22 changes: 19 additions & 3 deletions src/app/endpoint/system.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use std::ops::Bound;

use anyhow::bail;
use async_std::stream;
use async_trait::async_trait;
use chrono::{DateTime, Utc};
Expand All @@ -15,6 +16,7 @@ use crate::app::context::Context;
use crate::app::endpoint::prelude::*;
use crate::app::janus;
use crate::db;
use crate::db::recording::Status as RecordingStatus;

////////////////////////////////////////////////////////////////////////////////

Expand All @@ -27,7 +29,7 @@ pub(crate) struct RoomUploadEventData {
#[derive(Debug, Serialize)]
struct RtcUploadEventData {
id: Uuid,
status: db::recording::Status,
status: RecordingStatus,
#[serde(
serialize_with = "crate::serde::milliseconds_bound_tuples_option",
skip_serializing_if = "Option::is_none"
Expand Down Expand Up @@ -79,6 +81,8 @@ impl RequestHandler for VacuumHandler {
.room_id(room.id())
.execute(&conn)?;

db::recording::InsertQuery::new(rtc.id()).execute(&conn)?;

// TODO: Send the error as an event to "app/${APP}/audiences/${AUD}" topic
let backreq = janus::upload_stream_request(
reqp,
Expand Down Expand Up @@ -117,8 +121,12 @@ where
let mut event_entries = Vec::new();
for (rtc, recording) in rtcs_and_recordings {
let uri = match recording.status() {
db::recording::Status::Missing => None,
db::recording::Status::Ready => {
RecordingStatus::InProgress => bail!(
"Unexpected recording in in_progress status, rtc_id = '{}'",
recording.rtc_id()
),
RecordingStatus::Missing => None,
RecordingStatus::Ready => {
Some(format!("s3://{}/{}", bucket_name(&room), record_name(&rtc)))
}
};
Expand Down Expand Up @@ -275,6 +283,14 @@ mod test {
.filter(crate::schema::agent::room_id.eq(rtc.room_id()));

assert_eq!(query.execute(&conn).unwrap(), 0);

// Assert recording in `in_progress` status.
let recording = crate::schema::recording::table
.filter(crate::schema::recording::rtc_id.eq(rtc.id()))
.get_result::<crate::db::recording::Object>(&conn)
.expect("Failed to get recording from the DB");

assert_eq!(recording.status(), &RecordingStatus::InProgress);
}
});
}
Expand Down
6 changes: 4 additions & 2 deletions src/app/janus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -867,7 +867,8 @@ async fn handle_response_impl<C: Context>(
val if val == "404" => {
let conn = context.db().get()?;

recording::InsertQuery::new(tn.rtc_id, recording::Status::Missing)
recording::UpdateQuery::new(tn.rtc_id)
.status(recording::Status::Missing)
.execute(&conn)?;

let err = format!(
Expand Down Expand Up @@ -948,7 +949,8 @@ async fn handle_response_impl<C: Context>(
let (room, rtcs, recs) = {
let conn = context.db().get()?;

recording::InsertQuery::new(rtc_id, recording::Status::Ready)
recording::UpdateQuery::new(rtc_id)
.status(recording::Status::Ready)
.started_at(started_at)
.segments(segments)
.execute(&conn)?;
Expand Down
50 changes: 42 additions & 8 deletions src/db/recording.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ pub(crate) type Segment = (Bound<i64>, Bound<i64>);
#[PgType = "recording_status"]
#[DieselType = "Recording_status"]
pub(crate) enum Status {
#[serde(rename = "in_progress")]
InProgress,
Ready,
Missing,
}
Expand All @@ -44,6 +46,10 @@ pub(crate) struct Object {
}

impl Object {
pub(crate) fn rtc_id(&self) -> Uuid {
self.rtc_id
}

pub(crate) fn started_at(&self) -> &Option<DateTime<Utc>> {
&self.started_at
}
Expand All @@ -63,18 +69,47 @@ impl Object {
#[table_name = "recording"]
pub(crate) struct InsertQuery {
rtc_id: Uuid,
}

impl InsertQuery {
pub(crate) fn new(rtc_id: Uuid) -> Self {
Self { rtc_id }
}

pub(crate) fn execute(self, conn: &PgConnection) -> Result<Object, Error> {
use crate::schema::recording::dsl::recording;
use diesel::RunQueryDsl;

diesel::insert_into(recording).values(self).get_result(conn)
}
}

///////////////////////////////////////////////////////////////////////////////

#[derive(Debug, Identifiable, AsChangeset)]
#[table_name = "recording"]
#[primary_key(rtc_id)]
pub(crate) struct UpdateQuery {
rtc_id: Uuid,
status: Option<Status>,
started_at: Option<DateTime<Utc>>,
segments: Option<Vec<Segment>>,
status: Status,
}

impl InsertQuery {
pub(crate) fn new(rtc_id: Uuid, status: Status) -> Self {
impl UpdateQuery {
pub(crate) fn new(rtc_id: Uuid) -> Self {
Self {
rtc_id,
status: None,
started_at: None,
segments: None,
status,
}
}

pub(crate) fn status(self, status: Status) -> Self {
Self {
status: Some(status),
..self
}
}

Expand All @@ -92,10 +127,9 @@ impl InsertQuery {
}
}

pub(crate) fn execute(self, conn: &PgConnection) -> Result<Object, Error> {
use crate::schema::recording::dsl::recording;
use diesel::RunQueryDsl;
pub(crate) fn execute(&self, conn: &PgConnection) -> Result<Object, Error> {
use diesel::prelude::*;

diesel::insert_into(recording).values(self).get_result(conn)
diesel::update(self).set(self).get_result(conn)
}
}
5 changes: 4 additions & 1 deletion src/db/room.rs
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,10 @@ pub(crate) fn finished_without_recordings(
schema::rtc::table
.left_join(schema::recording::table)
.left_join(
schema::janus_rtc_stream::table.inner_join(schema::janus_backend::table),
schema::janus_rtc_stream::table
.inner_join(schema::janus_backend::table.on(
schema::janus_backend::id.eq(schema::janus_rtc_stream::backend_id),
)),
),
)
.filter(room::backend.ne(RoomBackend::None))
Expand Down
1 change: 0 additions & 1 deletion src/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,6 @@ table! {

joinable!(agent -> room (room_id));
joinable!(agent_stream -> agent (sent_by));
joinable!(janus_rtc_stream -> janus_backend (backend_id));
joinable!(janus_rtc_stream -> rtc (rtc_id));
joinable!(recording -> rtc (rtc_id));
joinable!(rtc -> room (room_id));
Expand Down

0 comments on commit 084d03f

Please sign in to comment.