Skip to content
This repository has been archived by the owner on Aug 16, 2023. It is now read-only.

Commit

Permalink
Switch to explicit api versions (#79)
Browse files Browse the repository at this point in the history
  • Loading branch information
feymartynov committed Dec 17, 2019
1 parent a3db3c7 commit def43dd
Show file tree
Hide file tree
Showing 6 changed files with 67 additions and 34 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

20 changes: 14 additions & 6 deletions src/app/endpoint/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,8 +89,8 @@ impl State {
let to = &inreq.payload().agent_id;
let payload = &inreq.payload().data;

let response_topic = Subscription::multicast_requests_from(to)
.subscription_topic(&self.me)
let response_topic = Subscription::multicast_requests_from(to, Some("v1"))
.subscription_topic(&self.me, "v2")
.map_err(|_| {
SvcError::builder()
.status(ResponseStatus::UNPROCESSABLE_ENTITY)
Expand All @@ -112,7 +112,7 @@ impl State {
ShortTermTimingProperties::until_now(start_timestamp),
);

OutgoingRequest::unicast(payload.to_owned(), props, to).into()
OutgoingRequest::unicast(payload.to_owned(), props, to, "v1").into()
}

pub(crate) async fn callback(
Expand All @@ -139,8 +139,13 @@ impl State {
inresp.properties().tracking().clone(),
);

let payload = inresp.payload();
let message = OutgoingResponse::unicast(payload.to_owned(), props, &reqp);
let message = OutgoingResponse::unicast(
inresp.payload().to_owned(),
props,
&reqp,
reqp.to_connection().version(),
);

Ok(vec![Box::new(message) as Box<dyn Publishable>])
}
}
Expand Down Expand Up @@ -221,7 +226,10 @@ mod test {
let message = result.remove(0);

match message.destination() {
&Destination::Unicast(ref agent_id) => assert_eq!(agent_id, receiver.agent_id()),
&Destination::Unicast(ref agent_id, ref version) => {
assert_eq!(agent_id, receiver.agent_id());
assert_eq!(version, "v1");
}
_ => panic!("Expected unicast destination"),
}

Expand Down
20 changes: 17 additions & 3 deletions src/app/endpoint/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,14 @@ where
.build();

let timing = ShortTermTimingProperties::until_now(start_timestamp);
let resp = OutgoingResponse::unicast(err, props.to_response(status, timing), props);

let resp = OutgoingResponse::unicast(
err,
props.to_response(status, timing),
props,
props.to_connection().version(),
);

Ok(vec![Box::new(resp) as Box<dyn Publishable>])
}
}
Expand All @@ -145,7 +152,14 @@ pub(crate) fn handle_error(
}

let timing = ShortTermTimingProperties::until_now(start_timestamp);
let resp = OutgoingResponse::unicast(err, props.to_response(status, timing), props);

let resp = OutgoingResponse::unicast(
err,
props.to_response(status, timing),
props,
props.to_connection().version(),
);

Ok(vec![Box::new(resp) as Box<dyn Publishable>])
}

Expand All @@ -163,7 +177,7 @@ pub(crate) fn handle_unknown_method(
.build();

let timing = ShortTermTimingProperties::until_now(start_timestamp);
let resp = OutgoingResponse::unicast(err, props.to_response(status, timing), props);
let resp = OutgoingResponse::unicast(err, props.to_response(status, timing), props, "v1");
Ok(vec![Box::new(resp) as Box<dyn Publishable>])
}

Expand Down
28 changes: 17 additions & 11 deletions src/app/endpoint/room.rs
Original file line number Diff line number Diff line change
Expand Up @@ -311,7 +311,7 @@ impl State {
// https://github.com/vernemq/vernemq/issues/1326.
// Then we won't need the local state on the broker at all and will be able
// to send a multicast request to the broker.
OutgoingRequest::unicast(payload, props, inreq.properties()).into()
OutgoingRequest::unicast(payload, props, inreq.properties(), "v1").into()
}

pub(crate) async fn leave(
Expand Down Expand Up @@ -359,7 +359,7 @@ impl State {
// https://github.com/vernemq/vernemq/issues/1326.
// Then we won't need the local state on the broker at all and will be able
// to send a multicast request to the broker.
OutgoingRequest::unicast(payload, props, inreq.properties()).into()
OutgoingRequest::unicast(payload, props, inreq.properties(), "v1").into()
}
}

Expand All @@ -371,7 +371,7 @@ mod test {
use diesel::prelude::*;
use failure::format_err;
use serde_json::{json, Value as JsonValue};
use svc_agent::Destination;
use svc_agent::{AccountId, AgentId, Destination};
use svc_authn::Authenticable;

use crate::test_helpers::{
Expand Down Expand Up @@ -857,10 +857,13 @@ mod test {

// Assert outgoing broker request.
match message.destination() {
&Destination::Unicast(ref agent_id) => {
assert_eq!(agent_id.label(), "web");
assert_eq!(agent_id.as_account_id().label(), "user123");
assert_eq!(agent_id.as_account_id().audience(), AUDIENCE);
&Destination::Unicast(ref agent_id, ref version) => {
assert_eq!(
agent_id.to_owned(),
AgentId::new("web", AccountId::new("user123", AUDIENCE))
);

assert_eq!(version, "v1");
}
_ => panic!("Expected unicast destination"),
}
Expand Down Expand Up @@ -1038,10 +1041,13 @@ mod test {

// Assert outgoing broker request.
match message.destination() {
&Destination::Unicast(ref agent_id) => {
assert_eq!(agent_id.label(), "web");
assert_eq!(agent_id.as_account_id().label(), "user123");
assert_eq!(agent_id.as_account_id().audience(), AUDIENCE);
&Destination::Unicast(ref agent_id, ref version) => {
assert_eq!(
agent_id.to_owned(),
AgentId::new("web", AccountId::new("user123", AUDIENCE))
);

assert_eq!(version, "v1");
}
_ => panic!("Expected unicast destination"),
}
Expand Down
20 changes: 12 additions & 8 deletions src/app/janus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ where
);

props.set_tracking(tracking.to_owned());
Ok(OutgoingRequest::unicast(payload, props, to))
Ok(OutgoingRequest::unicast(payload, props, to, "v1"))
}

////////////////////////////////////////////////////////////////////////////////
Expand Down Expand Up @@ -113,7 +113,7 @@ where
);

props.set_tracking(tracking.to_owned());
Ok(OutgoingRequest::unicast(payload, props, to))
Ok(OutgoingRequest::unicast(payload, props, to, "v1"))
}

////////////////////////////////////////////////////////////////////////////////
Expand Down Expand Up @@ -173,7 +173,7 @@ where
Some(&rtc_handle_id.to_string()),
);

Ok(OutgoingRequest::unicast(payload, props, to))
Ok(OutgoingRequest::unicast(payload, props, to, "v1"))
}

////////////////////////////////////////////////////////////////////////////////
Expand Down Expand Up @@ -241,7 +241,7 @@ where
Some(jsep),
);

Ok(OutgoingRequest::unicast(payload, props, to))
Ok(OutgoingRequest::unicast(payload, props, to, "v1"))
}

////////////////////////////////////////////////////////////////////////////////
Expand Down Expand Up @@ -309,7 +309,7 @@ where
Some(jsep),
);

Ok(OutgoingRequest::unicast(payload, props, to))
Ok(OutgoingRequest::unicast(payload, props, to, "v1"))
}

////////////////////////////////////////////////////////////////////////////////
Expand Down Expand Up @@ -374,7 +374,7 @@ pub(crate) fn upload_stream_request(
);

props.set_tracking(tracking.to_owned());
Ok(OutgoingRequest::unicast(payload, props, to))
Ok(OutgoingRequest::unicast(payload, props, to, "v1"))
}

////////////////////////////////////////////////////////////////////////////////
Expand Down Expand Up @@ -408,7 +408,7 @@ where
let props = reqp.to_request("janus_trickle.create", IGNORE, IGNORE, short_term_timing);
let transaction = Transaction::Trickle(TrickleTransaction::new(reqp));
let payload = TrickleRequest::new(&to_base64(&transaction)?, session_id, handle_id, jsep);
Ok(OutgoingRequest::unicast(payload, props, to))
Ok(OutgoingRequest::unicast(payload, props, to, "v1"))
}

////////////////////////////////////////////////////////////////////////////////
Expand Down Expand Up @@ -470,7 +470,7 @@ where
None,
);

Ok(OutgoingRequest::unicast(payload, props, to))
Ok(OutgoingRequest::unicast(payload, props, to, "v1"))
}

////////////////////////////////////////////////////////////////////////////////
Expand Down Expand Up @@ -541,6 +541,7 @@ pub(crate) async fn handle_response(
ShortTermTimingProperties::until_now(start_timestamp),
),
&reqp,
"v1",
);

Ok(vec![Box::new(resp) as Box<dyn Publishable>])
Expand Down Expand Up @@ -568,6 +569,7 @@ pub(crate) async fn handle_response(
ShortTermTimingProperties::until_now(start_timestamp),
),
tn.reqp.as_agent_id(),
"v1",
);

Ok(vec![Box::new(resp) as Box<dyn Publishable>])
Expand Down Expand Up @@ -627,6 +629,7 @@ pub(crate) async fn handle_response(
endpoint::rtc_signal::CreateResponseData::new(Some(jsep.clone())),
tn.reqp.to_response(ResponseStatus::OK, ShortTermTimingProperties::until_now(start_timestamp)),
tn.reqp.as_agent_id(),
"v1",
);

Ok(vec![Box::new(resp) as Box<dyn Publishable>])
Expand Down Expand Up @@ -688,6 +691,7 @@ pub(crate) async fn handle_response(
endpoint::rtc_signal::CreateResponseData::new(Some(jsep.clone())),
tn.reqp.to_response(ResponseStatus::OK, ShortTermTimingProperties::until_now(start_timestamp)),
tn.reqp.as_agent_id(),
"v1",
);

Ok(vec![Box::new(resp) as Box<dyn Publishable>])
Expand Down
11 changes: 6 additions & 5 deletions src/app/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,26 +101,27 @@ pub(crate) async fn run(db: &ConnectionPool) -> Result<(), Error> {
// Create Subscriptions
let route = Arc::new(Route {
janus_status_subscription_topic: {
let subscription = Subscription::broadcast_events(&config.backend_id, "status");
let subscription = Subscription::broadcast_events(&config.backend_id, "v1", "status");
tx.subscribe(&subscription, QoS::AtLeastOnce, Some(&group))
.expect("Error subscribing to backend events topic");

subscription
.subscription_topic(&agent_id)
.subscription_topic(&agent_id, "v1")
.expect("Error building janus events subscription topic")
},
janus_responses_subscription_topic: {
let subscription = Subscription::broadcast_events(&config.backend_id, "responses");
let subscription =
Subscription::broadcast_events(&config.backend_id, "v1", "responses");
tx.subscribe(&subscription, QoS::AtLeastOnce, Some(&group))
.expect("Error subscribing to backend responses topic");

subscription
.subscription_topic(&agent_id)
.subscription_topic(&agent_id, "v1")
.expect("Error building janus responses subscription topic")
},
});
tx.subscribe(
&Subscription::multicast_requests(),
&Subscription::multicast_requests(Some("v1")),
QoS::AtMostOnce,
Some(&group),
)
Expand Down

0 comments on commit def43dd

Please sign in to comment.