Skip to content
This repository has been archived by the owner on May 12, 2020. It is now read-only.

Commit

Permalink
Merge pull request #12 from media-cloud-ai/add_destination_paths
Browse files Browse the repository at this point in the history
Add destination paths
  • Loading branch information
valnoel committed Dec 27, 2019
2 parents e17cc5e + c63d5b0 commit 80068a9
Show file tree
Hide file tree
Showing 8 changed files with 328 additions and 154 deletions.
6 changes: 3 additions & 3 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ edition = "2018"

[dependencies]
libc = "0.2.66"
amqp_worker = "0.6.0"
amqp_worker = "0.7.0"
log = "0.4.8"
semver = "^0.9"
serde_json = "^1.0"
Expand Down
7 changes: 7 additions & 0 deletions src/constants.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
pub static GET_NAME_FUNCTION: &str = "get_name";
pub static GET_SHORT_DESCRIPTION_FUNCTION: &str = "get_short_description";
pub static GET_DESCRIPTION_FUNCTION: &str = "get_description";
pub static GET_VERSION_FUNCTION: &str = "get_version";
pub static GET_PARAMETERS_SIZE_FUNCTION: &str = "get_parameters_size";
pub static GET_PARAMETERS_FUNCTION: &str = "get_parameters";
pub static PROCESS_FUNCTION: &str = "process";
47 changes: 16 additions & 31 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ extern crate libloading;
#[macro_use]
extern crate log;

mod constants;
mod process_return;
mod worker;

use amqp_worker::job::*;
Expand All @@ -19,19 +21,19 @@ struct CWorkerEvent {}

impl MessageEvent for CWorkerEvent {
fn get_name(&self) -> String {
get_worker_function_string_value(GET_NAME_FUNCTION)
get_worker_function_string_value(constants::GET_NAME_FUNCTION)
}

fn get_short_description(&self) -> String {
get_worker_function_string_value(GET_SHORT_DESCRIPTION_FUNCTION)
get_worker_function_string_value(constants::GET_SHORT_DESCRIPTION_FUNCTION)
}

fn get_description(&self) -> String {
get_worker_function_string_value(GET_DESCRIPTION_FUNCTION)
get_worker_function_string_value(constants::GET_DESCRIPTION_FUNCTION)
}

fn get_version(&self) -> Version {
let version = get_worker_function_string_value(GET_VERSION_FUNCTION);
let version = get_worker_function_string_value(constants::GET_VERSION_FUNCTION);
Version::parse(&version).unwrap_or_else(|_| {
panic!(
"unable to parse version {} (please use SemVer format)",
Expand All @@ -40,11 +42,6 @@ impl MessageEvent for CWorkerEvent {
})
}

fn get_git_version(&self) -> Version {
// TODO get real git version?
self.get_version()
}

fn get_parameters(&self) -> Vec<Parameter> {
get_worker_parameters()
}
Expand All @@ -53,27 +50,13 @@ impl MessageEvent for CWorkerEvent {
let job = Job::new(message)?;
debug!("received message: {:?}", job);

match job.check_requirements() {
Ok(_) => {}
Err(message) => {
return Err(message);
}
}
job.check_requirements()?;

let job_id = job.job_id;
debug!("Process job: {:?}", job_id);
let process_return = call_worker_process(job);
debug!("Returned: {:?}", process_return);
match process_return {
ProcessReturn { code: 0, message } => {
Ok(JobResult::new(job_id, JobStatus::Completed, vec![]).with_message(message))
}
ProcessReturn { code, message } => {
let result = JobResult::new(job_id, JobStatus::Error, vec![])
.with_message(format!("{} (code: {:?})", message, code));
Err(MessageError::ProcessingError(result))
}
}
process_return.as_result(job_id)
}
}

Expand All @@ -90,17 +73,15 @@ pub fn test_c_binding_worker_info() {
let name = C_WORKER_EVENT.get_name();
let short_description = C_WORKER_EVENT.get_short_description();
let description = C_WORKER_EVENT.get_description();
let version = C_WORKER_EVENT.get_version();
let git_version = C_WORKER_EVENT.get_git_version();
let version = C_WORKER_EVENT.get_version().to_string();

assert_eq!(name, "my_c_worker".to_string());
assert_eq!(short_description, "My C Worker".to_string());
assert_eq!(
description,
"This is my long description \nover multilines".to_string()
);
assert_eq!(version.to_string(), "0.1.0".to_string());
assert_eq!(git_version, version);
assert_eq!(version, "0.1.0".to_string());

let parameters = C_WORKER_EVENT.get_parameters();
assert_eq!(1, parameters.len());
Expand Down Expand Up @@ -138,8 +119,12 @@ pub fn test_process() {
let result = C_WORKER_EVENT.process(message);
assert!(result.is_ok());
let job_result = result.unwrap();
assert_eq!(123, job_result.job_id);
assert_eq!(JobStatus::Completed, job_result.status);
assert_eq!(job_result.get_job_id(), 123);
assert_eq!(job_result.get_status(), &JobStatus::Completed);
assert_eq!(
job_result.get_destination_paths(),
&vec!["/path/out.mxf".to_string()]
);
}

#[test]
Expand Down
104 changes: 104 additions & 0 deletions src/process_return.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
use amqp_worker::job::*;
use amqp_worker::MessageError;

#[derive(Debug)]
pub struct ProcessReturn {
code: i32,
message: String,
output_paths: Vec<String>,
}

impl ProcessReturn {
pub fn new(code: i32, message: &str) -> Self {
ProcessReturn {
code,
message: message.to_string(),
output_paths: vec![],
}
}

pub fn new_error(message: &str) -> Self {
ProcessReturn::new(ProcessReturn::get_error_code(), message)
}

pub fn with_output_paths(mut self, output_paths: Vec<String>) -> Self {
self.output_paths = output_paths;
self
}

pub fn get_error_code() -> i32 {
1
}

#[allow(dead_code)]
pub fn get_code(&self) -> i32 {
self.code
}

#[allow(dead_code)]
pub fn get_message(&self) -> &String {
&self.message
}

#[allow(dead_code)]
pub fn get_output_paths(&self) -> &Vec<String> {
&self.output_paths
}

pub fn as_result(&self, job_id: u64) -> Result<JobResult, MessageError> {
if self.code == 0 {
let mut output_paths = self.output_paths.clone();

let job_result = JobResult::new(job_id, JobStatus::Completed)
.with_destination_paths(&mut output_paths)
.with_message(&self.message);

Ok(job_result)
} else {
let result = JobResult::new(job_id, JobStatus::Error)
.with_message(&format!("{} (code: {:?})", self.message, self.code));

Err(MessageError::ProcessingError(result))
}
}
}

#[test]
pub fn process_return_new() {
let process_return = ProcessReturn::new(123, "this is a message");
assert_eq!(123, process_return.get_code());
assert_eq!(
&"this is a message".to_string(),
process_return.get_message()
);
assert_eq!(0, process_return.get_output_paths().len());
}

#[test]
pub fn process_return_new_with_output_paths() {
let output_path = "/path/to/output";
let mut output_paths = vec![];
output_paths.push(output_path.to_string());
let process_return = ProcessReturn::new(123, "this is a message").with_output_paths(output_paths);
assert_eq!(123, process_return.get_code());
assert_eq!(
&"this is a message".to_string(),
process_return.get_message()
);
assert_eq!(1, process_return.get_output_paths().len());
assert_eq!(
output_path,
process_return.get_output_paths().get(0).unwrap()
);
}

#[test]
pub fn process_return_new_error() {
let process_return = ProcessReturn::new_error("this is an error message");
assert_eq!(ProcessReturn::get_error_code(), process_return.get_code());
assert_eq!(
&"this is an error message".to_string(),
process_return.get_message()
);
assert_eq!(0, process_return.get_output_paths().len());
}
Loading

0 comments on commit 80068a9

Please sign in to comment.