Skip to content
This repository has been archived by the owner on Aug 16, 2023. It is now read-only.

Commit

Permalink
room.close notification
Browse files Browse the repository at this point in the history
  • Loading branch information
khodzha committed Oct 6, 2020
1 parent 0cd7083 commit cc1af9a
Show file tree
Hide file tree
Showing 6 changed files with 260 additions and 48 deletions.
18 changes: 18 additions & 0 deletions docs/src/api/room.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,21 @@ created_at | int | _required_ | Room creation timestamp in seconds.
backend | string | _required_ | Room backend, either `janus` or `none`.
reserve | int | _optional_ | The number of slots for agents reserved on the backend.
tags | json | {} | Arbitrary tags object associated with the room.


## Lifecycle events

### room.close event

If either
* the room was updated so that the closure datetime was moved from future into the past,
* the room was vacuumed

`room.close` event will be sent to room topic.
This event is not guaranteed to be unique, that is two `room.close` events could be sent by the service.

**URI:** `rooms/:room_id/events`

**Label:** `room.close`.

**Payload:** [room](#properties) object.
12 changes: 11 additions & 1 deletion docs/src/api/room/update.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,4 +36,14 @@ A notification is being sent to the _audience_ topic.

**Label:** `room.update`.

**Payload:** [room](../room.md#room) object.
**Payload:** [room](../room.md#properties) object.

If the room closure date had been in the future but was moved by the update into the past, a notification will be sent to the _room_ topic.
Clients should not rely on this notification being unique.
That is this notification can reoccur even if it was sent before.

**URI:** `rooms/:room_id/events`

**Label:** `room.close`.

**Payload:** [room](../room.md#properties) object.
147 changes: 134 additions & 13 deletions src/app/endpoint/room.rs
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,17 @@ impl RequestHandler for ReadHandler {

///////////////////////////////////////////////////////////////////////////////

pub(crate) type UpdateRequest = db::room::UpdateQuery;
#[derive(Debug, Deserialize, Default)]
pub(crate) struct UpdateRequest {
id: Uuid,
#[serde(default)]
#[serde(with = "crate::serde::ts_seconds_option_bound_tuple")]
time: Option<db::room::Time>,
audience: Option<String>,
backend: Option<db::room::RoomBackend>,
reserve: Option<Option<i32>>,
tags: Option<JsonValue>,
}
pub(crate) struct UpdateHandler;

#[async_trait]
Expand All @@ -178,16 +188,16 @@ impl RequestHandler for UpdateHandler {
payload: Self::Payload,
reqp: &IncomingRequestProperties,
) -> Result {
context.add_logger_tags(o!("room_id" => payload.id().to_string()));
context.add_logger_tags(o!("room_id" => payload.id.to_string()));

let room = {
let conn = context.db().get()?;

db::room::FindQuery::new()
.time(db::room::since_now())
.id(payload.id())
.id(payload.id)
.execute(&conn)?
.ok_or_else(|| format!("Room not found, id = '{}' or closed", payload.id()))
.ok_or_else(|| format!("Room not found, id = '{}' or closed", payload.id))
.status(ResponseStatus::NOT_FOUND)?
};

Expand All @@ -200,10 +210,19 @@ impl RequestHandler for UpdateHandler {
.authorize(room.audience(), reqp, object, "update")
.await?;

let room_was_open = !room.is_closed();

// Update room.
let room = {
let query = db::room::UpdateQuery::new(room.id())
.time(payload.time)
.audience(payload.audience)
.backend(payload.backend)
.reserve(payload.reserve)
.tags(payload.tags);

let conn = context.db().get()?;
payload.execute(&conn)?
query.execute(&conn)?
};

// Respond and broadcast to the audience topic.
Expand All @@ -218,12 +237,40 @@ impl RequestHandler for UpdateHandler {
let notification = shared::build_notification(
"room.update",
&format!("audiences/{}/events", room.audience()),
room,
room.clone(),
reqp,
context.start_timestamp(),
);

Ok(Box::new(stream::from_iter(vec![response, notification])))
let mut responses = vec![response, notification];

let append_closed_notification = || {
let closed_notification = shared::build_notification(
"room.close",
&format!("rooms/{}/events", room.id()),
room,
reqp,
context.start_timestamp(),
);
responses.push(closed_notification);
};

// Publish room closed notification
if room_was_open {
if let Some(time) = payload.time {
match time.1 {
Bound::Included(t) if Utc::now() > t => {
append_closed_notification();
}
Bound::Excluded(t) if Utc::now() >= t => {
append_closed_notification();
}
_ => {}
}
}
}

Ok(Box::new(stream::from_iter(responses)))
}
}

Expand Down Expand Up @@ -625,6 +672,7 @@ mod test {
use serde_json::json;

use crate::db::room::Object as Room;
use crate::test_helpers::find_event_by_predicate;
use crate::test_helpers::prelude::*;

use super::super::*;
Expand Down Expand Up @@ -666,10 +714,14 @@ mod test {
Bound::Excluded(now + Duration::hours(3)),
);

let payload = UpdateRequest::new(room.id())
.time(time)
.reserve(Some(123))
.tags(json!({"foo": "bar"}));
let payload = UpdateRequest {
id: room.id(),
time: Some(time),
reserve: Some(Some(123)),
tags: Some(json!({"foo": "bar"})),
audience: None,
backend: None,
};

let messages = handle_request::<UpdateHandler>(&mut context, &agent, payload)
.await
Expand All @@ -687,12 +739,78 @@ mod test {
});
}

#[test]
fn update_and_close_room() {
async_std::task::block_on(async {
let db = TestDb::new();
let now = Utc::now().trunc_subsecs(0);

let room = {
let conn = db
.connection_pool()
.get()
.expect("Failed to get DB connection");

// Create room.
factory::Room::new()
.audience(USR_AUDIENCE)
.time((
Bound::Included(now - Duration::hours(1)),
Bound::Excluded(now + Duration::hours(2)),
))
.backend(db::room::RoomBackend::Janus)
.insert(&conn)
};

// Allow agent to update the room.
let agent = TestAgent::new("web", "user123", USR_AUDIENCE);
let mut authz = TestAuthz::new();
let room_id = room.id().to_string();
authz.allow(agent.account_id(), vec!["rooms", &room_id], "update");

// Make room.update request.
let mut context = TestContext::new(db, authz);

let time = (
Bound::Included(now - Duration::hours(1)),
Bound::Excluded(now - Duration::seconds(5)),
);

let payload = UpdateRequest {
id: room.id(),
time: Some(time),
reserve: Some(Some(123)),
audience: None,
backend: None,
tags: None,
};

let messages = handle_request::<UpdateHandler>(&mut context, &agent, payload)
.await
.expect("Room update failed");

assert_eq!(messages.len(), 3);
let (closed_notification, _, _) =
find_event_by_predicate::<JsonValue, _>(messages.as_slice(), |evp, _| {
evp.label() == "room.close"
})
.expect("Failed to find room.close event");
assert_eq!(
closed_notification.get("id").and_then(|v| v.as_str()),
Some(room.id().to_string()).as_deref()
);
});
}

#[test]
fn update_room_missing() {
async_std::task::block_on(async {
let agent = TestAgent::new("web", "user123", USR_AUDIENCE);
let mut context = TestContext::new(TestDb::new(), TestAuthz::new());
let payload = UpdateRequest::new(Uuid::new_v4());
let payload = UpdateRequest {
id: Uuid::new_v4(),
..Default::default()
};

let err = handle_request::<UpdateHandler>(&mut context, &agent, payload)
.await
Expand All @@ -719,7 +837,10 @@ mod test {
};

let mut context = TestContext::new(TestDb::new(), TestAuthz::new());
let payload = UpdateRequest::new(room.id());
let payload = UpdateRequest {
id: room.id(),
..Default::default()
};

let err = handle_request::<UpdateHandler>(&mut context, &agent, payload)
.await
Expand Down
48 changes: 38 additions & 10 deletions src/app/endpoint/system.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,11 @@ pub(crate) type RoomUploadEvent = OutgoingMessage<RoomUploadEventData>;

////////////////////////////////////////////////////////////////////////////////

#[derive(Serialize)]
struct ClosedRoomNotification {
room_id: Uuid,
}

#[derive(Debug, Deserialize)]
pub(crate) struct VacuumRequest {}

Expand Down Expand Up @@ -98,6 +103,17 @@ impl RequestHandler for VacuumHandler {
.status(ResponseStatus::UNPROCESSABLE_ENTITY)?;

requests.push(Box::new(backreq) as Box<dyn IntoPublishableMessage + Send>);

// Publish room closed notification
let closed_notification = shared::build_notification(
"room.close",
&format!("rooms/{}/events", room.id()),
room,
reqp,
context.start_timestamp(),
);

requests.push(closed_notification);
}

Ok(Box::new(stream::from_iter(requests)))
Expand Down Expand Up @@ -169,11 +185,12 @@ mod test {
mod vacuum {
use chrono::{Duration, Utc};
use diesel::prelude::*;
use serde_json::Value as JsonValue;

use crate::backend::janus::JANUS_API_VERSION;
use crate::db;
use crate::test_helpers::outgoing_envelope::OutgoingEnvelopeProperties;
use crate::test_helpers::prelude::*;
use crate::test_helpers::{find_event_by_predicate, find_request_by_predicate};

use super::super::*;

Expand Down Expand Up @@ -220,9 +237,10 @@ mod test {

for rtc in rtcs.iter() {
shared_helpers::insert_agent(&conn, agent.agent_id(), rtc.room_id());
shared_helpers::insert_recording(&conn, rtc, &backend);

db::room::UpdateQuery::new(rtc.room_id().to_owned())
.time(time)
.time(Some(time))
.execute(&conn)
.unwrap();
}
Expand All @@ -236,24 +254,27 @@ mod test {
authz.allow(agent.account_id(), vec!["system"], "update");

// Make system.vacuum request.
let mut context = TestContext::new(TestDb::new(), authz);
let mut context = TestContext::new(db, authz);
let payload = VacuumRequest {};

let messages = handle_request::<VacuumHandler>(&mut context, &agent, payload)
.await
.expect("System vacuum failed");

assert!(messages.len() > 0);

let conn = context.db().get().unwrap();

for (message, rtc) in messages.into_iter().zip(rtcs.iter()) {
for rtc in rtcs {
// Assert outgoing Janus stream.upload requests.
match message.properties() {
OutgoingEnvelopeProperties::Request(_) => (),
_ => panic!("Expected outgoing request"),
}
let (payload, _, topic) = find_request_by_predicate::<VacuumJanusRequest, _>(
&messages,
|_reqp, p| p.body.method == "stream.upload" && p.body.id == rtc.id(),
)
.expect("Failed to find stream.upload message for rtc");

assert_eq!(
message.topic(),
topic,
format!(
"agents/{}/api/{}/in/conference.{}",
backend.id(),
Expand All @@ -263,7 +284,7 @@ mod test {
);

assert_eq!(
message.payload::<VacuumJanusRequest>(),
payload,
VacuumJanusRequest {
janus: "message".to_string(),
session_id: backend.session_id(),
Expand All @@ -290,6 +311,13 @@ mod test {
.expect("Failed to get recording from the DB");

assert_eq!(recording.status(), &RecordingStatus::InProgress);

find_event_by_predicate::<JsonValue, _>(&messages, |evp, p| {
evp.label() == "room.close"
&& p.get("id").and_then(|v| v.as_str())
== Some(rtc.room_id().to_string()).as_deref()
})
.expect("Failed to find room.close event for given rtc");
}
});
}
Expand Down
Loading

0 comments on commit cc1af9a

Please sign in to comment.