Skip to content
This repository has been archived by the owner on Aug 16, 2023. It is now read-only.

Commit

Permalink
Add RTC sharing policy and created_by (#228)
Browse files Browse the repository at this point in the history
  • Loading branch information
feymartynov committed Mar 2, 2021
1 parent edac2e8 commit ec9d72d
Show file tree
Hide file tree
Showing 16 changed files with 527 additions and 107 deletions.
22 changes: 14 additions & 8 deletions docs/src/api/room/create.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,20 @@ method | String | _required_ | Always `room.create`.

**Payload**

Name | Type | Default | Description
-------- | ---------- | ---------- | ------------------
time | [i64, i64) | _required_ | A [lt, rt) range of unix time (seconds) or null (unbounded).
audience | String | _required_ | The room audience.
backend | String | none | The room backend. Available values: janus, none.
reserve | i32 | _optional_ | The number of slots for subscribers to reserve on the server.
tags | json | {} | Arbitrary tags object associated with the room.

Name | Type | Default | Description
------------------ | ---------- | ---------- | ------------------
time | [i64, i64) | _required_ | A [lt, rt) range of unix time (seconds) or null (unbounded).
audience | String | _required_ | The room audience.
backend | String | none | [DEPRECATED] The room backend. Available values: janus, none.
rtc_sharing_policy | String | none | RTC sharing mode. Available values: none, shared, owned.
reserve | i32 | _optional_ | The number of slots for subscribers to reserve on the server.
tags | json | {} | Arbitrary tags object associated with the room.

**Deprecation warning**

`backend` property is deprecated, use `rtc_sharing_policy` instead. For compatibility purpose
`backend = janus` implies `rtc_sharing_policy = shared` and `backend = none` implies
`rtc_sharing_policy = none`. If `rtc_sharing_policy` is specified then `backend` is being ignored.

## Unicast response

Expand Down
1 change: 0 additions & 1 deletion docs/src/api/room/update.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ Name | Type | Default | Description
id | String | _required_ | The room identifier. The room must not be expired.
time | [i64, i64) | _optional_ | A [lt, rt) range of unix time (seconds) or null (unbounded).
audience | String | _optional_ | The room audience.
backend | String | _optional_ | The room backend. Available values: janus, none.
reserve | i32 | _optional_ | The number of slots for subscribers to reserve on the server.
tags | json | {} | Arbitrary tags object associated with the room.

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
ALTER TABLE room DROP column rtc_sharing_policy;
DROP TYPE rtc_sharing_policy;

ALTER TABLE room ADD CONSTRAINT room_backend_id_check CHECK (backend_id IS NULL OR backend != 'janus');
CREATE UNIQUE INDEX rtc_unique_room_id ON rtc (room_id);

DROP TRIGGER rtc_insert_trigger ON rtc;
DROP FUNCTION on_rtc_insert;

ALTER TABLE rtc DROP COLUMN created_by;
60 changes: 60 additions & 0 deletions migrations/2021-03-01-092819_add_rtc_sharing_policy_to_room/up.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
-- Add rtc.created_by.
ALTER TABLE rtc ADD COLUMN created_by agent_id NULL;
UPDATE rtc SET created_by = '("(anonymous,example.com)",web)'::agent_id;
ALTER TABLE rtc ALTER COLUMN created_by SET NOT NULL;

UPDATE rtc
SET created_by = jrs.sent_by
FROM (
SELECT DISTINCT ON (rtc_id) *
FROM janus_rtc_stream
ORDER BY rtc_id, created_at
) AS jrs
WHERE jrs.rtc_id = rtc.id;

-- Add room.rtc_sharing_policy.
CREATE TYPE rtc_sharing_policy AS ENUM ('none', 'shared', 'owned');
ALTER TABLE room ADD COLUMN rtc_sharing_policy rtc_sharing_policy NOT NULL DEFAULT 'none';
UPDATE room SET rtc_sharing_policy = 'shared' WHERE backend = 'janus';

-- Update room_backend_id check constraint.
ALTER TABLE room DROP CONSTRAINT room_backend_id_check;
ALTER TABLE room ADD CONSTRAINT room_backend_id_check CHECK (backend_id IS NULL OR rtc_sharing_policy != 'none');

-- Apply RTC sharing policy within a trigger to limit RTCs number in the room:
DROP INDEX rtc_unique_room_id;

CREATE OR REPLACE FUNCTION on_rtc_insert() RETURNS trigger
LANGUAGE plpgsql
AS $$
DECLARE
rtc_sharing_policy rtc_sharing_policy;
BEGIN
CASE (SELECT r.rtc_sharing_policy FROM room AS r WHERE id = NEW.room_id)
WHEN 'none' THEN
-- RTC creation not allowed.
RAISE EXCEPTION 'creating RTC within a room with `none` RTC sharing policy is not allowed';
WHEN 'shared' THEN
-- Only single RTC allowed in the room.
IF (SELECT COUNT(id) FROM rtc WHERE room_id = NEW.room_id) = 0 THEN
RETURN NEW;
ELSE
RAISE EXCEPTION 'creating multiple RTCs within a room with `shared` RTC sharing policy is not allowed';
END IF;
WHEN 'owned' THEN
-- Only single RTC per agent allowed in the room.
IF (SELECT COUNT(id) FROM rtc WHERE room_id = NEW.room_id AND created_by = NEW.created_by) = 0 THEN
RETURN NEW;
ELSE
RAISE EXCEPTION 'creating multiple RTCs per agent within a room with `owned` RTC sharing policy is not allowed';
END IF;
END CASE;
END;
$$;

DO $$ BEGIN
CREATE TRIGGER rtc_insert_trigger BEFORE INSERT
ON rtc FOR EACH ROW EXECUTE FUNCTION on_rtc_insert();
EXCEPTION
WHEN duplicate_object THEN null;
END $$;
55 changes: 29 additions & 26 deletions src/app/endpoint/room.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use crate::app::context::Context;
use crate::app::endpoint::prelude::*;
use crate::db;
use crate::db::room::RoomBackend;
use crate::db::rtc::SharingPolicy as RtcSharingPolicy;

///////////////////////////////////////////////////////////////////////////////

Expand Down Expand Up @@ -43,18 +44,15 @@ pub(crate) struct CreateRequest {
#[serde(with = "crate::serde::ts_seconds_bound_tuple")]
time: (Bound<DateTime<Utc>>, Bound<DateTime<Utc>>),
audience: String,
#[serde(default = "CreateRequest::default_backend")]
backend: RoomBackend,
// Deprecated in favor of `rtc_sharing_policy`.
#[serde(default)]
backend: Option<RoomBackend>,
#[serde(default)]
rtc_sharing_policy: Option<RtcSharingPolicy>,
reserve: Option<i32>,
tags: Option<JsonValue>,
}

impl CreateRequest {
fn default_backend() -> RoomBackend {
RoomBackend::None
}
}

pub(crate) struct CreateHandler;

#[async_trait]
Expand All @@ -67,6 +65,12 @@ impl RequestHandler for CreateHandler {
payload: Self::Payload,
reqp: &IncomingRequestProperties,
) -> Result {
// Prefer `rtc_sharing_policy` with fallback to `backend` and `None` as default.
let rtc_sharing_policy = payload
.rtc_sharing_policy
.or_else(|| payload.backend.map(|b| b.into()))
.unwrap_or(RtcSharingPolicy::None);

// Authorize room creation on the tenant.
let authz_time = context
.authz()
Expand All @@ -76,7 +80,7 @@ impl RequestHandler for CreateHandler {
// Create a room.
let room = {
let mut q =
db::room::InsertQuery::new(payload.time, &payload.audience, payload.backend);
db::room::InsertQuery::new(payload.time, &payload.audience, rtc_sharing_policy);

if let Some(reserve) = payload.reserve {
q = q.reserve(reserve);
Expand Down Expand Up @@ -164,7 +168,6 @@ pub(crate) struct UpdateRequest {
#[serde(with = "crate::serde::ts_seconds_option_bound_tuple")]
time: Option<db::room::Time>,
audience: Option<String>,
backend: Option<db::room::RoomBackend>,
reserve: Option<Option<i32>>,
tags: Option<JsonValue>,
}
Expand Down Expand Up @@ -240,7 +243,6 @@ impl RequestHandler for UpdateHandler {
db::room::UpdateQuery::new(room.id())
.time(time)
.audience(payload.audience)
.backend(payload.backend)
.reserve(payload.reserve)
.tags(payload.tags)
.execute(&conn)?
Expand Down Expand Up @@ -515,7 +517,8 @@ mod test {
let payload = CreateRequest {
time: time.clone(),
audience: USR_AUDIENCE.to_owned(),
backend: db::room::RoomBackend::Janus,
backend: None,
rtc_sharing_policy: Some(db::rtc::SharingPolicy::Shared),
reserve: Some(123),
tags: Some(json!({ "foo": "bar" })),
};
Expand All @@ -529,7 +532,7 @@ mod test {
assert_eq!(respp.status(), ResponseStatus::OK);
assert_eq!(room.audience(), USR_AUDIENCE);
assert_eq!(room.time(), &time);
assert_eq!(room.backend(), db::room::RoomBackend::Janus);
assert_eq!(room.rtc_sharing_policy(), db::rtc::SharingPolicy::Shared);
assert_eq!(room.reserve(), Some(123));
assert_eq!(room.tags(), &json!({ "foo": "bar" }));

Expand All @@ -539,7 +542,7 @@ mod test {
assert_eq!(evp.label(), "room.create");
assert_eq!(room.audience(), USR_AUDIENCE);
assert_eq!(room.time(), &time);
assert_eq!(room.backend(), db::room::RoomBackend::Janus);
assert_eq!(room.rtc_sharing_policy(), db::rtc::SharingPolicy::Shared);
assert_eq!(room.reserve(), Some(123));
assert_eq!(room.tags(), &json!({ "foo": "bar" }));
});
Expand All @@ -555,7 +558,8 @@ mod test {
let payload = CreateRequest {
time: (Bound::Included(Utc::now()), Bound::Unbounded),
audience: USR_AUDIENCE.to_owned(),
backend: db::room::RoomBackend::Janus,
backend: None,
rtc_sharing_policy: Some(db::rtc::SharingPolicy::Shared),
reserve: None,
tags: None,
};
Expand Down Expand Up @@ -610,7 +614,7 @@ mod test {
assert_eq!(respp.status(), ResponseStatus::OK);
assert_eq!(resp_room.audience(), room.audience());
assert_eq!(resp_room.time(), room.time());
assert_eq!(resp_room.backend(), room.backend());
assert_eq!(resp_room.rtc_sharing_policy(), room.rtc_sharing_policy());
});
}

Expand Down Expand Up @@ -687,7 +691,7 @@ mod test {
factory::Room::new()
.audience(USR_AUDIENCE)
.time((Bound::Unbounded, Bound::Unbounded))
.backend(db::room::RoomBackend::Janus)
.rtc_sharing_policy(db::rtc::SharingPolicy::Shared)
.insert(&conn)
};

Expand All @@ -711,7 +715,6 @@ mod test {
reserve: Some(Some(123)),
tags: Some(json!({"foo": "bar"})),
audience: None,
backend: None,
};

let messages = handle_request::<UpdateHandler>(&mut context, &agent, payload)
Expand All @@ -724,7 +727,10 @@ mod test {
assert_eq!(resp_room.id(), room.id());
assert_eq!(resp_room.audience(), room.audience());
assert_eq!(resp_room.time(), &time);
assert_eq!(resp_room.backend(), db::room::RoomBackend::Janus);
assert_eq!(
resp_room.rtc_sharing_policy(),
db::rtc::SharingPolicy::Shared
);
assert_eq!(resp_room.reserve(), Some(123));
assert_eq!(resp_room.tags(), &json!({"foo": "bar"}));
});
Expand All @@ -749,7 +755,7 @@ mod test {
Bound::Included(now - Duration::hours(1)),
Bound::Excluded(now + Duration::hours(2)),
))
.backend(db::room::RoomBackend::Janus)
.rtc_sharing_policy(db::rtc::SharingPolicy::Shared)
.insert(&conn)
};

Expand All @@ -773,7 +779,6 @@ mod test {
reserve: Some(Some(123)),
tags: Some(json!({"foo": "bar"})),
audience: None,
backend: None,
};

handle_request::<UpdateHandler>(&mut context, &agent, payload)
Expand Down Expand Up @@ -801,7 +806,7 @@ mod test {
Bound::Included(now - Duration::hours(1)),
Bound::Excluded(now + Duration::hours(5)),
))
.backend(db::room::RoomBackend::Janus)
.rtc_sharing_policy(db::rtc::SharingPolicy::Shared)
.insert(&conn)
};

Expand All @@ -824,7 +829,6 @@ mod test {
time: Some(time),
reserve: Some(Some(123)),
audience: None,
backend: None,
tags: None,
};

Expand Down Expand Up @@ -876,7 +880,7 @@ mod test {
factory::Room::new()
.audience(USR_AUDIENCE)
.time((Bound::Included(now - Duration::hours(1)), Bound::Unbounded))
.backend(db::room::RoomBackend::Janus)
.rtc_sharing_policy(db::rtc::SharingPolicy::Shared)
.insert(&conn)
};

Expand All @@ -899,7 +903,6 @@ mod test {
time: Some(time),
reserve: None,
audience: None,
backend: None,
tags: None,
};

Expand Down Expand Up @@ -1004,7 +1007,7 @@ mod test {
assert_eq!(respp.status(), ResponseStatus::OK);
assert_eq!(resp_room.audience(), room.audience());
assert_eq!(resp_room.time(), room.time());
assert_eq!(resp_room.backend(), room.backend());
assert_eq!(resp_room.rtc_sharing_policy(), room.rtc_sharing_policy());

// Assert room absence in the DB.
let conn = context.get_conn().unwrap();
Expand Down
Loading

0 comments on commit ec9d72d

Please sign in to comment.