Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion ocaml/tests/test_cluster_host.ml
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ let test_destroy_forbidden_when_sr_attached () =
Alcotest.check_raises
("Should raise cluster_stack_in_use: [ " ^ cluster_stack ^ " ] ")
Api_errors.(Server_error (cluster_stack_in_use, [ cluster_stack ]))
(fun () -> Xapi_cluster_host.destroy ~__context ~self:cluster_host)
(fun () -> Xapi_cluster_host.force_destroy ~__context ~self:cluster_host)

type declare_dead_args = {
dead_members: Cluster_interface.address list;
Expand Down
6 changes: 3 additions & 3 deletions ocaml/tests/test_clustering.ml
Original file line number Diff line number Diff line change
Expand Up @@ -230,9 +230,9 @@ let nest_with_clustering_lock_if_needed ~__context ~timeout ~type1 ~type2 ~on_de
~timeout:timeout
~otherwise: on_deadlock
(fun () ->
Xapi_clustering.with_clustering_lock_if_needed ~__context ~sr_sm_type:type1
Xapi_clustering.with_clustering_lock_if_needed ~__context ~sr_sm_type:type1 __LOC__
(fun () ->
Xapi_clustering.with_clustering_lock_if_needed ~__context ~sr_sm_type:type2
Xapi_clustering.with_clustering_lock_if_needed ~__context ~sr_sm_type:type2 __LOC__
(fun () -> on_no_deadlock ()
)
)
Expand Down Expand Up @@ -607,7 +607,7 @@ let test_pool_ha_cluster_stacks_with_ha_with_clustering () =

(* Cluster.destroy should set HA cluster stack with HA disabled *)
Xapi_cluster_host.enable ~__context ~self:cluster_host;
Xapi_cluster_host.destroy ~__context ~self:cluster_host;
(* can't destroy last cluster_host, must be done through destroying cluster *)
Xapi_cluster.destroy ~__context ~self:cluster;
(* Cluster.destroy should reset HA cluster stacks *)
assert_cluster_stack_is default ~__context;
Expand Down
6 changes: 3 additions & 3 deletions ocaml/xapi/message_forwarding.ml
Original file line number Diff line number Diff line change
Expand Up @@ -591,12 +591,12 @@ module Forward = functor(Local: Custom_actions.CUSTOM_ACTIONS) -> struct
Local.Pool.ha_failover_plan_exists ~__context ~n

let ha_compute_max_host_failures_to_tolerate ~__context =
Xapi_clustering.with_clustering_lock_if_cluster_exists ~__context (fun () ->
Xapi_clustering.with_clustering_lock_if_cluster_exists ~__context __LOC__ (fun () ->
info "Pool.ha_compute_max_host_failures_to_tolerate: pool = '%s'" (current_pool_uuid ~__context);
Local.Pool.ha_compute_max_host_failures_to_tolerate ~__context)

let ha_compute_hypothetical_max_host_failures_to_tolerate ~__context ~configuration =
Xapi_clustering.with_clustering_lock_if_cluster_exists ~__context (fun () ->
Xapi_clustering.with_clustering_lock_if_cluster_exists ~__context __LOC__ (fun () ->
info "Pool.ha_compute_hypothetical_max_host_failures_to_tolerate: pool = '%s'; configuration = [ %s ]"
(current_pool_uuid ~__context)
(String.concat "; " (List.map (fun (vm, p) -> Ref.string_of vm ^ " " ^ p) configuration));
Expand All @@ -610,7 +610,7 @@ module Forward = functor(Local: Custom_actions.CUSTOM_ACTIONS) -> struct
Local.Pool.ha_compute_vm_failover_plan ~__context ~failed_hosts ~failed_vms

let set_ha_host_failures_to_tolerate ~__context ~self ~value =
Xapi_clustering.with_clustering_lock_if_cluster_exists ~__context (fun () ->
Xapi_clustering.with_clustering_lock_if_cluster_exists ~__context __LOC__ (fun () ->
info "Pool.set_ha_host_failures_to_tolerate: pool = '%s'; value = %Ld" (pool_uuid ~__context self) value;
Local.Pool.set_ha_host_failures_to_tolerate ~__context ~self ~value)

Expand Down
23 changes: 7 additions & 16 deletions ocaml/xapi/xapi_cluster.ml
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ let create ~__context ~pIF ~cluster_stack ~pool_auto_join ~token_timeout ~token_
(* Currently we only support corosync. If we support more cluster stacks, this
* should be replaced by a general function that checks the given cluster_stack *)
Pool_features.assert_enabled ~__context ~f:Features.Corosync;
with_clustering_lock (fun () ->
with_clustering_lock __LOC__(fun () ->
let dbg = Context.string_of_task __context in
validate_params ~token_timeout ~token_timeout_coefficient;
let cluster_ref = Ref.make () in
Expand Down Expand Up @@ -74,7 +74,6 @@ let create ~__context ~pIF ~cluster_stack ~pool_auto_join ~token_timeout ~token_
)

let destroy ~__context ~self =
let dbg = Context.string_of_task __context in
let cluster_hosts = Db.Cluster.get_cluster_hosts ~__context ~self in
let cluster_host = match cluster_hosts with
| [] -> None
Expand All @@ -84,21 +83,13 @@ let destroy ~__context ~self =
raise Api_errors.(Server_error(cluster_does_not_have_one_node, [string_of_int n]))
in
Xapi_stdext_monadic.Opt.iter (fun ch ->
assert_cluster_host_has_no_attached_sr_which_requires_cluster_stack ~__context ~self:ch
assert_cluster_host_has_no_attached_sr_which_requires_cluster_stack ~__context ~self:ch;
Xapi_cluster_host.force_destroy ~__context ~self:ch
) cluster_host;
let result = Cluster_client.LocalClient.destroy (rpc ~__context) dbg in
match result with
| Result.Ok () ->
Xapi_stdext_monadic.Opt.iter (fun ch ->
Db.Cluster_host.destroy ~__context ~self:ch
) cluster_host;
Db.Cluster.destroy ~__context ~self;
D.debug "Cluster destroyed successfully";
set_ha_cluster_stack ~__context;
Xapi_clustering.Daemon.disable ~__context
| Result.Error error ->
D.warn "Error occurred during Cluster.destroy";
handle_error error
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is good, we were unnecessarily duplicating code.

Db.Cluster.destroy ~__context ~self;
D.debug "Cluster destroyed successfully";
set_ha_cluster_stack ~__context;
Xapi_clustering.Daemon.disable ~__context

let get_network ~__context ~self =
get_network_internal ~__context ~self
Expand Down
62 changes: 29 additions & 33 deletions ocaml/xapi/xapi_cluster_host.ml
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ let call_api_function_with_alert ~__context ~msg ~cls ~obj_uuid ~body

(* Create xapi db object for cluster_host, resync_host calls clusterd *)
let create_internal ~__context ~cluster ~host ~pIF : API.ref_Cluster_host =
with_clustering_lock (fun () ->
with_clustering_lock __LOC__ (fun () ->
assert_operation_host_target_is_localhost ~__context ~host;
assert_pif_attached_to ~host ~pIF ~__context;
assert_cluster_host_can_be_created ~__context ~host;
Expand All @@ -63,7 +63,7 @@ let create_internal ~__context ~cluster ~host ~pIF : API.ref_Cluster_host =

(* Helper function atomically enables clusterd and joins the cluster_host *)
let join_internal ~__context ~self =
with_clustering_lock (fun () ->
with_clustering_lock __LOC__ (fun () ->

let pIF = Db.Cluster_host.get_PIF ~__context ~self in
fix_pif_prerequisites ~__context pIF;
Expand Down Expand Up @@ -127,43 +127,39 @@ let create ~__context ~cluster ~host ~pif =
resync_host ~__context ~host;
cluster_host

let destroy_op ~__context ~self meth =
with_clustering_lock __LOC__ (fun () ->
let dbg = Context.string_of_task __context in
let host = Db.Cluster_host.get_host ~__context ~self in
assert_operation_host_target_is_localhost ~__context ~host;
assert_cluster_host_has_no_attached_sr_which_requires_cluster_stack ~__context ~self;
let result = Cluster_client.LocalClient.destroy (rpc ~__context) dbg in
match result with
| Result.Ok () ->
Db.Cluster_host.destroy ~__context ~self;
debug "Cluster_host.%s was successful" meth;
Xapi_clustering.Daemon.disable ~__context
| Result.Error error ->
warn "Error occurred during Cluster_host.%s" meth;
handle_error error)

let force_destroy ~__context ~self =
let dbg = Context.string_of_task __context in
let host = Db.Cluster_host.get_host ~__context ~self in
assert_operation_host_target_is_localhost ~__context ~host;
assert_cluster_host_has_no_attached_sr_which_requires_cluster_stack ~__context ~self;
let result = Cluster_client.LocalClient.destroy (rpc ~__context) dbg in
match result with
| Result.Ok () ->
Db.Cluster_host.destroy ~__context ~self;
debug "Cluster_host.force_destroy was successful";
Xapi_clustering.Daemon.disable ~__context
| Result.Error error ->
warn "Error occurred during Cluster_host.force_destroy";
handle_error error
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also nicely done!

destroy_op ~__context ~self "force_destroy"

let destroy ~__context ~self =
let dbg = Context.string_of_task __context in
let host = Db.Cluster_host.get_host ~__context ~self in
assert_operation_host_target_is_localhost ~__context ~host;
assert_cluster_host_has_no_attached_sr_which_requires_cluster_stack ~__context ~self;
assert_cluster_host_enabled ~__context ~self ~expected:true;
let result = Cluster_client.LocalClient.leave (rpc ~__context) dbg in
match result with
(* can't include refs in case those were successfully destroyed *)
| Result.Ok () ->
Db.Cluster_host.destroy ~__context ~self;
debug "Cluster_host.destroy was successful";
Xapi_clustering.Daemon.disable ~__context
| Result.Error error ->
warn "Error occurred during Cluster_host.destroy";
handle_error error

let cluster = Db.Cluster_host.get_cluster ~__context ~self in
let () = match Db.Cluster.get_cluster_hosts ~__context ~self:cluster with
| [ _ ] ->
raise Api_errors.(Server_error (cluster_does_not_have_one_node, ["1"]))
| _ -> ()
in
destroy_op ~__context ~self "destroy"

let ip_of_str str = Cluster_interface.IPv4 str

let forget ~__context ~self =
with_clustering_lock (fun () ->
with_clustering_lock __LOC__ (fun () ->
let dbg = Context.string_of_task __context in
let cluster = Db.Cluster_host.get_cluster ~__context ~self in
let pif = Db.Cluster_host.get_PIF ~__context ~self in
Expand All @@ -187,7 +183,7 @@ let forget ~__context ~self =
)

let enable ~__context ~self =
with_clustering_lock (fun () ->
with_clustering_lock __LOC__ (fun () ->
let dbg = Context.string_of_task __context in
let host = Db.Cluster_host.get_host ~__context ~self in
assert_operation_host_target_is_localhost ~__context ~host;
Expand All @@ -213,7 +209,7 @@ let enable ~__context ~self =
)

let disable ~__context ~self =
with_clustering_lock (fun () ->
with_clustering_lock __LOC__ (fun () ->
let dbg = Context.string_of_task __context in
let host = Db.Cluster_host.get_host ~__context ~self in
assert_operation_host_target_is_localhost ~__context ~host;
Expand Down
37 changes: 22 additions & 15 deletions ocaml/xapi/xapi_clustering.ml
Original file line number Diff line number Diff line change
Expand Up @@ -24,17 +24,17 @@ let set_ha_cluster_stack ~__context =
Db.Pool.set_ha_cluster_stack ~__context ~self ~value

(* host-local clustering lock *)
let clustering_lock_m = Mutex.create ()

let with_clustering_lock f =
debug "Trying to grab host-local clustering lock...";
Stdext.Threadext.Mutex.execute clustering_lock_m
(fun () ->
Stdext.Pervasiveext.finally
(fun () ->
debug "Grabbed host-local clustering lock; executing function...";
f ())
(fun () -> debug "Function execution finished; returned host-local clustering lock."))
let clustering_lock_m = Locking_helpers.Named_mutex.create "clustering"

let with_clustering_lock where f =
debug "Trying to grab host-local clustering lock... (%s)" where;
Locking_helpers.Named_mutex.execute clustering_lock_m
(fun () -> Stdext.Pervasiveext.finally
(fun () ->
debug "Grabbed host-local clustering lock; executing function... (%s)" where;
f ())
(fun () ->
debug "Function execution finished; returned host-local clustering lock. (%s)" where))

(* Note we have to add type annotations to network/host here because they're only used in the context of
Db.PIF.get_records_where, and they're just strings there *)
Expand Down Expand Up @@ -107,15 +107,15 @@ let assert_cluster_stack_valid ~cluster_stack =
if not (List.mem cluster_stack Constants.supported_smapiv3_cluster_stacks)
then raise Api_errors.(Server_error (invalid_cluster_stack, [ cluster_stack ]))

let with_clustering_lock_if_needed ~__context ~sr_sm_type f =
let with_clustering_lock_if_needed ~__context ~sr_sm_type where f =
match get_required_cluster_stacks ~__context ~sr_sm_type with
| [] -> f ()
| _required_cluster_stacks -> with_clustering_lock f
| _required_cluster_stacks -> with_clustering_lock where f

let with_clustering_lock_if_cluster_exists ~__context f =
let with_clustering_lock_if_cluster_exists ~__context where f =
match Db.Cluster.get_all ~__context with
| [] -> f ()
| _ -> with_clustering_lock f
| _ -> with_clustering_lock where f

let find_cluster_host ~__context ~host =
match Db.Cluster_host.get_refs_where ~__context
Expand Down Expand Up @@ -199,6 +199,8 @@ let assert_cluster_host_has_no_attached_sr_which_requires_cluster_stack ~__conte
then raise Api_errors.(Server_error (cluster_stack_in_use, [ cluster_stack ]))

module Daemon = struct
let enabled = ref false

let maybe_call_script ~__context script params =
match Context.get_test_clusterd_rpc __context with
| Some _ -> debug "in unit test, not calling %s %s" script (String.concat " " params)
Expand All @@ -211,11 +213,13 @@ module Daemon = struct
maybe_call_script ~__context !Xapi_globs.firewall_port_config_script ["open"; port];
maybe_call_script ~__context "/usr/bin/systemctl" [ "enable"; service ];
maybe_call_script ~__context "/usr/bin/systemctl" [ "start"; service ];
enabled := true;
debug "Cluster daemon: enabled & started"

let disable ~__context =
let port = (string_of_int !Xapi_globs.xapi_clusterd_port) in
debug "Disabling and stopping the clustering daemon";
enabled := false;
maybe_call_script ~__context "/usr/bin/systemctl" [ "disable"; service ];
maybe_call_script ~__context "/usr/bin/systemctl" [ "stop"; service ];
maybe_call_script ~__context !Xapi_globs.firewall_port_config_script ["close"; port];
Expand All @@ -228,6 +232,9 @@ end
* Instead of returning an empty URL which wouldn't work just raise an
* exception. *)
let rpc ~__context =
if not !Daemon.enabled then
raise Api_errors.(Server_error(Api_errors.operation_not_allowed,
["clustering daemon has not been started yet"]));
match Context.get_test_clusterd_rpc __context with
| Some rpc -> rpc
| None ->
Expand Down
2 changes: 1 addition & 1 deletion ocaml/xapi/xapi_ha_vm_failover.ml
Original file line number Diff line number Diff line change
Expand Up @@ -332,7 +332,7 @@ let compute_max_host_failures_to_tolerate ~__context ?live_set ?protected_vms ()
(* Make sure the pool is marked as overcommitted and the appropriate alert is generated. Return
true if something changed, false otherwise *)
let mark_pool_as_overcommitted ~__context ~live_set =
Xapi_clustering.with_clustering_lock_if_cluster_exists ~__context (fun () ->
Xapi_clustering.with_clustering_lock_if_cluster_exists ~__context __LOC__ (fun () ->
let pool = Helpers.get_pool ~__context in

let overcommitted = Db.Pool.get_ha_overcommitted ~__context ~self:pool in
Expand Down
4 changes: 2 additions & 2 deletions ocaml/xapi/xapi_pbd.ml
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ let plug ~__context ~self =
(* This must NOT be done while holding the lock, because the functions that
eventually get called also grab the clustering lock. We can call this
unconditionally because the operations it calls should be idempotent. *)
Xapi_clustering.with_clustering_lock_if_needed ~__context ~sr_sm_type (fun () ->
Xapi_clustering.with_clustering_lock_if_needed ~__context ~sr_sm_type __LOC__ (fun () ->
Xapi_clustering.assert_cluster_host_is_enabled_for_matching_sms ~__context ~host ~sr_sm_type;
check_sharing_constraint ~__context ~sr;
let dbg = Ref.string_of (Context.get_task_id __context) in
Expand All @@ -149,7 +149,7 @@ let unplug ~__context ~self =
if currently_attached then
let sr = Db.PBD.get_SR ~__context ~self in
let sr_sm_type = Db.SR.get_type ~__context ~self:sr in
Xapi_clustering.with_clustering_lock_if_needed ~__context ~sr_sm_type (fun () ->
Xapi_clustering.with_clustering_lock_if_needed ~__context ~sr_sm_type __LOC__ (fun () ->
let host = Db.PBD.get_host ~__context ~self in
if Db.Host.get_enabled ~__context ~self:host
then abort_if_storage_attached_to_protected_vms ~__context ~self;
Expand Down
3 changes: 2 additions & 1 deletion ocaml/xapi/xapi_sr.ml
Original file line number Diff line number Diff line change
Expand Up @@ -280,7 +280,8 @@ let probe_ext =
(* Create actually makes the SR on disk, and introduces it into db, and creates PBD record for current host *)
let create ~__context ~host ~device_config ~(physical_size:int64) ~name_label ~name_description
~_type ~content_type ~shared ~sm_config =
let pbds, sr_ref = Xapi_clustering.with_clustering_lock_if_needed ~__context ~sr_sm_type:_type (fun () ->
let pbds, sr_ref =
Xapi_clustering.with_clustering_lock_if_needed ~__context ~sr_sm_type:_type __LOC__ (fun () ->
Xapi_clustering.assert_cluster_host_is_enabled_for_matching_sms ~__context ~host ~sr_sm_type:_type;
Helpers.assert_rolling_upgrade_not_in_progress ~__context ;
debug "SR.create name_label=%s sm_config=[ %s ]" name_label (String.concat "; " (List.map (fun (k, v) -> k ^ " = " ^ v) sm_config));
Expand Down