From 19667a895667444c154806b587d4542aa65f1f32 Mon Sep 17 00:00:00 2001 From: Marc-Antoine Arnaud Date: Tue, 24 Dec 2019 05:22:17 +0100 Subject: [PATCH 1/9] refactor some code --- Cargo.lock | 4 +--- Cargo.toml | 2 +- src/constants.rs | 8 ++++++++ src/main.rs | 43 ++++++++++++------------------------------- src/process_return.rs | 37 +++++++++++++++++++++++++++++++++++++ src/worker.rs | 39 ++++++++++----------------------------- 6 files changed, 69 insertions(+), 64 deletions(-) create mode 100644 src/constants.rs create mode 100644 src/process_return.rs diff --git a/Cargo.lock b/Cargo.lock index 926c612..ccf89e5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -69,7 +69,6 @@ dependencies = [ [[package]] name = "amqp_worker" version = "0.6.0" -source = "registry+https://github.com/rust-lang/crates.io-index" dependencies = [ "amq-protocol-types 2.3.0 (registry+https://github.com/rust-lang/crates.io-index)", "amq-protocol-uri 2.3.0 (registry+https://github.com/rust-lang/crates.io-index)", @@ -193,7 +192,7 @@ dependencies = [ name = "c_amqp_worker" version = "0.2.0" dependencies = [ - "amqp_worker 0.6.0 (registry+https://github.com/rust-lang/crates.io-index)", + "amqp_worker 0.6.0", "libc 0.2.66 (registry+https://github.com/rust-lang/crates.io-index)", "libloading 0.5.2 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.4.8 (registry+https://github.com/rust-lang/crates.io-index)", @@ -1866,7 +1865,6 @@ dependencies = [ "checksum amq-protocol-tcp 2.3.0 (registry+https://github.com/rust-lang/crates.io-index)" = "e4a039afb7f213011c3c471adc75b876ab2f9642cbe23682de65e89c4a30ab19" "checksum amq-protocol-types 2.3.0 (registry+https://github.com/rust-lang/crates.io-index)" = "7dfcfbaac30db6586c7837b5c62ef7a00247c664bba29f548ab353481b1ad0fc" "checksum amq-protocol-uri 2.3.0 (registry+https://github.com/rust-lang/crates.io-index)" = "5735ea31591bb8680556706fd4121fbb7039653d5adfc905fe9a3a6535c3c52f" -"checksum amqp_worker 0.6.0 (registry+https://github.com/rust-lang/crates.io-index)" = "41bc749b5c1307124bfd71fda13a0ee83884dc942838d96f70e9a38df1871b0b" "checksum arrayvec 0.4.12 (registry+https://github.com/rust-lang/crates.io-index)" = "cd9fd44efafa8690358b7408d253adf110036b88f55672a933f01d616ad9b1b9" "checksum atty 0.2.13 (registry+https://github.com/rust-lang/crates.io-index)" = "1803c647a3ec87095e7ae7acfca019e98de5ec9a7d01343f611cf3152ed71a90" "checksum autocfg 0.1.7 (registry+https://github.com/rust-lang/crates.io-index)" = "1d49d90015b3c36167a20fe2810c5cd875ad504b39cff3d4eae7977e6b7c1cb2" diff --git a/Cargo.toml b/Cargo.toml index 233b309..2adda08 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -6,7 +6,7 @@ edition = "2018" [dependencies] libc = "0.2.66" -amqp_worker = "0.6.0" +amqp_worker = {version = "0.6.0", path = "../rs_amqp_worker"} log = "0.4.8" semver = "^0.9" serde_json = "^1.0" diff --git a/src/constants.rs b/src/constants.rs new file mode 100644 index 0000000..d3ddd9e --- /dev/null +++ b/src/constants.rs @@ -0,0 +1,8 @@ + +pub static GET_NAME_FUNCTION: &str = "get_name"; +pub static GET_SHORT_DESCRIPTION_FUNCTION: &str = "get_short_description"; +pub static GET_DESCRIPTION_FUNCTION: &str = "get_description"; +pub static GET_VERSION_FUNCTION: &str = "get_version"; +pub static GET_PARAMETERS_SIZE_FUNCTION: &str = "get_parameters_size"; +pub static GET_PARAMETERS_FUNCTION: &str = "get_parameters"; +pub static PROCESS_FUNCTION: &str = "process"; diff --git a/src/main.rs b/src/main.rs index 05ef091..936a727 100644 --- a/src/main.rs +++ b/src/main.rs @@ -3,7 +3,9 @@ extern crate libloading; #[macro_use] extern crate log; +mod constants; mod worker; +mod process_return; use amqp_worker::job::*; use amqp_worker::start_worker; @@ -19,19 +21,19 @@ struct CWorkerEvent {} impl MessageEvent for CWorkerEvent { fn get_name(&self) -> String { - get_worker_function_string_value(GET_NAME_FUNCTION) + get_worker_function_string_value(constants::GET_NAME_FUNCTION) } fn get_short_description(&self) -> String { - get_worker_function_string_value(GET_SHORT_DESCRIPTION_FUNCTION) + get_worker_function_string_value(constants::GET_SHORT_DESCRIPTION_FUNCTION) } fn get_description(&self) -> String { - get_worker_function_string_value(GET_DESCRIPTION_FUNCTION) + get_worker_function_string_value(constants::GET_DESCRIPTION_FUNCTION) } fn get_version(&self) -> Version { - let version = get_worker_function_string_value(GET_VERSION_FUNCTION); + let version = get_worker_function_string_value(constants::GET_VERSION_FUNCTION); Version::parse(&version).unwrap_or_else(|_| { panic!( "unable to parse version {} (please use SemVer format)", @@ -40,11 +42,6 @@ impl MessageEvent for CWorkerEvent { }) } - fn get_git_version(&self) -> Version { - // TODO get real git version? - self.get_version() - } - fn get_parameters(&self) -> Vec { get_worker_parameters() } @@ -53,27 +50,13 @@ impl MessageEvent for CWorkerEvent { let job = Job::new(message)?; debug!("received message: {:?}", job); - match job.check_requirements() { - Ok(_) => {} - Err(message) => { - return Err(message); - } - } + job.check_requirements()?; let job_id = job.job_id; debug!("Process job: {:?}", job_id); let process_return = call_worker_process(job); debug!("Returned: {:?}", process_return); - match process_return { - ProcessReturn { code: 0, message } => { - Ok(JobResult::new(job_id, JobStatus::Completed, vec![]).with_message(message)) - } - ProcessReturn { code, message } => { - let result = JobResult::new(job_id, JobStatus::Error, vec![]) - .with_message(format!("{} (code: {:?})", message, code)); - Err(MessageError::ProcessingError(result)) - } - } + process_return.as_result(job_id) } } @@ -90,8 +73,7 @@ pub fn test_c_binding_worker_info() { let name = C_WORKER_EVENT.get_name(); let short_description = C_WORKER_EVENT.get_short_description(); let description = C_WORKER_EVENT.get_description(); - let version = C_WORKER_EVENT.get_version(); - let git_version = C_WORKER_EVENT.get_git_version(); + let version = C_WORKER_EVENT.get_version().to_string(); assert_eq!(name, "my_c_worker".to_string()); assert_eq!(short_description, "My C Worker".to_string()); @@ -99,8 +81,7 @@ pub fn test_c_binding_worker_info() { description, "This is my long description \nover multilines".to_string() ); - assert_eq!(version.to_string(), "0.1.0".to_string()); - assert_eq!(git_version, version); + assert_eq!(version, "0.1.0".to_string()); let parameters = C_WORKER_EVENT.get_parameters(); assert_eq!(1, parameters.len()); @@ -138,8 +119,8 @@ pub fn test_process() { let result = C_WORKER_EVENT.process(message); assert!(result.is_ok()); let job_result = result.unwrap(); - assert_eq!(123, job_result.job_id); - assert_eq!(JobStatus::Completed, job_result.status); + assert_eq!(job_result.get_job_id(), 123); + assert_eq!(job_result.get_status(), &JobStatus::Completed); } #[test] diff --git a/src/process_return.rs b/src/process_return.rs new file mode 100644 index 0000000..5658788 --- /dev/null +++ b/src/process_return.rs @@ -0,0 +1,37 @@ + +use amqp_worker::job::*; +use amqp_worker::MessageError; + +#[derive(Debug)] +pub struct ProcessReturn { + pub code: i32, + pub message: String, +} + +impl ProcessReturn { + pub fn new(code: i32, message: &str) -> Self { + ProcessReturn { + code, + message: message.to_string(), + } + } + + pub fn new_error(message: &str) -> Self { + ProcessReturn::new(1, message) + } + + pub fn as_result(&self, job_id: u64) -> Result { + if self.code == 0 { + let job_result = + JobResult::new(job_id, JobStatus::Completed) + .with_message(&self.message); + + Ok(job_result) + } else { + let result = JobResult::new(job_id, JobStatus::Error) + .with_message(&format!("{} (code: {:?})", self.message, self.code)); + + Err(MessageError::ProcessingError(result)) + } + } +} diff --git a/src/worker.rs b/src/worker.rs index 8da293a..d9abde5 100644 --- a/src/worker.rs +++ b/src/worker.rs @@ -7,6 +7,8 @@ use amqp_worker::job::Job; use amqp_worker::worker::{Parameter, ParameterType}; use amqp_worker::ParametersContainer; use libloading::Library; +use crate::constants; +use crate::process_return::ProcessReturn; thread_local!(static LAST_ERROR: RefCell> = RefCell::new(None)); @@ -56,18 +58,10 @@ type ProcessFunc = unsafe fn( type CheckLastError = extern "C" fn() -> c_int; -pub static GET_NAME_FUNCTION: &str = "get_name"; -pub static GET_SHORT_DESCRIPTION_FUNCTION: &str = "get_short_description"; -pub static GET_DESCRIPTION_FUNCTION: &str = "get_description"; -pub static GET_VERSION_FUNCTION: &str = "get_version"; -pub static GET_PARAMETERS_SIZE_FUNCTION: &str = "get_parameters_size"; -pub static GET_PARAMETERS_FUNCTION: &str = "get_parameters"; -pub static PROCESS_FUNCTION: &str = "process"; - extern "C" fn check_error() -> c_int { let last_error = LAST_ERROR.with(|last_error| last_error.replace(None)); if let Some(error_message) = last_error { - return_with_error(error_message).code + ProcessReturn::new_error(&error_message).code } else { 0 } @@ -103,16 +97,6 @@ extern "C" fn log(value: *const c_char) { * Utility functions ************************/ -#[derive(Debug)] -pub struct ProcessReturn { - pub code: i32, - pub message: String, -} - -fn return_with_error(message: String) -> ProcessReturn { - ProcessReturn { code: 1, message } -} - fn get_parameter_type_from_c_str(c_str: &CStr) -> ParameterType { match c_str.to_str() { Ok(c_str) => { @@ -185,7 +169,7 @@ pub fn get_worker_parameters() -> Vec { Ok(worker_lib) => unsafe { // Retrieve number of parameters from the worker getter function let get_parameters_size_func: libloading::Symbol = - get_library_function(&worker_lib, GET_PARAMETERS_SIZE_FUNCTION) + get_library_function(&worker_lib, constants::GET_PARAMETERS_SIZE_FUNCTION) .unwrap_or_else(|error| panic!(error)); let parameters_size = get_parameters_size_func() as usize; @@ -194,7 +178,7 @@ pub fn get_worker_parameters() -> Vec { as *mut WorkerParameter; let get_parameters_func: libloading::Symbol = - get_library_function(&worker_lib, GET_PARAMETERS_FUNCTION) + get_library_function(&worker_lib, constants::GET_PARAMETERS_FUNCTION) .unwrap_or_else(|error| panic!(error)); get_parameters_func(worker_parameters); @@ -221,7 +205,7 @@ pub fn call_worker_process(job: Job) -> ProcessReturn { debug!("Call worker process from library: {}", library); match libloading::Library::new(library) { Ok(worker_lib) => unsafe { - match get_library_function(&worker_lib, PROCESS_FUNCTION) + match get_library_function(&worker_lib, constants::PROCESS_FUNCTION) as Result, String> { Ok(process_func) => { @@ -257,18 +241,15 @@ pub fn call_worker_process(job: Job) -> ProcessReturn { let message = get_c_string!(message_ptr); libc::free(message_ptr as *mut libc::c_void); - ProcessReturn { - code: return_code, - message, - } + ProcessReturn::new(return_code, &message) } - Err(error) => return_with_error(format!( + Err(error) => ProcessReturn::new_error(&format!( "Could not access {:?} function from worker library: {:?}", - PROCESS_FUNCTION, error + constants::PROCESS_FUNCTION, error )), } }, - Err(error) => return_with_error(format!( + Err(error) => ProcessReturn::new_error(&format!( "Could not load worker dynamic library: {:?}", error )), From 9f992f5689782ce521eaa677ab995148b72bdfd4 Mon Sep 17 00:00:00 2001 From: Marc-Antoine Arnaud Date: Tue, 24 Dec 2019 05:29:36 +0100 Subject: [PATCH 2/9] pass ProcessReturn parameters as private --- src/process_return.rs | 18 +++++++++++++++--- src/worker.rs | 12 ++++++------ 2 files changed, 21 insertions(+), 9 deletions(-) diff --git a/src/process_return.rs b/src/process_return.rs index 5658788..d4a5c8f 100644 --- a/src/process_return.rs +++ b/src/process_return.rs @@ -4,8 +4,8 @@ use amqp_worker::MessageError; #[derive(Debug)] pub struct ProcessReturn { - pub code: i32, - pub message: String, + code: i32, + message: String, } impl ProcessReturn { @@ -17,7 +17,19 @@ impl ProcessReturn { } pub fn new_error(message: &str) -> Self { - ProcessReturn::new(1, message) + ProcessReturn::new(ProcessReturn::get_error_code(), message) + } + + pub fn get_error_code() -> i32 { + 1 + } + + pub fn get_code(&self) -> i32 { + self.code + } + + pub fn get_message(&self) -> &String { + &self.message } pub fn as_result(&self, job_id: u64) -> Result { diff --git a/src/worker.rs b/src/worker.rs index d9abde5..69b08f2 100644 --- a/src/worker.rs +++ b/src/worker.rs @@ -60,8 +60,8 @@ type CheckLastError = extern "C" fn() -> c_int; extern "C" fn check_error() -> c_int { let last_error = LAST_ERROR.with(|last_error| last_error.replace(None)); - if let Some(error_message) = last_error { - ProcessReturn::new_error(&error_message).code + if last_error.is_some() { + ProcessReturn::get_error_code() } else { 0 } @@ -271,8 +271,8 @@ pub fn test_c_binding_process() { let job = Job::new(message).unwrap(); let returned_code = call_worker_process(job); - assert_eq!(0, returned_code.code); - assert_eq!("Everything worked well!", returned_code.message); + assert_eq!(returned_code.get_code(), 0); + assert_eq!(returned_code.get_message(), "Everything worked well!"); } #[test] @@ -290,6 +290,6 @@ pub fn test_c_binding_failing_process() { let job = Job::new(message).unwrap(); let returned_code = call_worker_process(job); - assert_eq!(1, returned_code.code); - assert_eq!("Something went wrong...", returned_code.message); + assert_eq!(returned_code.get_code(), 1); + assert_eq!(returned_code.get_message(), "Something went wrong..."); } From 93a9290cd37229a4b9507aaddb07ce1484d5b8d6 Mon Sep 17 00:00:00 2001 From: Marc-Antoine Arnaud Date: Tue, 24 Dec 2019 10:41:09 +0100 Subject: [PATCH 3/9] add return of output_paths --- src/main.rs | 2 + src/process_return.rs | 14 ++++++ src/worker.rs | 99 ++++++++++++++++++++++++++----------------- worker.cpp | 59 ++++++++++++++------------ 4 files changed, 109 insertions(+), 65 deletions(-) diff --git a/src/main.rs b/src/main.rs index 936a727..b62f94e 100644 --- a/src/main.rs +++ b/src/main.rs @@ -121,6 +121,7 @@ pub fn test_process() { let job_result = result.unwrap(); assert_eq!(job_result.get_job_id(), 123); assert_eq!(job_result.get_status(), &JobStatus::Completed); + assert_eq!(job_result.get_destination_paths(), &vec!["/path/out.mxf".to_string()]); } #[test] @@ -139,4 +140,5 @@ pub fn test_failing_process() { let result = C_WORKER_EVENT.process(message); assert!(result.is_err()); let _message_error = result.unwrap_err(); + } diff --git a/src/process_return.rs b/src/process_return.rs index d4a5c8f..c0832be 100644 --- a/src/process_return.rs +++ b/src/process_return.rs @@ -6,6 +6,7 @@ use amqp_worker::MessageError; pub struct ProcessReturn { code: i32, message: String, + output_paths: Vec, } impl ProcessReturn { @@ -13,6 +14,7 @@ impl ProcessReturn { ProcessReturn { code, message: message.to_string(), + output_paths: vec![], } } @@ -20,6 +22,11 @@ impl ProcessReturn { ProcessReturn::new(ProcessReturn::get_error_code(), message) } + pub fn with_output_paths(mut self, output_paths: Vec) -> Self { + self.output_paths = output_paths; + self + } + pub fn get_error_code() -> i32 { 1 } @@ -32,10 +39,17 @@ impl ProcessReturn { &self.message } + pub fn get_output_paths(&self) -> &Vec { + &self.output_paths + } + pub fn as_result(&self, job_id: u64) -> Result { if self.code == 0 { + let mut output_paths = self.output_paths.clone(); + let job_result = JobResult::new(job_id, JobStatus::Completed) + .with_destination_paths(&mut output_paths) .with_message(&self.message); Ok(job_result) diff --git a/src/worker.rs b/src/worker.rs index 69b08f2..0cc546d 100644 --- a/src/worker.rs +++ b/src/worker.rs @@ -1,4 +1,4 @@ -use std::cell::RefCell; + use std::collections::HashMap; use std::ffi::{CStr, CString}; use std::os::raw::{c_char, c_int, c_uint, c_void}; @@ -10,21 +10,13 @@ use libloading::Library; use crate::constants; use crate::process_return::ProcessReturn; -thread_local!(static LAST_ERROR: RefCell> = RefCell::new(None)); - -macro_rules! handle_error { - ($name:expr) => { - LAST_ERROR.with(|last_error| { - last_error.replace(Some($name)); - }); - }; -} - macro_rules! get_c_string { ($name:expr) => { - CString::from(CStr::from_ptr($name)) - .into_string() - .expect("cannot convert C string to String") + if $name.is_null() { + "".to_string() + } else { + std::str::from_utf8_unchecked(CStr::from_ptr($name).to_bytes()).to_string() + } }; } @@ -47,26 +39,15 @@ type GetParametersSizeFunc = unsafe fn() -> c_uint; type GetParametersFunc = unsafe fn(parameters: *mut WorkerParameter); type GetParameterValueCallback = extern "C" fn(*mut c_void, *const c_char) -> *const c_char; -type LogCallback = extern "C" fn(*const c_char); +type LoggerCallback = extern "C" fn(*const c_char, *const c_char); type ProcessFunc = unsafe fn( job: *mut c_void, callback: GetParameterValueCallback, - check_error: CheckLastError, - logger: LogCallback, - output_message: *mut c_char, + logger: LoggerCallback, + output_message: &*const c_char, + output_paths: &*mut *const c_char ) -> c_int; -type CheckLastError = extern "C" fn() -> c_int; - -extern "C" fn check_error() -> c_int { - let last_error = LAST_ERROR.with(|last_error| last_error.replace(None)); - if last_error.is_some() { - ProcessReturn::get_error_code() - } else { - 0 - } -} - #[allow(unused_assignments)] extern "C" fn get_parameter_value( mut c_worker_job: *mut c_void, @@ -74,22 +55,33 @@ extern "C" fn get_parameter_value( ) -> *const c_char { let job_params_ptrs: Box> = unsafe { Box::from_raw(c_worker_job as *mut HashMap) }; + let key = unsafe { get_c_string!(parameter_id) }; debug!("Get parameter value from id: {:?}", key); + let param_value = if let Some(value) = job_params_ptrs.get(&key) { *value } else { - handle_error!(format!("No worker_job parameter for id: {}.", key)); std::ptr::null() }; + // reset job parameters pointer c_worker_job = Box::into_raw(job_params_ptrs) as *mut c_void; param_value } -extern "C" fn log(value: *const c_char) { +extern "C" fn logger(level: *const c_char, raw_value: *const c_char) { unsafe { - debug!("[Worker] {}", get_c_string!(value)); + let level = get_c_string!(level); + let value = get_c_string!(raw_value); + + match level.as_str() { + "trace" => {trace!("[Worker] {}", value);}, + "debug" => {debug!("[Worker] {}", value);}, + "info" => {info!("[Worker] {}", value);}, + "error" => {error!("[Worker] {}", value);}, + _ => {} + } } } @@ -225,23 +217,50 @@ pub fn call_worker_process(job: Job) -> ProcessReturn { let boxed_job_params_ptrs = Box::new(job_params_ptrs); let job_params_ptrs_ptr = Box::into_raw(boxed_job_params_ptrs); - // Get output message pointer - let message_ptr = libc::malloc(1024 * 1024) as *mut c_char; // 1MB max. sized message + let message_ptr = std::ptr::null(); + + let mut output_paths_ptr = vec![std::ptr::null()]; + let ptr = output_paths_ptr.as_mut_ptr(); // Call C worker process function let return_code = process_func( job_params_ptrs_ptr as *mut c_void, get_parameter_value, - check_error, - log, - message_ptr, + logger, + &message_ptr, + &ptr ); + let mut output_paths = vec![]; + + if !ptr.is_null() { + let mut offset = 0; + loop { + let cur_ptr = *ptr.offset(offset); + if cur_ptr.is_null() { + break; + } + + output_paths.push(get_c_string!(cur_ptr)); + + libc::free(cur_ptr as *mut libc::c_void); + offset += 1; + } + + if offset > 0 { + libc::free(ptr as *mut libc::c_void); + } + } + // Retrieve message as string and free pointer - let message = get_c_string!(message_ptr); - libc::free(message_ptr as *mut libc::c_void); + let mut message = "".to_string(); + if !message_ptr.is_null() { + message = get_c_string!(message_ptr); + libc::free(message_ptr as *mut libc::c_void); + } ProcessReturn::new(return_code, &message) + .with_output_paths(output_paths) } Err(error) => ProcessReturn::new_error(&format!( "Could not access {:?} function from worker library: {:?}", @@ -273,6 +292,7 @@ pub fn test_c_binding_process() { let returned_code = call_worker_process(job); assert_eq!(returned_code.get_code(), 0); assert_eq!(returned_code.get_message(), "Everything worked well!"); + assert_eq!(returned_code.get_output_paths(), &vec!["/path/out.mxf".to_string()]); } #[test] @@ -292,4 +312,5 @@ pub fn test_c_binding_failing_process() { let returned_code = call_worker_process(job); assert_eq!(returned_code.get_code(), 1); assert_eq!(returned_code.get_message(), "Something went wrong..."); + assert!(returned_code.get_output_paths().is_empty()); } diff --git a/worker.cpp b/worker.cpp index fe2d048..c947199 100644 --- a/worker.cpp +++ b/worker.cpp @@ -1,3 +1,4 @@ +#include #include #include @@ -19,24 +20,15 @@ typedef struct Parameter { /** * Job parameters handler */ -typedef void* JobParameters; +typedef void* JobHandle; /** * Get job parameter value callback */ -typedef char* (*GetParameterValueCallback)(JobParameters, const char*); +typedef char* (*GetParameterValueCallback)(JobHandle, const char*); /** - * Rust logger callback + * Rust Logger */ -typedef void* (*Logger)(const char*); -/** - * Check error callback - */ -typedef int* (*CheckError)(); -/** - * Message to return as a response - */ -typedef char* OutputMessage; - +typedef void* (*Logger)(const char*, const char*); /** * Get worker name @@ -98,28 +90,43 @@ void get_parameters(Parameter* parameters) { * Worker main process function * @param job Job parameters handler * @param parametersValueGetter Get job parameter value callback - * @param checkError Check error callback - * @param logger Rust logger callback + * @param logger Rust Logger */ -int process(JobParameters job, GetParameterValueCallback parametersValueGetter, CheckError checkError, Logger logger, OutputMessage message) { - // Print message through the Rust internal logger - logger("Start C Worker process..."); +int process( + JobHandle job_handle, + GetParameterValueCallback parametersValueGetter, + Logger logger, + const char** message, + const char*** output_paths + ) { + // Print message through the Rust Logger + logger("debug", "Start C Worker process..."); // Retrieve "path" job parameter value - char* value = parametersValueGetter(job, "path"); + char* value = parametersValueGetter(job_handle, "path"); // Check whether an error occurred parsing job parameters - if(checkError() != 0) { - const char* message_str = "Something went wrong..."; - memcpy(message, message_str, strlen(message_str)); + if(value == NULL) { + const char* message_str = "Something went wrong...\0"; + size_t length = strlen(message_str) + 1; + *message = (const char *)malloc(length); + memcpy((void*)*message, message_str, length); return 1; } - // Print value through the Rust internal logger - logger(value); + // Print value through the Rust Logger + logger("debug", value); + + const char* message_str = "Everything worked well!\0"; + size_t length = strlen(message_str) + 1; + *message = (const char *)malloc(length); + memcpy((void*)*message, message_str, length); + + output_paths[0] = (const char **)malloc(sizeof(int) * 2); + output_paths[0][0] = (const char *)malloc(20); + output_paths[0][1] = 0; + memcpy((void*)output_paths[0][0], "/path/out.mxf\0", 13); - const char* message_str = "Everything worked well!"; - memcpy(message, message_str, strlen(message_str)); return 0; } From ad20d27908097cabf4969c12af629564f5d6b40b Mon Sep 17 00:00:00 2001 From: Marc-Antoine Arnaud Date: Tue, 24 Dec 2019 10:55:25 +0100 Subject: [PATCH 4/9] declare worker header --- worker.cpp | 42 +++---------------------- worker.h | 91 ++++++++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 96 insertions(+), 37 deletions(-) create mode 100644 worker.h diff --git a/worker.cpp b/worker.cpp index c947199..99f42d4 100644 --- a/worker.cpp +++ b/worker.cpp @@ -1,35 +1,10 @@ -#include -#include -#include + +#include "worker.h" #ifdef __cplusplus extern "C" { #endif -/** - * Worker parameter type - */ -typedef struct Parameter { - char* identifier; - char* label; - unsigned int kind_size; - char** kind; - int required; -} Parameter; - -/** - * Job parameters handler - */ -typedef void* JobHandle; -/** - * Get job parameter value callback - */ -typedef char* (*GetParameterValueCallback)(JobHandle, const char*); -/** - * Rust Logger - */ -typedef void* (*Logger)(const char*, const char*); - /** * Get worker name */ @@ -107,25 +82,18 @@ int process( // Check whether an error occurred parsing job parameters if(value == NULL) { - const char* message_str = "Something went wrong...\0"; - size_t length = strlen(message_str) + 1; - *message = (const char *)malloc(length); - memcpy((void*)*message, message_str, length); + set_str_on_ptr(message, "Something went wrong...\0"); return 1; } // Print value through the Rust Logger logger("debug", value); - const char* message_str = "Everything worked well!\0"; - size_t length = strlen(message_str) + 1; - *message = (const char *)malloc(length); - memcpy((void*)*message, message_str, length); + set_str_on_ptr(message, "Everything worked well!\0"); output_paths[0] = (const char **)malloc(sizeof(int) * 2); - output_paths[0][0] = (const char *)malloc(20); + set_str_on_ptr(&output_paths[0][0], "/path/out.mxf\0"); output_paths[0][1] = 0; - memcpy((void*)output_paths[0][0], "/path/out.mxf\0", 13); return 0; } diff --git a/worker.h b/worker.h new file mode 100644 index 0000000..336f49d --- /dev/null +++ b/worker.h @@ -0,0 +1,91 @@ +#include +#include +#include + +#ifdef __cplusplus +extern "C" { +#endif + +/** + * Worker parameter type + */ +typedef struct Parameter { + char* identifier; + char* label; + unsigned int kind_size; + char** kind; + int required; +} Parameter; + +/** + * Job parameters handler + */ +typedef void* JobHandle; +/** + * Get job parameter value callback + */ +typedef char* (*GetParameterValueCallback)(JobHandle, const char*); +/** + * Rust Logger + */ +typedef void* (*Logger)(const char*, const char*); + +/** + * Get worker name + */ +char* get_name(); + +/** + * Get worker short description + */ +char* get_short_description(); + +/** + * Get worker long description + */ +char* get_description(); + +/** + * Get worker version + */ +char* get_version(); + +/** + * Get number of worker parameters + */ +unsigned int get_parameters_size(); + +/** + * Retrieve worker parameters + * @param parameters Output parameters array pointer + */ +void get_parameters(Parameter* parameters); + +/** + * Worker main process function + * @param job_handle Job parameters handler + * @param parameters_value_getter Get job parameter value callback + * @param logger Rust Logger + */ +int process( + JobHandle job_handle, + GetParameterValueCallback parameters_value_getter, + Logger logger, + const char** message, + const char*** output_paths + ); + +/** + * Set the C string to the pointer + * @param message Pointer on the const char* + * @param value c string with 0 ending + */ +void set_str_on_ptr(const char** message, const char* value) { + size_t length = strlen(value) + 1; + *message = (const char *)malloc(length); + memcpy((void*)*message, value, length); +} + +#ifdef __cplusplus +} +#endif From 8327574b41df7bb57bf5750131ab7562ba7819ca Mon Sep 17 00:00:00 2001 From: Marc-Antoine Arnaud Date: Fri, 27 Dec 2019 10:43:52 +0100 Subject: [PATCH 5/9] use new version of amqp worker --- Cargo.lock | 6 ++++-- Cargo.toml | 2 +- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index ccf89e5..4e6ae5a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -68,7 +68,8 @@ dependencies = [ [[package]] name = "amqp_worker" -version = "0.6.0" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" dependencies = [ "amq-protocol-types 2.3.0 (registry+https://github.com/rust-lang/crates.io-index)", "amq-protocol-uri 2.3.0 (registry+https://github.com/rust-lang/crates.io-index)", @@ -192,7 +193,7 @@ dependencies = [ name = "c_amqp_worker" version = "0.2.0" dependencies = [ - "amqp_worker 0.6.0", + "amqp_worker 0.7.0 (registry+https://github.com/rust-lang/crates.io-index)", "libc 0.2.66 (registry+https://github.com/rust-lang/crates.io-index)", "libloading 0.5.2 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.4.8 (registry+https://github.com/rust-lang/crates.io-index)", @@ -1865,6 +1866,7 @@ dependencies = [ "checksum amq-protocol-tcp 2.3.0 (registry+https://github.com/rust-lang/crates.io-index)" = "e4a039afb7f213011c3c471adc75b876ab2f9642cbe23682de65e89c4a30ab19" "checksum amq-protocol-types 2.3.0 (registry+https://github.com/rust-lang/crates.io-index)" = "7dfcfbaac30db6586c7837b5c62ef7a00247c664bba29f548ab353481b1ad0fc" "checksum amq-protocol-uri 2.3.0 (registry+https://github.com/rust-lang/crates.io-index)" = "5735ea31591bb8680556706fd4121fbb7039653d5adfc905fe9a3a6535c3c52f" +"checksum amqp_worker 0.7.0 (registry+https://github.com/rust-lang/crates.io-index)" = "9f935f55af0a654979ed92932bcb806225a0fc6e310846361a7306a8b8cb8dd8" "checksum arrayvec 0.4.12 (registry+https://github.com/rust-lang/crates.io-index)" = "cd9fd44efafa8690358b7408d253adf110036b88f55672a933f01d616ad9b1b9" "checksum atty 0.2.13 (registry+https://github.com/rust-lang/crates.io-index)" = "1803c647a3ec87095e7ae7acfca019e98de5ec9a7d01343f611cf3152ed71a90" "checksum autocfg 0.1.7 (registry+https://github.com/rust-lang/crates.io-index)" = "1d49d90015b3c36167a20fe2810c5cd875ad504b39cff3d4eae7977e6b7c1cb2" diff --git a/Cargo.toml b/Cargo.toml index 2adda08..85fbe5e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -6,7 +6,7 @@ edition = "2018" [dependencies] libc = "0.2.66" -amqp_worker = {version = "0.6.0", path = "../rs_amqp_worker"} +amqp_worker = "0.7.0" log = "0.4.8" semver = "^0.9" serde_json = "^1.0" From 77ef71f70e8777f5b53e0b52bba5a0345964afc1 Mon Sep 17 00:00:00 2001 From: Marc-Antoine Arnaud Date: Fri, 27 Dec 2019 11:00:36 +0100 Subject: [PATCH 6/9] format code --- src/constants.rs | 1 - src/main.rs | 8 +++++--- src/process_return.rs | 4 +--- src/worker.rs | 36 +++++++++++++++++++++++------------- 4 files changed, 29 insertions(+), 20 deletions(-) diff --git a/src/constants.rs b/src/constants.rs index d3ddd9e..fadda3f 100644 --- a/src/constants.rs +++ b/src/constants.rs @@ -1,4 +1,3 @@ - pub static GET_NAME_FUNCTION: &str = "get_name"; pub static GET_SHORT_DESCRIPTION_FUNCTION: &str = "get_short_description"; pub static GET_DESCRIPTION_FUNCTION: &str = "get_description"; diff --git a/src/main.rs b/src/main.rs index b62f94e..a88fb02 100644 --- a/src/main.rs +++ b/src/main.rs @@ -4,8 +4,8 @@ extern crate libloading; extern crate log; mod constants; -mod worker; mod process_return; +mod worker; use amqp_worker::job::*; use amqp_worker::start_worker; @@ -121,7 +121,10 @@ pub fn test_process() { let job_result = result.unwrap(); assert_eq!(job_result.get_job_id(), 123); assert_eq!(job_result.get_status(), &JobStatus::Completed); - assert_eq!(job_result.get_destination_paths(), &vec!["/path/out.mxf".to_string()]); + assert_eq!( + job_result.get_destination_paths(), + &vec!["/path/out.mxf".to_string()] + ); } #[test] @@ -140,5 +143,4 @@ pub fn test_failing_process() { let result = C_WORKER_EVENT.process(message); assert!(result.is_err()); let _message_error = result.unwrap_err(); - } diff --git a/src/process_return.rs b/src/process_return.rs index c0832be..47d7a9c 100644 --- a/src/process_return.rs +++ b/src/process_return.rs @@ -1,4 +1,3 @@ - use amqp_worker::job::*; use amqp_worker::MessageError; @@ -47,8 +46,7 @@ impl ProcessReturn { if self.code == 0 { let mut output_paths = self.output_paths.clone(); - let job_result = - JobResult::new(job_id, JobStatus::Completed) + let job_result = JobResult::new(job_id, JobStatus::Completed) .with_destination_paths(&mut output_paths) .with_message(&self.message); diff --git a/src/worker.rs b/src/worker.rs index 0cc546d..956437a 100644 --- a/src/worker.rs +++ b/src/worker.rs @@ -1,14 +1,13 @@ - use std::collections::HashMap; use std::ffi::{CStr, CString}; use std::os::raw::{c_char, c_int, c_uint, c_void}; +use crate::constants; +use crate::process_return::ProcessReturn; use amqp_worker::job::Job; use amqp_worker::worker::{Parameter, ParameterType}; use amqp_worker::ParametersContainer; use libloading::Library; -use crate::constants; -use crate::process_return::ProcessReturn; macro_rules! get_c_string { ($name:expr) => { @@ -45,7 +44,7 @@ type ProcessFunc = unsafe fn( callback: GetParameterValueCallback, logger: LoggerCallback, output_message: &*const c_char, - output_paths: &*mut *const c_char + output_paths: &*mut *const c_char, ) -> c_int; #[allow(unused_assignments)] @@ -76,10 +75,18 @@ extern "C" fn logger(level: *const c_char, raw_value: *const c_char) { let value = get_c_string!(raw_value); match level.as_str() { - "trace" => {trace!("[Worker] {}", value);}, - "debug" => {debug!("[Worker] {}", value);}, - "info" => {info!("[Worker] {}", value);}, - "error" => {error!("[Worker] {}", value);}, + "trace" => { + trace!("[Worker] {}", value); + } + "debug" => { + debug!("[Worker] {}", value); + } + "info" => { + info!("[Worker] {}", value); + } + "error" => { + error!("[Worker] {}", value); + } _ => {} } } @@ -228,7 +235,7 @@ pub fn call_worker_process(job: Job) -> ProcessReturn { get_parameter_value, logger, &message_ptr, - &ptr + &ptr, ); let mut output_paths = vec![]; @@ -259,12 +266,12 @@ pub fn call_worker_process(job: Job) -> ProcessReturn { libc::free(message_ptr as *mut libc::c_void); } - ProcessReturn::new(return_code, &message) - .with_output_paths(output_paths) + ProcessReturn::new(return_code, &message).with_output_paths(output_paths) } Err(error) => ProcessReturn::new_error(&format!( "Could not access {:?} function from worker library: {:?}", - constants::PROCESS_FUNCTION, error + constants::PROCESS_FUNCTION, + error )), } }, @@ -292,7 +299,10 @@ pub fn test_c_binding_process() { let returned_code = call_worker_process(job); assert_eq!(returned_code.get_code(), 0); assert_eq!(returned_code.get_message(), "Everything worked well!"); - assert_eq!(returned_code.get_output_paths(), &vec!["/path/out.mxf".to_string()]); + assert_eq!( + returned_code.get_output_paths(), + &vec!["/path/out.mxf".to_string()] + ); } #[test] From d9528fb5ddb7b19b5d323b7281cea3cc531234e7 Mon Sep 17 00:00:00 2001 From: Marc-Antoine Arnaud Date: Fri, 27 Dec 2019 11:02:15 +0100 Subject: [PATCH 7/9] allow dead code to fix clippy --- src/process_return.rs | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/process_return.rs b/src/process_return.rs index 47d7a9c..2ad565b 100644 --- a/src/process_return.rs +++ b/src/process_return.rs @@ -30,14 +30,17 @@ impl ProcessReturn { 1 } + #[allow(dead_code)] pub fn get_code(&self) -> i32 { self.code } + #[allow(dead_code)] pub fn get_message(&self) -> &String { &self.message } + #[allow(dead_code)] pub fn get_output_paths(&self) -> &Vec { &self.output_paths } From bde47ff8bc8ed08255879dc72ad1063e84b236de Mon Sep 17 00:00:00 2001 From: Valentin NOEL Date: Fri, 27 Dec 2019 14:21:34 +0100 Subject: [PATCH 8/9] Add unit tests --- src/process_return.rs | 40 ++++++++++++++++++++++++++++++++++++++++ src/worker.rs | 31 +++++++++++++++++++++++++++++++ 2 files changed, 71 insertions(+) diff --git a/src/process_return.rs b/src/process_return.rs index 2ad565b..d04397e 100644 --- a/src/process_return.rs +++ b/src/process_return.rs @@ -62,3 +62,43 @@ impl ProcessReturn { } } } + +#[test] +pub fn process_return_new() { + let process_return = ProcessReturn::new(123, "this is a message"); + assert_eq!(123, process_return.get_code()); + assert_eq!( + &"this is a message".to_string(), + process_return.get_message() + ); + assert_eq!(0, process_return.get_output_paths().len()); +} + +#[test] +pub fn process_return_new_with_output_paths() { + let output_path = "/path/to/output"; + let mut output_paths = vec![]; + output_paths.push(output_path.to_string()); + let process_return = ProcessReturn::new(123, "this is a message").with_output_paths(output_paths); + assert_eq!(123, process_return.get_code()); + assert_eq!( + &"this is a message".to_string(), + process_return.get_message() + ); + assert_eq!(1, process_return.get_output_paths().len()); + assert_eq!( + output_path, + process_return.get_output_paths().get(0).unwrap() + ); +} + +#[test] +pub fn process_return_new_error() { + let process_return = ProcessReturn::new_error("this is an error message"); + assert_eq!(ProcessReturn::get_error_code(), process_return.get_code()); + assert_eq!( + &"this is an error message".to_string(), + process_return.get_message() + ); + assert_eq!(0, process_return.get_output_paths().len()); +} diff --git a/src/worker.rs b/src/worker.rs index 956437a..85c0d64 100644 --- a/src/worker.rs +++ b/src/worker.rs @@ -324,3 +324,34 @@ pub fn test_c_binding_failing_process() { assert_eq!(returned_code.get_message(), "Something went wrong..."); assert!(returned_code.get_output_paths().is_empty()); } + +#[test] +pub fn test_c_worker_library_env_var() { + let fake_path = "/path/to/nowhere/where/there/is/nothing"; + std::env::set_var("WORKER_LIBRARY_FILE", fake_path); + let library_path = get_library_file_path(); + assert_eq!(library_path, fake_path); + std::env::remove_var("WORKER_LIBRARY_FILE"); +} + +#[test] +pub fn test_c_worker_library_failure_on_get_worker_function_string_value() { + std::env::set_var( + "WORKER_LIBRARY_FILE", + "/path/to/nowhere/where/there/is/nothing", + ); + let result = std::panic::catch_unwind(|| get_worker_function_string_value("fake_function")); + assert!(result.is_err()); + std::env::remove_var("WORKER_LIBRARY_FILE"); +} + +#[test] +pub fn test_c_worker_library_failure_on_get_worker_parameters() { + std::env::set_var( + "WORKER_LIBRARY_FILE", + "/path/to/nowhere/where/there/is/nothing", + ); + let result = std::panic::catch_unwind(|| get_worker_parameters()); + assert!(result.is_err()); + std::env::remove_var("WORKER_LIBRARY_FILE"); +} From c63d5b0e47da82187bbfb8828001e344ea0a3c38 Mon Sep 17 00:00:00 2001 From: Valentin NOEL Date: Fri, 27 Dec 2019 15:29:04 +0100 Subject: [PATCH 9/9] Remove unit tests that access concurrently to env var --- src/worker.rs | 31 ------------------------------- 1 file changed, 31 deletions(-) diff --git a/src/worker.rs b/src/worker.rs index 85c0d64..956437a 100644 --- a/src/worker.rs +++ b/src/worker.rs @@ -324,34 +324,3 @@ pub fn test_c_binding_failing_process() { assert_eq!(returned_code.get_message(), "Something went wrong..."); assert!(returned_code.get_output_paths().is_empty()); } - -#[test] -pub fn test_c_worker_library_env_var() { - let fake_path = "/path/to/nowhere/where/there/is/nothing"; - std::env::set_var("WORKER_LIBRARY_FILE", fake_path); - let library_path = get_library_file_path(); - assert_eq!(library_path, fake_path); - std::env::remove_var("WORKER_LIBRARY_FILE"); -} - -#[test] -pub fn test_c_worker_library_failure_on_get_worker_function_string_value() { - std::env::set_var( - "WORKER_LIBRARY_FILE", - "/path/to/nowhere/where/there/is/nothing", - ); - let result = std::panic::catch_unwind(|| get_worker_function_string_value("fake_function")); - assert!(result.is_err()); - std::env::remove_var("WORKER_LIBRARY_FILE"); -} - -#[test] -pub fn test_c_worker_library_failure_on_get_worker_parameters() { - std::env::set_var( - "WORKER_LIBRARY_FILE", - "/path/to/nowhere/where/there/is/nothing", - ); - let result = std::panic::catch_unwind(|| get_worker_parameters()); - assert!(result.is_err()); - std::env::remove_var("WORKER_LIBRARY_FILE"); -}