Skip to content
This repository has been archived by the owner on Aug 16, 2023. It is now read-only.

Commit

Permalink
Reserve and limit overhaul (#123)
Browse files Browse the repository at this point in the history
* Rename room subscribers limit to reserve

* Remove subscribers count checking on room enter

* Overhaul limiting and reserve consideration
  • Loading branch information
feymartynov committed Aug 5, 2020
1 parent 843f4f6 commit 11808eb
Show file tree
Hide file tree
Showing 12 changed files with 263 additions and 333 deletions.
12 changes: 6 additions & 6 deletions docs/src/api/room/create.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,12 @@ method | String | _required_ | Always `room.create`.

**Payload**

Name | Type | Default | Description
----------------- | ---------- | ---------- | ------------------
time | [i64, i64) | _required_ | A [lt, rt) range of unix time (seconds) or null (unbounded).
audience | String | _required_ | The room audience.
backend | String | none | The room backend. Available values: janus, none.
subscribers_limit | i32 | _optional_ | The maximum number of simultaneously entered subscribers allowed.
Name | Type | Default | Description
-------- | ---------- | ---------- | ------------------
time | [i64, i64) | _required_ | A [lt, rt) range of unix time (seconds) or null (unbounded).
audience | String | _required_ | The room audience.
backend | String | none | The room backend. Available values: janus, none.
reserve | i32 | _optional_ | The number of slots for subscribers to reserve on the server.


## Unicast response
Expand Down
14 changes: 7 additions & 7 deletions docs/src/api/room/update.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,13 @@ method | String | _required_ | Always `room.update`.

**Payload**

Name | Type | Default | Description
----------------- | ---------- | ---------- | ------------------
id | String | _required_ | The room identifier. The room must not be expired.
time | [i64, i64) | _optional_ | A [lt, rt) range of unix time (seconds) or null (unbounded).
audience | String | _optional_ | The room audience.
backend | String | _optional_ | The room backend. Available values: janus, none.
subscribers_limit | i32 | _optional_ | The maximum number of simultaneously entered subscribers allowed.
Name | Type | Default | Description
-------- | ---------- | ---------- | ------------------
id | String | _required_ | The room identifier. The room must not be expired.
time | [i64, i64) | _optional_ | A [lt, rt) range of unix time (seconds) or null (unbounded).
audience | String | _optional_ | The room audience.
backend | String | _optional_ | The room backend. Available values: janus, none.
reserve | i32 | _optional_ | The number of slots for subscribers to reserve on the server.


## Unicast response
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
ALTER TABLE room RENAME COLUMN reserve TO subscribers_limit;
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
ALTER TABLE room RENAME COLUMN subscribers_limit TO reserve;
95 changes: 11 additions & 84 deletions src/app/endpoint/room.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ pub(crate) struct CreateRequest {
audience: String,
#[serde(default = "CreateRequest::default_backend")]
backend: db::room::RoomBackend,
subscribers_limit: Option<i32>,
reserve: Option<i32>,
}

impl CreateRequest {
Expand Down Expand Up @@ -75,8 +75,8 @@ impl RequestHandler for CreateHandler {
let mut q =
db::room::InsertQuery::new(payload.time, &payload.audience, payload.backend);

if let Some(subscribers_limit) = payload.subscribers_limit {
q = q.subscribers_limit(subscribers_limit);
if let Some(reserve) = payload.reserve {
q = q.reserve(reserve);
}

let conn = context.db().get()?;
Expand Down Expand Up @@ -300,28 +300,12 @@ impl RequestHandler for EnterHandler {
let conn = context.db().get()?;

// Find opened room.
let room = db::room::FindQuery::new()
db::room::FindQuery::new()
.id(payload.id)
.time(db::room::now())
.execute(&conn)?
.ok_or_else(|| format!("Room not found or closed, id = '{}'", payload.id))
.status(ResponseStatus::NOT_FOUND)?;

// Check if not full house in the room.
if let Some(subscribers_limit) = room.subscribers_limit() {
let agents_count = db::agent::RoomCountQuery::new(room.id()).execute(&conn)?;

if agents_count > subscribers_limit as i64 {
let err = format!(
"Subscribers limit in the room has been reached ({})",
subscribers_limit
);

return Err(err).status(ResponseStatus::SERVICE_UNAVAILABLE)?;
}
}

room
.status(ResponseStatus::NOT_FOUND)?
};

// Authorize subscribing to the room's events.
Expand Down Expand Up @@ -482,7 +466,7 @@ mod test {
time: time.clone(),
audience: USR_AUDIENCE.to_owned(),
backend: db::room::RoomBackend::Janus,
subscribers_limit: Some(123),
reserve: Some(123),
};

let messages = handle_request::<CreateHandler>(&context, &agent, payload)
Expand All @@ -495,7 +479,7 @@ mod test {
assert_eq!(room.audience(), USR_AUDIENCE);
assert_eq!(room.time(), &time);
assert_eq!(room.backend(), db::room::RoomBackend::Janus);
assert_eq!(room.subscribers_limit(), Some(123));
assert_eq!(room.reserve(), Some(123));

// Assert notification.
let (room, evp, topic) = find_event::<Room>(messages.as_slice());
Expand All @@ -504,7 +488,7 @@ mod test {
assert_eq!(room.audience(), USR_AUDIENCE);
assert_eq!(room.time(), &time);
assert_eq!(room.backend(), db::room::RoomBackend::Janus);
assert_eq!(room.subscribers_limit(), Some(123));
assert_eq!(room.reserve(), Some(123));
});
}

Expand All @@ -519,7 +503,7 @@ mod test {
time: (Bound::Included(Utc::now()), Bound::Unbounded),
audience: USR_AUDIENCE.to_owned(),
backend: db::room::RoomBackend::Janus,
subscribers_limit: None,
reserve: None,
};

let err = handle_request::<CreateHandler>(&context, &agent, payload)
Expand Down Expand Up @@ -664,9 +648,7 @@ mod test {
Bound::Excluded(now + Duration::hours(3)),
);

let payload = UpdateRequest::new(room.id())
.time(time)
.subscribers_limit(Some(123));
let payload = UpdateRequest::new(room.id()).time(time).reserve(Some(123));

let messages = handle_request::<UpdateHandler>(&context, &agent, payload)
.await
Expand All @@ -679,7 +661,7 @@ mod test {
assert_eq!(resp_room.audience(), room.audience());
assert_eq!(resp_room.time(), &time);
assert_eq!(resp_room.backend(), db::room::RoomBackend::Janus);
assert_eq!(resp_room.subscribers_limit(), Some(123));
assert_eq!(resp_room.reserve(), Some(123));
});
}

Expand Down Expand Up @@ -820,8 +802,6 @@ mod test {
}

mod enter {
use chrono::SubsecRound;

use crate::test_helpers::prelude::*;

use super::super::*;
Expand Down Expand Up @@ -878,59 +858,6 @@ mod test {
});
}

#[test]
fn enter_room_full_house() {
async_std::task::block_on(async {
let db = TestDb::new();

// Create room and fill it up with agents.
let room = {
let conn = db
.connection_pool()
.get()
.expect("Failed to get DB connection");

let now = Utc::now().trunc_subsecs(0);

let room = factory::Room::new()
.audience(USR_AUDIENCE)
.time((Bound::Included(now), Bound::Unbounded))
.backend(db::room::RoomBackend::Janus)
.subscribers_limit(1)
.insert(&conn);

let publisher = TestAgent::new("web", "publisher", USR_AUDIENCE);
shared_helpers::insert_agent(&conn, publisher.agent_id(), room.id());

let subscriber = TestAgent::new("web", "subscriber", USR_AUDIENCE);
shared_helpers::insert_agent(&conn, subscriber.agent_id(), room.id());

room
};

// Allow agent to subscribe to the rooms' events.
let agent = TestAgent::new("web", "user123", USR_AUDIENCE);
let mut authz = TestAuthz::new();
let room_id = room.id().to_string();

authz.allow(
agent.account_id(),
vec!["rooms", &room_id, "events"],
"subscribe",
);

// Make room.enter request.
let context = TestContext::new(db, authz);
let payload = EnterRequest { id: room.id() };

let err = handle_request::<EnterHandler>(&context, &agent, payload)
.await
.expect_err("Unexpected success on room entering");

assert_eq!(err.status_code(), ResponseStatus::SERVICE_UNAVAILABLE);
});
}

#[test]
fn enter_room_not_authorized() {
async_std::task::block_on(async {
Expand Down
Loading

0 comments on commit 11808eb

Please sign in to comment.