Skip to content
This repository has been archived by the owner on Aug 16, 2023. It is now read-only.

Commit

Permalink
Add timing
Browse files Browse the repository at this point in the history
  • Loading branch information
feymartynov committed Oct 29, 2019
1 parent 3d36d10 commit ccf7766
Show file tree
Hide file tree
Showing 15 changed files with 606 additions and 199 deletions.
16 changes: 6 additions & 10 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 1 addition & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ chrono = { version = "0.4", features = ["serde"] }
openssl = "*"
diesel = { version = "1.4", features = ["postgres", "uuid", "chrono", "serde_json", "r2d2"] }
diesel-derive-enum = { version = "0.4", features = ["postgres"] }
svc-agent = { version = "0.8", features = ["diesel"] }
svc-agent = { version = "0.9", features = ["diesel"] }
svc-authz = "0.5"
svc-authn = { version = "0.5", features = ["jose", "diesel"] }
svc-error = { version = "0.1", features = ["diesel", "r2d2", "svc-agent", "svc-authn", "svc-authz", "sentry-extension"] }
Expand All @@ -38,6 +38,5 @@ version = "=0.3.0-alpha.18"
package = "futures-channel-preview"

[patch.crates-io]
rumqtt = { git = "https://github.com/AtherEnergy/rumqtt.git", ref = "980e3d7d72344fc1b547a6b6834a7cba4112f93a" }
# Use a branch with synchronous authz until 1.39 becomes stable (07.11.2019)
svc-authz = { git = "https://github.com/netology-group/svc-authz-rs", branch = "sync" }
26 changes: 18 additions & 8 deletions src/app/endpoint/agent.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
use chrono::{DateTime, Utc};
use serde_derive::Deserialize;
use svc_agent::mqtt::{IncomingRequest, ResponseStatus};
use svc_error::Error as SvcError;
use uuid::Uuid;

use crate::app::endpoint;
use crate::app::endpoint::shared;
use crate::db::{agent, room, ConnectionPool};

////////////////////////////////////////////////////////////////////////////////
Expand Down Expand Up @@ -33,11 +35,15 @@ impl State {
Self { authz, db }
}

pub(crate) async fn list(&self, inreq: ListRequest) -> endpoint::Result {
pub(crate) async fn list(
&self,
inreq: ListRequest,
start_timestamp: DateTime<Utc>,
) -> endpoint::Result {
let room_id = inreq.payload().room_id;

// Authorization: room's owner has to allow the action
{
let authz_time = {
let conn = self.db.get()?;
let room = room::FindQuery::new()
.time(room::now())
Expand All @@ -59,8 +65,8 @@ impl State {
vec!["rooms", &room_id, "agents"],
"list",
)
.map_err(|err| SvcError::from(err))?;
}
.map_err(|err| SvcError::from(err))?
};

let objects = {
let conn = self.db.get()?;
Expand All @@ -77,7 +83,11 @@ impl State {
.execute(&conn)?
};

inreq.to_response(objects, ResponseStatus::OK).into()
let timing = shared::build_short_term_timing(start_timestamp, Some(authz_time));

inreq
.to_response(objects, ResponseStatus::OK, timing)
.into()
}
}

Expand Down Expand Up @@ -145,7 +155,7 @@ mod test {
let state = State::new(authz.into(), db.connection_pool().clone());
let payload = json!({"room_id": online_agent.room_id()});
let request: ListRequest = agent.build_request("agent.list", &payload).unwrap();
let mut result = state.list(request).await.into_result().unwrap();
let mut result = state.list(request, Utc::now()).await.into_result().unwrap();
let message = result.remove(0);

// Assert response.
Expand Down Expand Up @@ -174,7 +184,7 @@ mod test {
let agent = TestAgent::new("web", "user123", AUDIENCE);
let payload = json!({ "room_id": Uuid::new_v4() });
let request: ListRequest = agent.build_request("agent.list", &payload).unwrap();
let result = state.list(request).await.into_result();
let result = state.list(request, Utc::now()).await.into_result();

// Assert 404 error response.
match result {
Expand All @@ -201,7 +211,7 @@ mod test {
let agent = TestAgent::new("web", "user123", AUDIENCE);
let payload = json!({"room_id": room.id()});
let request: ListRequest = agent.build_request("agent.list", &payload).unwrap();
let result = state.list(request).await.into_result();
let result = state.list(request, Utc::now()).await.into_result();

// Assert 403 error response.
match result {
Expand Down
74 changes: 56 additions & 18 deletions src/app/endpoint/message.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use chrono::{DateTime, Utc};
use diesel::pg::PgConnection;
use failure::Error;
use serde_derive::Deserialize;
Expand All @@ -12,7 +13,7 @@ use svc_error::Error as SvcError;
use uuid::Uuid;

use crate::app::endpoint;
use crate::app::endpoint::shared::check_room_presence;
use crate::app::endpoint::shared;
use crate::db::{room, ConnectionPool};
use crate::util::{from_base64, to_base64};

Expand Down Expand Up @@ -51,28 +52,43 @@ impl State {
}

impl State {
pub(crate) async fn broadcast(&self, inreq: BroadcastRequest) -> endpoint::Result {
pub(crate) async fn broadcast(
&self,
inreq: BroadcastRequest,
start_timestamp: DateTime<Utc>,
) -> endpoint::Result {
let conn = self.db.get()?;
let room = find_room(inreq.payload().room_id, &conn)?;
check_room_presence(&room, &inreq.properties().as_agent_id(), &conn)?;
shared::check_room_presence(&room, &inreq.properties().as_agent_id(), &conn)?;

let (long_term_timing, short_term_timing) =
shared::build_timings(inreq.properties(), start_timestamp, None);

let resp = inreq.to_response(json!({}), ResponseStatus::OK);
let resp = inreq.to_response(json!({}), ResponseStatus::OK, short_term_timing.clone());
let resp_box = Box::new(resp) as Box<dyn Publishable>;

let payload = inreq.payload().data.to_owned();
let props = OutgoingEventProperties::new("message.broadcast");

let mut props = OutgoingEventProperties::new("message.broadcast", short_term_timing);

props.set_long_term_timing(long_term_timing);

let to_uri = format!("rooms/{}/events", inreq.payload().room_id);
let event = OutgoingEvent::broadcast(payload, props, &to_uri);
let event_box = Box::new(event) as Box<dyn Publishable>;

vec![resp_box, event_box].into()
}

pub(crate) async fn unicast(&self, inreq: UnicastRequest) -> endpoint::Result {
pub(crate) async fn unicast(
&self,
inreq: UnicastRequest,
start_timestamp: DateTime<Utc>,
) -> endpoint::Result {
let conn = self.db.get()?;
let room = find_room(inreq.payload().room_id, &conn)?;
check_room_presence(&room, &inreq.properties().as_agent_id(), &conn)?;
check_room_presence(&room, &inreq.payload().agent_id, &conn)?;
shared::check_room_presence(&room, &inreq.properties().as_agent_id(), &conn)?;
shared::check_room_presence(&room, &inreq.payload().agent_id, &conn)?;

let to = &inreq.payload().agent_id;
let payload = &inreq.payload().data;
Expand All @@ -93,25 +109,39 @@ impl State {
.build()
})?;

let props = OutgoingRequestProperties::new(
let (long_term_timing, short_term_timing) =
shared::build_timings(inreq.properties(), start_timestamp, None);

let mut props = OutgoingRequestProperties::new(
inreq.properties().method(),
&response_topic,
&correlation_data,
short_term_timing,
);

props.set_long_term_timing(long_term_timing);
OutgoingRequest::unicast(payload.to_owned(), props, to).into()
}

pub(crate) async fn callback(
&self,
inresp: UnicastIncomingResponse,
start_timestamp: DateTime<Utc>,
) -> Result<Vec<Box<dyn Publishable>>, Error> {
let reqp =
from_base64::<IncomingRequestProperties>(inresp.properties().correlation_data())?;

let payload = inresp.payload();

let props =
OutgoingResponseProperties::new(inresp.properties().status(), reqp.correlation_data());
let (long_term_timing, short_term_timing) =
shared::build_timings(inresp.properties(), start_timestamp, None);

let props = OutgoingResponseProperties::new(
inresp.properties().status(),
reqp.correlation_data(),
long_term_timing,
short_term_timing,
);

let message = OutgoingResponse::unicast(payload.to_owned(), props, &reqp);
Ok(vec![Box::new(message) as Box<dyn Publishable>])
Expand Down Expand Up @@ -186,7 +216,11 @@ mod test {
sender.build_request("message.unicast", &payload).unwrap();

let state = State::new(sender.agent_id().clone(), db.connection_pool().clone());
let mut result = state.unicast(request).await.into_result().unwrap();
let mut result = state
.unicast(request, Utc::now())
.await
.into_result()
.unwrap();
let message = result.remove(0);

match message.destination() {
Expand Down Expand Up @@ -219,7 +253,7 @@ mod test {
let state = State::new(sender.agent_id().clone(), db.connection_pool().clone());

// Assert 404 response.
match state.unicast(request).await.into_result() {
match state.unicast(request, Utc::now()).await.into_result() {
Ok(_) => panic!("Expected message.unicast to fail"),
Err(err) => assert_eq!(err.status_code(), ResponseStatus::NOT_FOUND),
}
Expand Down Expand Up @@ -263,7 +297,7 @@ mod test {
let state = State::new(sender.agent_id().clone(), db.connection_pool().clone());

// Assert 404 response.
match state.unicast(request).await.into_result() {
match state.unicast(request, Utc::now()).await.into_result() {
Ok(_) => panic!("Expected message.unicast to fail"),
Err(err) => assert_eq!(err.status_code(), ResponseStatus::NOT_FOUND),
}
Expand Down Expand Up @@ -307,7 +341,7 @@ mod test {
let state = State::new(sender.agent_id().clone(), db.connection_pool().clone());

// Assert 404 response.
match state.unicast(request).await.into_result() {
match state.unicast(request, Utc::now()).await.into_result() {
Ok(_) => panic!("Expected message.unicast to fail"),
Err(err) => assert_eq!(err.status_code(), ResponseStatus::NOT_FOUND),
}
Expand Down Expand Up @@ -345,7 +379,11 @@ mod test {
sender.build_request("message.broadcast", &payload).unwrap();

let state = State::new(sender.agent_id().clone(), db.connection_pool().clone());
let mut result = state.broadcast(request).await.into_result().unwrap();
let mut result = state
.broadcast(request, Utc::now())
.await
.into_result()
.unwrap();

// Assert response.
let message = result.remove(0);
Expand Down Expand Up @@ -385,7 +423,7 @@ mod test {
let state = State::new(sender.agent_id().clone(), db.connection_pool().clone());

// Assert 404 response.
match state.broadcast(request).await.into_result() {
match state.broadcast(request, Utc::now()).await.into_result() {
Ok(_) => panic!("Expected message.broadcast to fail"),
Err(err) => assert_eq!(err.status_code(), ResponseStatus::NOT_FOUND),
}
Expand Down Expand Up @@ -418,7 +456,7 @@ mod test {
let state = State::new(sender.agent_id().clone(), db.connection_pool().clone());

// Assert 404 response.
match state.broadcast(request).await.into_result() {
match state.broadcast(request, Utc::now()).await.into_result() {
Ok(_) => panic!("Expected message.broadcast to fail"),
Err(err) => assert_eq!(err.status_code(), ResponseStatus::NOT_FOUND),
}
Expand Down
Loading

0 comments on commit ccf7766

Please sign in to comment.