From 63bb125180626717fa5919640752b8d3c0eb1884 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Edwin=20T=C3=B6r=C3=B6k?= Date: Thu, 31 May 2018 16:47:01 +0000 Subject: [PATCH 1/3] CP-28406: use named mutex for clustering MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Locking_helpers provides a nice list of active locks as part of `xe host-get-thread-diagnostics`, which helps debugging. Signed-off-by: Edwin Török --- ocaml/xapi/xapi_clustering.ml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/ocaml/xapi/xapi_clustering.ml b/ocaml/xapi/xapi_clustering.ml index 4677837bb6e..3724c9d394b 100644 --- a/ocaml/xapi/xapi_clustering.ml +++ b/ocaml/xapi/xapi_clustering.ml @@ -24,11 +24,11 @@ let set_ha_cluster_stack ~__context = Db.Pool.set_ha_cluster_stack ~__context ~self ~value (* host-local clustering lock *) -let clustering_lock_m = Mutex.create () +let clustering_lock_m = Locking_helpers.Named_mutex.create "clustering" let with_clustering_lock f = debug "Trying to grab host-local clustering lock..."; - Stdext.Threadext.Mutex.execute clustering_lock_m + Locking_helpers.Named_mutex.execute clustering_lock_m (fun () -> Stdext.Pervasiveext.finally (fun () -> From c0b51a6393e63ec37d36c31f5d82f078c6dd7980 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Edwin=20T=C3=B6r=C3=B6k?= Date: Thu, 31 May 2018 16:57:17 +0000 Subject: [PATCH 2/3] CP-28406: log caller of locking function MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This is quite useful for tracing what Xapi_cluster_host methods are called, and together with the previous commit for tracking deadlocks. Signed-off-by: Edwin Török --- ocaml/tests/test_clustering.ml | 4 ++-- ocaml/xapi/message_forwarding.ml | 6 +++--- ocaml/xapi/xapi_cluster.ml | 2 +- ocaml/xapi/xapi_cluster_host.ml | 10 +++++----- ocaml/xapi/xapi_clustering.ml | 24 ++++++++++++------------ ocaml/xapi/xapi_ha_vm_failover.ml | 2 +- ocaml/xapi/xapi_pbd.ml | 4 ++-- ocaml/xapi/xapi_sr.ml | 3 ++- 8 files changed, 28 insertions(+), 27 deletions(-) diff --git a/ocaml/tests/test_clustering.ml b/ocaml/tests/test_clustering.ml index 7c90ffee609..28597e85f20 100644 --- a/ocaml/tests/test_clustering.ml +++ b/ocaml/tests/test_clustering.ml @@ -230,9 +230,9 @@ let nest_with_clustering_lock_if_needed ~__context ~timeout ~type1 ~type2 ~on_de ~timeout:timeout ~otherwise: on_deadlock (fun () -> - Xapi_clustering.with_clustering_lock_if_needed ~__context ~sr_sm_type:type1 + Xapi_clustering.with_clustering_lock_if_needed ~__context ~sr_sm_type:type1 __LOC__ (fun () -> - Xapi_clustering.with_clustering_lock_if_needed ~__context ~sr_sm_type:type2 + Xapi_clustering.with_clustering_lock_if_needed ~__context ~sr_sm_type:type2 __LOC__ (fun () -> on_no_deadlock () ) ) diff --git a/ocaml/xapi/message_forwarding.ml b/ocaml/xapi/message_forwarding.ml index 62a44a9e3c7..9338ee3005b 100644 --- a/ocaml/xapi/message_forwarding.ml +++ b/ocaml/xapi/message_forwarding.ml @@ -591,12 +591,12 @@ module Forward = functor(Local: Custom_actions.CUSTOM_ACTIONS) -> struct Local.Pool.ha_failover_plan_exists ~__context ~n let ha_compute_max_host_failures_to_tolerate ~__context = - Xapi_clustering.with_clustering_lock_if_cluster_exists ~__context (fun () -> + Xapi_clustering.with_clustering_lock_if_cluster_exists ~__context __LOC__ (fun () -> info "Pool.ha_compute_max_host_failures_to_tolerate: pool = '%s'" (current_pool_uuid ~__context); Local.Pool.ha_compute_max_host_failures_to_tolerate ~__context) let ha_compute_hypothetical_max_host_failures_to_tolerate ~__context ~configuration = - Xapi_clustering.with_clustering_lock_if_cluster_exists ~__context (fun () -> + Xapi_clustering.with_clustering_lock_if_cluster_exists ~__context __LOC__ (fun () -> info "Pool.ha_compute_hypothetical_max_host_failures_to_tolerate: pool = '%s'; configuration = [ %s ]" (current_pool_uuid ~__context) (String.concat "; " (List.map (fun (vm, p) -> Ref.string_of vm ^ " " ^ p) configuration)); @@ -610,7 +610,7 @@ module Forward = functor(Local: Custom_actions.CUSTOM_ACTIONS) -> struct Local.Pool.ha_compute_vm_failover_plan ~__context ~failed_hosts ~failed_vms let set_ha_host_failures_to_tolerate ~__context ~self ~value = - Xapi_clustering.with_clustering_lock_if_cluster_exists ~__context (fun () -> + Xapi_clustering.with_clustering_lock_if_cluster_exists ~__context __LOC__ (fun () -> info "Pool.set_ha_host_failures_to_tolerate: pool = '%s'; value = %Ld" (pool_uuid ~__context self) value; Local.Pool.set_ha_host_failures_to_tolerate ~__context ~self ~value) diff --git a/ocaml/xapi/xapi_cluster.ml b/ocaml/xapi/xapi_cluster.ml index 85d98efaa07..3f59a35201f 100644 --- a/ocaml/xapi/xapi_cluster.ml +++ b/ocaml/xapi/xapi_cluster.ml @@ -30,7 +30,7 @@ let create ~__context ~pIF ~cluster_stack ~pool_auto_join ~token_timeout ~token_ (* Currently we only support corosync. If we support more cluster stacks, this * should be replaced by a general function that checks the given cluster_stack *) Pool_features.assert_enabled ~__context ~f:Features.Corosync; - with_clustering_lock (fun () -> + with_clustering_lock __LOC__(fun () -> let dbg = Context.string_of_task __context in validate_params ~token_timeout ~token_timeout_coefficient; let cluster_ref = Ref.make () in diff --git a/ocaml/xapi/xapi_cluster_host.ml b/ocaml/xapi/xapi_cluster_host.ml index 7f074e97706..edc7369e8cf 100644 --- a/ocaml/xapi/xapi_cluster_host.ml +++ b/ocaml/xapi/xapi_cluster_host.ml @@ -50,7 +50,7 @@ let call_api_function_with_alert ~__context ~msg ~cls ~obj_uuid ~body (* Create xapi db object for cluster_host, resync_host calls clusterd *) let create_internal ~__context ~cluster ~host ~pIF : API.ref_Cluster_host = - with_clustering_lock (fun () -> + with_clustering_lock __LOC__ (fun () -> assert_operation_host_target_is_localhost ~__context ~host; assert_pif_attached_to ~host ~pIF ~__context; assert_cluster_host_can_be_created ~__context ~host; @@ -63,7 +63,7 @@ let create_internal ~__context ~cluster ~host ~pIF : API.ref_Cluster_host = (* Helper function atomically enables clusterd and joins the cluster_host *) let join_internal ~__context ~self = - with_clustering_lock (fun () -> + with_clustering_lock __LOC__ (fun () -> let pIF = Db.Cluster_host.get_PIF ~__context ~self in fix_pif_prerequisites ~__context pIF; @@ -163,7 +163,7 @@ let destroy ~__context ~self = let ip_of_str str = Cluster_interface.IPv4 str let forget ~__context ~self = - with_clustering_lock (fun () -> + with_clustering_lock __LOC__ (fun () -> let dbg = Context.string_of_task __context in let cluster = Db.Cluster_host.get_cluster ~__context ~self in let pif = Db.Cluster_host.get_PIF ~__context ~self in @@ -187,7 +187,7 @@ let forget ~__context ~self = ) let enable ~__context ~self = - with_clustering_lock (fun () -> + with_clustering_lock __LOC__ (fun () -> let dbg = Context.string_of_task __context in let host = Db.Cluster_host.get_host ~__context ~self in assert_operation_host_target_is_localhost ~__context ~host; @@ -213,7 +213,7 @@ let enable ~__context ~self = ) let disable ~__context ~self = - with_clustering_lock (fun () -> + with_clustering_lock __LOC__ (fun () -> let dbg = Context.string_of_task __context in let host = Db.Cluster_host.get_host ~__context ~self in assert_operation_host_target_is_localhost ~__context ~host; diff --git a/ocaml/xapi/xapi_clustering.ml b/ocaml/xapi/xapi_clustering.ml index 3724c9d394b..7292a921fbb 100644 --- a/ocaml/xapi/xapi_clustering.ml +++ b/ocaml/xapi/xapi_clustering.ml @@ -26,15 +26,15 @@ let set_ha_cluster_stack ~__context = (* host-local clustering lock *) let clustering_lock_m = Locking_helpers.Named_mutex.create "clustering" -let with_clustering_lock f = - debug "Trying to grab host-local clustering lock..."; +let with_clustering_lock where f = + debug "Trying to grab host-local clustering lock... (%s)" where; Locking_helpers.Named_mutex.execute clustering_lock_m - (fun () -> - Stdext.Pervasiveext.finally - (fun () -> - debug "Grabbed host-local clustering lock; executing function..."; - f ()) - (fun () -> debug "Function execution finished; returned host-local clustering lock.")) + (fun () -> Stdext.Pervasiveext.finally + (fun () -> + debug "Grabbed host-local clustering lock; executing function... (%s)" where; + f ()) + (fun () -> + debug "Function execution finished; returned host-local clustering lock. (%s)" where)) (* Note we have to add type annotations to network/host here because they're only used in the context of Db.PIF.get_records_where, and they're just strings there *) @@ -107,15 +107,15 @@ let assert_cluster_stack_valid ~cluster_stack = if not (List.mem cluster_stack Constants.supported_smapiv3_cluster_stacks) then raise Api_errors.(Server_error (invalid_cluster_stack, [ cluster_stack ])) -let with_clustering_lock_if_needed ~__context ~sr_sm_type f = +let with_clustering_lock_if_needed ~__context ~sr_sm_type where f = match get_required_cluster_stacks ~__context ~sr_sm_type with | [] -> f () - | _required_cluster_stacks -> with_clustering_lock f + | _required_cluster_stacks -> with_clustering_lock where f -let with_clustering_lock_if_cluster_exists ~__context f = +let with_clustering_lock_if_cluster_exists ~__context where f = match Db.Cluster.get_all ~__context with | [] -> f () - | _ -> with_clustering_lock f + | _ -> with_clustering_lock where f let find_cluster_host ~__context ~host = match Db.Cluster_host.get_refs_where ~__context diff --git a/ocaml/xapi/xapi_ha_vm_failover.ml b/ocaml/xapi/xapi_ha_vm_failover.ml index e6721b54fd5..03940bad6d7 100644 --- a/ocaml/xapi/xapi_ha_vm_failover.ml +++ b/ocaml/xapi/xapi_ha_vm_failover.ml @@ -332,7 +332,7 @@ let compute_max_host_failures_to_tolerate ~__context ?live_set ?protected_vms () (* Make sure the pool is marked as overcommitted and the appropriate alert is generated. Return true if something changed, false otherwise *) let mark_pool_as_overcommitted ~__context ~live_set = - Xapi_clustering.with_clustering_lock_if_cluster_exists ~__context (fun () -> + Xapi_clustering.with_clustering_lock_if_cluster_exists ~__context __LOC__ (fun () -> let pool = Helpers.get_pool ~__context in let overcommitted = Db.Pool.get_ha_overcommitted ~__context ~self:pool in diff --git a/ocaml/xapi/xapi_pbd.ml b/ocaml/xapi/xapi_pbd.ml index 03621f296af..464f611ff08 100644 --- a/ocaml/xapi/xapi_pbd.ml +++ b/ocaml/xapi/xapi_pbd.ml @@ -127,7 +127,7 @@ let plug ~__context ~self = (* This must NOT be done while holding the lock, because the functions that eventually get called also grab the clustering lock. We can call this unconditionally because the operations it calls should be idempotent. *) - Xapi_clustering.with_clustering_lock_if_needed ~__context ~sr_sm_type (fun () -> + Xapi_clustering.with_clustering_lock_if_needed ~__context ~sr_sm_type __LOC__ (fun () -> Xapi_clustering.assert_cluster_host_is_enabled_for_matching_sms ~__context ~host ~sr_sm_type; check_sharing_constraint ~__context ~sr; let dbg = Ref.string_of (Context.get_task_id __context) in @@ -149,7 +149,7 @@ let unplug ~__context ~self = if currently_attached then let sr = Db.PBD.get_SR ~__context ~self in let sr_sm_type = Db.SR.get_type ~__context ~self:sr in - Xapi_clustering.with_clustering_lock_if_needed ~__context ~sr_sm_type (fun () -> + Xapi_clustering.with_clustering_lock_if_needed ~__context ~sr_sm_type __LOC__ (fun () -> let host = Db.PBD.get_host ~__context ~self in if Db.Host.get_enabled ~__context ~self:host then abort_if_storage_attached_to_protected_vms ~__context ~self; diff --git a/ocaml/xapi/xapi_sr.ml b/ocaml/xapi/xapi_sr.ml index 29074c3bdbc..3ebfd309503 100644 --- a/ocaml/xapi/xapi_sr.ml +++ b/ocaml/xapi/xapi_sr.ml @@ -280,7 +280,8 @@ let probe_ext = (* Create actually makes the SR on disk, and introduces it into db, and creates PBD record for current host *) let create ~__context ~host ~device_config ~(physical_size:int64) ~name_label ~name_description ~_type ~content_type ~shared ~sm_config = - let pbds, sr_ref = Xapi_clustering.with_clustering_lock_if_needed ~__context ~sr_sm_type:_type (fun () -> + let pbds, sr_ref = + Xapi_clustering.with_clustering_lock_if_needed ~__context ~sr_sm_type:_type __LOC__ (fun () -> Xapi_clustering.assert_cluster_host_is_enabled_for_matching_sms ~__context ~host ~sr_sm_type:_type; Helpers.assert_rolling_upgrade_not_in_progress ~__context ; debug "SR.create name_label=%s sm_config=[ %s ]" name_label (String.concat "; " (List.map (fun (k, v) -> k ^ " = " ^ v) sm_config)); From 171bbd34e50e900abab8de3ae690fe9abde65c18 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Edwin=20T=C3=B6r=C3=B6k?= Date: Thu, 31 May 2018 17:35:18 +0000 Subject: [PATCH 3/3] CP-28406: refactor Cluster/Cluster_host.destroy and forbid RPCs if clustering daemon is down MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Cluster.destroy did not work if you destroyed the last cluster host (like some of the unit tests actually did). Cluster_host.destroy on the last node is special: there is no cluster after you leave (leave would fail in reality), so just destroy the cluster. Refactor all 3 destroy operations into one, choose automatically based on number of hosts. Could've introduced a new API error to forbid destroying the last cluster host, but it is better if XAPI is able to automatically do the right thing than to tell the user it should call some other API instead. Also with Cluster_host.force_destroy you could have already ended up in a situation where you have no cluster hosts and want to destroy the cluster, which would hang indefinitely because the daemon was stopped. We always try to enable the daemon on startup, so keep track on whether we think it should be running, and if we know we stopped it then just raise an error when trying to do an RPC. This is useful in debugging situations where we try to send RPCs too early too (e.g. before we started the daemon). Signed-off-by: Edwin Török --- ocaml/tests/test_cluster_host.ml | 2 +- ocaml/tests/test_clustering.ml | 2 +- ocaml/xapi/xapi_cluster.ml | 21 ++++--------- ocaml/xapi/xapi_cluster_host.ml | 52 +++++++++++++++----------------- ocaml/xapi/xapi_clustering.ml | 7 +++++ 5 files changed, 39 insertions(+), 45 deletions(-) diff --git a/ocaml/tests/test_cluster_host.ml b/ocaml/tests/test_cluster_host.ml index 960ac11e38a..ddc20b0cddb 100644 --- a/ocaml/tests/test_cluster_host.ml +++ b/ocaml/tests/test_cluster_host.ml @@ -122,7 +122,7 @@ let test_destroy_forbidden_when_sr_attached () = Alcotest.check_raises ("Should raise cluster_stack_in_use: [ " ^ cluster_stack ^ " ] ") Api_errors.(Server_error (cluster_stack_in_use, [ cluster_stack ])) - (fun () -> Xapi_cluster_host.destroy ~__context ~self:cluster_host) + (fun () -> Xapi_cluster_host.force_destroy ~__context ~self:cluster_host) type declare_dead_args = { dead_members: Cluster_interface.address list; diff --git a/ocaml/tests/test_clustering.ml b/ocaml/tests/test_clustering.ml index 28597e85f20..28214e119f5 100644 --- a/ocaml/tests/test_clustering.ml +++ b/ocaml/tests/test_clustering.ml @@ -607,7 +607,7 @@ let test_pool_ha_cluster_stacks_with_ha_with_clustering () = (* Cluster.destroy should set HA cluster stack with HA disabled *) Xapi_cluster_host.enable ~__context ~self:cluster_host; - Xapi_cluster_host.destroy ~__context ~self:cluster_host; + (* can't destroy last cluster_host, must be done through destroying cluster *) Xapi_cluster.destroy ~__context ~self:cluster; (* Cluster.destroy should reset HA cluster stacks *) assert_cluster_stack_is default ~__context; diff --git a/ocaml/xapi/xapi_cluster.ml b/ocaml/xapi/xapi_cluster.ml index 3f59a35201f..baf0dd864cd 100644 --- a/ocaml/xapi/xapi_cluster.ml +++ b/ocaml/xapi/xapi_cluster.ml @@ -74,7 +74,6 @@ let create ~__context ~pIF ~cluster_stack ~pool_auto_join ~token_timeout ~token_ ) let destroy ~__context ~self = - let dbg = Context.string_of_task __context in let cluster_hosts = Db.Cluster.get_cluster_hosts ~__context ~self in let cluster_host = match cluster_hosts with | [] -> None @@ -84,21 +83,13 @@ let destroy ~__context ~self = raise Api_errors.(Server_error(cluster_does_not_have_one_node, [string_of_int n])) in Xapi_stdext_monadic.Opt.iter (fun ch -> - assert_cluster_host_has_no_attached_sr_which_requires_cluster_stack ~__context ~self:ch + assert_cluster_host_has_no_attached_sr_which_requires_cluster_stack ~__context ~self:ch; + Xapi_cluster_host.force_destroy ~__context ~self:ch ) cluster_host; - let result = Cluster_client.LocalClient.destroy (rpc ~__context) dbg in - match result with - | Result.Ok () -> - Xapi_stdext_monadic.Opt.iter (fun ch -> - Db.Cluster_host.destroy ~__context ~self:ch - ) cluster_host; - Db.Cluster.destroy ~__context ~self; - D.debug "Cluster destroyed successfully"; - set_ha_cluster_stack ~__context; - Xapi_clustering.Daemon.disable ~__context - | Result.Error error -> - D.warn "Error occurred during Cluster.destroy"; - handle_error error + Db.Cluster.destroy ~__context ~self; + D.debug "Cluster destroyed successfully"; + set_ha_cluster_stack ~__context; + Xapi_clustering.Daemon.disable ~__context let get_network ~__context ~self = get_network_internal ~__context ~self diff --git a/ocaml/xapi/xapi_cluster_host.ml b/ocaml/xapi/xapi_cluster_host.ml index edc7369e8cf..55295269e4d 100644 --- a/ocaml/xapi/xapi_cluster_host.ml +++ b/ocaml/xapi/xapi_cluster_host.ml @@ -127,38 +127,34 @@ let create ~__context ~cluster ~host ~pif = resync_host ~__context ~host; cluster_host +let destroy_op ~__context ~self meth = + with_clustering_lock __LOC__ (fun () -> + let dbg = Context.string_of_task __context in + let host = Db.Cluster_host.get_host ~__context ~self in + assert_operation_host_target_is_localhost ~__context ~host; + assert_cluster_host_has_no_attached_sr_which_requires_cluster_stack ~__context ~self; + let result = Cluster_client.LocalClient.destroy (rpc ~__context) dbg in + match result with + | Result.Ok () -> + Db.Cluster_host.destroy ~__context ~self; + debug "Cluster_host.%s was successful" meth; + Xapi_clustering.Daemon.disable ~__context + | Result.Error error -> + warn "Error occurred during Cluster_host.%s" meth; + handle_error error) + let force_destroy ~__context ~self = - let dbg = Context.string_of_task __context in - let host = Db.Cluster_host.get_host ~__context ~self in - assert_operation_host_target_is_localhost ~__context ~host; - assert_cluster_host_has_no_attached_sr_which_requires_cluster_stack ~__context ~self; - let result = Cluster_client.LocalClient.destroy (rpc ~__context) dbg in - match result with - | Result.Ok () -> - Db.Cluster_host.destroy ~__context ~self; - debug "Cluster_host.force_destroy was successful"; - Xapi_clustering.Daemon.disable ~__context - | Result.Error error -> - warn "Error occurred during Cluster_host.force_destroy"; - handle_error error + destroy_op ~__context ~self "force_destroy" let destroy ~__context ~self = - let dbg = Context.string_of_task __context in - let host = Db.Cluster_host.get_host ~__context ~self in - assert_operation_host_target_is_localhost ~__context ~host; - assert_cluster_host_has_no_attached_sr_which_requires_cluster_stack ~__context ~self; assert_cluster_host_enabled ~__context ~self ~expected:true; - let result = Cluster_client.LocalClient.leave (rpc ~__context) dbg in - match result with - (* can't include refs in case those were successfully destroyed *) - | Result.Ok () -> - Db.Cluster_host.destroy ~__context ~self; - debug "Cluster_host.destroy was successful"; - Xapi_clustering.Daemon.disable ~__context - | Result.Error error -> - warn "Error occurred during Cluster_host.destroy"; - handle_error error - + let cluster = Db.Cluster_host.get_cluster ~__context ~self in + let () = match Db.Cluster.get_cluster_hosts ~__context ~self:cluster with + | [ _ ] -> + raise Api_errors.(Server_error (cluster_does_not_have_one_node, ["1"])) + | _ -> () + in + destroy_op ~__context ~self "destroy" let ip_of_str str = Cluster_interface.IPv4 str diff --git a/ocaml/xapi/xapi_clustering.ml b/ocaml/xapi/xapi_clustering.ml index 7292a921fbb..a6149092c06 100644 --- a/ocaml/xapi/xapi_clustering.ml +++ b/ocaml/xapi/xapi_clustering.ml @@ -199,6 +199,8 @@ let assert_cluster_host_has_no_attached_sr_which_requires_cluster_stack ~__conte then raise Api_errors.(Server_error (cluster_stack_in_use, [ cluster_stack ])) module Daemon = struct + let enabled = ref false + let maybe_call_script ~__context script params = match Context.get_test_clusterd_rpc __context with | Some _ -> debug "in unit test, not calling %s %s" script (String.concat " " params) @@ -211,11 +213,13 @@ module Daemon = struct maybe_call_script ~__context !Xapi_globs.firewall_port_config_script ["open"; port]; maybe_call_script ~__context "/usr/bin/systemctl" [ "enable"; service ]; maybe_call_script ~__context "/usr/bin/systemctl" [ "start"; service ]; + enabled := true; debug "Cluster daemon: enabled & started" let disable ~__context = let port = (string_of_int !Xapi_globs.xapi_clusterd_port) in debug "Disabling and stopping the clustering daemon"; + enabled := false; maybe_call_script ~__context "/usr/bin/systemctl" [ "disable"; service ]; maybe_call_script ~__context "/usr/bin/systemctl" [ "stop"; service ]; maybe_call_script ~__context !Xapi_globs.firewall_port_config_script ["close"; port]; @@ -228,6 +232,9 @@ end * Instead of returning an empty URL which wouldn't work just raise an * exception. *) let rpc ~__context = + if not !Daemon.enabled then + raise Api_errors.(Server_error(Api_errors.operation_not_allowed, + ["clustering daemon has not been started yet"])); match Context.get_test_clusterd_rpc __context with | Some rpc -> rpc | None ->