Skip to content
This repository has been archived by the owner on Aug 16, 2023. It is now read-only.

Commit

Permalink
set unbounded rooms as 6 hours long
Browse files Browse the repository at this point in the history
  • Loading branch information
khodzha committed Dec 7, 2020
1 parent 3a58024 commit 1017e3f
Show file tree
Hide file tree
Showing 4 changed files with 105 additions and 3 deletions.
7 changes: 7 additions & 0 deletions docs/src/api/room.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,13 @@ reserve | int | _optional_ | The number of slots for agents reserved o
tags | json | {} | Arbitrary tags object associated with the room.


Room can be unbounded, ie its closing timestamp is null.
To avoid rooms being stuck in unvacuumed state, room closure time will be set as 6 hours from first rtc creation.
This 6 hours duration serves as timeout for vacuums.

When the room closure time becomes bounded (either by creating rtc or it was bounded from the start),
closure=unbounded update is prohibited to avoid erasing this 6 hours timeout.

## Lifecycle events

### room.close event
Expand Down
2 changes: 0 additions & 2 deletions docs/src/api/room/update.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@

Update a Room which holds Real-Time Connections.



## Multicast request

**Properties**
Expand Down
20 changes: 20 additions & 0 deletions src/app/endpoint/room.rs
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,26 @@ impl RequestHandler for UpdateHandler {
}
}
}
// the case when new end is unbouned is special
// we must take care not to erase max webinar duration timeout
(new_opened_at, Bound::Unbounded) => {
let (start, end) = room.time();
let new_start = match start {
Bound::Included(opened_at) if *opened_at <= Utc::now() => {
Bound::Included(*opened_at)
}
Bound::Included(_) => new_opened_at,
_ => {
return Err(anyhow!("Invalid room time"))
.error(AppErrorKind::InvalidRoomTime)
}
};
// if new end should be unbounded there are two cases
// old end is unbounded => end doesnt change
// old end is bounded => it means there already has been an attempt to stream or the room was bounded from creation
// thus we cant make the end unbounded again
time = Some((new_start, *end))
}
_ => return Err(anyhow!("Invalid room time")).error(AppErrorKind::InvalidRoomTime),
}
}
Expand Down
79 changes: 78 additions & 1 deletion src/app/endpoint/rtc.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
use std::fmt;
use std::ops::Bound;

use async_std::stream;
use async_trait::async_trait;
use chrono::Duration;
use chrono::Utc;
use serde_derive::{Deserialize, Serialize};
use svc_agent::{
mqtt::{IncomingRequestProperties, IntoPublishableMessage, OutgoingResponse, ResponseStatus},
Expand Down Expand Up @@ -32,6 +35,8 @@ pub(crate) type ConnectResponse = OutgoingResponse<ConnectResponseData>;

////////////////////////////////////////////////////////////////////////////////

const MAX_WEBINAR_DURATION: i64 = 6;

#[derive(Debug, Deserialize)]
pub(crate) struct CreateRequest {
room_id: Uuid,
Expand Down Expand Up @@ -64,7 +69,24 @@ impl RequestHandler for CreateHandler {
// Create an rtc.
let rtc = {
let conn = context.get_conn()?;
db::rtc::InsertQuery::new(room.id()).execute(&conn)?

conn.transaction::<_, diesel::result::Error, _>(|| {
match room.time() {
(start, Bound::Unbounded) => {
let new_time = (
*start,
Bound::Excluded(Utc::now() + Duration::hours(MAX_WEBINAR_DURATION)),
);

db::room::UpdateQuery::new(room.id())
.time(Some(new_time))
.execute(&conn)?;
}
_ => {}
}
let rtc = db::rtc::InsertQuery::new(room.id()).execute(&conn)?;
Ok(rtc)
})?
};

context.add_logger_tags(o!("rtc_id" => rtc.id().to_string()));
Expand Down Expand Up @@ -390,6 +412,9 @@ impl RequestHandler for ConnectHandler {
#[cfg(test)]
mod test {
mod create {
use chrono::{SubsecRound, Utc};

use crate::db::room::FindQueryable;
use crate::db::rtc::Object as Rtc;
use crate::test_helpers::prelude::*;

Expand Down Expand Up @@ -435,6 +460,58 @@ mod test {
});
}

#[test]
fn create_in_unbounded_room() {
async_std::task::block_on(async {
let db = TestDb::new();
let mut authz = TestAuthz::new();

// Insert a room.
let room = db
.connection_pool()
.get()
.map(|conn| {
factory::Room::new()
.audience(USR_AUDIENCE)
.time((
Bound::Included(Utc::now().trunc_subsecs(0)),
Bound::Unbounded,
))
.backend(crate::db::room::RoomBackend::Janus)
.insert(&conn)
})
.unwrap();

// Allow user to create rtcs in the room.
let agent = TestAgent::new("web", "user123", USR_AUDIENCE);
let room_id = room.id().to_string();
let object = vec!["rooms", &room_id, "rtcs"];
authz.allow(agent.account_id(), object, "create");

// Make rtc.create request.
let mut context = TestContext::new(db, authz);
let payload = CreateRequest { room_id: room.id() };

let messages = handle_request::<CreateHandler>(&mut context, &agent, payload)
.await
.expect("Rtc creation failed");

// Assert response.
let (rtc, respp) = find_response::<Rtc>(messages.as_slice());
assert_eq!(respp.status(), ResponseStatus::CREATED);
assert_eq!(rtc.room_id(), room.id());

// Assert room closure is not unbounded
let conn = context.db().get().expect("Failed to get conn");

let room = db::room::FindQuery::new(room.id())
.execute(&conn)
.expect("Db query failed")
.expect("Room must exist");
assert_ne!(room.time().1, Bound::Unbounded);
});
}

#[test]
fn create_rtc_missing_room() {
async_std::task::block_on(async {
Expand Down

0 comments on commit 1017e3f

Please sign in to comment.