Skip to content
This repository has been archived by the owner on Aug 16, 2023. It is now read-only.

Commit

Permalink
Fix capacity edge case (#130)
Browse files Browse the repository at this point in the history
  • Loading branch information
feymartynov committed Aug 18, 2020
1 parent 2fdc33d commit 0bb614a
Showing 1 changed file with 80 additions and 16 deletions.
96 changes: 80 additions & 16 deletions src/app/endpoint/rtc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -325,10 +325,10 @@ impl RequestHandler for ConnectHandler {

// Check that the backend's capacity is not exceeded for readers.
if payload.intent == ConnectIntent::Read {
if let Some(limit) = backend.capacity() {
if let Some(capacity) = backend.capacity() {
let agents_count = db::janus_backend::agents_count(backend.id(), &conn)?;

if agents_count >= limit.into() {
if agents_count > capacity.into() {
let err = SvcError::builder()
.status(ResponseStatus::SERVICE_UNAVAILABLE)
.kind("capacity_exceeded", "Capacity exceeded")
Expand Down Expand Up @@ -945,11 +945,13 @@ mod test {
}

#[test]
fn connect_to_rtc_full_server() {
fn connect_to_rtc_as_last_reader() {
async_std::task::block_on(async {
let mut rng = rand::thread_rng();
let db = TestDb::new();
let mut authz = TestAuthz::new();
let writer = TestAgent::new("web", "writer", USR_AUDIENCE);
let reader = TestAgent::new("web", "reader", USR_AUDIENCE);

let rtc = db
.connection_pool()
Expand All @@ -965,7 +967,7 @@ mod test {
};

let backend = factory::JanusBackend::new(backend_id, rng.gen(), rng.gen())
.capacity(1)
.capacity(2)
.insert(&conn);

// Insert active stream.
Expand All @@ -974,20 +976,19 @@ mod test {
.rtc(&rtc)
.insert(&conn);

// Insert active agent.
let agent = TestAgent::new("web", "user456", USR_AUDIENCE);
shared_helpers::insert_agent(&conn, agent.agent_id(), rtc.room_id());
// Insert active agents.
shared_helpers::insert_agent(&conn, writer.agent_id(), rtc.room_id());
shared_helpers::insert_agent(&conn, reader.agent_id(), rtc.room_id());

rtc
})
.unwrap();

// Allow user to read the rtc.
let agent = TestAgent::new("web", "user123", USR_AUDIENCE);
let room_id = rtc.room_id().to_string();
let rtc_id = rtc.id().to_string();
let object = vec!["rooms", &room_id, "rtcs", &rtc_id];
authz.allow(agent.account_id(), object, "read");
authz.allow(reader.account_id(), object, "read");

// Make rtc.connect request.
let context = TestContext::new(db, authz);
Expand All @@ -997,7 +998,69 @@ mod test {
intent: ConnectIntent::Read,
};

let err = handle_request::<ConnectHandler>(&context, &agent, payload)
handle_request::<ConnectHandler>(&context, &reader, payload)
.await
.expect("RTC connect failed");
});
}

#[test]
fn connect_to_rtc_full_server_as_reader() {
async_std::task::block_on(async {
let mut rng = rand::thread_rng();
let db = TestDb::new();
let mut authz = TestAuthz::new();
let writer = TestAgent::new("web", "writer", USR_AUDIENCE);
let reader1 = TestAgent::new("web", "reader1", USR_AUDIENCE);
let reader2 = TestAgent::new("web", "reader2", USR_AUDIENCE);

let rtc = db
.connection_pool()
.get()
.map(|conn| {
// Insert rtc.
let rtc = shared_helpers::insert_rtc(&conn);

// Insert backend.
let backend_id = {
let agent = TestAgent::new("alpha", "janus", SVC_AUDIENCE);
agent.agent_id().to_owned()
};

let backend = factory::JanusBackend::new(backend_id, rng.gen(), rng.gen())
.capacity(2)
.insert(&conn);

// Insert active stream.
factory::JanusRtcStream::new(USR_AUDIENCE)
.backend(&backend)
.rtc(&rtc)
.insert(&conn);

// Insert active agents.
shared_helpers::insert_agent(&conn, writer.agent_id(), rtc.room_id());
shared_helpers::insert_agent(&conn, reader1.agent_id(), rtc.room_id());
shared_helpers::insert_agent(&conn, reader2.agent_id(), rtc.room_id());

rtc
})
.unwrap();

// Allow user to read the rtc.
let room_id = rtc.room_id().to_string();
let rtc_id = rtc.id().to_string();
let object = vec!["rooms", &room_id, "rtcs", &rtc_id];
authz.allow(reader2.account_id(), object, "read");

// Make rtc.connect request.
let context = TestContext::new(db, authz);

let payload = ConnectRequest {
id: rtc.id(),
intent: ConnectIntent::Read,
};

let err = handle_request::<ConnectHandler>(&context, &reader2, payload)
.await
.expect_err("Unexpected success on rtc connecting");

Expand All @@ -1012,6 +1075,8 @@ mod test {
let mut rng = rand::thread_rng();
let db = TestDb::new();
let mut authz = TestAuthz::new();
let writer = TestAgent::new("web", "writer", USR_AUDIENCE);
let reader = TestAgent::new("web", "reader", USR_AUDIENCE);

let rtc = db
.connection_pool()
Expand All @@ -1036,20 +1101,19 @@ mod test {
.rtc(&rtc)
.insert(&conn);

// Insert active agent.
let agent = TestAgent::new("web", "user456", USR_AUDIENCE);
shared_helpers::insert_agent(&conn, agent.agent_id(), rtc.room_id());
// Insert active agents.
shared_helpers::insert_agent(&conn, writer.agent_id(), rtc.room_id());
shared_helpers::insert_agent(&conn, reader.agent_id(), rtc.room_id());

rtc
})
.unwrap();

// Allow user to read the rtc.
let agent = TestAgent::new("web", "user123", USR_AUDIENCE);
let room_id = rtc.room_id().to_string();
let rtc_id = rtc.id().to_string();
let object = vec!["rooms", &room_id, "rtcs", &rtc_id];
authz.allow(agent.account_id(), object, "update");
authz.allow(writer.account_id(), object, "update");

// Make rtc.connect request.
let context = TestContext::new(db, authz);
Expand All @@ -1059,7 +1123,7 @@ mod test {
intent: ConnectIntent::Write,
};

handle_request::<ConnectHandler>(&context, &agent, payload)
handle_request::<ConnectHandler>(&context, &writer, payload)
.await
.expect("RTC connect failed");
});
Expand Down

0 comments on commit 0bb614a

Please sign in to comment.