diff --git a/ocaml/tests/test_cluster_host.ml b/ocaml/tests/test_cluster_host.ml index 960ac11e38a..ddc20b0cddb 100644 --- a/ocaml/tests/test_cluster_host.ml +++ b/ocaml/tests/test_cluster_host.ml @@ -122,7 +122,7 @@ let test_destroy_forbidden_when_sr_attached () = Alcotest.check_raises ("Should raise cluster_stack_in_use: [ " ^ cluster_stack ^ " ] ") Api_errors.(Server_error (cluster_stack_in_use, [ cluster_stack ])) - (fun () -> Xapi_cluster_host.destroy ~__context ~self:cluster_host) + (fun () -> Xapi_cluster_host.force_destroy ~__context ~self:cluster_host) type declare_dead_args = { dead_members: Cluster_interface.address list; diff --git a/ocaml/tests/test_clustering.ml b/ocaml/tests/test_clustering.ml index 7c90ffee609..28214e119f5 100644 --- a/ocaml/tests/test_clustering.ml +++ b/ocaml/tests/test_clustering.ml @@ -230,9 +230,9 @@ let nest_with_clustering_lock_if_needed ~__context ~timeout ~type1 ~type2 ~on_de ~timeout:timeout ~otherwise: on_deadlock (fun () -> - Xapi_clustering.with_clustering_lock_if_needed ~__context ~sr_sm_type:type1 + Xapi_clustering.with_clustering_lock_if_needed ~__context ~sr_sm_type:type1 __LOC__ (fun () -> - Xapi_clustering.with_clustering_lock_if_needed ~__context ~sr_sm_type:type2 + Xapi_clustering.with_clustering_lock_if_needed ~__context ~sr_sm_type:type2 __LOC__ (fun () -> on_no_deadlock () ) ) @@ -607,7 +607,7 @@ let test_pool_ha_cluster_stacks_with_ha_with_clustering () = (* Cluster.destroy should set HA cluster stack with HA disabled *) Xapi_cluster_host.enable ~__context ~self:cluster_host; - Xapi_cluster_host.destroy ~__context ~self:cluster_host; + (* can't destroy last cluster_host, must be done through destroying cluster *) Xapi_cluster.destroy ~__context ~self:cluster; (* Cluster.destroy should reset HA cluster stacks *) assert_cluster_stack_is default ~__context; diff --git a/ocaml/xapi/message_forwarding.ml b/ocaml/xapi/message_forwarding.ml index 62a44a9e3c7..9338ee3005b 100644 --- a/ocaml/xapi/message_forwarding.ml +++ b/ocaml/xapi/message_forwarding.ml @@ -591,12 +591,12 @@ module Forward = functor(Local: Custom_actions.CUSTOM_ACTIONS) -> struct Local.Pool.ha_failover_plan_exists ~__context ~n let ha_compute_max_host_failures_to_tolerate ~__context = - Xapi_clustering.with_clustering_lock_if_cluster_exists ~__context (fun () -> + Xapi_clustering.with_clustering_lock_if_cluster_exists ~__context __LOC__ (fun () -> info "Pool.ha_compute_max_host_failures_to_tolerate: pool = '%s'" (current_pool_uuid ~__context); Local.Pool.ha_compute_max_host_failures_to_tolerate ~__context) let ha_compute_hypothetical_max_host_failures_to_tolerate ~__context ~configuration = - Xapi_clustering.with_clustering_lock_if_cluster_exists ~__context (fun () -> + Xapi_clustering.with_clustering_lock_if_cluster_exists ~__context __LOC__ (fun () -> info "Pool.ha_compute_hypothetical_max_host_failures_to_tolerate: pool = '%s'; configuration = [ %s ]" (current_pool_uuid ~__context) (String.concat "; " (List.map (fun (vm, p) -> Ref.string_of vm ^ " " ^ p) configuration)); @@ -610,7 +610,7 @@ module Forward = functor(Local: Custom_actions.CUSTOM_ACTIONS) -> struct Local.Pool.ha_compute_vm_failover_plan ~__context ~failed_hosts ~failed_vms let set_ha_host_failures_to_tolerate ~__context ~self ~value = - Xapi_clustering.with_clustering_lock_if_cluster_exists ~__context (fun () -> + Xapi_clustering.with_clustering_lock_if_cluster_exists ~__context __LOC__ (fun () -> info "Pool.set_ha_host_failures_to_tolerate: pool = '%s'; value = %Ld" (pool_uuid ~__context self) value; Local.Pool.set_ha_host_failures_to_tolerate ~__context ~self ~value) diff --git a/ocaml/xapi/xapi_cluster.ml b/ocaml/xapi/xapi_cluster.ml index 85d98efaa07..baf0dd864cd 100644 --- a/ocaml/xapi/xapi_cluster.ml +++ b/ocaml/xapi/xapi_cluster.ml @@ -30,7 +30,7 @@ let create ~__context ~pIF ~cluster_stack ~pool_auto_join ~token_timeout ~token_ (* Currently we only support corosync. If we support more cluster stacks, this * should be replaced by a general function that checks the given cluster_stack *) Pool_features.assert_enabled ~__context ~f:Features.Corosync; - with_clustering_lock (fun () -> + with_clustering_lock __LOC__(fun () -> let dbg = Context.string_of_task __context in validate_params ~token_timeout ~token_timeout_coefficient; let cluster_ref = Ref.make () in @@ -74,7 +74,6 @@ let create ~__context ~pIF ~cluster_stack ~pool_auto_join ~token_timeout ~token_ ) let destroy ~__context ~self = - let dbg = Context.string_of_task __context in let cluster_hosts = Db.Cluster.get_cluster_hosts ~__context ~self in let cluster_host = match cluster_hosts with | [] -> None @@ -84,21 +83,13 @@ let destroy ~__context ~self = raise Api_errors.(Server_error(cluster_does_not_have_one_node, [string_of_int n])) in Xapi_stdext_monadic.Opt.iter (fun ch -> - assert_cluster_host_has_no_attached_sr_which_requires_cluster_stack ~__context ~self:ch + assert_cluster_host_has_no_attached_sr_which_requires_cluster_stack ~__context ~self:ch; + Xapi_cluster_host.force_destroy ~__context ~self:ch ) cluster_host; - let result = Cluster_client.LocalClient.destroy (rpc ~__context) dbg in - match result with - | Result.Ok () -> - Xapi_stdext_monadic.Opt.iter (fun ch -> - Db.Cluster_host.destroy ~__context ~self:ch - ) cluster_host; - Db.Cluster.destroy ~__context ~self; - D.debug "Cluster destroyed successfully"; - set_ha_cluster_stack ~__context; - Xapi_clustering.Daemon.disable ~__context - | Result.Error error -> - D.warn "Error occurred during Cluster.destroy"; - handle_error error + Db.Cluster.destroy ~__context ~self; + D.debug "Cluster destroyed successfully"; + set_ha_cluster_stack ~__context; + Xapi_clustering.Daemon.disable ~__context let get_network ~__context ~self = get_network_internal ~__context ~self diff --git a/ocaml/xapi/xapi_cluster_host.ml b/ocaml/xapi/xapi_cluster_host.ml index 7f074e97706..55295269e4d 100644 --- a/ocaml/xapi/xapi_cluster_host.ml +++ b/ocaml/xapi/xapi_cluster_host.ml @@ -50,7 +50,7 @@ let call_api_function_with_alert ~__context ~msg ~cls ~obj_uuid ~body (* Create xapi db object for cluster_host, resync_host calls clusterd *) let create_internal ~__context ~cluster ~host ~pIF : API.ref_Cluster_host = - with_clustering_lock (fun () -> + with_clustering_lock __LOC__ (fun () -> assert_operation_host_target_is_localhost ~__context ~host; assert_pif_attached_to ~host ~pIF ~__context; assert_cluster_host_can_be_created ~__context ~host; @@ -63,7 +63,7 @@ let create_internal ~__context ~cluster ~host ~pIF : API.ref_Cluster_host = (* Helper function atomically enables clusterd and joins the cluster_host *) let join_internal ~__context ~self = - with_clustering_lock (fun () -> + with_clustering_lock __LOC__ (fun () -> let pIF = Db.Cluster_host.get_PIF ~__context ~self in fix_pif_prerequisites ~__context pIF; @@ -127,43 +127,39 @@ let create ~__context ~cluster ~host ~pif = resync_host ~__context ~host; cluster_host +let destroy_op ~__context ~self meth = + with_clustering_lock __LOC__ (fun () -> + let dbg = Context.string_of_task __context in + let host = Db.Cluster_host.get_host ~__context ~self in + assert_operation_host_target_is_localhost ~__context ~host; + assert_cluster_host_has_no_attached_sr_which_requires_cluster_stack ~__context ~self; + let result = Cluster_client.LocalClient.destroy (rpc ~__context) dbg in + match result with + | Result.Ok () -> + Db.Cluster_host.destroy ~__context ~self; + debug "Cluster_host.%s was successful" meth; + Xapi_clustering.Daemon.disable ~__context + | Result.Error error -> + warn "Error occurred during Cluster_host.%s" meth; + handle_error error) + let force_destroy ~__context ~self = - let dbg = Context.string_of_task __context in - let host = Db.Cluster_host.get_host ~__context ~self in - assert_operation_host_target_is_localhost ~__context ~host; - assert_cluster_host_has_no_attached_sr_which_requires_cluster_stack ~__context ~self; - let result = Cluster_client.LocalClient.destroy (rpc ~__context) dbg in - match result with - | Result.Ok () -> - Db.Cluster_host.destroy ~__context ~self; - debug "Cluster_host.force_destroy was successful"; - Xapi_clustering.Daemon.disable ~__context - | Result.Error error -> - warn "Error occurred during Cluster_host.force_destroy"; - handle_error error + destroy_op ~__context ~self "force_destroy" let destroy ~__context ~self = - let dbg = Context.string_of_task __context in - let host = Db.Cluster_host.get_host ~__context ~self in - assert_operation_host_target_is_localhost ~__context ~host; - assert_cluster_host_has_no_attached_sr_which_requires_cluster_stack ~__context ~self; assert_cluster_host_enabled ~__context ~self ~expected:true; - let result = Cluster_client.LocalClient.leave (rpc ~__context) dbg in - match result with - (* can't include refs in case those were successfully destroyed *) - | Result.Ok () -> - Db.Cluster_host.destroy ~__context ~self; - debug "Cluster_host.destroy was successful"; - Xapi_clustering.Daemon.disable ~__context - | Result.Error error -> - warn "Error occurred during Cluster_host.destroy"; - handle_error error - + let cluster = Db.Cluster_host.get_cluster ~__context ~self in + let () = match Db.Cluster.get_cluster_hosts ~__context ~self:cluster with + | [ _ ] -> + raise Api_errors.(Server_error (cluster_does_not_have_one_node, ["1"])) + | _ -> () + in + destroy_op ~__context ~self "destroy" let ip_of_str str = Cluster_interface.IPv4 str let forget ~__context ~self = - with_clustering_lock (fun () -> + with_clustering_lock __LOC__ (fun () -> let dbg = Context.string_of_task __context in let cluster = Db.Cluster_host.get_cluster ~__context ~self in let pif = Db.Cluster_host.get_PIF ~__context ~self in @@ -187,7 +183,7 @@ let forget ~__context ~self = ) let enable ~__context ~self = - with_clustering_lock (fun () -> + with_clustering_lock __LOC__ (fun () -> let dbg = Context.string_of_task __context in let host = Db.Cluster_host.get_host ~__context ~self in assert_operation_host_target_is_localhost ~__context ~host; @@ -213,7 +209,7 @@ let enable ~__context ~self = ) let disable ~__context ~self = - with_clustering_lock (fun () -> + with_clustering_lock __LOC__ (fun () -> let dbg = Context.string_of_task __context in let host = Db.Cluster_host.get_host ~__context ~self in assert_operation_host_target_is_localhost ~__context ~host; diff --git a/ocaml/xapi/xapi_clustering.ml b/ocaml/xapi/xapi_clustering.ml index 4677837bb6e..a6149092c06 100644 --- a/ocaml/xapi/xapi_clustering.ml +++ b/ocaml/xapi/xapi_clustering.ml @@ -24,17 +24,17 @@ let set_ha_cluster_stack ~__context = Db.Pool.set_ha_cluster_stack ~__context ~self ~value (* host-local clustering lock *) -let clustering_lock_m = Mutex.create () - -let with_clustering_lock f = - debug "Trying to grab host-local clustering lock..."; - Stdext.Threadext.Mutex.execute clustering_lock_m - (fun () -> - Stdext.Pervasiveext.finally - (fun () -> - debug "Grabbed host-local clustering lock; executing function..."; - f ()) - (fun () -> debug "Function execution finished; returned host-local clustering lock.")) +let clustering_lock_m = Locking_helpers.Named_mutex.create "clustering" + +let with_clustering_lock where f = + debug "Trying to grab host-local clustering lock... (%s)" where; + Locking_helpers.Named_mutex.execute clustering_lock_m + (fun () -> Stdext.Pervasiveext.finally + (fun () -> + debug "Grabbed host-local clustering lock; executing function... (%s)" where; + f ()) + (fun () -> + debug "Function execution finished; returned host-local clustering lock. (%s)" where)) (* Note we have to add type annotations to network/host here because they're only used in the context of Db.PIF.get_records_where, and they're just strings there *) @@ -107,15 +107,15 @@ let assert_cluster_stack_valid ~cluster_stack = if not (List.mem cluster_stack Constants.supported_smapiv3_cluster_stacks) then raise Api_errors.(Server_error (invalid_cluster_stack, [ cluster_stack ])) -let with_clustering_lock_if_needed ~__context ~sr_sm_type f = +let with_clustering_lock_if_needed ~__context ~sr_sm_type where f = match get_required_cluster_stacks ~__context ~sr_sm_type with | [] -> f () - | _required_cluster_stacks -> with_clustering_lock f + | _required_cluster_stacks -> with_clustering_lock where f -let with_clustering_lock_if_cluster_exists ~__context f = +let with_clustering_lock_if_cluster_exists ~__context where f = match Db.Cluster.get_all ~__context with | [] -> f () - | _ -> with_clustering_lock f + | _ -> with_clustering_lock where f let find_cluster_host ~__context ~host = match Db.Cluster_host.get_refs_where ~__context @@ -199,6 +199,8 @@ let assert_cluster_host_has_no_attached_sr_which_requires_cluster_stack ~__conte then raise Api_errors.(Server_error (cluster_stack_in_use, [ cluster_stack ])) module Daemon = struct + let enabled = ref false + let maybe_call_script ~__context script params = match Context.get_test_clusterd_rpc __context with | Some _ -> debug "in unit test, not calling %s %s" script (String.concat " " params) @@ -211,11 +213,13 @@ module Daemon = struct maybe_call_script ~__context !Xapi_globs.firewall_port_config_script ["open"; port]; maybe_call_script ~__context "/usr/bin/systemctl" [ "enable"; service ]; maybe_call_script ~__context "/usr/bin/systemctl" [ "start"; service ]; + enabled := true; debug "Cluster daemon: enabled & started" let disable ~__context = let port = (string_of_int !Xapi_globs.xapi_clusterd_port) in debug "Disabling and stopping the clustering daemon"; + enabled := false; maybe_call_script ~__context "/usr/bin/systemctl" [ "disable"; service ]; maybe_call_script ~__context "/usr/bin/systemctl" [ "stop"; service ]; maybe_call_script ~__context !Xapi_globs.firewall_port_config_script ["close"; port]; @@ -228,6 +232,9 @@ end * Instead of returning an empty URL which wouldn't work just raise an * exception. *) let rpc ~__context = + if not !Daemon.enabled then + raise Api_errors.(Server_error(Api_errors.operation_not_allowed, + ["clustering daemon has not been started yet"])); match Context.get_test_clusterd_rpc __context with | Some rpc -> rpc | None -> diff --git a/ocaml/xapi/xapi_ha_vm_failover.ml b/ocaml/xapi/xapi_ha_vm_failover.ml index e6721b54fd5..03940bad6d7 100644 --- a/ocaml/xapi/xapi_ha_vm_failover.ml +++ b/ocaml/xapi/xapi_ha_vm_failover.ml @@ -332,7 +332,7 @@ let compute_max_host_failures_to_tolerate ~__context ?live_set ?protected_vms () (* Make sure the pool is marked as overcommitted and the appropriate alert is generated. Return true if something changed, false otherwise *) let mark_pool_as_overcommitted ~__context ~live_set = - Xapi_clustering.with_clustering_lock_if_cluster_exists ~__context (fun () -> + Xapi_clustering.with_clustering_lock_if_cluster_exists ~__context __LOC__ (fun () -> let pool = Helpers.get_pool ~__context in let overcommitted = Db.Pool.get_ha_overcommitted ~__context ~self:pool in diff --git a/ocaml/xapi/xapi_pbd.ml b/ocaml/xapi/xapi_pbd.ml index 03621f296af..464f611ff08 100644 --- a/ocaml/xapi/xapi_pbd.ml +++ b/ocaml/xapi/xapi_pbd.ml @@ -127,7 +127,7 @@ let plug ~__context ~self = (* This must NOT be done while holding the lock, because the functions that eventually get called also grab the clustering lock. We can call this unconditionally because the operations it calls should be idempotent. *) - Xapi_clustering.with_clustering_lock_if_needed ~__context ~sr_sm_type (fun () -> + Xapi_clustering.with_clustering_lock_if_needed ~__context ~sr_sm_type __LOC__ (fun () -> Xapi_clustering.assert_cluster_host_is_enabled_for_matching_sms ~__context ~host ~sr_sm_type; check_sharing_constraint ~__context ~sr; let dbg = Ref.string_of (Context.get_task_id __context) in @@ -149,7 +149,7 @@ let unplug ~__context ~self = if currently_attached then let sr = Db.PBD.get_SR ~__context ~self in let sr_sm_type = Db.SR.get_type ~__context ~self:sr in - Xapi_clustering.with_clustering_lock_if_needed ~__context ~sr_sm_type (fun () -> + Xapi_clustering.with_clustering_lock_if_needed ~__context ~sr_sm_type __LOC__ (fun () -> let host = Db.PBD.get_host ~__context ~self in if Db.Host.get_enabled ~__context ~self:host then abort_if_storage_attached_to_protected_vms ~__context ~self; diff --git a/ocaml/xapi/xapi_sr.ml b/ocaml/xapi/xapi_sr.ml index 29074c3bdbc..3ebfd309503 100644 --- a/ocaml/xapi/xapi_sr.ml +++ b/ocaml/xapi/xapi_sr.ml @@ -280,7 +280,8 @@ let probe_ext = (* Create actually makes the SR on disk, and introduces it into db, and creates PBD record for current host *) let create ~__context ~host ~device_config ~(physical_size:int64) ~name_label ~name_description ~_type ~content_type ~shared ~sm_config = - let pbds, sr_ref = Xapi_clustering.with_clustering_lock_if_needed ~__context ~sr_sm_type:_type (fun () -> + let pbds, sr_ref = + Xapi_clustering.with_clustering_lock_if_needed ~__context ~sr_sm_type:_type __LOC__ (fun () -> Xapi_clustering.assert_cluster_host_is_enabled_for_matching_sms ~__context ~host ~sr_sm_type:_type; Helpers.assert_rolling_upgrade_not_in_progress ~__context ; debug "SR.create name_label=%s sm_config=[ %s ]" name_label (String.concat "; " (List.map (fun (k, v) -> k ^ " = " ^ v) sm_config));