Skip to content

Commit

Permalink
Merge pull request #2 from media-io/dev/return_job_result
Browse files Browse the repository at this point in the history
Jobs return JobResult struct instead of Job ID
  • Loading branch information
MarcAntoine-Arnaud committed Jul 16, 2019
2 parents c51d56e + 1eb9342 commit c4cea69
Show file tree
Hide file tree
Showing 4 changed files with 346 additions and 107 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "amqp_worker"
version = "0.5.1"
version = "0.5.2"
authors = [
"Marc-Antoine Arnaud <arnaud.marcantoine@gmail.com>",
"Valentin Noel <valentin.noel@media-io.com>"
Expand Down
283 changes: 191 additions & 92 deletions src/job.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,95 @@
use crate::config;
use crate::MessageError;
use std::path::Path;
use std::thread;

#[derive(Default, Debug, Serialize, Deserialize)]
use reqwest::Error;

use crate::config;
use crate::MessageError;

pub trait ParametersContainer {
fn get_parameters(&self) -> &Vec<Parameter>;
fn get_boolean_parameter(&self, key: &str) -> Option<bool> {
for param in self.get_parameters() {
if let Parameter::BooleanParam { id, default, value } = param {
if id == key {
if let Some(ref v) = value {
return Some(*v);
} else {
return *default;
}
}
}
}
None
}

fn get_credential_parameter(&self, key: &str) -> Option<Credential> {
for param in self.get_parameters() {
if let Parameter::CredentialParam { id, default, value } = param {
if id == key {
if let Some(ref v) = value {
return Some(Credential { key: v.to_string() });
} else {
return default.clone().map(|key| Credential { key });
}
}
}
}
None
}

fn get_integer_parameter(&self, key: &str) -> Option<i64> {
for param in self.get_parameters() {
if let Parameter::IntegerParam { id, default, value } = param {
if id == key {
if let Some(ref v) = value {
return Some(*v);
} else {
return *default;
}
}
}
}
None
}

fn get_string_parameter(&self, key: &str) -> Option<String> {
for param in self.get_parameters() {
if let Parameter::StringParam { id, default, value } = param {
if id == key {
if let Some(ref v) = value {
return Some(v.to_string());
} else {
return default.clone();
}
}
}
}
None
}

fn get_array_of_strings_parameter(&self, key: &str) -> Option<Vec<String>> {
for param in self.get_parameters() {
if let Parameter::ArrayOfStringsParam { id, default, value } = param {
if id == key {
if let Some(ref v) = value {
return Some(v.clone());
} else {
return default.clone();
}
}
}
}
None
}
}

#[derive(Clone, Default, Debug, Serialize, Deserialize, PartialEq)]
pub struct Requirement {
paths: Option<Vec<String>>,
}

#[derive(Debug, Serialize, Deserialize)]
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
#[serde(tag = "type")]
pub enum Parameter {
#[serde(rename = "array_of_strings")]
Expand Down Expand Up @@ -90,14 +171,14 @@ struct ValueResponseBody {
}

impl Credential {
pub fn request_value(&self, job: &Job) -> Result<String, MessageError> {
pub fn request_value(&self, job: &'static Job) -> Result<String, MessageError> {
let backend_endpoint = config::get_backend_hostname();
let backend_username = config::get_backend_username();
let backend_password = config::get_backend_password();

let session_url = format!("{}/sessions", backend_endpoint);
let credential_url = format!("{}/credentials/{}", backend_endpoint, self.key);
let job_id = job.job_id;
let job_clone = job.clone();

let request_thread = thread::spawn(move || {
let client = reqwest::Client::builder().build().unwrap();
Expand All @@ -113,30 +194,55 @@ impl Credential {
.post(&session_url)
.json(&session_body)
.send()
.map_err(|e| MessageError::ProcessingError(job_id, e.to_string()))?;
.map_err(|e| {
let job_result = JobResult::from(job_clone)
.with_status(JobStatus::Error)
.with_error(e);
MessageError::ProcessingError(job_result)
})?;

let r: SessionResponseBody = response
.json()
.map_err(|e| MessageError::ProcessingError(job_id, e.to_string()))?;
let r: SessionResponseBody = response.json().map_err(|e| {
let job_result = JobResult::from(job_clone)
.with_status(JobStatus::Error)
.with_error(e);
MessageError::ProcessingError(job_result)
})?;
let token = r.access_token;

let mut response = client
.get(&credential_url)
// .bearer_auth(token)
.header("Authorization", token)
.send()
.map_err(|e| MessageError::ProcessingError(job_id, e.to_string()))?;
.map_err(|e| {
let job_result = JobResult::from(job_clone)
.with_status(JobStatus::Error)
.with_error(e);
MessageError::ProcessingError(job_result)
})?;

let resp_value: ValueResponseBody = response
.json()
.map_err(|e| MessageError::ProcessingError(job_id, e.to_string()))?;
let resp_value: ValueResponseBody = response.json().map_err(|e| {
let job_result = JobResult::from(job_clone)
.with_status(JobStatus::Error)
.with_error(e);
MessageError::ProcessingError(job_result)
})?;

Ok(resp_value.data.value)
});

request_thread
.join()
.map_err(|e| MessageError::ProcessingError(job.job_id, format!("{:?}", e)))?
request_thread.join().map_err(|e| {
let job_result = JobResult::from(job_clone)
.with_status(JobStatus::Error)
.with_message(format!("{:?}", e));
MessageError::ProcessingError(job_result)
})?
}
}

impl ParametersContainer for Job {
fn get_parameters(&self) -> &Vec<Parameter> {
&self.parameters
}
}

Expand All @@ -147,81 +253,6 @@ impl Job {
.map_err(|e| MessageError::RuntimeError(format!("unable to parse input message: {:?}", e)))
}

pub fn get_boolean_parameter(&self, key: &str) -> Option<bool> {
for param in self.parameters.iter() {
if let Parameter::BooleanParam { id, default, value } = param {
if id == key {
if let Some(ref v) = value {
return Some(*v);
} else {
return *default;
}
}
}
}
None
}

pub fn get_credential_parameter(&self, key: &str) -> Option<Credential> {
for param in self.parameters.iter() {
if let Parameter::CredentialParam { id, default, value } = param {
if id == key {
if let Some(ref v) = value {
return Some(Credential { key: v.to_string() });
} else {
return default.clone().map(|key| Credential { key });
}
}
}
}
None
}

pub fn get_integer_parameter(&self, key: &str) -> Option<i64> {
for param in self.parameters.iter() {
if let Parameter::IntegerParam { id, default, value } = param {
if id == key {
if let Some(ref v) = value {
return Some(*v);
} else {
return *default;
}
}
}
}
None
}

pub fn get_string_parameter(&self, key: &str) -> Option<String> {
for param in self.parameters.iter() {
if let Parameter::StringParam { id, default, value } = param {
if id == key {
if let Some(ref v) = value {
return Some(v.to_string());
} else {
return default.clone();
}
}
}
}
None
}

pub fn get_array_of_strings_parameter(&self, key: &str) -> Option<Vec<String>> {
for param in self.parameters.iter() {
if let Parameter::ArrayOfStringsParam { id, default, value } = param {
if id == key {
if let Some(ref v) = value {
return Some(v.clone());
} else {
return default.clone();
}
}
}
}
None
}

pub fn check_requirements(&self) -> Result<(), MessageError> {
for param in self.parameters.iter() {
if let Parameter::RequirementParam { id, value, .. } = param {
Expand All @@ -243,3 +274,71 @@ impl Job {
Ok(())
}
}

#[derive(Debug, Serialize, Deserialize, PartialEq)]
pub enum JobStatus {
#[serde(rename = "unknown")]
Unknown,
#[serde(rename = "completed")]
Completed,
#[serde(rename = "error")]
Error,
}

#[derive(Debug, Serialize, Deserialize, PartialEq)]
pub struct JobResult {
pub job_id: u64,
pub status: JobStatus,
pub parameters: Vec<Parameter>,
}

impl From<Job> for JobResult {
fn from(job: Job) -> JobResult {
JobResult::new(job.job_id, JobStatus::Unknown, vec![])
}
}

impl From<&Job> for JobResult {
fn from(job: &Job) -> JobResult {
JobResult::new(job.job_id, JobStatus::Unknown, vec![])
}
}

impl ParametersContainer for JobResult {
fn get_parameters(&self) -> &Vec<Parameter> {
&self.parameters
}
}

impl JobResult {
pub fn new(job_id: u64, status: JobStatus, parameters: Vec<Parameter>) -> JobResult {
JobResult {
job_id,
status,
parameters,
}
}

pub fn with_status(mut self, status: JobStatus) -> Self {
self.status = status;
self
}

pub fn with_error(mut self, error: Error) -> Self {
self.parameters.push(Parameter::StringParam {
id: "message".to_string(),
default: None,
value: Some(error.to_string()),
});
self
}

pub fn with_message(mut self, message: String) -> Self {
self.parameters.push(Parameter::StringParam {
id: "message".to_string(),
default: None,
value: Some(message),
});
self
}
}

0 comments on commit c4cea69

Please sign in to comment.