Skip to content
This repository has been archived by the owner on Aug 16, 2023. It is now read-only.

Commit

Permalink
svc-agent update
Browse files Browse the repository at this point in the history
  • Loading branch information
khodzha committed Jun 1, 2020
1 parent 9f5bc75 commit 884d356
Show file tree
Hide file tree
Showing 15 changed files with 195 additions and 231 deletions.
23 changes: 22 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ rand = "0.7"
serde = "1.0"
serde_derive = "1.0"
serde_json = "1.0"
svc-agent = { version = "0.12", features = ["diesel"] }
svc-agent = { version = "0.13", features = ["diesel"] }
svc-authn = { version = "0.5", features = ["jose", "diesel"] }
svc-authz = "0.10"
svc-error = { version = "0.1", features = ["diesel", "r2d2", "svc-agent", "svc-authn", "svc-authz", "sentry-extension"] }
Expand Down
6 changes: 3 additions & 3 deletions src/app/endpoint/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use diesel::pg::PgConnection;
use serde_derive::Deserialize;
use serde_json::{json, Value as JsonValue};
use svc_agent::mqtt::{
IncomingRequestProperties, IncomingResponseProperties, IntoPublishableDump, OutgoingRequest,
IncomingRequestProperties, IncomingResponseProperties, IntoPublishableMessage, OutgoingRequest,
OutgoingResponse, OutgoingResponseProperties, ResponseStatus, ShortTermTimingProperties,
SubscriptionTopic,
};
Expand Down Expand Up @@ -74,7 +74,7 @@ impl RequestHandler for UnicastHandler {
API_VERSION,
);

let boxed_req = Box::new(req) as Box<dyn IntoPublishableDump + Send>;
let boxed_req = Box::new(req) as Box<dyn IntoPublishableMessage + Send>;
Ok(Box::new(stream::once(boxed_req)))
}
}
Expand Down Expand Up @@ -157,7 +157,7 @@ impl ResponseHandler for CallbackHandler {
);

let resp = OutgoingResponse::unicast(payload.to_owned(), props, &reqp, API_VERSION);
let boxed_resp = Box::new(resp) as Box<dyn IntoPublishableDump + Send>;
let boxed_resp = Box::new(resp) as Box<dyn IntoPublishableMessage + Send>;
Ok(Box::new(stream::once(boxed_resp)))
}
}
Expand Down
5 changes: 3 additions & 2 deletions src/app/endpoint/metric.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use async_trait::async_trait;
use chrono::{serde::ts_seconds, DateTime, Utc};
use serde_derive::{Deserialize, Serialize};
use svc_agent::mqtt::{
IncomingEventProperties, IntoPublishableDump, OutgoingEvent, ShortTermTimingProperties,
IncomingEventProperties, IntoPublishableMessage, OutgoingEvent, ShortTermTimingProperties,
};

use crate::app::context::Context;
Expand Down Expand Up @@ -54,7 +54,8 @@ impl EventHandler for PullHandler {
let props = evp.to_event("metric.create", short_term_timing);
let outgoing_event =
OutgoingEvent::multicast(outgoing_event_payload, props, account_id);
let boxed_event = Box::new(outgoing_event) as Box<dyn IntoPublishableDump + Send>;
let boxed_event =
Box::new(outgoing_event) as Box<dyn IntoPublishableMessage + Send>;
Ok(Box::new(stream::once(boxed_event)))
}

Expand Down
37 changes: 13 additions & 24 deletions src/app/endpoint/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ use async_trait::async_trait;
use chrono::{DateTime, Utc};
use serde::de::DeserializeOwned;
use svc_agent::mqtt::{
compat::IncomingEnvelope, IncomingEventProperties, IncomingRequestProperties,
IncomingResponseProperties,
IncomingEvent, IncomingEventProperties, IncomingRequest, IncomingRequestProperties,
IncomingResponse, IncomingResponseProperties,
};
use svc_error::Error as SvcError;

Expand Down Expand Up @@ -37,15 +37,14 @@ macro_rules! request_routes {
($($m: pat => $h: ty),*) => {
pub(crate) async fn route_request<C: Context>(
context: &C,
envelope: IncomingEnvelope,
reqp: &IncomingRequestProperties,
request: &IncomingRequest<String>,
_topic: &str,
start_timestamp: DateTime<Utc>,
) -> Option<MessageStream> {
match reqp.method() {
match request.properties().method() {
$(
$m => Some(
<$h>::handle_envelope::<C>(context, envelope, reqp, start_timestamp).await
<$h>::handle_envelope::<C>(context, request, start_timestamp).await
),
)*
_ => None,
Expand Down Expand Up @@ -90,23 +89,14 @@ pub(crate) trait ResponseHandler {

pub(crate) async fn route_response<C: Context>(
context: &C,
envelope: IncomingEnvelope,
respp: &IncomingResponseProperties,
resp: &IncomingResponse<String>,
topic: &str,
start_timestamp: DateTime<Utc>,
) -> Option<MessageStream> {
if topic == context.janus_topics().responses_topic() {
Some(janus::handle_response::<C>(context, envelope, respp, start_timestamp).await)
Some(janus::handle_response::<C>(context, resp, start_timestamp).await)
} else {
Some(
message::CallbackHandler::handle_envelope::<C>(
context,
envelope,
respp,
start_timestamp,
)
.await,
)
Some(message::CallbackHandler::handle_envelope::<C>(context, resp, start_timestamp).await)
}
}

Expand All @@ -129,20 +119,19 @@ macro_rules! event_routes {
#[allow(unused_variables)]
pub(crate) async fn route_event<C: Context>(
context: &C,
envelope: IncomingEnvelope,
evp: &IncomingEventProperties,
event: &IncomingEvent<String>,
topic: &str,
start_timestamp: DateTime<Utc>,
) -> Option<MessageStream> {
if topic == context.janus_topics().events_topic() {
Some(janus::handle_event::<C>(context, envelope, evp, start_timestamp).await)
Some(janus::handle_event::<C>(context, event, start_timestamp).await)
} else if topic == context.janus_topics().status_events_topic() {
Some(janus::handle_status_event::<C>(context, envelope, evp, start_timestamp).await)
Some(janus::handle_status_event::<C>(context, event, start_timestamp).await)
} else {
match evp.label() {
match event.properties().label() {
$(
Some($l) => Some(
<$h>::handle_envelope::<C>(context, envelope, evp, start_timestamp).await
<$h>::handle_envelope::<C>(context, event, start_timestamp).await
),
)*
_ => None,
Expand Down
6 changes: 3 additions & 3 deletions src/app/endpoint/room.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use chrono::{DateTime, Utc};
use serde_derive::{Deserialize, Serialize};
use std::ops::Bound;
use svc_agent::mqtt::{
IncomingRequestProperties, IntoPublishableDump, OutgoingRequest, ResponseStatus,
IncomingRequestProperties, IntoPublishableMessage, OutgoingRequest, ResponseStatus,
ShortTermTimingProperties,
};
use svc_agent::{Addressable, AgentId};
Expand Down Expand Up @@ -339,7 +339,7 @@ impl RequestHandler for EnterHandler {
// Then we won't need the local state on the broker at all and will be able
// to send a multicast request to the broker.
let outgoing_request = OutgoingRequest::unicast(payload, props, reqp, MQTT_GW_API_VERSION);
let boxed_request = Box::new(outgoing_request) as Box<dyn IntoPublishableDump + Send>;
let boxed_request = Box::new(outgoing_request) as Box<dyn IntoPublishableMessage + Send>;
Ok(Box::new(stream::once(boxed_request)))
}
}
Expand Down Expand Up @@ -413,7 +413,7 @@ impl RequestHandler for LeaveHandler {
// Then we won't need the local state on the broker at all and will be able
// to send a multicast request to the broker.
let outgoing_request = OutgoingRequest::unicast(payload, props, reqp, MQTT_GW_API_VERSION);
let boxed_request = Box::new(outgoing_request) as Box<dyn IntoPublishableDump + Send>;
let boxed_request = Box::new(outgoing_request) as Box<dyn IntoPublishableMessage + Send>;
Ok(Box::new(stream::once(boxed_request)))
}
}
Expand Down
4 changes: 2 additions & 2 deletions src/app/endpoint/rtc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use async_trait::async_trait;
use chrono::{DateTime, Utc};
use serde_derive::{Deserialize, Serialize};
use svc_agent::mqtt::{
IncomingRequestProperties, IntoPublishableDump, OutgoingResponse, ResponseStatus,
IncomingRequestProperties, IntoPublishableMessage, OutgoingResponse, ResponseStatus,
};
use uuid::Uuid;

Expand Down Expand Up @@ -312,7 +312,7 @@ impl RequestHandler for ConnectHandler {

match janus_request_result {
Ok(req) => {
let boxed_request = Box::new(req) as Box<dyn IntoPublishableDump + Send>;
let boxed_request = Box::new(req) as Box<dyn IntoPublishableMessage + Send>;
Ok(Box::new(stream::once(boxed_request)))
}
Err(err) => Err(format!("error creating a backend request: {}", err))
Expand Down
8 changes: 4 additions & 4 deletions src/app/endpoint/rtc_signal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use chrono::{DateTime, Duration, Utc};
use serde_derive::{Deserialize, Serialize};
use serde_json::Value as JsonValue;
use svc_agent::mqtt::{
IncomingRequestProperties, IntoPublishableDump, OutgoingResponse, ResponseStatus,
IncomingRequestProperties, IntoPublishableMessage, OutgoingResponse, ResponseStatus,
};
use svc_agent::Addressable;
use svc_error::Error as SvcError;
Expand Down Expand Up @@ -85,7 +85,7 @@ impl RequestHandler for CreateHandler {
start_timestamp,
authz_time,
)
.map(|req| Box::new(req) as Box<dyn IntoPublishableDump + Send>)
.map(|req| Box::new(req) as Box<dyn IntoPublishableMessage + Send>)
.map_err(|err| format!("error creating a backend request: {}", err))
.status(ResponseStatus::UNPROCESSABLE_ENTITY)?
} else {
Expand Down Expand Up @@ -124,7 +124,7 @@ impl RequestHandler for CreateHandler {
start_timestamp,
authz_time,
)
.map(|req| Box::new(req) as Box<dyn IntoPublishableDump + Send>)
.map(|req| Box::new(req) as Box<dyn IntoPublishableMessage + Send>)
.map_err(|err| format!("error creating a backend request: {}", err))
.status(ResponseStatus::UNPROCESSABLE_ENTITY)?
}
Expand All @@ -145,7 +145,7 @@ impl RequestHandler for CreateHandler {
start_timestamp,
authz_time,
)
.map(|req| Box::new(req) as Box<dyn IntoPublishableDump + Send>)
.map(|req| Box::new(req) as Box<dyn IntoPublishableMessage + Send>)
.map_err(|err| format!("error creating a backend request: {}", err))
.status(ResponseStatus::UNPROCESSABLE_ENTITY)?
}
Expand Down
6 changes: 3 additions & 3 deletions src/app/endpoint/rtc_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ use async_trait::async_trait;
use chrono::{DateTime, Utc};
use serde_derive::Deserialize;
use svc_agent::mqtt::{
IncomingRequestProperties, OutgoingEvent, OutgoingEventProperties, ResponseStatus,
ShortTermTimingProperties, TrackingProperties,
IncomingRequestProperties, OutgoingEvent, OutgoingEventProperties, OutgoingMessage,
ResponseStatus, ShortTermTimingProperties, TrackingProperties,
};
use svc_error::Error as SvcError;
use uuid::Uuid;
Expand Down Expand Up @@ -104,7 +104,7 @@ impl RequestHandler for ListHandler {

////////////////////////////////////////////////////////////////////////////////

pub(crate) type ObjectUpdateEvent = OutgoingEvent<db::janus_rtc_stream::Object>;
pub(crate) type ObjectUpdateEvent = OutgoingMessage<db::janus_rtc_stream::Object>;

pub(crate) fn update_event(
room_id: Uuid,
Expand Down
6 changes: 3 additions & 3 deletions src/app/endpoint/shared.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use chrono::{DateTime, Duration, Utc};
use serde::Serialize;
use svc_agent::mqtt::{
IncomingRequestProperties, IntoPublishableDump, OutgoingEvent, OutgoingEventProperties,
IncomingRequestProperties, IntoPublishableMessage, OutgoingEvent, OutgoingEventProperties,
OutgoingResponse, ResponseStatus, ShortTermTimingProperties,
};

Expand All @@ -15,7 +15,7 @@ pub(crate) fn build_response(
reqp: &IncomingRequestProperties,
start_timestamp: DateTime<Utc>,
maybe_authz_time: Option<Duration>,
) -> Box<dyn IntoPublishableDump + Send> {
) -> Box<dyn IntoPublishableMessage + Send> {
let mut timing = ShortTermTimingProperties::until_now(start_timestamp);

if let Some(authz_time) = maybe_authz_time {
Expand All @@ -32,7 +32,7 @@ pub(crate) fn build_notification(
payload: impl Serialize + Send + 'static,
reqp: &IncomingRequestProperties,
start_timestamp: DateTime<Utc>,
) -> Box<dyn IntoPublishableDump + Send> {
) -> Box<dyn IntoPublishableMessage + Send> {
let timing = ShortTermTimingProperties::until_now(start_timestamp);
let mut props = OutgoingEventProperties::new(label, timing);
props.set_tracking(reqp.tracking().to_owned());
Expand Down
6 changes: 3 additions & 3 deletions src/app/endpoint/subscription.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use chrono::{DateTime, Utc};
use serde_derive::{Deserialize, Serialize};
use svc_agent::{
mqtt::{
IncomingEventProperties, IntoPublishableDump, OutgoingEvent, ResponseStatus,
IncomingEventProperties, IntoPublishableMessage, OutgoingEvent, ResponseStatus,
ShortTermTimingProperties,
},
AgentId, Authenticable,
Expand Down Expand Up @@ -101,7 +101,7 @@ impl EventHandler for CreateHandler {
let props = evp.to_event("room.enter", short_term_timing);
let to_uri = format!("rooms/{}/events", room_id);
let outgoing_event = OutgoingEvent::broadcast(outgoing_event_payload, props, &to_uri);
let boxed_event = Box::new(outgoing_event) as Box<dyn IntoPublishableDump + Send>;
let boxed_event = Box::new(outgoing_event) as Box<dyn IntoPublishableMessage + Send>;
Ok(Box::new(stream::once(boxed_event)))
}
}
Expand Down Expand Up @@ -145,7 +145,7 @@ impl EventHandler for DeleteHandler {
let props = evp.to_event("room.leave", short_term_timing);
let to_uri = format!("rooms/{}/events", room_id);
let outgoing_event = OutgoingEvent::broadcast(outgoing_event_payload, props, &to_uri);
let boxed_event = Box::new(outgoing_event) as Box<dyn IntoPublishableDump + Send>;
let boxed_event = Box::new(outgoing_event) as Box<dyn IntoPublishableMessage + Send>;
let mut messages = vec![boxed_event];

// `agent.leave` requests to Janus instances that host active streams in this room.
Expand Down
8 changes: 4 additions & 4 deletions src/app/endpoint/system.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ use async_trait::async_trait;
use chrono::{DateTime, Utc};
use serde_derive::{Deserialize, Serialize};
use svc_agent::mqtt::{
IncomingRequestProperties, IntoPublishableDump, OutgoingEvent, OutgoingEventProperties,
ResponseStatus, ShortTermTimingProperties, TrackingProperties,
IncomingRequestProperties, IntoPublishableMessage, OutgoingEvent, OutgoingEventProperties,
OutgoingMessage, ResponseStatus, ShortTermTimingProperties, TrackingProperties,
};
use svc_authn::Authenticable;
use uuid::Uuid;
Expand Down Expand Up @@ -47,7 +47,7 @@ struct RtcUploadEventData {
uri: Option<String>,
}

pub(crate) type RoomUploadEvent = OutgoingEvent<RoomUploadEventData>;
pub(crate) type RoomUploadEvent = OutgoingMessage<RoomUploadEventData>;

////////////////////////////////////////////////////////////////////////////////

Expand Down Expand Up @@ -107,7 +107,7 @@ impl RequestHandler for VacuumHandler {
.map_err(|err| format!("error creating a backend request: {}", err))
.status(ResponseStatus::UNPROCESSABLE_ENTITY)?;

requests.push(Box::new(backreq) as Box<dyn IntoPublishableDump + Send>);
requests.push(Box::new(backreq) as Box<dyn IntoPublishableMessage + Send>);
}
}

Expand Down
Loading

0 comments on commit 884d356

Please sign in to comment.