Skip to content

Commit

Permalink
fix: error handling in thread
Browse files Browse the repository at this point in the history
Signed-off-by: Malo Polese <malo.polese@gmail.com>
  • Loading branch information
MaloPolese committed May 7, 2023
1 parent e4da759 commit 8acf04a
Show file tree
Hide file tree
Showing 3 changed files with 52 additions and 25 deletions.
48 changes: 30 additions & 18 deletions controller/src/api/external/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ use tiny_http::{Request, Server as TinyServer};

use tracing::{event, Level};

use super::RikError;

pub struct Server {
internal_sender: Sender<ApiChannel>,
}
Expand All @@ -20,11 +22,11 @@ impl Server {
Server { internal_sender }
}

pub fn run(&self, db: Arc<RikDataBase>) {
self.run_server(db);
pub fn run(&self, db: Arc<RikDataBase>) -> Result<(), RikError> {
self.run_server(db)
}

fn run_server(&self, db: Arc<RikDataBase>) {
fn run_server(&self, db: Arc<RikDataBase>) -> Result<(), RikError> {
let host = String::from("0.0.0.0");
dotenv().ok();
let port: usize = match std::env::var("PORT") {
Expand All @@ -41,28 +43,38 @@ impl Server {
let db = db.clone();
let internal_sender = self.internal_sender.clone();

let guard = thread::spawn(move || loop {
let router = routes::Router::new();
let connection = db.open().unwrap();
let guard = thread::spawn(move || -> Result<(), RikError> {
loop {
let router = routes::Router::new();
let connection = db.open().map_err(RikError::DatabaseError)?;

let mut req: Request = server.recv().unwrap();
let mut req: Request = server.recv().unwrap();

if let Some(res) = router.handle(&mut req, &connection, &internal_sender) {
req.respond(res).unwrap();
continue;
if let Some(res) = router.handle(&mut req, &connection, &internal_sender) {
req.respond(res).unwrap();
continue;
}
event!(
Level::INFO,
"Route {} ({}) could not be found",
req.url(),
req.method()
);
req.respond(tiny_http::Response::empty(tiny_http::StatusCode::from(404)))
.unwrap();
}
event!(
Level::INFO,
"Route {} ({}) could not be found",
req.url(),
req.method()
);
req.respond(tiny_http::Response::empty(tiny_http::StatusCode::from(404)))
.unwrap();
});

guards.push(guard);
}

for guard in guards {
guard
.join()
.expect("Couldn't join on the associated thread")?
}

event!(Level::INFO, "Server running on http://{}:{}", host, port);
Ok(())
}
}
5 changes: 5 additions & 0 deletions controller/src/api/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ use definition::workload::WorkloadDefinition;
use std::fmt::{Debug, Display, Formatter, Result};
use thiserror::Error;

use crate::database::DatabaseError;

#[derive(Debug)]
pub enum Crud {
Create = 0,
Expand All @@ -23,6 +25,9 @@ impl From<i32> for Crud {

#[derive(Debug, Error)]
pub enum RikError {
#[error("Database error {0}")]
DatabaseError(DatabaseError),

#[error("Internal communication error: {0}")]
InternalCommunicationError(String),

Expand Down
24 changes: 17 additions & 7 deletions controller/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@ mod tests;
use std::sync::mpsc::channel;
use std::thread;

use crate::database::RikDataBase;
use crate::{api::RikError, database::RikDataBase};
use api::{external, ApiChannel};
use tracing::{event, metadata::LevelFilter, Level};
use tracing::{error, event, metadata::LevelFilter, Level};
use tracing_subscriber::{
fmt, prelude::__tracing_subscriber_SubscriberExt, util::SubscriberInitExt, EnvFilter,
};
Expand All @@ -32,7 +32,9 @@ async fn main() {
logger_setup();
event!(Level::INFO, "Starting Rik");
let db = RikDataBase::new(String::from("rik"));
db.init_tables().unwrap();
if let Err(e) = db.init_tables() {
error!("Error while table initialization {}", e)
}

let (legacy_sender, legacy_receiver) = channel::<ApiChannel>();

Expand All @@ -42,18 +44,26 @@ async fn main() {
let external_api = external::Server::new(legacy_sender);
let mut threads = Vec::new();

threads.push(thread::spawn(move || {
threads.push(thread::spawn(move || -> Result<(), RikError> {
let future = async move { internal_api.listen_notification(legacy_receiver).await };
Builder::new_multi_thread()
.enable_all()
.build()
.unwrap()
.block_on(future)
.block_on(future);
Ok(())
}));

threads.push(thread::spawn(move || external_api.run(db)));
threads.push(thread::spawn(move || -> Result<(), RikError> {
external_api.run(db)
}));

for thread in threads {
thread.join().unwrap();
if let Err(e) = thread
.join()
.expect("Couldn't join on the associated thread")
{
error!("An error occured {}", e)
}
}
}

0 comments on commit 8acf04a

Please sign in to comment.