Skip to content
This repository has been archived by the owner on Aug 16, 2023. It is now read-only.

Commit

Permalink
Get rid of dynamic subscription workaround
Browse files Browse the repository at this point in the history
  • Loading branch information
khodzha committed Mar 5, 2021
1 parent ec9d72d commit b7ceab9
Show file tree
Hide file tree
Showing 14 changed files with 910 additions and 607 deletions.
2 changes: 1 addition & 1 deletion src/app/endpoint/agent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ mod tests {
.expect("Agents listing failed");

// Assert response.
let (agents, respp) = find_response::<Vec<Agent>>(messages.as_slice());
let (agents, respp, _) = find_response::<Vec<Agent>>(messages.as_slice());
assert_eq!(respp.status(), ResponseStatus::OK);
assert_eq!(agents.len(), 1);
assert_eq!(&agents[0].agent_id, agent.agent_id());
Expand Down
37 changes: 24 additions & 13 deletions src/app/endpoint/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::result::Result as StdResult;
use async_std::stream;
use async_trait::async_trait;
use diesel::pg::PgConnection;
use serde_derive::Deserialize;
use serde_derive::{Deserialize, Serialize};
use serde_json::{json, Value as JsonValue};
use svc_agent::mqtt::{
IncomingRequestProperties, IncomingResponseProperties, IntoPublishableMessage, OutgoingRequest,
Expand All @@ -17,10 +17,20 @@ use crate::app::context::Context;
use crate::app::endpoint::prelude::*;
use crate::app::API_VERSION;
use crate::db::{self, room::Object as Room};
use crate::util::{from_base64, to_base64};

////////////////////////////////////////////////////////////////////////////////

#[derive(Debug, Serialize, Deserialize)]
pub(crate) struct CorrelationDataPayload {
reqp: IncomingRequestProperties,
}

impl CorrelationDataPayload {
pub(crate) fn new(reqp: IncomingRequestProperties) -> Self {
Self { reqp }
}
}

#[derive(Debug, Deserialize)]
pub(crate) struct UnicastRequest {
agent_id: AgentId,
Expand Down Expand Up @@ -58,14 +68,16 @@ impl RequestHandler for UnicastHandler {
.map_err(|err| anyhow!("Error building responses subscription topic: {}", err))
.error(AppErrorKind::MessageBuildingFailed)?;

let correlation_data = to_base64(reqp)
.map_err(|err| err.context("Error encoding incoming request properties"))
let corr_data_payload = CorrelationDataPayload::new(reqp.to_owned());

let corr_data = CorrelationData::MessageUnicast(corr_data_payload)
.dump()
.error(AppErrorKind::MessageBuildingFailed)?;

let props = reqp.to_request(
reqp.method(),
&response_topic,
&correlation_data,
&corr_data,
ShortTermTimingProperties::until_now(context.start_timestamp()),
);

Expand Down Expand Up @@ -143,20 +155,19 @@ impl RequestHandler for BroadcastHandler {

////////////////////////////////////////////////////////////////////////////////

pub(crate) struct CallbackHandler;
pub(crate) struct UnicastResponseHandler;

#[async_trait]
impl ResponseHandler for CallbackHandler {
impl ResponseHandler for UnicastResponseHandler {
type Payload = JsonValue;
type CorrelationData = CorrelationDataPayload;

async fn handle<C: Context>(
context: &mut C,
payload: Self::Payload,
respp: &IncomingResponseProperties,
corr_data: &Self::CorrelationData,
) -> Result {
let reqp = from_base64::<IncomingRequestProperties>(respp.correlation_data())
.error(AppErrorKind::MessageParsingFailed)?;

let short_term_timing = ShortTermTimingProperties::until_now(context.start_timestamp());

let long_term_timing = respp
Expand All @@ -166,14 +177,14 @@ impl ResponseHandler for CallbackHandler {

let props = OutgoingResponseProperties::new(
respp.status(),
reqp.correlation_data(),
corr_data.reqp.correlation_data(),
long_term_timing,
short_term_timing,
respp.tracking().clone(),
respp.local_tracking_label().clone(),
);

let resp = OutgoingResponse::unicast(payload, props, &reqp, API_VERSION);
let resp = OutgoingResponse::unicast(payload, props, &corr_data.reqp, API_VERSION);
let boxed_resp = Box::new(resp) as Box<dyn IntoPublishableMessage + Send>;
Ok(Box::new(stream::once(boxed_resp)))
}
Expand Down Expand Up @@ -393,7 +404,7 @@ mod test {
.expect("Broadcast message sending failed");

// Assert response.
let (_, respp) = find_response::<JsonValue>(messages.as_slice());
let (_, respp, _) = find_response::<JsonValue>(messages.as_slice());
assert_eq!(respp.status(), ResponseStatus::OK);

// Assert broadcast event.
Expand Down
60 changes: 49 additions & 11 deletions src/app/endpoint/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,29 +70,67 @@ request_routes!(

///////////////////////////////////////////////////////////////////////////////

use serde_derive::{Deserialize, Serialize};

#[derive(Debug, Deserialize, Serialize)]
pub(crate) enum CorrelationData {
SubscriptionCreate(subscription::CorrelationDataPayload),
SubscriptionDelete(subscription::CorrelationDataPayload),
MessageUnicast(message::CorrelationDataPayload),
}

#[async_trait]
pub(crate) trait ResponseHandler {
type Payload: Send + DeserializeOwned;
type CorrelationData: Sync;

async fn handle<C: Context>(
context: &mut C,
payload: Self::Payload,
respp: &IncomingResponseProperties,
corr_data: &Self::CorrelationData,
) -> Result;
}

pub(crate) async fn route_response<C: Context>(
context: &mut C,
resp: &IncomingResponse<String>,
topic: &str,
) -> Option<MessageStream> {
if topic == context.janus_topics().responses_topic() {
Some(janus::handle_response::<C>(context, resp).await)
} else {
Some(message::CallbackHandler::handle_envelope::<C>(context, resp).await)
macro_rules! response_routes {
($($c: tt => $h: ty),*) => {
#[allow(unused_variables)]
pub(crate) async fn route_response<C: Context>(
context: &mut C,
response: &IncomingResponse<String>,
corr_data: &str,
topic: &str,
) -> MessageStream {
// TODO: Refactor janus response handler to use common pattern.
if topic == context.janus_topics().responses_topic() {
janus::handle_response::<C>(context, response).await
} else {
let corr_data = match CorrelationData::parse(corr_data) {
Ok(corr_data) => corr_data,
Err(err) => {
warn!(
context.logger(),
"Failed to parse response correlation data '{}': {}", corr_data, err
);
return Box::new(async_std::stream::empty()) as MessageStream;
}
};
match corr_data {
$(
CorrelationData::$c(cd) => <$h>::handle_envelope::<C>(context, response, &cd).await,
)*
}
}
}
}
}

response_routes!(
SubscriptionCreate => subscription::CreateResponseHandler,
SubscriptionDelete => subscription::DeleteResponseHandler,
MessageUnicast => message::UnicastResponseHandler
);

///////////////////////////////////////////////////////////////////////////////

#[async_trait]
Expand Down Expand Up @@ -133,8 +171,7 @@ macro_rules! event_routes {
// Event routes configuration: label => EventHandler
event_routes!(
"metric.pull" => metric::PullHandler,
"subscription.delete" => subscription::DeleteHandler,
"subscription.create" => subscription::CreateHandler
"subscription.delete" => subscription::DeleteEventHandler
);

///////////////////////////////////////////////////////////////////////////////
Expand All @@ -152,5 +189,6 @@ pub(crate) mod system;

pub(self) mod prelude {
pub(super) use super::{helpers, EventHandler, RequestHandler, ResponseHandler, Result};
pub(super) use crate::app::endpoint::CorrelationData;
pub(super) use crate::app::error::{Error as AppError, ErrorExt, ErrorKind as AppErrorKind};
}
Loading

0 comments on commit b7ceab9

Please sign in to comment.