Skip to content
This repository has been archived by the owner on Aug 16, 2023. It is now read-only.

Commit

Permalink
Add logger tags (#162)
Browse files Browse the repository at this point in the history
* Add logger tags

* Add room scope tag

* Split agent_id tag into labels and audience
  • Loading branch information
feymartynov committed Oct 8, 2020
1 parent f2b841a commit e052bd1
Show file tree
Hide file tree
Showing 18 changed files with 439 additions and 344 deletions.
4 changes: 3 additions & 1 deletion src/app/endpoint/agent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,12 @@ impl RequestHandler for ListHandler {
.id(payload.room_id)
.time(db::room::now())
.execute(&conn)?
.ok_or_else(|| format!("the room = '{}' is not found or closed", payload.room_id))
.ok_or_else(|| anyhow!("Room not found or closed"))
.status(ResponseStatus::NOT_FOUND)?
};

shared::add_room_logger_tags(context, &room);

// Authorize agents listing in the room.
let room_id = room.id().to_string();
let object = vec!["rooms", &room_id, "agents"];
Expand Down
21 changes: 10 additions & 11 deletions src/app/endpoint/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,21 +41,24 @@ impl RequestHandler for UnicastHandler {
payload: Self::Payload,
reqp: &IncomingRequestProperties,
) -> Result {
context.add_logger_tags(o!("room_id" => payload.room_id.to_string()));

{
let conn = context.db().get()?;
let room = find_room(payload.room_id, &conn)?;
shared::add_room_logger_tags(context, &room);
check_room_presence(&room, reqp.as_agent_id(), &conn)?;
check_room_presence(&room, &payload.agent_id, &conn)?;
}

let response_topic =
Subscription::multicast_requests_from(&payload.agent_id, Some(API_VERSION))
.subscription_topic(context.agent_id(), API_VERSION)
.map_err(|err| format!("error building responses subscription topic: {}", err))
.map_err(|err| anyhow!("Error building responses subscription topic: {}", err))
.status(ResponseStatus::UNPROCESSABLE_ENTITY)?;

let correlation_data = to_base64(reqp)
.map_err(|err| format!("error encoding incoming request properties: {}", err))
.map_err(|err| err.context("Error encoding incoming request properties"))
.status(ResponseStatus::UNPROCESSABLE_ENTITY)?;

let props = reqp.to_request(
Expand Down Expand Up @@ -98,9 +101,12 @@ impl RequestHandler for BroadcastHandler {
payload: Self::Payload,
reqp: &IncomingRequestProperties,
) -> Result {
context.add_logger_tags(o!("room_id" => payload.room_id.to_string()));

let room = {
let conn = context.db().get()?;
let room = find_room(payload.room_id, &conn)?;
shared::add_room_logger_tags(context, &room);
check_room_presence(&room, &reqp.as_agent_id(), &conn)?;
room
};
Expand Down Expand Up @@ -146,7 +152,6 @@ impl ResponseHandler for CallbackHandler {
respp: &IncomingResponseProperties,
) -> Result {
let reqp = from_base64::<IncomingRequestProperties>(respp.correlation_data())
.map_err(|err| err.to_string())
.status(ResponseStatus::BAD_REQUEST)?;

let short_term_timing = ShortTermTimingProperties::until_now(context.start_timestamp());
Expand Down Expand Up @@ -178,7 +183,7 @@ fn find_room(id: Uuid, conn: &PgConnection) -> StdResult<Room, SvcError> {
.time(db::room::now())
.id(id)
.execute(&conn)?
.ok_or_else(|| format!("the room = '{}' is not found", id))
.ok_or_else(|| anyhow!("Room not found or closed"))
.status(ResponseStatus::NOT_FOUND)
}

Expand All @@ -193,13 +198,7 @@ fn check_room_presence(
.execute(conn)?;

if results.is_empty() {
let err = format!(
"agent = '{}' is not online in the room = '{}'",
agent_id,
room.id()
);

Err(err).status(ResponseStatus::NOT_FOUND)
Err(anyhow!("Agent is not online in the room")).status(ResponseStatus::NOT_FOUND)
} else {
Ok(())
}
Expand Down
3 changes: 1 addition & 2 deletions src/app/endpoint/metric.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ impl EventHandler for PullHandler {
let mut metrics = if let Some(qc) = context.queue_counter() {
let stats = qc
.get_stats(payload.duration)
.map_err(|err| anyhow!("Failed to get stats: {}", err))
.status(ResponseStatus::BAD_REQUEST)?;

vec![
Expand Down Expand Up @@ -96,11 +97,9 @@ impl EventHandler for PullHandler {
append_db_pool_stats(&mut metrics, context, now);

append_dynamic_stats(&mut metrics, context, now)
.map_err(|err| err.to_string())
.status(ResponseStatus::UNPROCESSABLE_ENTITY)?;

append_janus_stats(&mut metrics, context, now)
.map_err(|err| err.to_string())
.status(ResponseStatus::UNPROCESSABLE_ENTITY)?;

let metrics2 = metrics
Expand Down
2 changes: 1 addition & 1 deletion src/app/endpoint/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ mod room;
pub(crate) mod rtc;
pub(crate) mod rtc_signal;
pub(crate) mod rtc_stream;
pub(self) mod shared;
pub(crate) mod shared;
mod subscription;
pub(crate) mod system;

Expand Down
30 changes: 18 additions & 12 deletions src/app/endpoint/room.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ impl RequestHandler for CreateHandler {
q.execute(&conn)?
};

context.add_logger_tags(o!("room_id" => room.id().to_string()));
shared::add_room_logger_tags(context, &room);

// Respond and broadcast to the audience topic.
let response = shared::build_response(
Expand Down Expand Up @@ -140,10 +140,12 @@ impl RequestHandler for ReadHandler {
db::room::FindQuery::new()
.id(payload.id)
.execute(&conn)?
.ok_or_else(|| format!("Room not found, id = '{}'", payload.id))
.ok_or_else(|| anyhow!("Room not found"))
.status(ResponseStatus::NOT_FOUND)?
};

shared::add_room_logger_tags(context, &room);

// Authorize room reading on the tenant.
let room_id = room.id().to_string();
let object = vec!["rooms", &room_id];
Expand Down Expand Up @@ -197,10 +199,12 @@ impl RequestHandler for UpdateHandler {
.time(db::room::since_now())
.id(payload.id)
.execute(&conn)?
.ok_or_else(|| format!("Room not found, id = '{}' or closed", payload.id))
.ok_or_else(|| anyhow!("Room not found or closed"))
.status(ResponseStatus::NOT_FOUND)?
};

shared::add_room_logger_tags(context, &room);

// Authorize room updating on the tenant.
let room_id = room.id().to_string();
let object = vec!["rooms", &room_id];
Expand Down Expand Up @@ -298,10 +302,12 @@ impl RequestHandler for DeleteHandler {
.time(db::room::since_now())
.id(payload.id)
.execute(&conn)?
.ok_or_else(|| format!("Room not found, id = '{}' or closed", payload.id))
.ok_or_else(|| anyhow!("Room not found or closed"))
.status(ResponseStatus::NOT_FOUND)?
};

shared::add_room_logger_tags(context, &room);

// Authorize room deletion on the tenant.
let room_id = room.id().to_string();
let object = vec!["rooms", &room_id];
Expand Down Expand Up @@ -363,10 +369,12 @@ impl RequestHandler for EnterHandler {
.id(payload.id)
.time(db::room::now())
.execute(&conn)?
.ok_or_else(|| format!("Room not found or closed, id = '{}'", payload.id))
.ok_or_else(|| anyhow!("Room not found or closed"))
.status(ResponseStatus::NOT_FOUND)?
};

shared::add_room_logger_tags(context, &room);

// Authorize subscribing to the room's events.
let room_id = room.id().to_string();
let object = vec!["rooms", &room_id, "events"];
Expand Down Expand Up @@ -433,9 +441,11 @@ impl RequestHandler for LeaveHandler {
let room = db::room::FindQuery::new()
.id(payload.id)
.execute(&conn)?
.ok_or_else(|| format!("Room not found, id = '{}'", payload.id))
.ok_or_else(|| anyhow!("Room not found"))
.status(ResponseStatus::NOT_FOUND)?;

shared::add_room_logger_tags(context, &room);

// Check room presence.
let presence = db::agent::ListQuery::new()
.room_id(room.id())
Expand All @@ -446,12 +456,8 @@ impl RequestHandler for LeaveHandler {
};

if presence.is_empty() {
return Err(format!(
"agent = '{}' is not online in the room = '{}'",
reqp.as_agent_id(),
room.id()
))
.status(ResponseStatus::NOT_FOUND);
return Err(anyhow!("Agent is not online in the room"))
.status(ResponseStatus::NOT_FOUND);
}

// Send dynamic subscription deletion request to the broker.
Expand Down
52 changes: 42 additions & 10 deletions src/app/endpoint/rtc.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use std::fmt;

use async_std::stream;
use async_trait::async_trait;
use serde_derive::{Deserialize, Serialize};
Expand Down Expand Up @@ -47,17 +49,21 @@ impl RequestHandler for CreateHandler {
payload: Self::Payload,
reqp: &IncomingRequestProperties,
) -> Result {
context.add_logger_tags(o!("room_id" => payload.room_id.to_string()));

let room = {
let conn = context.db().get()?;

db::room::FindQuery::new()
.time(db::room::now())
.id(payload.room_id)
.execute(&conn)?
.ok_or_else(|| format!("the room = '{}' is not found", payload.room_id))
.ok_or_else(|| anyhow!("Room not found or closed"))
.status(ResponseStatus::NOT_FOUND)?
};

shared::add_room_logger_tags(context, &room);

// Authorize room creation.
let room_id = room.id().to_string();
let object = vec!["rooms", &room_id, "rtcs"];
Expand All @@ -73,6 +79,8 @@ impl RequestHandler for CreateHandler {
db::rtc::InsertQuery::new(room.id()).execute(&conn)?
};

context.add_logger_tags(o!("rtc_id" => rtc.id().to_string()));

// Respond and broadcast to the room topic.
let response = shared::build_response(
ResponseStatus::CREATED,
Expand Down Expand Up @@ -113,17 +121,21 @@ impl RequestHandler for ReadHandler {
payload: Self::Payload,
reqp: &IncomingRequestProperties,
) -> Result {
context.add_logger_tags(o!("rtc_id" => payload.id.to_string()));

let room = {
let conn = context.db().get()?;

db::room::FindQuery::new()
.time(db::room::now())
.rtc_id(payload.id)
.execute(&conn)?
.ok_or_else(|| format!("a room for the rtc = '{}' is not found", payload.id))
.ok_or_else(|| anyhow!("Room not found or closed"))
.status(ResponseStatus::NOT_FOUND)?
};

shared::add_room_logger_tags(context, &room);

// Authorize rtc reading.
let rtc_id = payload.id.to_string();
let room_id = room.id().to_string();
Expand All @@ -141,7 +153,7 @@ impl RequestHandler for ReadHandler {
db::rtc::FindQuery::new()
.id(payload.id)
.execute(&conn)?
.ok_or_else(|| format!("RTC not found, id = '{}'", payload.id))
.ok_or_else(|| anyhow!("RTC not found"))
.status(ResponseStatus::NOT_FOUND)?
};

Expand Down Expand Up @@ -178,14 +190,16 @@ impl RequestHandler for ListHandler {
payload: Self::Payload,
reqp: &IncomingRequestProperties,
) -> Result {
context.add_logger_tags(o!("room_id" => payload.room_id.to_string()));

let room = {
let conn = context.db().get()?;

db::room::FindQuery::new()
.time(db::room::now())
.id(payload.room_id)
.execute(&conn)?
.ok_or_else(|| format!("the room = '{}' is not found", payload.room_id))
.ok_or_else(|| anyhow!("Room not found or closed"))
.status(ResponseStatus::NOT_FOUND)?
};

Expand Down Expand Up @@ -232,6 +246,15 @@ pub(crate) enum ConnectIntent {
Write,
}

impl fmt::Display for ConnectIntent {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match self {
Self::Read => write!(f, "read"),
Self::Write => write!(f, "write"),
}
}
}

#[derive(Debug, Deserialize)]
pub(crate) struct ConnectRequest {
id: Uuid,
Expand All @@ -257,21 +280,28 @@ impl RequestHandler for ConnectHandler {
payload: Self::Payload,
reqp: &IncomingRequestProperties,
) -> Result {
context.add_logger_tags(o!(
"rtc_id" => payload.id.to_string(),
"intent" => payload.intent.to_string(),
));

let room = {
let conn = context.db().get()?;

db::room::FindQuery::new()
.time(db::room::now())
.rtc_id(payload.id)
.execute(&conn)?
.ok_or_else(|| format!("a room for the rtc = '{}' is not found", payload.id))
.ok_or_else(|| anyhow!("Room not found or closed"))
.status(ResponseStatus::NOT_FOUND)?
};

shared::add_room_logger_tags(context, &room);

// Authorize connecting to the rtc.
if room.backend() != db::room::RoomBackend::Janus {
return Err(format!(
"'rtc.connect' is not implemented for the backend = '{}'.",
return Err(anyhow!(
"'rtc.connect' is not implemented for '{}' backend",
room.backend(),
))
.status(ResponseStatus::NOT_IMPLEMENTED);
Expand Down Expand Up @@ -312,7 +342,7 @@ impl RequestHandler for ConnectHandler {
Some(ref recording) => db::janus_backend::FindQuery::new()
.id(recording.backend_id().to_owned())
.execute(&conn)?
.ok_or("no backend found for stream")
.ok_or_else(|| anyhow!("No backend found for stream"))
.status(ResponseStatus::UNPROCESSABLE_ENTITY)?,
None => match db::janus_backend::most_loaded(room.id(), &conn)? {
Some(backend) => backend,
Expand All @@ -339,7 +369,7 @@ impl RequestHandler for ConnectHandler {

backend
})
.ok_or("no available backends")
.ok_or_else(|| anyhow!("No available backends"))
.status(ResponseStatus::SERVICE_UNAVAILABLE)?,
},
};
Expand All @@ -365,6 +395,8 @@ impl RequestHandler for ConnectHandler {
backend
};

context.add_logger_tags(o!("backend_id" => backend.id().to_string()));

// Send janus handle creation request.
let janus_request_result = context.janus_client().create_rtc_handle_request(
reqp.clone(),
Expand All @@ -387,7 +419,7 @@ impl RequestHandler for ConnectHandler {
let boxed_request = Box::new(req) as Box<dyn IntoPublishableMessage + Send>;
Ok(Box::new(stream::once(boxed_request)))
}
Err(err) => Err(format!("error creating a backend request: {}", err))
Err(err) => Err(err.context("Error creating a backend request"))
.status(ResponseStatus::UNPROCESSABLE_ENTITY),
}
}
Expand Down
Loading

0 comments on commit e052bd1

Please sign in to comment.