Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
Signed-off-by: Benjamin Coenen <benjamin.coenen@corp.ovh.com>
  • Loading branch information
bnjjj committed Jun 12, 2019
1 parent 53b91a6 commit d73011f
Show file tree
Hide file tree
Showing 12 changed files with 205 additions and 378 deletions.
166 changes: 80 additions & 86 deletions contrib/uservices/badge/Cargo.lock

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion contrib/uservices/badge/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ serde_derive = "1.0"
serde_json = "1.0.0"
# Actor
actix = "0.8.3"
actix-web = { version = "1.0" }
actix-web = "1.0.0"
tokio = "0.1.11"
# Log
log = "0.4"
Expand Down
69 changes: 33 additions & 36 deletions contrib/uservices/badge/src/badge/handlers.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@

use actix_web::error;
use actix_web::error::Error;
use actix_web::http::HeaderMap;
use actix_web::web::Data;
use actix_web::{HttpRequest, HttpResponse};
use actix_web::{web, Error, HttpRequest, HttpResponse};
use badge_gen::{Badge, BadgeOptions};
use futures::Future;
use std::collections::HashMap;

use crate::models::StatusEnum;
use crate::run::QueryRun;
Expand All @@ -14,45 +14,42 @@ const GREEN: &str = "#21BA45";
const RED: &str = "#FF4F60";
const BLUE: &str = "#4fa3e3";

pub fn badge_handler(req: HttpRequest) -> impl Future<Item=HttpResponse, Error=Error> {
let web_state: Option<&WebState> = req.app_data::<WebState>();
pub fn badge_handler(
state: web::Data<WebState>,
query: web::Query<HashMap<String, String>>,
req: HttpRequest,
) -> impl Future<Item = HttpResponse, Error = Error> {
let project_key = req.match_info().get("project").unwrap_or_default();
let workflow_name = req.match_info().get("workflow").unwrap_or_default();
let branch = if req.match_info().query("branch").is_empty() {
get_branch_from_referer(req.headers())
} else {
Some(req.match_info().query("branch").to_string())
let branch = query
.get("branch")
.map(|q| q.to_string())
.or_else(|| get_branch_from_referer(req.headers()));

let query = QueryRun {
project_key: project_key.to_string(),
workflow_name: workflow_name.to_string(),
branch,
};

web_state
.unwrap()
.db
.send(QueryRun {
project_key: project_key.to_string(),
workflow_name: workflow_name.to_string(),
branch,
})
.from_err()
.and_then(|res| {
let run = res?;
let color = match StatusEnum::from(run.status.clone()) {
StatusEnum::Success => GREEN.to_string(),
StatusEnum::Building | StatusEnum::Waiting | StatusEnum::Checking => {
String::from(BLUE)
}
StatusEnum::Fail | StatusEnum::Stopped => RED.to_string(),
_ => "grey".to_string(),
};
state.db_actor.send(query).from_err().and_then(move |res| {
let run = res?;
let color = match StatusEnum::from(run.status.clone()) {
StatusEnum::Success => GREEN.to_string(),
StatusEnum::Building | StatusEnum::Waiting | StatusEnum::Checking => String::from(BLUE),
StatusEnum::Fail | StatusEnum::Stopped => RED.to_string(),
_ => "grey".to_string(),
};

let opts = BadgeOptions {
subject: String::from("CDS"),
status: run.status,
color,
};
let badge = Badge::new(opts).map_err(error::ErrorBadRequest)?.to_svg();
let opts = BadgeOptions {
subject: String::from("CDS"),
status: run.status,
color,
};
let badge = Badge::new(opts).map_err(error::ErrorBadRequest)?.to_svg();

Ok(HttpResponse::Ok().content_type("image/svg+xml").body(badge))
})
Ok(HttpResponse::Ok().content_type("image/svg+xml").body(badge))
})
}

fn get_branch_from_referer(headers: &HeaderMap) -> Option<String> {
Expand Down
25 changes: 1 addition & 24 deletions contrib/uservices/badge/src/configuration.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@

use config::{Config, ConfigError, Environment, File};
use sdk_cds::service::APIConfiguration;
use std::str::FromStr;
#[derive(Default, Debug, Deserialize, Clone)]
#[serde(default)]
Expand All @@ -14,8 +13,6 @@ pub struct BadgeConfiguration {
#[serde(with = "url_serde")]
pub url: url::Url,
pub name: String,
pub mode: String,
pub api: APIConfiguration,
pub database: DatabaseConfiguration,
pub kafka: KafkaConfiguration,
pub http: HTTPConfiguration,
Expand All @@ -26,8 +23,6 @@ impl std::default::Default for BadgeConfiguration {
BadgeConfiguration {
url: url::Url::from_str("http://localhost:8086").unwrap(),
name: String::default(),
mode: String::from("kafka"),
api: APIConfiguration::default(),
database: DatabaseConfiguration::default(),
kafka: KafkaConfiguration::default(),
http: HTTPConfiguration::default(),
Expand Down Expand Up @@ -90,24 +85,6 @@ pub fn get_example_config_file() -> &'static str {
# Name of this CDS badge Service
name = "cds-badge"
mode = "kafka"
######################
# CDS API Settings
#######################
[badge.api]
maxHeartbeatFailures = 10
requestTimeout = 10
token = "USECDSAPITOKEN"
[badge.api.grpc]
insecure = true
url = "http://localhost:8082"
[badge.api.http]
insecure = true
url = "http://localhost:8081"
######################
# CDS Badge Database Settings (postgresql)
#######################
Expand All @@ -121,7 +98,7 @@ pub fn get_example_config_file() -> &'static str {
timeout = 3000
######################
# CDS Badge Kafka Settings (kafka mode only)
# CDS Badge Kafka Settings
#######################
[badge.kafka]
broker = "localhost:9092"
Expand Down
22 changes: 21 additions & 1 deletion contrib/uservices/badge/src/error.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@

use actix::MailboxError;
use actix_web::{HttpResponse, ResponseError};
use diesel::result::Error as DieselError;
use failure::Error;

#[derive(Fail, Debug)]
pub enum BadgeError {
#[fail(display = "Invalid parameter")]
Expand All @@ -9,6 +11,10 @@ pub enum BadgeError {
NoRunAvailable,
#[fail(display = "Unauthorised")]
Unauthorised,
#[fail(display = "Database Error")]
DieselError { cause: DieselError },
#[fail(display = "Mailbox actix Error")]
MailboxError { cause: MailboxError },
#[fail(display = "Unexpected error")]
UnexpectedError { cause: Error },
}
Expand All @@ -19,7 +25,21 @@ impl ResponseError for BadgeError {
BadgeError::Unauthorised => HttpResponse::Unauthorized().json("Unauthorised"),
BadgeError::NoRunAvailable => HttpResponse::NotFound().json("Not run found"),
BadgeError::InvalidParameter => HttpResponse::BadRequest().json("Invalid parameter"),
BadgeError::DieselError{..} => HttpResponse::InternalServerError().json("Database error"),
BadgeError::MailboxError{..} => HttpResponse::InternalServerError().json("Communication error"),
_ => HttpResponse::InternalServerError().json("Unexpected error"),
}
}
}

impl From<DieselError> for BadgeError {
fn from(err: DieselError) -> Self {
BadgeError::DieselError { cause: err }
}
}

impl From<MailboxError> for BadgeError {
fn from(err: MailboxError) -> Self {
BadgeError::MailboxError { cause: err }
}
}
20 changes: 13 additions & 7 deletions contrib/uservices/badge/src/kafka.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
use actix::{Actor, Addr, Context, Running};
use futures::Future;
use futures::Stream;
use actix::{Actor, Addr, Context, Running, SyncContext};
use futures::{Future, Stream};
use rdkafka::client::ClientContext;
use rdkafka::config::{ClientConfig, RDKafkaLogLevel};
use rdkafka::consumer::stream_consumer::StreamConsumer;
Expand All @@ -10,18 +9,22 @@ use rdkafka::message::Message;
use rdkafka_sys;
use serde_json;


use crate::database::DbExecutor;
use crate::models::Event;
use crate::models::Run;
use crate::models::{Event, Run};
use crate::run::CreateRun;
use crate::web::Pool;


#[derive(Clone)]
pub struct KafkaConsumerActor {
pub brokers: Vec<String>,
pub topic: String,
pub user: String,
pub password: String,
pub group: String,
pub db: Addr<DbExecutor>,
pub db: Pool,
pub db_actor: Addr<DbExecutor>,
}

struct CustomContext;
Expand All @@ -48,6 +51,7 @@ impl Actor for KafkaConsumerActor {
type Context = Context<Self>;

fn started(&mut self, _ctx: &mut Self::Context) {
println!("Kafka consumer starting");
let consumer = create_consumer(
self.user.clone(),
self.password.clone(),
Expand Down Expand Up @@ -96,7 +100,9 @@ impl Actor for KafkaConsumerActor {
status: event.status.clone().into(),
..Default::default()
};
if let Err(err) = self.db.send(CreateRun { run }).flatten().wait() {

if let Err(err) = self.db_actor.send(CreateRun { run }).flatten().wait()
{
eprintln!("future run NOT created in db {:?}", err);
}

Expand Down
32 changes: 18 additions & 14 deletions contrib/uservices/badge/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,20 +44,17 @@ mod configuration;
mod database;
mod error;
mod kafka;
// mod middlewares;
mod models;
mod run;
mod schema;
mod service;
mod web;
mod status;

mod status;
mod web;
use actix::prelude::*;
use actix::{Arbiter, SyncArbiter, System};
use actix::{Actor, Arbiter, SyncArbiter, System};
use clap::{App, Arg, SubCommand};
use diesel::prelude::PgConnection;
use diesel::r2d2::ConnectionManager;
use sdk_cds::service::ServiceTrait;

use database::DbExecutor;
use kafka::KafkaConsumerActor;
Expand Down Expand Up @@ -121,35 +118,42 @@ fn main() {
.build(manager)
.expect("Failed to create pool.");

let addr = SyncArbiter::start(12, move || DbExecutor(pool.clone()));

let brokers: Vec<String> = config.kafka.broker.split(',').map(String::from).collect();
let db_addr = addr.clone();
let kafka_config = config.kafka.clone();
let _kafka_addr: Addr<KafkaConsumerActor> = Arbiter::start(|_| KafkaConsumerActor {
let clone_pool = pool.clone();
let addr = SyncArbiter::start(12, move || DbExecutor(clone_pool.clone()));
let addr_clone = addr.clone();
let clone_pool = pool.clone();
let kafka_actor = KafkaConsumerActor {
brokers,
topic: kafka_config.topic,
group: kafka_config.group,
user: kafka_config.user,
password: kafka_config.password,
db: db_addr,
db: clone_pool,
db_actor: addr.clone(),
};
Arbiter::new().exec_fn(move || {
let _kafka_addr = kafka_actor.start();
});

let host = config.http.addr.clone();
let port = config.http.port;
let mut hash = String::new();

web::http_server(
WebState {
db: addr.clone(),
hash,
db: pool.clone(),
db_actor: addr_clone,
},
host.clone(),
port.to_string(),
);

println!("Server is listening on {}:{}", host, port);
system.run();
if let Err(err) = system.run() {
eprintln!("Error when system run {:?}", err);
}
}
_ => app.write_help(&mut std::io::stdout()).unwrap(),
}
Expand Down
16 changes: 0 additions & 16 deletions contrib/uservices/badge/src/run/handlers.rs

This file was deleted.

Loading

0 comments on commit d73011f

Please sign in to comment.