Skip to content
This repository has been archived by the owner on Aug 16, 2023. It is now read-only.

Commit

Permalink
Add room subscribers limit
Browse files Browse the repository at this point in the history
  • Loading branch information
feymartynov committed Jul 8, 2020
1 parent 04a56ab commit 747678e
Show file tree
Hide file tree
Showing 7 changed files with 154 additions and 9 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
alter table room drop column subscribers_limit;
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
alter table room add column subscribers_limit int;
93 changes: 87 additions & 6 deletions src/app/endpoint/room.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ pub(crate) struct CreateRequest {
audience: String,
#[serde(default = "CreateRequest::default_backend")]
backend: db::room::RoomBackend,
subscribers_limit: Option<i32>,
}

impl CreateRequest {
Expand Down Expand Up @@ -71,10 +72,15 @@ impl RequestHandler for CreateHandler {

// Create a room.
let room = {
let conn = context.db().get()?;
let mut q =
db::room::InsertQuery::new(payload.time, &payload.audience, payload.backend);

db::room::InsertQuery::new(payload.time, &payload.audience, payload.backend)
.execute(&conn)?
if let Some(subscribers_limit) = payload.subscribers_limit {
q = q.subscribers_limit(subscribers_limit);
}

let conn = context.db().get()?;
q.execute(&conn)?
};

// Respond and broadcast to the audience topic.
Expand Down Expand Up @@ -293,12 +299,25 @@ impl RequestHandler for EnterHandler {
let room = {
let conn = context.db().get()?;

db::room::FindQuery::new()
// Find opened room.
let room = db::room::FindQuery::new()
.id(payload.id)
.time(db::room::now())
.execute(&conn)?
.ok_or_else(|| format!("Room not found or closed, id = '{}'", payload.id))
.status(ResponseStatus::NOT_FOUND)?
.status(ResponseStatus::NOT_FOUND)?;

// Check if not full house in the room.
if let Some(subscribers_limit) = room.subscribers_limit() {
let agents_count = db::agent::CountQuery::new(room.id()).execute(&conn)?;

if agents_count > subscribers_limit as i64 {
let err = format!("Subscribers limit has been reached ({})", subscribers_limit);
return Err(err).status(ResponseStatus::SERVICE_UNAVAILABLE)?;
}
}

room
};

// Authorize subscribing to the room's events.
Expand Down Expand Up @@ -459,6 +478,7 @@ mod test {
time: time.clone(),
audience: USR_AUDIENCE.to_owned(),
backend: db::room::RoomBackend::Janus,
subscribers_limit: Some(123),
};

let messages = handle_request::<CreateHandler>(&context, &agent, payload)
Expand All @@ -471,6 +491,7 @@ mod test {
assert_eq!(room.audience(), USR_AUDIENCE);
assert_eq!(room.time(), &time);
assert_eq!(room.backend(), db::room::RoomBackend::Janus);
assert_eq!(room.subscribers_limit(), Some(123));

// Assert notification.
let (room, evp, topic) = find_event::<Room>(messages.as_slice());
Expand All @@ -479,6 +500,7 @@ mod test {
assert_eq!(room.audience(), USR_AUDIENCE);
assert_eq!(room.time(), &time);
assert_eq!(room.backend(), db::room::RoomBackend::Janus);
assert_eq!(room.subscribers_limit(), Some(123));
});
}

Expand All @@ -493,6 +515,7 @@ mod test {
time: (Bound::Included(Utc::now()), Bound::Unbounded),
audience: USR_AUDIENCE.to_owned(),
backend: db::room::RoomBackend::Janus,
subscribers_limit: None,
};

let err = handle_request::<CreateHandler>(&context, &agent, payload)
Expand Down Expand Up @@ -637,7 +660,9 @@ mod test {
Bound::Excluded(now + Duration::hours(3)),
);

let payload = UpdateRequest::new(room.id()).time(time);
let payload = UpdateRequest::new(room.id())
.time(time)
.subscribers_limit(Some(123));

let messages = handle_request::<UpdateHandler>(&context, &agent, payload)
.await
Expand All @@ -650,6 +675,7 @@ mod test {
assert_eq!(resp_room.audience(), room.audience());
assert_eq!(resp_room.time(), &time);
assert_eq!(resp_room.backend(), db::room::RoomBackend::Janus);
assert_eq!(resp_room.subscribers_limit(), Some(123));
});
}

Expand Down Expand Up @@ -790,6 +816,8 @@ mod test {
}

mod enter {
use chrono::SubsecRound;

use crate::test_helpers::prelude::*;

use super::super::*;
Expand Down Expand Up @@ -846,6 +874,59 @@ mod test {
});
}

#[test]
fn enter_room_full_house() {
async_std::task::block_on(async {
let db = TestDb::new();

// Create room and fill it up with agents.
let room = {
let conn = db
.connection_pool()
.get()
.expect("Failed to get DB connection");

let now = Utc::now().trunc_subsecs(0);

let room = factory::Room::new()
.audience(USR_AUDIENCE)
.time((Bound::Included(now), Bound::Unbounded))
.backend(db::room::RoomBackend::Janus)
.subscribers_limit(1)
.insert(&conn);

let publisher = TestAgent::new("web", "publisher", USR_AUDIENCE);
shared_helpers::insert_agent(&conn, publisher.agent_id(), room.id());

let subscriber = TestAgent::new("web", "subscriber", USR_AUDIENCE);
shared_helpers::insert_agent(&conn, subscriber.agent_id(), room.id());

room
};

// Allow agent to subscribe to the rooms' events.
let agent = TestAgent::new("web", "user123", USR_AUDIENCE);
let mut authz = TestAuthz::new();
let room_id = room.id().to_string();

authz.allow(
agent.account_id(),
vec!["rooms", &room_id, "events"],
"subscribe",
);

// Make room.enter request.
let context = TestContext::new(db, authz);
let payload = EnterRequest { id: room.id() };

let err = handle_request::<EnterHandler>(&context, &agent, payload)
.await
.expect_err("Unexpected success on room entering");

assert_eq!(err.status_code(), ResponseStatus::SERVICE_UNAVAILABLE);
});
}

#[test]
fn enter_room_not_authorized() {
async_std::task::block_on(async {
Expand Down
21 changes: 21 additions & 0 deletions src/db/agent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,27 @@ impl<'a> ListQuery<'a> {

////////////////////////////////////////////////////////////////////////////////

pub(crate) struct CountQuery {
room_id: Uuid,
}

impl CountQuery {
pub(crate) fn new(room_id: Uuid) -> Self {
Self { room_id }
}

pub(crate) fn execute(&self, conn: &PgConnection) -> Result<i64, Error> {
use diesel::prelude::*;

agent::table
.filter(agent::room_id.eq(self.room_id))
.select(diesel::dsl::count(agent::id))
.get_result(conn)
}
}

////////////////////////////////////////////////////////////////////////////////

#[derive(Debug, Insertable)]
#[table_name = "agent"]
pub(crate) struct InsertQuery<'a> {
Expand Down
27 changes: 27 additions & 0 deletions src/db/room.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ type AllColumns = (
room::audience,
room::created_at,
room::backend,
room::subscribers_limit,
);

const ALL_COLUMNS: AllColumns = (
Expand All @@ -51,6 +52,7 @@ const ALL_COLUMNS: AllColumns = (
room::audience,
room::created_at,
room::backend,
room::subscribers_limit,
);

////////////////////////////////////////////////////////////////////////////////
Expand Down Expand Up @@ -83,6 +85,8 @@ pub(crate) struct Object {
#[serde(with = "ts_seconds")]
created_at: DateTime<Utc>,
backend: RoomBackend,
#[serde(skip_serializing_if = "Option::is_none")]
subscribers_limit: Option<i32>,
}

impl Object {
Expand All @@ -102,6 +106,10 @@ impl Object {
pub(crate) fn backend(&self) -> RoomBackend {
self.backend
}

pub(crate) fn subscribers_limit(&self) -> Option<i32> {
self.subscribers_limit
}
}

////////////////////////////////////////////////////////////////////////////////
Expand Down Expand Up @@ -198,6 +206,7 @@ pub(crate) struct InsertQuery<'a> {
time: Time,
audience: &'a str,
backend: RoomBackend,
subscribers_limit: Option<i32>,
}

impl<'a> InsertQuery<'a> {
Expand All @@ -206,6 +215,14 @@ impl<'a> InsertQuery<'a> {
time,
audience,
backend,
subscribers_limit: None,
}
}

pub(crate) fn subscribers_limit(self, value: i32) -> Self {
Self {
subscribers_limit: Some(value),
..self
}
}

Expand Down Expand Up @@ -246,6 +263,7 @@ pub(crate) struct UpdateQuery {
time: Option<Time>,
audience: Option<String>,
backend: Option<RoomBackend>,
subscribers_limit: Option<Option<i32>>,
}

impl UpdateQuery {
Expand All @@ -256,6 +274,7 @@ impl UpdateQuery {
time: None,
audience: None,
backend: None,
subscribers_limit: None,
}
}

Expand All @@ -271,6 +290,14 @@ impl UpdateQuery {
}
}

#[cfg(test)]
pub(crate) fn subscribers_limit(self, value: Option<i32>) -> Self {
Self {
subscribers_limit: Some(value),
..self
}
}

pub(crate) fn execute(&self, conn: &PgConnection) -> Result<Object, Error> {
use diesel::prelude::*;

Expand Down
1 change: 1 addition & 0 deletions src/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ table! {
audience -> Text,
created_at -> Timestamptz,
backend -> Room_backend,
subscribers_limit -> Nullable<Int4>,
}
}

Expand Down
19 changes: 16 additions & 3 deletions src/test_helpers/factory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ pub(crate) struct Room {
audience: Option<String>,
time: Option<db::room::Time>,
backend: db::room::RoomBackend,
subscribers_limit: Option<i32>,
}

impl Room {
Expand All @@ -22,6 +23,7 @@ impl Room {
audience: None,
time: None,
backend: db::room::RoomBackend::None,
subscribers_limit: None,
}
}

Expand All @@ -43,13 +45,24 @@ impl Room {
Self { backend, ..self }
}

pub(crate) fn subscribers_limit(self, subscribers_limit: i32) -> Self {
Self {
subscribers_limit: Some(subscribers_limit),
..self
}
}

pub(crate) fn insert(self, conn: &PgConnection) -> db::room::Object {
let audience = self.audience.expect("Audience not set");
let time = self.time.expect("Time not set");

db::room::InsertQuery::new(time, &audience, self.backend)
.execute(conn)
.expect("Failed to insert room")
let mut q = db::room::InsertQuery::new(time, &audience, self.backend);

if let Some(subscribers_limit) = self.subscribers_limit {
q = q.subscribers_limit(subscribers_limit);
}

q.execute(conn).expect("Failed to insert room")
}
}

Expand Down

0 comments on commit 747678e

Please sign in to comment.