Skip to content
This repository has been archived by the owner on Aug 16, 2023. It is now read-only.

Commit

Permalink
Separate missing from closed room erros (#166)
Browse files Browse the repository at this point in the history
  • Loading branch information
feymartynov committed Oct 9, 2020
1 parent 0c6946e commit cb6c9ae
Show file tree
Hide file tree
Showing 15 changed files with 311 additions and 364 deletions.
9 changes: 6 additions & 3 deletions docs/src/api/errors.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,15 +35,18 @@ The following types are a part of the service's API and are guaranteed to mainta
- `capacity_exceeded` – There's no free capacity left on the backend to connect to.
- `database_connection_acquisition_failed` – The service couldn't obtain a DB connection from the pool.
- `database_query_failed` – The database returned an error while executing a query.
- `invalid_jsep_format` – Failed to determine whether the SDP is recvonly.
- `invalid_sdp_type` – Failed to parse SDP type or an SDP answer is received.
- `invalid_subscription_object` – An object for dynamic subscription is not of format `["rooms", UUID, "events"]`.
- `message_building_failed` – An error occurred while building a message to another service.
- `message_handling_failed` – An incoming message is likely to have non-valid JSON payload or missing required properties.
- `message_parsing_failed` – Failed to parse a message from another service.
- `no_available_backends` – No backends found to host the RTC.
- `not_implemented` – The requested feature is not supported.
- `resubscription_failed` – The services has failed to resubscribe to topics after reconnect.
- `stats_collection_failed` – Couldn't collect metrics from one of the sources.
- `publish_failed` – Failed to publish an MQTT message.
- `room_not_found` – A [room](room.md#Room) is missing or closed.
- `resubscription_failed` – The services has failed to resubscribe to topics after reconnect.
- `room_closed` - The [room](room.md#Room) exists but already closed.
- `room_not_found` – The [room](room.md#Room) is missing.
- `rtc_not_found` – An [RTC](rtc.md#Real-time_Connection) is missing or closed.
- `stats_collection_failed` – Couldn't collect metrics from one of the sources.
- `unknown_method` – An unsupported value in `method` property of the request message.
20 changes: 6 additions & 14 deletions src/app/endpoint/agent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,19 +31,8 @@ impl RequestHandler for ListHandler {
payload: Self::Payload,
reqp: &IncomingRequestProperties,
) -> Result {
// Check whether the room exists and open.
let room = {
let conn = context.get_conn()?;

db::room::FindQuery::new()
.id(payload.room_id)
.time(db::room::now())
.execute(&conn)?
.ok_or_else(|| anyhow!("Room not found or closed"))
.error(AppErrorKind::RoomNotFound)?
};

shared::add_room_logger_tags(context, &room);
let room =
helpers::find_room_by_id(context, payload.room_id, helpers::RoomTimeRequirement::Open)?;

// Authorize agents listing in the room.
let room_id = room.id().to_string();
Expand All @@ -69,7 +58,7 @@ impl RequestHandler for ListHandler {
};

// Respond with agents list.
Ok(Box::new(stream::once(shared::build_response(
Ok(Box::new(stream::once(helpers::build_response(
ResponseStatus::OK,
agents,
reqp,
Expand Down Expand Up @@ -178,6 +167,7 @@ mod tests {
.expect_err("Unexpected success on agents listing");

assert_eq!(err.status_code(), ResponseStatus::FORBIDDEN);
assert_eq!(err.kind(), "access_denied");
});
}

Expand Down Expand Up @@ -221,6 +211,7 @@ mod tests {
.expect_err("Unexpected success on agents listing");

assert_eq!(err.status_code(), ResponseStatus::NOT_FOUND);
assert_eq!(err.kind(), "room_closed");
});
}

Expand All @@ -241,6 +232,7 @@ mod tests {
.expect_err("Unexpected success on agents listing");

assert_eq!(err.status_code(), ResponseStatus::NOT_FOUND);
assert_eq!(err.kind(), "room_not_found");
});
}
}
Expand Down
140 changes: 140 additions & 0 deletions src/app/endpoint/helpers.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
use std::ops::Bound;

use anyhow::anyhow;
use chrono::{DateTime, Duration, Utc};
use serde::Serialize;
use svc_agent::mqtt::{
IncomingRequestProperties, IntoPublishableMessage, OutgoingEvent, OutgoingEventProperties,
OutgoingResponse, ResponseStatus, ShortTermTimingProperties,
};
use uuid::Uuid;

use crate::app::context::Context;
use crate::app::error::{Error as AppError, ErrorExt, ErrorKind as AppErrorKind};
use crate::app::API_VERSION;
use crate::db;

///////////////////////////////////////////////////////////////////////////////

pub(crate) fn build_response(
status: ResponseStatus,
payload: impl Serialize + Send + 'static,
reqp: &IncomingRequestProperties,
start_timestamp: DateTime<Utc>,
maybe_authz_time: Option<Duration>,
) -> Box<dyn IntoPublishableMessage + Send> {
let mut timing = ShortTermTimingProperties::until_now(start_timestamp);

if let Some(authz_time) = maybe_authz_time {
timing.set_authorization_time(authz_time);
}

let props = reqp.to_response(status, timing);
Box::new(OutgoingResponse::unicast(payload, props, reqp, API_VERSION))
}

pub(crate) fn build_notification(
label: &'static str,
path: &str,
payload: impl Serialize + Send + 'static,
reqp: &IncomingRequestProperties,
start_timestamp: DateTime<Utc>,
) -> Box<dyn IntoPublishableMessage + Send> {
let timing = ShortTermTimingProperties::until_now(start_timestamp);
let mut props = OutgoingEventProperties::new(label, timing);
props.set_tracking(reqp.tracking().to_owned());
Box::new(OutgoingEvent::broadcast(payload, props, path))
}

////////////////////////////////////////////////////////////////////////////////

pub(crate) enum RoomTimeRequirement {
Any,
NotClosed,
Open,
}

pub(crate) fn find_room_by_id<C: Context>(
context: &mut C,
id: Uuid,
opening_requirement: RoomTimeRequirement,
) -> Result<db::room::Object, AppError> {
context.add_logger_tags(o!("room_id" => id.to_string()));
let query = db::room::FindQuery::new(id);
find_room(context, query, opening_requirement)
}

pub(crate) fn find_room_by_rtc_id<C: Context>(
context: &mut C,
rtc_id: Uuid,
opening_requirement: RoomTimeRequirement,
) -> Result<db::room::Object, AppError> {
context.add_logger_tags(o!("rtc_id" => rtc_id.to_string()));
let query = db::room::FindByRtcIdQuery::new(rtc_id);
find_room(context, query, opening_requirement)
}

fn find_room<C, Q>(
context: &mut C,
query: Q,
opening_requirement: RoomTimeRequirement,
) -> Result<db::room::Object, AppError>
where
C: Context,
Q: db::room::FindQueryable,
{
let conn = context.get_conn()?;

let room = query
.execute(&conn)?
.ok_or_else(|| anyhow!("Room not found"))
.error(AppErrorKind::RoomNotFound)?;

add_room_logger_tags(context, &room);

match opening_requirement {
// Room time doesn't matter.
RoomTimeRequirement::Any => Ok(room),
// Current time must be before room closing, including not yet opened rooms.
RoomTimeRequirement::NotClosed => {
let now = Utc::now();
let (_opened_at, closed_at) = room.time();

match closed_at {
Bound::Included(dt) | Bound::Excluded(dt) if *dt < now => {
Err(anyhow!("Room closed")).error(AppErrorKind::RoomClosed)
}
_ => Ok(room),
}
}
// Current time must be exactly in the room's time range.
RoomTimeRequirement::Open => {
let now = Utc::now();
let (opened_at, closed_at) = room.time();

match opened_at {
Bound::Included(dt) | Bound::Excluded(dt) if *dt >= now => {
Err(anyhow!("Room not opened")).error(AppErrorKind::RoomClosed)
}
_ => Ok(()),
}?;

match closed_at {
Bound::Included(dt) | Bound::Excluded(dt) if *dt < now => {
Err(anyhow!("Room closed")).error(AppErrorKind::RoomClosed)
}
_ => Ok(()),
}?;

Ok(room)
}
}
}

pub(crate) fn add_room_logger_tags<C: Context>(context: &mut C, room: &db::room::Object) {
context.add_logger_tags(o!("room_id" => room.id().to_string()));

if let Some(scope) = room.tags().get("scope") {
context.add_logger_tags(o!("scope" => scope.to_string()));
}
}
38 changes: 19 additions & 19 deletions src/app/endpoint/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,12 +40,14 @@ impl RequestHandler for UnicastHandler {
payload: Self::Payload,
reqp: &IncomingRequestProperties,
) -> Result {
context.add_logger_tags(o!("room_id" => payload.room_id.to_string()));

{
let room = helpers::find_room_by_id(
context,
payload.room_id,
helpers::RoomTimeRequirement::Open,
)?;

let conn = context.get_conn()?;
let room = find_room(payload.room_id, &conn)?;
shared::add_room_logger_tags(context, &room);
check_room_presence(&room, reqp.as_agent_id(), &conn)?;
check_room_presence(&room, &payload.agent_id, &conn)?;
}
Expand Down Expand Up @@ -100,12 +102,14 @@ impl RequestHandler for BroadcastHandler {
payload: Self::Payload,
reqp: &IncomingRequestProperties,
) -> Result {
context.add_logger_tags(o!("room_id" => payload.room_id.to_string()));

let room = {
let room = helpers::find_room_by_id(
context,
payload.room_id,
helpers::RoomTimeRequirement::Open,
)?;

let conn = context.get_conn()?;
let room = find_room(payload.room_id, &conn)?;
shared::add_room_logger_tags(context, &room);
check_room_presence(&room, &reqp.as_agent_id(), &conn)?;
room
};
Expand All @@ -117,15 +121,15 @@ impl RequestHandler for BroadcastHandler {
}

// Respond and broadcast to the room topic.
let response = shared::build_response(
let response = helpers::build_response(
ResponseStatus::OK,
json!({}),
reqp,
context.start_timestamp(),
None,
);

let notification = shared::build_notification(
let notification = helpers::build_notification(
"message.broadcast",
&format!("rooms/{}/events", room.id()),
payload.data,
Expand Down Expand Up @@ -177,15 +181,6 @@ impl ResponseHandler for CallbackHandler {

///////////////////////////////////////////////////////////////////////////////

fn find_room(id: Uuid, conn: &PgConnection) -> StdResult<Room, AppError> {
db::room::FindQuery::new()
.time(db::room::now())
.id(id)
.execute(&conn)?
.ok_or_else(|| anyhow!("Room not found or closed"))
.error(AppErrorKind::RoomNotFound)
}

fn check_room_presence(
room: &Room,
agent_id: &AgentId,
Expand Down Expand Up @@ -290,6 +285,7 @@ mod test {
.expect_err("Unexpected success on unicast message sending");

assert_eq!(err.status_code(), ResponseStatus::NOT_FOUND);
assert_eq!(err.kind(), "room_not_found");
});
}

Expand Down Expand Up @@ -330,6 +326,7 @@ mod test {
.expect_err("Unexpected success on unicast message sending");

assert_eq!(err.status_code(), ResponseStatus::NOT_FOUND);
assert_eq!(err.kind(), "agent_not_entered_the_room");
});
}

Expand Down Expand Up @@ -370,6 +367,7 @@ mod test {
.expect_err("Unexpected success on unicast message sending");

assert_eq!(err.status_code(), ResponseStatus::NOT_FOUND);
assert_eq!(err.kind(), "agent_not_entered_the_room");
});
}
}
Expand Down Expand Up @@ -450,6 +448,7 @@ mod test {
.expect_err("Unexpected success on unicast message sending");

assert_eq!(err.status_code(), ResponseStatus::NOT_FOUND);
assert_eq!(err.kind(), "room_not_found");
});
}

Expand Down Expand Up @@ -480,6 +479,7 @@ mod test {
.expect_err("Unexpected success on unicast message sending");

assert_eq!(err.status_code(), ResponseStatus::NOT_FOUND);
assert_eq!(err.kind(), "agent_not_entered_the_room");
});
}
}
Expand Down
4 changes: 2 additions & 2 deletions src/app/endpoint/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -140,17 +140,17 @@ event_routes!(
///////////////////////////////////////////////////////////////////////////////

mod agent;
pub(crate) mod helpers;
mod message;
mod metric;
mod room;
pub(crate) mod rtc;
pub(crate) mod rtc_signal;
pub(crate) mod rtc_stream;
pub(crate) mod shared;
mod subscription;
pub(crate) mod system;

pub(self) mod prelude {
pub(super) use super::{shared, EventHandler, RequestHandler, ResponseHandler, Result};
pub(super) use super::{helpers, EventHandler, RequestHandler, ResponseHandler, Result};
pub(super) use crate::app::error::{Error as AppError, ErrorExt, ErrorKind as AppErrorKind};
}
Loading

0 comments on commit cb6c9ae

Please sign in to comment.