Skip to content
This repository has been archived by the owner on Aug 16, 2023. It is now read-only.

Commit

Permalink
Send backend and bucket from config to stream.upload (#174)
Browse files Browse the repository at this point in the history
  • Loading branch information
feymartynov committed Oct 19, 2020
1 parent cb0d28f commit 8cc41bf
Show file tree
Hide file tree
Showing 8 changed files with 70 additions and 20 deletions.
4 changes: 4 additions & 0 deletions App.toml.sample
Original file line number Diff line number Diff line change
Expand Up @@ -26,5 +26,9 @@ default_timeout = 5
stream_upload_timeout = 600
transaction_watchdog_check_period = 1

[upload."example.net"]
backend = "EXAMPLE"
bucket = "origin.webinars.example.net"

[metrics.http]
bind_address = "0.0.0.0:8087"
1 change: 1 addition & 0 deletions docs/src/api/errors.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ The following types are a part of the service's API and are guaranteed to mainta
- `backend_request_timed_out` – The backend request didn't finished in a reasonable time.
- `backend_not_found` – The backend that hosted the RTC went offline.
- `capacity_exceeded` – There's no free capacity left on the backend to connect to.
- `config_key_missing` – The service couldn't perform an operation due to misconfiguration.
- `database_connection_acquisition_failed` – The service couldn't obtain a DB connection from the pool.
- `database_query_failed` – The database returned an error while executing a query.
- `invalid_jsep_format` – Failed to determine whether the SDP is recvonly.
Expand Down
50 changes: 36 additions & 14 deletions src/app/endpoint/system.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use std::ops::Bound;
use std::result::Result as StdResult;

use async_std::stream;
use async_trait::async_trait;
Expand All @@ -13,9 +14,12 @@ use uuid::Uuid;

use crate::app::context::Context;
use crate::app::endpoint::prelude::*;
use crate::app::error::Error as AppError;
use crate::backend::janus::requests::UploadStreamRequestBody;
use crate::config::UploadConfig;
use crate::db;
use crate::db::recording::Status as RecordingStatus;
use crate::db::recording::{Object as Recording, Status as RecordingStatus};
use crate::db::room::Object as Room;

////////////////////////////////////////////////////////////////////////////////

Expand Down Expand Up @@ -84,6 +88,8 @@ impl RequestHandler for VacuumHandler {
.room_id(room.id())
.execute(&conn)?;

let config = upload_config(context, &room)?;

// TODO: Send the error as an event to "app/${APP}/audiences/${AUD}" topic
let backreq = context
.janus_client()
Expand All @@ -93,7 +99,8 @@ impl RequestHandler for VacuumHandler {
backend.handle_id(),
UploadStreamRequestBody::new(
recording.rtc_id(),
&bucket_name(&room),
&config.backend,
&config.bucket,
&record_name(&recording),
),
backend.id(),
Expand Down Expand Up @@ -122,26 +129,31 @@ impl RequestHandler for VacuumHandler {

////////////////////////////////////////////////////////////////////////////////

pub(crate) fn upload_event<I>(
pub(crate) fn upload_event<C: Context, I>(
context: &C,
room: &db::room::Object,
recordings: I,
start_timestamp: DateTime<Utc>,
tracking: &TrackingProperties,
) -> anyhow::Result<RoomUploadEvent>
) -> StdResult<RoomUploadEvent, AppError>
where
I: Iterator<Item = db::recording::Object>,
{
let mut event_entries = Vec::new();

for recording in recordings {
let uri = match recording.status() {
RecordingStatus::InProgress => bail!(
"Unexpected recording in in_progress status, rtc_id = '{}'",
recording.rtc_id()
),
RecordingStatus::InProgress => {
let err = anyhow!(
"Unexpected recording in in_progress status, rtc_id = '{}'",
recording.rtc_id(),
);

return Err(err).error(AppErrorKind::MessageBuildingFailed)?;
}
RecordingStatus::Missing => None,
RecordingStatus::Ready => Some(format!(
"s3://{}/{}",
bucket_name(&room),
&upload_config(context, &room)?.bucket,
record_name(&recording)
)),
};
Expand All @@ -158,7 +170,7 @@ where
}

let uri = format!("audiences/{}/events", room.audience());
let timing = ShortTermTimingProperties::until_now(start_timestamp);
let timing = ShortTermTimingProperties::until_now(context.start_timestamp());
let mut props = OutgoingEventProperties::new("room.upload", timing);
props.set_tracking(tracking.to_owned());

Expand All @@ -170,11 +182,19 @@ where
Ok(OutgoingEvent::broadcast(event, props, &uri))
}

fn bucket_name(room: &db::room::Object) -> String {
format!("origin.webinar.{}", room.audience())
fn upload_config<'a, C: Context>(
context: &'a C,
room: &Room,
) -> StdResult<&'a UploadConfig, AppError> {
context
.config()
.upload
.get(room.audience())
.ok_or_else(|| anyhow!("Missing upload configuration for the room's audience"))
.error(AppErrorKind::ConfigKeyMissing)
}

fn record_name(recording: &db::recording::Object) -> String {
fn record_name(recording: &Recording) -> String {
format!("{}.source.webm", recording.rtc_id())
}

Expand Down Expand Up @@ -207,6 +227,7 @@ mod test {
struct VacuumJanusRequestBody {
method: String,
id: Uuid,
backend: String,
bucket: String,
object: String,
}
Expand Down Expand Up @@ -293,6 +314,7 @@ mod test {
body: VacuumJanusRequestBody {
method: "stream.upload".to_string(),
id: rtc.id(),
backend: String::from("EXAMPLE"),
bucket: format!("origin.webinar.{}", USR_AUDIENCE).to_string(),
object: format!("{}.source.webm", rtc.id()).to_string(),
}
Expand Down
6 changes: 6 additions & 0 deletions src/app/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ pub(crate) enum ErrorKind {
BackendRequestTimedOut,
BackendNotFound,
CapacityExceeded,
ConfigKeyMissing,
DbConnAcquisitionFailed,
DbQueryFailed,
InvalidJsepFormat,
Expand Down Expand Up @@ -69,6 +70,11 @@ impl Into<(ResponseStatus, &'static str, &'static str)> for ErrorKind {
"backend_not_found",
"Backend not found",
),
Self::ConfigKeyMissing => (
ResponseStatus::UNPROCESSABLE_ENTITY,
"config_key_missing",
"Config key missing",
),
Self::CapacityExceeded => (
ResponseStatus::SERVICE_UNAVAILABLE,
"capacity_exceeded",
Expand Down
6 changes: 2 additions & 4 deletions src/backend/janus/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -400,13 +400,11 @@ async fn handle_response_impl<C: Context>(

// Send room.upload event.
let event = endpoint::system::upload_event(
context,
&room,
recs.into_iter(),
context.start_timestamp(),
respp.tracking(),
)
.map_err(|err| err.context("Error creating a system event"))
.error(AppErrorKind::MessageBuildingFailed)?;
)?;

let event_box = Box::new(event) as Box<dyn IntoPublishableMessage + Send>;
Ok(Box::new(stream::once(event_box)) as MessageStream)
Expand Down
4 changes: 3 additions & 1 deletion src/backend/janus/requests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -127,15 +127,17 @@ impl ReadStreamRequestBody {
pub(crate) struct UploadStreamRequestBody {
method: &'static str,
id: Uuid,
backend: String,
bucket: String,
object: String,
}

impl UploadStreamRequestBody {
pub(crate) fn new(id: Uuid, bucket: &str, object: &str) -> Self {
pub(crate) fn new(id: Uuid, backend: &str, bucket: &str, object: &str) -> Self {
Self {
method: STREAM_UPLOAD_METHOD,
id,
backend: backend.to_owned(),
bucket: bucket.to_owned(),
object: object.to_owned(),
}
Expand Down
11 changes: 11 additions & 0 deletions src/config.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use std::collections::HashMap;

use serde_derive::Deserialize;
use svc_agent::{mqtt::AgentConfig, AccountId};
use svc_authn::jose::Algorithm;
Expand All @@ -14,6 +16,7 @@ pub(crate) struct Config {
pub(crate) mqtt: AgentConfig,
pub(crate) sentry: Option<SentryConfig>,
pub(crate) backend: BackendConfig,
pub(crate) upload: UploadConfigMap,
#[serde(default)]
pub(crate) telemetry: TelemetryConfig,
#[serde(default)]
Expand Down Expand Up @@ -44,6 +47,14 @@ pub(crate) struct BackendConfig {
pub(crate) transaction_watchdog_check_period: u64,
}

pub(crate) type UploadConfigMap = HashMap<String, UploadConfig>;

#[derive(Clone, Debug, Deserialize)]
pub(crate) struct UploadConfig {
pub(crate) backend: String,
pub(crate) bucket: String,
}

#[derive(Clone, Debug, Deserialize, Default)]
pub(crate) struct TelemetryConfig {
pub(crate) id: Option<AccountId>,
Expand Down
8 changes: 7 additions & 1 deletion src/test_helpers/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use crate::db::ConnectionPool as Db;

use super::authz::TestAuthz;
use super::db::TestDb;
use super::SVC_AUDIENCE;
use super::{SVC_AUDIENCE, USR_AUDIENCE};

///////////////////////////////////////////////////////////////////////////////

Expand All @@ -42,6 +42,12 @@ fn build_config() -> Config {
"default_timeout": 5,
"stream_upload_timeout": 600,
"transaction_watchdog_check_period": 1,
},
"upload": {
USR_AUDIENCE: {
"backend": "EXAMPLE",
"bucket": format!("origin.webinar.{}", USR_AUDIENCE),
}
}
});

Expand Down

0 comments on commit 8cc41bf

Please sign in to comment.