Skip to content
This repository has been archived by the owner on Aug 16, 2023. It is now read-only.

Commit

Permalink
Replace connected agent status w/ agent connection (#212)
Browse files Browse the repository at this point in the history
  • Loading branch information
feymartynov authored Jan 28, 2021
1 parent 477d78d commit c1a7058
Show file tree
Hide file tree
Showing 16 changed files with 346 additions and 259 deletions.
21 changes: 21 additions & 0 deletions migrations/2020-12-29-232109_create_agent_connection/down.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
-- Put back `agent_stream` table.
CREATE TABLE agent_stream (
id UUID DEFAULT gen_random_uuid(),
sent_by UUID NOT NULL,
label TEXT NOT NULL,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),

FOREIGN KEY (sent_by) REFERENCES agent (id) ON DELETE CASCADE,
PRIMARY KEY (id)
);

-- Put back `connected` status.
ALTER TYPE agent_status RENAME TO agent_status_old;
CREATE TYPE agent_status AS ENUM ('in_progress', 'ready', 'connected');
ALTER TABLE agent ALTER COLUMN status DROP DEFAULT;
ALTER TABLE agent ALTER COLUMN status TYPE agent_status USING status::text::agent_status;
ALTER TABLE agent ALTER COLUMN status SET DEFAULT 'in_progress';
DROP TYPE agent_status_old;

-- Drop new `agent_connection table`.
DROP TABLE agent_connection;
21 changes: 21 additions & 0 deletions migrations/2020-12-29-232109_create_agent_connection/up.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
-- Create `agent_connection`.
CREATE TABLE agent_connection (
agent_id UUID NOT NULL,
handle_id BIGINT NOT NULL,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),

FOREIGN KEY (agent_id) REFERENCES agent (id) ON DELETE CASCADE,
PRIMARY KEY (agent_id)
);

-- Remove `connected` status since it's now indicated by `agent_connection` row presence.
UPDATE agent SET status = 'ready' WHERE status = 'connected';
ALTER TYPE agent_status RENAME TO agent_status_old;
CREATE TYPE agent_status AS ENUM ('in_progress', 'ready');
ALTER TABLE agent ALTER COLUMN status DROP DEFAULT;
ALTER TABLE agent ALTER COLUMN status TYPE agent_status USING status::text::agent_status;
ALTER TABLE agent ALTER COLUMN status SET DEFAULT 'in_progress';
DROP TYPE agent_status_old;

-- Unused table.
DROP TABLE agent_stream;
27 changes: 4 additions & 23 deletions src/app/endpoint/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -223,17 +223,8 @@ mod test {
.get()
.map(|conn| {
let room = shared_helpers::insert_room(&conn);

factory::Agent::new()
.room_id(room.id())
.agent_id(sender.agent_id())
.insert(&conn);

factory::Agent::new()
.room_id(room.id())
.agent_id(receiver.agent_id())
.insert(&conn);

shared_helpers::insert_agent(&conn, sender.agent_id(), room.id());
shared_helpers::insert_agent(&conn, receiver.agent_id(), room.id());
room
})
.expect("Failed to insert room");
Expand Down Expand Up @@ -302,12 +293,7 @@ mod test {
.get()
.map(|conn| {
let room = shared_helpers::insert_room(&conn);

factory::Agent::new()
.room_id(room.id())
.agent_id(receiver.agent_id())
.insert(&conn);

shared_helpers::insert_agent(&conn, receiver.agent_id(), room.id());
room
})
.expect("Failed to insert room");
Expand Down Expand Up @@ -343,12 +329,7 @@ mod test {
.get()
.map(|conn| {
let room = shared_helpers::insert_room(&conn);

factory::Agent::new()
.room_id(room.id())
.agent_id(sender.agent_id())
.insert(&conn);

shared_helpers::insert_agent(&conn, sender.agent_id(), room.id());
room
})
.expect("Failed to insert room");
Expand Down
99 changes: 37 additions & 62 deletions src/app/endpoint/rtc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,8 @@ use async_trait::async_trait;
use chrono::Duration;
use chrono::Utc;
use serde_derive::{Deserialize, Serialize};
use svc_agent::{
mqtt::{IncomingRequestProperties, IntoPublishableMessage, OutgoingResponse, ResponseStatus},
Addressable,
use svc_agent::mqtt::{
IncomingRequestProperties, IntoPublishableMessage, OutgoingResponse, ResponseStatus,
};
use uuid::Uuid;

Expand Down Expand Up @@ -381,6 +380,7 @@ impl RequestHandler for ConnectHandler {
reqp.clone(),
Uuid::new_v4(),
payload.id,
room.id(),
backend.session_id(),
backend.id(),
context.start_timestamp(),
Expand All @@ -389,12 +389,6 @@ impl RequestHandler for ConnectHandler {

match janus_request_result {
Ok(req) => {
let conn = context.get_conn()?;

db::agent::UpdateQuery::new(reqp.as_agent_id(), room.id())
.status(db::agent::Status::Connected)
.execute(&conn)?;

let boxed_request = Box::new(req) as Box<dyn IntoPublishableMessage + Send>;
Ok(Box::new(stream::once(boxed_request)))
}
Expand Down Expand Up @@ -862,7 +856,7 @@ mod test {
let _rtc1 = shared_helpers::insert_rtc_with_room(&conn, &room1);

let s1a1 = TestAgent::new("web", "s1a1", USR_AUDIENCE);
shared_helpers::insert_agent(&conn, s1a1.agent_id(), room1.id());
shared_helpers::insert_connected_agent(&conn, s1a1.agent_id(), room1.id());

// The second backend has 2 agents.
let room2 =
Expand All @@ -871,10 +865,10 @@ mod test {
let _rtc2 = shared_helpers::insert_rtc_with_room(&conn, &room2);

let s2a1 = TestAgent::new("web", "s2a1", USR_AUDIENCE);
shared_helpers::insert_agent(&conn, s2a1.agent_id(), room2.id());
shared_helpers::insert_connected_agent(&conn, s2a1.agent_id(), room2.id());

let s2a2 = TestAgent::new("web", "s2a2", USR_AUDIENCE);
shared_helpers::insert_agent(&conn, s2a2.agent_id(), room2.id());
shared_helpers::insert_connected_agent(&conn, s2a2.agent_id(), room2.id());

// The new rtc for which we will balance the stream.
let rtc3 = shared_helpers::insert_rtc(&conn);
Expand Down Expand Up @@ -1286,8 +1280,13 @@ mod test {
let rtc = shared_helpers::insert_rtc_with_room(&conn, &room);

// Insert active agents.
shared_helpers::insert_agent(&conn, writer.agent_id(), room.id());
shared_helpers::insert_agent(&conn, reader1.agent_id(), room.id());
shared_helpers::insert_connected_agent(&conn, writer.agent_id(), room.id());

shared_helpers::insert_connected_agent(
&conn,
reader1.agent_id(),
room.id(),
);

factory::Agent::new()
.agent_id(reader2.agent_id())
Expand Down Expand Up @@ -1462,38 +1461,23 @@ mod test {
let rtc3 = shared_helpers::insert_rtc_with_room(&conn, &room3);

// Insert writer for room 1 @ backend 1
factory::Agent::new()
.agent_id(TestAgent::new("web", "writer1", USR_AUDIENCE).agent_id())
.room_id(room1.id())
.status(AgentStatus::Connected)
.insert(&conn);
let agent = TestAgent::new("web", "writer1", USR_AUDIENCE);
shared_helpers::insert_connected_agent(&conn, agent.agent_id(), room1.id());

// Insert two readers for room 1 @ backend 1
factory::Agent::new()
.agent_id(TestAgent::new("web", "reader1-1", USR_AUDIENCE).agent_id())
.room_id(room1.id())
.status(AgentStatus::Connected)
.insert(&conn);
let agent = TestAgent::new("web", "reader1-1", USR_AUDIENCE);
shared_helpers::insert_connected_agent(&conn, agent.agent_id(), room1.id());

factory::Agent::new()
.agent_id(TestAgent::new("web", "reader1-2", USR_AUDIENCE).agent_id())
.room_id(room1.id())
.status(AgentStatus::Connected)
.insert(&conn);
let agent = TestAgent::new("web", "reader1-2", USR_AUDIENCE);
shared_helpers::insert_connected_agent(&conn, agent.agent_id(), room1.id());

// Insert writer for room 2 @ backend 2
factory::Agent::new()
.agent_id(TestAgent::new("web", "writer2", USR_AUDIENCE).agent_id())
.room_id(room2.id())
.status(AgentStatus::Connected)
.insert(&conn);
let agent = TestAgent::new("web", "writer2", USR_AUDIENCE);
shared_helpers::insert_connected_agent(&conn, agent.agent_id(), room2.id());

// Insert reader for room 2 @ backend 2
factory::Agent::new()
.agent_id(TestAgent::new("web", "reader2", USR_AUDIENCE).agent_id())
.room_id(room2.id())
.status(AgentStatus::Connected)
.insert(&conn);
let agent = TestAgent::new("web", "reader2", USR_AUDIENCE);
shared_helpers::insert_connected_agent(&conn, agent.agent_id(), room2.id());

(rtc3, backend2)
})
Expand Down Expand Up @@ -1601,37 +1585,28 @@ mod test {
let rtc3 = shared_helpers::insert_rtc_with_room(&conn, &room3);

// Insert writer for room 1
factory::Agent::new()
.agent_id(TestAgent::new("web", "writer1", USR_AUDIENCE).agent_id())
.room_id(room1.id())
.status(AgentStatus::Connected)
.insert(&conn);
let agent = TestAgent::new("web", "writer1", USR_AUDIENCE);
shared_helpers::insert_connected_agent(&conn, agent.agent_id(), room1.id());

// Insert 450 readers for room 1
for i in 0..450 {
factory::Agent::new()
.agent_id(
TestAgent::new("web", &format!("reader1-{}", i), USR_AUDIENCE)
.agent_id(),
)
.room_id(room1.id())
.status(AgentStatus::Connected)
.insert(&conn);
let agent =
TestAgent::new("web", &format!("reader1-{}", i), USR_AUDIENCE);

shared_helpers::insert_connected_agent(
&conn,
agent.agent_id(),
room1.id(),
);
}

// Insert writer for room 3
factory::Agent::new()
.agent_id(TestAgent::new("web", "writer3", USR_AUDIENCE).agent_id())
.room_id(room2.id())
.status(AgentStatus::Connected)
.insert(&conn);
let agent = TestAgent::new("web", "writer3", USR_AUDIENCE);
shared_helpers::insert_connected_agent(&conn, agent.agent_id(), room2.id());

// Insert reader for room 3
factory::Agent::new()
.agent_id(TestAgent::new("web", "reader3", USR_AUDIENCE).agent_id())
.room_id(room3.id())
.status(AgentStatus::Connected)
.insert(&conn);
let agent = TestAgent::new("web", "reader3", USR_AUDIENCE);
shared_helpers::insert_connected_agent(&conn, agent.agent_id(), room3.id());

([rtc1, rtc2, rtc3], backend)
})
Expand Down
9 changes: 3 additions & 6 deletions src/app/endpoint/subscription.rs
Original file line number Diff line number Diff line change
Expand Up @@ -159,15 +159,12 @@ impl EventHandler for DeleteHandler {
if stream.sent_by() == &payload.subject {
// Stop the stream.
db::janus_rtc_stream::stop(stream.id(), &conn)?;

// Put stream readers into `ready` status since the stream has gone.
db::agent::BulkStatusUpdateQuery::new(db::agent::Status::Ready)
.room_id(room_id)
.status(db::agent::Status::Connected)
.execute(&conn)?;
}
}

// Disconnect stream readers since the stream has gone.
db::agent_connection::BulkDisconnectByRoomQuery::new(room_id).execute(&conn)?;

// Send agent.leave requests to those backends where the agent is connected to.
let mut backend_ids = streams
.iter()
Expand Down
5 changes: 2 additions & 3 deletions src/app/metrics/aggregator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ fn append_janus_stats(
context: &impl GlobalContext,
now: DateTime<Utc>,
) -> anyhow::Result<()> {
use crate::db::agent;
use crate::db::agent_connection;
use anyhow::Context;

match context.get_conn() {
Expand Down Expand Up @@ -261,8 +261,7 @@ fn append_janus_stats(
));

// The number of agents connect to an RTC.
let connected_agents_count = agent::CountQuery::new()
.status(agent::Status::Connected)
let connected_agents_count = agent_connection::CountQuery::new()
.execute(&conn)
.context("Failed to get connected agents count")?;

Expand Down
56 changes: 42 additions & 14 deletions src/backend/janus/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ use crate::app::error::{Error as AppError, ErrorExt, ErrorKind as AppErrorKind};
use crate::app::handle_id::HandleId;
use crate::app::message_handler::MessageStream;
use crate::app::API_VERSION;
use crate::db::{agent, janus_backend, janus_rtc_stream, recording, room, rtc};
use crate::diesel::Connection;
use crate::db::{agent, agent_connection, janus_backend, janus_rtc_stream, recording, room, rtc};
use crate::diesel::{Connection, Identifiable};
use crate::util::from_base64;

use self::events::{IncomingEvent, StatusEvent};
Expand Down Expand Up @@ -114,17 +114,51 @@ async fn handle_response_impl<C: Context>(
}
// Rtc Handle has been created
Transaction::CreateRtcHandle(tn) => {
let agent_id = respp.as_agent_id();
let handle_id = inresp.data().id();
let backend_id = respp.as_agent_id();
let room_id = tn.room_id();
let reqp = tn.reqp();
let agent_id = reqp.as_agent_id();

{
let conn = context.get_conn()?;

conn.transaction::<_, AppError, _>(|| {
// Find agent in the DB who made the original `rtc.connect` request.
let maybe_agent = agent::ListQuery::new()
.agent_id(agent_id)
.room_id(room_id)
.status(agent::Status::Ready)
.limit(1)
.execute(&conn)?;

if let Some(agent) = maybe_agent.first() {
// Create agent connection in the DB.
agent_connection::UpsertQuery::new(*agent.id(), handle_id)
.execute(&conn)?;

Ok(())
} else {
context.add_logger_tags(o!(
"agent_id" => agent_id.to_string(),
"room_id" => room_id.to_string(),
));

// Agent may be already gone.
Err(anyhow!("Agent not found"))
.error(AppErrorKind::AgentNotEnteredTheRoom)
}
})?;
}

// Returning Real-Time connection handle
let resp = endpoint::rtc::ConnectResponse::unicast(
endpoint::rtc::ConnectResponseData::new(HandleId::new(
tn.rtc_stream_id(),
tn.rtc_id(),
inresp.data().id(),
handle_id,
tn.session_id(),
agent_id.clone(),
backend_id.to_owned(),
)),
reqp.to_response(
ResponseStatus::OK,
Expand Down Expand Up @@ -557,12 +591,8 @@ fn handle_hangup_detach<C: Context, E: OpaqueId>(
// Publish the update event only if the stream object has been changed.
// If there's no actual media stream, the object wouldn't contain its start time.
if rtc_stream.time().is_some() {
// Put connected `agents` back into `ready` status since the stream has gone and
// they haven't been connected anymore.
agent::BulkStatusUpdateQuery::new(agent::Status::Ready)
.room_id(room.id())
.status(agent::Status::Connected)
.execute(&conn)?;
// Disconnect agents.
agent_connection::BulkDisconnectByRoomQuery::new(room.id()).execute(&conn)?;

// Send rtc_stream.update event.
let event = endpoint::rtc_stream::update_event(
Expand Down Expand Up @@ -625,9 +655,7 @@ async fn handle_status_event_impl<C: Context>(
.backend_id(evp.as_agent_id())
.execute(&conn)?;

agent::BulkStatusUpdateQuery::new(agent::Status::Ready)
.backend_id(evp.as_agent_id())
.status(agent::Status::Connected)
agent_connection::BulkDisconnectByBackendQuery::new(evp.as_agent_id())
.execute(&conn)?;

janus_backend::DeleteQuery::new(evp.as_agent_id()).execute(&conn)?;
Expand Down
Loading

0 comments on commit c1a7058

Please sign in to comment.