Skip to content
This repository has been archived by the owner on Aug 16, 2023. It is now read-only.

Commit

Permalink
Switch to anyhow from failure (#108)
Browse files Browse the repository at this point in the history
  • Loading branch information
feymartynov committed May 22, 2020
1 parent e6ab003 commit 9f5bc75
Show file tree
Hide file tree
Showing 9 changed files with 52 additions and 54 deletions.
8 changes: 7 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ authors = ["Andrei Nesterov <ae.nesterov@gmail.com>"]
edition = "2018"

[dependencies]
anyhow = "1.0"
async-std = { version = "= 1.6.0-beta.1", features = ["attributes"] }
async-trait = "0.1"
base64 = "0.10"
Expand All @@ -13,7 +14,6 @@ config = "0.9"
diesel = { version = "1.4", features = ["postgres", "uuid", "chrono", "serde_json", "r2d2"] }
diesel-derive-enum = { version = "0.4", features = ["postgres"] }
env_logger = "0.6"
failure = "0.1"
futures = "0.3"
futures-channel = "0.3"
futures-util = "0.3"
Expand Down
12 changes: 7 additions & 5 deletions src/app/endpoint/rtc_signal.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
use std::result::Result as StdResult;

use anyhow::format_err;
use async_std::stream;
use async_trait::async_trait;
use chrono::{DateTime, Duration, Utc};
use failure::{err_msg, format_err, Error};
use serde_derive::{Deserialize, Serialize};
use serde_json::Value as JsonValue;
use svc_agent::mqtt::{
Expand Down Expand Up @@ -201,7 +201,7 @@ enum SdpType {
IceCandidate,
}

fn parse_sdp_type(jsep: &JsonValue) -> StdResult<SdpType, Error> {
fn parse_sdp_type(jsep: &JsonValue) -> anyhow::Result<SdpType> {
// '{"type": "offer", "sdp": _}' or '{"type": "answer", "sdp": _}'
let sdp_type = jsep.get("type");
// '{"sdpMid": _, "sdpMLineIndex": _, "candidate": _}' or '{"completed": true}' or 'null'
Expand All @@ -226,14 +226,16 @@ fn parse_sdp_type(jsep: &JsonValue) -> StdResult<SdpType, Error> {
}
}

fn is_sdp_recvonly(jsep: &JsonValue) -> StdResult<bool, Error> {
fn is_sdp_recvonly(jsep: &JsonValue) -> anyhow::Result<bool> {
use webrtc_sdp::{attribute_type::SdpAttributeType, parse_sdp};

let sdp = jsep.get("sdp").ok_or_else(|| err_msg("missing sdp"))?;
let sdp = jsep.get("sdp").ok_or_else(|| format_err!("missing sdp"))?;

let sdp = sdp
.as_str()
.ok_or_else(|| format_err!("invalid sdp = '{}'", sdp))?;
let sdp = parse_sdp(sdp, false).map_err(|_| err_msg("invalid sdp"))?;

let sdp = parse_sdp(sdp, false).map_err(|_| format_err!("invalid sdp"))?;

// Returning true if all media section contains 'recvonly' attribute
Ok(sdp.media.iter().all(|item| {
Expand Down
4 changes: 1 addition & 3 deletions src/app/endpoint/system.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,8 @@
use std::ops::Bound;
use std::result::Result as StdResult;

use async_std::stream;
use async_trait::async_trait;
use chrono::{DateTime, Utc};
use failure::Error;
use serde_derive::{Deserialize, Serialize};
use svc_agent::mqtt::{
IncomingRequestProperties, IntoPublishableDump, OutgoingEvent, OutgoingEventProperties,
Expand Down Expand Up @@ -124,7 +122,7 @@ pub(crate) fn upload_event<I>(
rtcs_and_recordings: I,
start_timestamp: DateTime<Utc>,
tracking: &TrackingProperties,
) -> StdResult<RoomUploadEvent, Error>
) -> anyhow::Result<RoomUploadEvent>
where
I: Iterator<Item = (db::rtc::Object, db::recording::Object)>,
{
Expand Down
2 changes: 1 addition & 1 deletion src/app/handle_id.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::fmt;
use std::str::FromStr;

use failure::{format_err, Error};
use anyhow::{format_err, Error};
use svc_agent::AgentId;
use uuid::Uuid;

Expand Down
22 changes: 11 additions & 11 deletions src/app/janus.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
use std::str::FromStr;

use anyhow::{Context as AnyhowContext, Result};
use async_std::stream;
use chrono::{DateTime, Duration, NaiveDateTime, Utc};
use failure::{format_err, Error};
use log::{error, info, warn};
use serde_derive::{Deserialize, Serialize};
use serde_json::Value as JsonValue;
Expand Down Expand Up @@ -62,7 +62,7 @@ pub(crate) fn create_session_request<M>(
evp: &IncomingEventProperties,
me: &M,
start_timestamp: DateTime<Utc>,
) -> Result<OutgoingRequest<CreateSessionRequest>, Error>
) -> Result<OutgoingRequest<CreateSessionRequest>>
where
M: Addressable,
{
Expand Down Expand Up @@ -104,7 +104,7 @@ pub(crate) fn create_handle_request<M>(
session_id: i64,
me: &M,
start_timestamp: DateTime<Utc>,
) -> Result<OutgoingRequest<CreateHandleRequest>, Error>
) -> Result<OutgoingRequest<CreateHandleRequest>>
where
M: Addressable,
{
Expand Down Expand Up @@ -169,7 +169,7 @@ pub(crate) fn create_rtc_handle_request<A, M>(
me: &M,
start_timestamp: DateTime<Utc>,
authz_time: Duration,
) -> Result<OutgoingRequest<CreateHandleRequest>, Error>
) -> Result<OutgoingRequest<CreateHandleRequest>>
where
A: Addressable,
M: Addressable,
Expand Down Expand Up @@ -246,7 +246,7 @@ pub(crate) fn create_stream_request<A, M>(
me: &M,
start_timestamp: DateTime<Utc>,
authz_time: Duration,
) -> Result<OutgoingRequest<MessageRequest>, Error>
) -> Result<OutgoingRequest<MessageRequest>>
where
A: Addressable,
M: Addressable,
Expand Down Expand Up @@ -321,7 +321,7 @@ pub(crate) fn read_stream_request<A, M>(
me: &M,
start_timestamp: DateTime<Utc>,
authz_time: Duration,
) -> Result<OutgoingRequest<MessageRequest>, Error>
) -> Result<OutgoingRequest<MessageRequest>>
where
A: Addressable,
M: Addressable,
Expand Down Expand Up @@ -400,7 +400,7 @@ pub(crate) fn upload_stream_request<A, M>(
to: &A,
me: &M,
start_timestamp: DateTime<Utc>,
) -> Result<OutgoingRequest<MessageRequest>, Error>
) -> Result<OutgoingRequest<MessageRequest>>
where
A: Addressable,
M: Addressable,
Expand Down Expand Up @@ -454,7 +454,7 @@ pub(crate) fn trickle_request<A, M>(
me: &M,
start_timestamp: DateTime<Utc>,
authz_time: Duration,
) -> Result<OutgoingRequest<TrickleRequest>, Error>
) -> Result<OutgoingRequest<TrickleRequest>>
where
A: Addressable,
M: Addressable,
Expand Down Expand Up @@ -515,7 +515,7 @@ pub(crate) fn agent_leave_request<T, M>(
to: &T,
me: &M,
tracking: &TrackingProperties,
) -> Result<OutgoingRequest<MessageRequest>, Error>
) -> Result<OutgoingRequest<MessageRequest>>
where
T: Addressable,
M: Addressable,
Expand Down Expand Up @@ -548,14 +548,14 @@ where
))
}

fn response_topic<T, M>(to: &T, me: &M) -> Result<String, Error>
fn response_topic<T, M>(to: &T, me: &M) -> Result<String>
where
T: Addressable,
M: Addressable,
{
Subscription::unicast_responses_from(to)
.subscription_topic(me, JANUS_API_VERSION)
.map_err(|err| format_err!("Failed to build subscription topic: {}", err))
.context("Failed to build subscription topic")
}

////////////////////////////////////////////////////////////////////////////////
Expand Down
42 changes: 18 additions & 24 deletions src/app/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use std::sync::Arc;
use std::thread;

use anyhow::{Context as AnyhowContext, Result};
use async_std::task;
use chrono::Utc;
use futures::StreamExt;
Expand All @@ -27,10 +28,7 @@ pub(crate) const API_VERSION: &str = "v1";

////////////////////////////////////////////////////////////////////////////////

pub(crate) async fn run(
db: &ConnectionPool,
authz_cache: Option<AuthzCache>,
) -> Result<(), String> {
pub(crate) async fn run(db: &ConnectionPool, authz_cache: Option<AuthzCache>) -> Result<()> {
// Config
let config = config::load().expect("Failed to load config");
info!("App config: {:?}", config);
Expand All @@ -44,15 +42,15 @@ pub(crate) async fn run(
.subject(&agent_id)
.key(config.id_token.algorithm, config.id_token.key.as_slice())
.build()
.map_err(|err| format!("Error creating an id token: {}", err))?;
.context("Error creating an id token")?;

let mut agent_config = config.mqtt.clone();
agent_config.set_password(&token);

let (mut agent, rx) = AgentBuilder::new(agent_id.clone(), API_VERSION)
.connection_mode(ConnectionMode::Service)
.start(&agent_config)
.map_err(|err| format!("Failed to create an agent: {}", err))?;
.context("Failed to create an agent")?;

// Event loop for incoming messages of MQTT Agent
let (mq_tx, mut mq_rx) = futures_channel::mpsc::unbounded::<Notification>();
Expand All @@ -67,7 +65,7 @@ pub(crate) async fn run(

// Authz
let authz = svc_authz::ClientMap::new(&config.id, authz_cache, config.authz.clone())
.map_err(|err| format!("Error converting authz config to clients: {}", err))?;
.context("Error converting authz config to clients")?;

// Sentry
if let Some(sentry_config) = config.sentry.as_ref() {
Expand Down Expand Up @@ -114,11 +112,7 @@ pub(crate) async fn run(
Ok(())
}

fn subscribe(
agent: &mut Agent,
agent_id: &AgentId,
config: &Config,
) -> Result<JanusTopics, String> {
fn subscribe(agent: &mut Agent, agent_id: &AgentId, config: &Config) -> Result<JanusTopics> {
let group = SharedGroup::new("loadbalancer", agent_id.as_account_id().clone());

// Multicast requests
Expand All @@ -128,7 +122,7 @@ fn subscribe(
QoS::AtMostOnce,
Some(&group),
)
.map_err(|err| format!("Error subscribing to multicast requests: {}", err))?;
.context("Error subscribing to multicast requests")?;

// Unicast requests
agent
Expand All @@ -137,40 +131,40 @@ fn subscribe(
QoS::AtMostOnce,
Some(&group),
)
.map_err(|err| format!("Error subscribing to unicast requests: {}", err))?;
.context("Error subscribing to unicast requests")?;

// Janus status events
let subscription = Subscription::broadcast_events(&config.backend_id, API_VERSION, "status");

agent
.subscribe(&subscription, QoS::AtLeastOnce, Some(&group))
.map_err(|err| format!("Error subscribing to backend events topic: {}", err))?;
.context("Error subscribing to backend events topic")?;

let janus_status_events_topic = subscription
.subscription_topic(agent_id, API_VERSION)
.map_err(|err| format!("Error building janus events subscription topic: {}", err))?;
.context("Error building janus events subscription topic")?;

// Janus events
let subscription = Subscription::broadcast_events(&config.backend_id, API_VERSION, "events");

agent
.subscribe(&subscription, QoS::AtLeastOnce, Some(&group))
.map_err(|err| format!("Error subscribing to backend events topic: {}", err))?;
.context("Error subscribing to backend events topic")?;

let janus_events_topic = subscription
.subscription_topic(agent_id, API_VERSION)
.map_err(|err| format!("Error building janus events subscription topic: {}", err))?;
.context("Error building janus events subscription topic")?;

// Janus responses
let subscription = Subscription::unicast_responses_from(&config.backend_id);

agent
.subscribe(&subscription, QoS::AtLeastOnce, Some(&group))
.map_err(|err| format!("Error subscribing to backend responses topic: {}", err))?;
.context("Error subscribing to backend responses topic")?;

let janus_responses_topic = subscription
.subscription_topic(agent_id, API_VERSION)
.map_err(|err| format!("Error building janus responses subscription topic: {}", err))?;
.context("Error building janus responses subscription topic")?;

// Kruonis
if let KruonisConfig {
Expand All @@ -188,20 +182,20 @@ fn subscribe(
))
}

fn subscribe_to_kruonis(kruonis_id: &AccountId, agent: &mut Agent) -> Result<(), String> {
fn subscribe_to_kruonis(kruonis_id: &AccountId, agent: &mut Agent) -> Result<()> {
let timing = ShortTermTimingProperties::new(Utc::now());

let topic = Subscription::unicast_requests_from(kruonis_id)
.subscription_topic(agent.id(), API_VERSION)
.map_err(|err| format!("Failed to build subscription topic: {:?}", err))?;
.context("Failed to build subscription topic")?;

let props = OutgoingRequestProperties::new("kruonis.subscribe", &topic, "", timing);
let event = OutgoingRequest::multicast(json!({}), props, kruonis_id);
let message = Box::new(event) as Box<dyn IntoPublishableDump + Send>;

let dump = message
.into_dump(agent.address())
.map_err(|err| format!("Failed to dump message: {}", err))?;
.context("Failed to dump message")?;

info!(
"Outgoing message = '{}' sending to the topic = '{}'",
Expand All @@ -211,7 +205,7 @@ fn subscribe_to_kruonis(kruonis_id: &AccountId, agent: &mut Agent) -> Result<(),

agent
.publish_dump(dump)
.map_err(|err| format!("Failed to publish message: {}", err))?;
.context("Failed to publish message")?;

Ok(())
}
Expand Down
8 changes: 3 additions & 5 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,12 @@ extern crate diesel;
extern crate diesel_derive_enum;

use std::env::var;
use std::io::{Error, ErrorKind};

use anyhow::Result;
use svc_authz::cache::{create_pool, Cache};

#[async_std::main]
async fn main() -> std::io::Result<()> {
async fn main() -> Result<()> {
env_logger::init();

let db = {
Expand Down Expand Up @@ -63,9 +63,7 @@ async fn main() -> std::io::Result<()> {
Cache::new(create_pool(&url, size, timeout), expiration_time)
});

app::run(&db, authz_cache)
.await
.map_err(|err| Error::new(ErrorKind::Other, err))
app::run(&db, authz_cache).await
}

mod app;
Expand Down
Loading

0 comments on commit 9f5bc75

Please sign in to comment.