Skip to content
This repository has been archived by the owner on Aug 16, 2023. It is now read-only.

Commit

Permalink
Add intent parameter to rtc.connect (#129)
Browse files Browse the repository at this point in the history
* Add role parameter to rtc.connect

* Return specific error type for subscriber limit excess

* Rename connect role to intent

* Rename subscribers limit to capacity

* Fix active agents count query
  • Loading branch information
feymartynov committed Aug 17, 2020
1 parent be8b163 commit 2fdc33d
Show file tree
Hide file tree
Showing 12 changed files with 215 additions and 129 deletions.
16 changes: 8 additions & 8 deletions docs/src/api/room.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,11 @@

## Properties

Name | Type | Default | Description
----------------- | ---------- | ---------- | ----------------------------------------------------
id | uuid | _required_ | The room identifier.
audience | string | _required_ | The audience of the room.
time | [int, int] | _required_ | Opening and closing timestamps in seconds.
created_at | int | _required_ | Room creation timestamp in seconds.
backend | string | _required_ | Room backend, either `janus` or `none`.
subscribers_limit | int | _optional_ | The maximum number of simultaneously entered subscribers allowed.
Name | Type | Default | Description
-----------| ---------- | ---------- | ----------------------------------------------------
id | uuid | _required_ | The room identifier.
audience | string | _required_ | The audience of the room.
time | [int, int] | _required_ | Opening and closing timestamps in seconds.
created_at | int | _required_ | Room creation timestamp in seconds.
backend | string | _required_ | Room backend, either `janus` or `none`.
reserve | int | _optional_ | The number of slots for agents reserved on the backend.
7 changes: 4 additions & 3 deletions docs/src/api/rtc/connect.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,10 @@ method | String | _required_ | Always `rtc.connect`.

**Payload**

Name | Type | Default | Description
---------- | ------ | ---------- | ------------------
id | String | _required_ | A real-time connection identifier.
Name | Type | Default | Description
------ | ------ | ---------- | ------------------
id | String | _required_ | A real-time connection identifier.
intent | String | read | `write` or `read`.



Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
ALTER TABLE janus_backend RENAME capacity TO subscribers_limit;
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
ALTER TABLE janus_backend RENAME subscribers_limit TO capacity;
155 changes: 132 additions & 23 deletions src/app/endpoint/rtc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use serde_derive::{Deserialize, Serialize};
use svc_agent::mqtt::{
IncomingRequestProperties, IntoPublishableMessage, OutgoingResponse, ResponseStatus,
};
use svc_error::Error as SvcError;
use uuid::Uuid;

use crate::app::context::Context;
Expand Down Expand Up @@ -228,9 +229,24 @@ impl RequestHandler for ListHandler {

////////////////////////////////////////////////////////////////////////////////

#[derive(Debug, PartialEq, Deserialize)]
#[serde(rename_all = "lowercase")]
pub(crate) enum ConnectIntent {
Read,
Write,
}

#[derive(Debug, Deserialize)]
pub(crate) struct ConnectRequest {
id: Uuid,
#[serde(default = "ConnectRequest::default_intent")]
intent: ConnectIntent,
}

impl ConnectRequest {
fn default_intent() -> ConnectIntent {
ConnectIntent::Read
}
}

pub(crate) struct ConnectHandler;
Expand Down Expand Up @@ -270,22 +286,27 @@ impl RequestHandler for ConnectHandler {
let room_id = room.id().to_string();
let object = vec!["rooms", &room_id, "rtcs", &rtc_id];

let action = match payload.intent {
ConnectIntent::Read => "read",
ConnectIntent::Write => "update",
};

let authz_time = context
.authz()
.authorize(room.audience(), reqp, object, "read")
.authorize(room.audience(), reqp, object, action)
.await?;

// Choose backend to connect.
let backend = {
let conn = context.db().get()?;

// There are 3 cases:
// 1. Connecting as publisher with no previous stream. Select the least loaded backend
// 1. Connecting as writer with no previous stream. Select the least loaded backend
// that is capable to host the room's reservation.
// 2. Connecting as subscriber with existing stream. Choose the backend of the active
// 2. Connecting as reader with existing stream. Choose the backend of the active
// stream because Janus doesn't support clustering and it must be the same server
// that the stream's publisher is connected to.
// 3. Reconnecting as publisher with previous stream. Select the backend of the previous
// that the stream's writer is connected to.
// 3. Reconnecting as writer with previous stream. Select the backend of the previous
// stream to avoid partitioning the record across multiple servers.
let maybe_rtc_stream = db::janus_rtc_stream::FindQuery::new()
.rtc_id(payload.id)
Expand All @@ -302,16 +323,20 @@ impl RequestHandler for ConnectHandler {
.status(ResponseStatus::SERVICE_UNAVAILABLE)?,
};

// Check that the backend's subscribers limit is not reached.
if let Some(limit) = backend.subscribers_limit() {
let agents_count = db::agent::CountQuery::new()
.room_id(room.id())
.status(db::agent::Status::Ready)
.execute(&conn)?;
// Check that the backend's capacity is not exceeded for readers.
if payload.intent == ConnectIntent::Read {
if let Some(limit) = backend.capacity() {
let agents_count = db::janus_backend::agents_count(backend.id(), &conn)?;

if agents_count >= limit.into() {
return Err("subscribers limit reached")
.status(ResponseStatus::SERVICE_UNAVAILABLE);
if agents_count >= limit.into() {
let err = SvcError::builder()
.status(ResponseStatus::SERVICE_UNAVAILABLE)
.kind("capacity_exceeded", "Capacity exceeded")
.detail("active agents number on the backend exceeded its capacity")
.build();

return Err(err);
}
}
}

Expand Down Expand Up @@ -713,7 +738,11 @@ mod test {

// Make rtc.connect request.
let context = TestContext::new(db, authz);
let payload = ConnectRequest { id: rtc.id() };

let payload = ConnectRequest {
id: rtc.id(),
intent: ConnectIntent::Read,
};

let messages = handle_request::<ConnectHandler>(&context, &agent, payload)
.await
Expand Down Expand Up @@ -790,7 +819,11 @@ mod test {

// Make rtc.connect request.
let context = TestContext::new(db, authz);
let payload = ConnectRequest { id: rtc.id() };

let payload = ConnectRequest {
id: rtc.id(),
intent: ConnectIntent::Read,
};

let messages = handle_request::<ConnectHandler>(&context, &agent, payload)
.await
Expand Down Expand Up @@ -849,7 +882,7 @@ mod test {

let backend1 =
factory::JanusBackend::new(backend1_id, rng.gen(), rng.gen())
.subscribers_limit(20)
.capacity(20)
.insert(&conn);

// The second backend is too small but has no load.
Expand All @@ -859,7 +892,7 @@ mod test {
};

factory::JanusBackend::new(backend2_id, rng.gen(), rng.gen())
.subscribers_limit(5)
.capacity(5)
.insert(&conn);

// Insert stream.
Expand All @@ -886,7 +919,11 @@ mod test {

// Make rtc.connect request.
let context = TestContext::new(db, authz);
let payload = ConnectRequest { id: rtc.id() };

let payload = ConnectRequest {
id: rtc.id(),
intent: ConnectIntent::Read,
};

let messages = handle_request::<ConnectHandler>(&context, &agent, payload)
.await
Expand Down Expand Up @@ -928,7 +965,7 @@ mod test {
};

let backend = factory::JanusBackend::new(backend_id, rng.gen(), rng.gen())
.subscribers_limit(1)
.capacity(1)
.insert(&conn);

// Insert active stream.
Expand All @@ -954,13 +991,77 @@ mod test {

// Make rtc.connect request.
let context = TestContext::new(db, authz);
let payload = ConnectRequest { id: rtc.id() };

let payload = ConnectRequest {
id: rtc.id(),
intent: ConnectIntent::Read,
};

let err = handle_request::<ConnectHandler>(&context, &agent, payload)
.await
.expect_err("Unexpected success on rtc connecting");

assert_eq!(err.status_code(), ResponseStatus::SERVICE_UNAVAILABLE);
assert_eq!(err.kind(), "capacity_exceeded");
});
}

#[test]
fn connect_to_rtc_full_server_as_writer() {
async_std::task::block_on(async {
let mut rng = rand::thread_rng();
let db = TestDb::new();
let mut authz = TestAuthz::new();

let rtc = db
.connection_pool()
.get()
.map(|conn| {
// Insert rtc.
let rtc = shared_helpers::insert_rtc(&conn);

// Insert backend.
let backend_id = {
let agent = TestAgent::new("alpha", "janus", SVC_AUDIENCE);
agent.agent_id().to_owned()
};

let backend = factory::JanusBackend::new(backend_id, rng.gen(), rng.gen())
.capacity(1)
.insert(&conn);

// Insert active stream.
factory::JanusRtcStream::new(USR_AUDIENCE)
.backend(&backend)
.rtc(&rtc)
.insert(&conn);

// Insert active agent.
let agent = TestAgent::new("web", "user456", USR_AUDIENCE);
shared_helpers::insert_agent(&conn, agent.agent_id(), rtc.room_id());

rtc
})
.unwrap();

// Allow user to read the rtc.
let agent = TestAgent::new("web", "user123", USR_AUDIENCE);
let room_id = rtc.room_id().to_string();
let rtc_id = rtc.id().to_string();
let object = vec!["rooms", &room_id, "rtcs", &rtc_id];
authz.allow(agent.account_id(), object, "update");

// Make rtc.connect request.
let context = TestContext::new(db, authz);

let payload = ConnectRequest {
id: rtc.id(),
intent: ConnectIntent::Write,
};

handle_request::<ConnectHandler>(&context, &agent, payload)
.await
.expect("RTC connect failed");
});
}

Expand All @@ -980,7 +1081,11 @@ mod test {
};

let context = TestContext::new(db, TestAuthz::new());
let payload = ConnectRequest { id: rtc.id() };

let payload = ConnectRequest {
id: rtc.id(),
intent: ConnectIntent::Read,
};

let err = handle_request::<ConnectHandler>(&context, &agent, payload)
.await
Expand All @@ -995,7 +1100,11 @@ mod test {
async_std::task::block_on(async {
let agent = TestAgent::new("web", "user123", USR_AUDIENCE);
let context = TestContext::new(TestDb::new(), TestAuthz::new());
let payload = ConnectRequest { id: Uuid::new_v4() };

let payload = ConnectRequest {
id: Uuid::new_v4(),
intent: ConnectIntent::Read,
};

let err = handle_request::<ConnectHandler>(&context, &agent, payload)
.await
Expand Down
Loading

0 comments on commit 2fdc33d

Please sign in to comment.