Skip to content
This repository has been archived by the owner on Aug 16, 2023. It is now read-only.

Commit

Permalink
Add tracking (#75)
Browse files Browse the repository at this point in the history
  • Loading branch information
feymartynov committed Nov 20, 2019
1 parent 5bd5baa commit b66ebdf
Show file tree
Hide file tree
Showing 8 changed files with 72 additions and 24 deletions.
19 changes: 15 additions & 4 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,3 +29,4 @@ svc-error = { version = "0.1", features = ["diesel", "r2d2", "svc-agent", "svc-a

[dev-dependencies]
rand = "0.7"

1 change: 1 addition & 0 deletions src/app/endpoint/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,7 @@ impl State {
reqp.correlation_data(),
long_term_timing,
short_term_timing,
inresp.properties().tracking().clone(),
);

let payload = inresp.payload();
Expand Down
6 changes: 4 additions & 2 deletions src/app/endpoint/rtc_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use chrono::{DateTime, Utc};
use serde_derive::Deserialize;
use svc_agent::mqtt::{
IncomingRequest, OutgoingEvent, OutgoingEventProperties, ResponseStatus,
ShortTermTimingProperties,
ShortTermTimingProperties, TrackingProperties,
};
use svc_error::Error as SvcError;
use uuid::Uuid;
Expand Down Expand Up @@ -117,11 +117,13 @@ pub(crate) fn update_event(
room_id: Uuid,
object: janus_rtc_stream::Object,
start_timestamp: DateTime<Utc>,
tracking: &TrackingProperties,
) -> Result<ObjectUpdateEvent, SvcError> {
let uri = format!("rooms/{}/events", room_id);

let timing = ShortTermTimingProperties::until_now(start_timestamp);
let props = OutgoingEventProperties::new("rtc_stream.update", timing);
let mut props = OutgoingEventProperties::new("rtc_stream.update", timing);
props.set_tracking(tracking.to_owned());
Ok(OutgoingEvent::broadcast(object, props, &uri))
}

Expand Down
3 changes: 2 additions & 1 deletion src/app/endpoint/shared.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@ pub(crate) fn respond<R, O: 'static + Clone + Serialize>(
let mut messages: Vec<Box<dyn Publishable>> = vec![Box::new(resp)];

if let Some((label, topic)) = notification {
let props = OutgoingEventProperties::new(label, short_term_timing);
let mut props = OutgoingEventProperties::new(label, short_term_timing);
props.set_tracking(inreq.properties().tracking().to_owned());
messages.push(Box::new(OutgoingEvent::broadcast(object, props, topic)));
}

Expand Down
7 changes: 5 additions & 2 deletions src/app/endpoint/system.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use failure::Error;
use serde_derive::{Deserialize, Serialize};
use svc_agent::mqtt::{
IncomingRequest, OutgoingEvent, OutgoingEventProperties, Publishable, ResponseStatus,
ShortTermTimingProperties,
ShortTermTimingProperties, TrackingProperties,
};
use svc_authn::AccountId;
use svc_error::Error as SvcError;
Expand Down Expand Up @@ -103,6 +103,7 @@ impl State {
),
backend.id(),
start_timestamp,
inreq.properties().tracking(),
)
.map_err(|_| {
// TODO: Send the error as an event to "app/${APP}/audiences/${AUD}" topic
Expand All @@ -126,6 +127,7 @@ pub(crate) fn upload_event<I>(
room: &room::Object,
rtcs_and_recordings: I,
start_timestamp: DateTime<Utc>,
tracking: &TrackingProperties,
) -> Result<RoomUploadEvent, Error>
where
I: Iterator<Item = (rtc::Object, recording::Object)>,
Expand All @@ -152,7 +154,8 @@ where

let uri = format!("audiences/{}/events", room.audience());
let timing = ShortTermTimingProperties::until_now(start_timestamp);
let props = OutgoingEventProperties::new("room.upload", timing);
let mut props = OutgoingEventProperties::new("room.upload", timing);
props.set_tracking(tracking.to_owned());

let event = RoomUploadEventData {
id: room.id(),
Expand Down
55 changes: 40 additions & 15 deletions src/app/janus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use std::sync::Arc;
use svc_agent::mqtt::{
compat::{into_event, IncomingEnvelope},
IncomingRequestProperties, OutgoingRequest, OutgoingRequestProperties, Publishable,
ResponseStatus, ShortTermTimingProperties,
ResponseStatus, ShortTermTimingProperties, TrackingProperties,
};
use svc_agent::{Addressable, AgentId};
use svc_error::Error as SvcError;
Expand Down Expand Up @@ -54,20 +54,22 @@ impl CreateSessionTransaction {
pub(crate) fn create_session_request<A>(
to: &A,
start_timestamp: DateTime<Utc>,
tracking: &TrackingProperties,
) -> Result<OutgoingRequest<CreateSessionRequest>, Error>
where
A: Addressable,
{
let transaction = Transaction::CreateSession(CreateSessionTransaction::new());
let payload = CreateSessionRequest::new(&to_base64(&transaction)?);

let props = OutgoingRequestProperties::new(
let mut props = OutgoingRequestProperties::new(
"janus_session.create",
IGNORE,
IGNORE,
ShortTermTimingProperties::until_now(start_timestamp),
);

props.set_tracking(tracking.to_owned());
Ok(OutgoingRequest::unicast(payload, props, to))
}

Expand All @@ -88,6 +90,7 @@ pub(crate) fn create_handle_request<A>(
session_id: i64,
to: &A,
start_timestamp: DateTime<Utc>,
tracking: &TrackingProperties,
) -> Result<OutgoingRequest<CreateHandleRequest>, Error>
where
A: Addressable,
Expand All @@ -101,12 +104,14 @@ where
None,
);

let props = OutgoingRequestProperties::new(
let mut props = OutgoingRequestProperties::new(
"janus_handle.create",
IGNORE,
IGNORE,
ShortTermTimingProperties::until_now(start_timestamp),
);

props.set_tracking(tracking.to_owned());
Ok(OutgoingRequest::unicast(payload, props, to))
}

Expand Down Expand Up @@ -342,6 +347,7 @@ pub(crate) fn upload_stream_request(
body: UploadStreamRequestBody,
to: &AgentId,
start_timestamp: DateTime<Utc>,
tracking: &TrackingProperties,
) -> Result<OutgoingRequest<MessageRequest>, Error> {
let transaction = Transaction::UploadStream(UploadStreamTransaction::new(body.id));

Expand All @@ -353,13 +359,14 @@ pub(crate) fn upload_stream_request(
None,
);

let props = OutgoingRequestProperties::new(
let mut props = OutgoingRequestProperties::new(
"janus_conference_stream.upload",
IGNORE,
IGNORE,
ShortTermTimingProperties::until_now(start_timestamp),
);

props.set_tracking(tracking.to_owned());
Ok(OutgoingRequest::unicast(payload, props, to))
}

Expand Down Expand Up @@ -423,11 +430,14 @@ pub(crate) async fn handle_response(
match from_base64::<Transaction>(&inresp.transaction())? {
// Session has been created
Transaction::CreateSession(_tn) => {
let session_id = inresp.data().id();

// Creating Handle
let backreq =
create_handle_request(session_id, message.properties(), start_timestamp)?;
let backreq = create_handle_request(
inresp.data().id(),
message.properties(),
start_timestamp,
message.properties().tracking(),
)?;

Ok(vec![Box::new(backreq) as Box<dyn Publishable>])
}
// Handle has been created
Expand Down Expand Up @@ -776,8 +786,14 @@ pub(crate) async fn handle_response(

match maybe_rtcs_and_recordings {
Some(rtcs_and_recordings) => {
let event = endpoint::system::upload_event(&room, rtcs_and_recordings.into_iter(), start_timestamp)
.map_err(|e| format_err!("error creating a system event, {}", e))?;
let event = endpoint::system::upload_event(
&room,
rtcs_and_recordings.into_iter(),
start_timestamp,
message.properties().tracking(),
).map_err(|e| {
format_err!("error creating a system event, {}", e)
})?;

Ok(vec![Box::new(event) as Box<dyn Publishable>])
}
Expand Down Expand Up @@ -824,8 +840,12 @@ pub(crate) async fn handle_response(
.execute(&conn)?
.ok_or_else(|| format_err!("a room for rtc = '{}' is not found", &rtc_id))?;

let event =
endpoint::rtc_stream::update_event(room.id(), rtc_stream, start_timestamp)?;
let event = endpoint::rtc_stream::update_event(
room.id(),
rtc_stream,
start_timestamp,
message.properties().tracking(),
)?;

Ok(vec![Box::new(event) as Box<dyn Publishable>])
} else {
Expand All @@ -851,8 +871,12 @@ pub(crate) async fn handle_response(
// Publish the update event if only stream object has been changed
// (if there was't any actual media stream, the object won't contain its start time)
if let Some(_) = rtc_stream.time() {
let event =
endpoint::rtc_stream::update_event(room.id(), rtc_stream, start_timestamp)?;
let event = endpoint::rtc_stream::update_event(
room.id(),
rtc_stream,
start_timestamp,
message.properties().tracking(),
)?;

return Ok(vec![Box::new(event) as Box<dyn Publishable>]);
}
Expand Down Expand Up @@ -887,7 +911,8 @@ pub(crate) async fn handle_status(
let agent_id = inev.properties().as_agent_id();

if let true = inev.payload().online() {
let event = create_session_request(agent_id, start_timestamp)?;
let tracking = inev.properties().tracking();
let event = create_session_request(agent_id, start_timestamp, tracking)?;
Ok(vec![Box::new(event) as Box<dyn Publishable>])
} else {
let conn = janus.db.get()?;
Expand Down
4 changes: 4 additions & 0 deletions src/test_helpers/agent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,8 @@ impl TestAgent {
"broker_timestamp": now,
"broker_processing_timestamp": now,
"broker_initial_processing_timestamp": now,
"tracking_id": "16911d40-0b13-11ea-8171-60f81db6d53e",
"session_tracking_label": "16cc4294-0b13-11ea-91ae-60f81db6d53e.16ee876e-0b13-11ea-8c32-60f81db6d53e 2565f962-0b13-11ea-9359-60f81db6d53e.25c2b97c-0b13-11ea-9f20-60f81db6d53e",
}
});

Expand Down Expand Up @@ -100,6 +102,8 @@ impl TestAgent {
"broker_timestamp": now,
"broker_processing_timestamp": now,
"broker_initial_processing_timestamp": now,
"tracking_id": "16911d40-0b13-11ea-8171-60f81db6d53e",
"session_tracking_label": "16cc4294-0b13-11ea-91ae-60f81db6d53e.16ee876e-0b13-11ea-8c32-60f81db6d53e 2565f962-0b13-11ea-9359-60f81db6d53e.25c2b97c-0b13-11ea-9f20-60f81db6d53e",
}
});

Expand Down

0 comments on commit b66ebdf

Please sign in to comment.