Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
90 changes: 46 additions & 44 deletions ocaml/xapi/server_helpers.ml
Original file line number Diff line number Diff line change
Expand Up @@ -55,57 +55,58 @@ let unknown_rpc_failure func =
let parameter_count_mismatch_failure func expected received =
API.response_of_failure Api_errors.message_parameter_count_mismatch [func; expected; received]

(* Execute fn f in specified __context, marshalling result with "marshaller".
If has_task is set then __context has a real task in it that has to be completed. *)
let exec ?marshaller ?f_forward ~__context f =
(* NB:
1. If we are a slave we process the call locally assuming the locks have
already been taken by the master
2. If we are the master, locks are only necessary for the potentially-forwarded
(ie side-effecting) operations and not things like the database layer *)
try
let result =
if not(Pool_role.is_master ())
then f ~__context (* slaves process everything locally *)
else match f_forward with
| None ->
(* this operation cannot be forwarded (eg database lookup); do it now *)
f ~__context
| Some forward ->
(* use the forwarding layer (NB this might make a local call ultimately) *)
forward ~local_fn:f ~__context
in
begin match marshaller with
| None -> TaskHelper.complete ~__context None
| Some fn -> TaskHelper.complete ~__context (Some (fn result))
end;
result
with
| Api_errors.Server_error (a,b) as e when a = Api_errors.task_cancelled ->
Backtrace.is_important e;
TaskHelper.cancel ~__context;
raise e
| e ->
Backtrace.is_important e;
TaskHelper.failed ~__context e;
raise e

(** WARNING: the context is destroyed when execution is finished if the task is not forwarded, in database and not called asynchronous. *)
(* FIXME: This function should not be used for external call : we should add a proper .mli file to hide it. *)
let exec_with_context ~__context ?marshaller ?f_forward ?(called_async=false) f =
let exec_with_context ~__context ?marshaller ?f_forward ?(called_async=false) ?(has_task=false) f =
(* Execute fn f in specified __context, marshalling result with "marshaller".
If has_task is set then __context has a real task in it that has to be completed. *)
let exec () =
(* NB:
1. If we are a slave we process the call locally assuming the locks have
already been taken by the master
2. If we are the master, locks are only necessary for the potentially-forwarded
(ie side-effecting) operations and not things like the database layer *)

(* For forwarded task, we should not complete it here, the server which forward the task will complete it. However for the task forwarded by client, which param `has_task` is set with `true`, we have to complete it also. *)
let need_complete = has_task || (not (Context.forwarded_task __context)) in
try
let result =
if not(Pool_role.is_master ())
then f ~__context (* slaves process everything locally *)
else match f_forward with
| None ->
(* this operation cannot be forwarded (eg database lookup); do it now *)
f ~__context
| Some forward ->
(* use the forwarding layer (NB this might make a local call ultimately) *)
forward ~local_fn:f ~__context
in
if need_complete then begin
match marshaller with
| None -> TaskHelper.complete ~__context None
| Some fn -> TaskHelper.complete ~__context (Some (fn result))
end;
result
with
| Api_errors.Server_error (a,b) as e when a = Api_errors.task_cancelled ->
Backtrace.is_important e;
if need_complete then TaskHelper.cancel ~__context;
raise e
| e ->
Backtrace.is_important e;
if need_complete then TaskHelper.failed ~__context e;
raise e
in
Locking_helpers.Thread_state.with_named_thread (Context.get_task_name __context) (Context.get_task_id __context)
(fun () ->
Debug.with_thread_associated (Context.string_of_task __context)
(fun () ->
finally
(fun () ->
(* CP-982: promote tracking debug line to info status *)
if called_async then info "spawning a new thread to handle the current task%s" (Context.trackid ~with_brackets:true ~prefix:" " __context);
exec ?marshaller ?f_forward ~__context f)
(fun () ->
if not called_async
then Context.destroy __context
(* else debug "nothing more to process for this thread" *)
(* CP-982: promote tracking debug line to info status *)
if called_async then info "spawning a new thread to handle the current task%s" (Context.trackid ~with_brackets:true ~prefix:" " __context);
finally exec (fun () ->
if not called_async then Context.destroy __context
(* else debug "nothing more to process for this thread" *)
)
)
()
Expand Down Expand Up @@ -146,6 +147,7 @@ let exec_with_new_task ?http_other_config ?quiet ?subtask_of ?session_id ?task_i
let exec_with_forwarded_task ?http_other_config ?session_id ?origin task_id f =
exec_with_context
~__context:(Context.from_forwarded_task ?http_other_config ?session_id ?origin task_id)
~has_task:true
(fun ~__context -> f __context)

let exec_with_subtask ~__context ?task_in_database ?task_description task_name f =
Expand Down