diff --git a/ocaml/xapi/message_forwarding.ml b/ocaml/xapi/message_forwarding.ml index bed9656ccd..fe5953123c 100644 --- a/ocaml/xapi/message_forwarding.ml +++ b/ocaml/xapi/message_forwarding.ml @@ -3325,11 +3325,14 @@ end the internal scan function directly. *) Server_helpers.exec_with_new_task "PBD.plug initial SR scan" (fun __scan_context -> - let handle_metadata_vdis () = - (* Only handle metadata VDIs when attaching shared storage to the master. *) + (* Only handle metadata VDIs when attaching shared storage to the master. *) + let should_handle_metadata_vdis = let pbd_host = Db.PBD.get_host ~__context:__scan_context ~self in let master = Db.Pool.get_master ~__context:__scan_context ~self:(Helpers.get_pool ~__context:__scan_context) in - if (pbd_host = master) && (Db.SR.get_shared ~__context:__scan_context ~self:sr) then begin + (pbd_host = master) && (Db.SR.get_shared ~__context:__scan_context ~self:sr) + in + let handle_metadata_vdis () = + if should_handle_metadata_vdis then begin debug "Shared SR %s is being plugged to master - handling metadata VDIs." (sr_uuid ~__context sr); let metadata_vdis = List.filter (fun vdi -> Db.VDI.get_type ~__context:__scan_context ~self:vdi = `metadata) @@ -3354,10 +3357,13 @@ end with e -> debug "Could not re-enable database replication to VDI %s - caught %s" vdi_uuid (Printexc.to_string e) - end + end; + Xapi_dr.signal_sr_is_ready ~__context:__scan_context ~sr end else debug "SR %s is not shared or is being plugged to a slave - not handling metadata VDIs at this point." (sr_uuid ~__context sr) in + if should_handle_metadata_vdis then + Xapi_dr.signal_sr_is_processing ~__context:__scan_context ~sr; Xapi_sr.scan_one ~__context:__scan_context ~callback:handle_metadata_vdis sr) let unplug ~__context ~self = diff --git a/ocaml/xapi/xapi_dr.ml b/ocaml/xapi/xapi_dr.ml index 1c4659ae4c..812bc4590c 100644 --- a/ocaml/xapi/xapi_dr.ml +++ b/ocaml/xapi/xapi_dr.ml @@ -5,6 +5,8 @@ open Threadext module D = Debug.Debugger(struct let name="xapi" end) open D +(* -------------------------- VDI caching ----------------------------------- *) + (* Keep track of foreign metadata VDIs and their database generations and pool UUIDs. *) (* The generation count is used to keep track of metadata_latest of all foreign database VDIs. *) (* The pool uuid is cached so that "xe pool-param-get param-name=metadata-of-pool" can be called without opening the database. *) @@ -113,6 +115,42 @@ let read_vdi_cache_record ~vdi = else None) +(* ------------ Providing signalling that an SR is ready for DR ------------- *) + +let processing_srs : API.ref_SR list ref = ref [] +let processing_srs_m = Mutex.create () +let processing_srs_c = Condition.create () + +let signal_sr_is_processing ~__context ~sr = + debug "Recording that processing of SR %s has started." (Db.SR.get_uuid ~__context ~self:sr); + Mutex.execute processing_srs_m + (fun () -> + let srs = !processing_srs in + if not(List.mem sr srs) then + processing_srs := sr::srs) + +let signal_sr_is_ready ~__context ~sr = + debug "Recording that processing of SR %s has finished." (Db.SR.get_uuid ~__context ~self:sr); + Mutex.execute processing_srs_m + (fun () -> + let srs = !processing_srs in + if List.mem sr srs then begin + processing_srs := (List.filter (fun x -> x <> sr) srs); + Condition.broadcast processing_srs_c + end) + +let wait_until_sr_is_ready ~__context ~sr = + let sr_uuid = Db.SR.get_uuid ~__context ~self:sr in + Mutex.execute processing_srs_m + (fun () -> + debug "Waiting for SR %s to be processed." sr_uuid; + while List.mem sr !processing_srs do + Condition.wait processing_srs_c processing_srs_m + done; + debug "Finished waiting for SR %s to be processed." sr_uuid) + +(* --------------------------------- VM recovery ---------------------------- *) + (* This function uses the VM export functionality to *) (* create the objects required to reimport a list of VMs *) let create_import_objects ~__context ~vms = diff --git a/ocaml/xapi/xapi_dr_task.ml b/ocaml/xapi/xapi_dr_task.ml index 67aba70973..b639e1d869 100644 --- a/ocaml/xapi/xapi_dr_task.ml +++ b/ocaml/xapi/xapi_dr_task.ml @@ -78,8 +78,8 @@ let try_create_sr_from_record ~__context ~_type ~device_config ~dr_task ~sr_reco debug "Attaching SR %s to host %s" sr_record.uuid (Db.Host.get_name_label ~__context ~self:host); let pbd = Client.PBD.create ~rpc ~session_id ~host ~sR:sr ~device_config ~other_config:[] in Client.PBD.plug ~rpc ~session_id ~self:pbd) hosts; - (* Ensure the VDI records are in the database. *) - Client.SR.scan ~rpc ~session_id ~sr; + (* Wait until the asynchronous scan is complete and metadata_latest has been updated for all metadata VDIs. *) + Xapi_dr.wait_until_sr_is_ready ~__context ~sr; Db.SR.set_introduced_by ~__context ~self:sr ~value:dr_task with e -> (* Clean up if anything goes wrong. *)