forked from xapi-project/xen-api
-
Notifications
You must be signed in to change notification settings - Fork 0
/
message_forwarding.ml
3705 lines (3195 loc) · 172 KB
/
message_forwarding.ml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
(*
* Copyright (C) 2006-2009 Citrix Systems Inc.
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU Lesser General Public License as published
* by the Free Software Foundation; version 2.1 only. with the special
* exception on linking described in file LICENSE.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Lesser General Public License for more details.
*)
(**
* @group API Messaging
*)
open Threadext
open Pervasiveext
open Listext
open Stringext
open Server_helpers
open Client
module D = Debug.Debugger(struct let name="xapi" end)
open D
module Audit = Debug.Debugger(struct let name="audit" end)
let info = Audit.debug
(**************************************************************************************)
(* The master uses a global mutex to mark database records before forwarding messages *)
(** All must fear the global mutex *)
let __internal_mutex = Mutex.create ()
let __number_of_queueing_threads = ref 0
let max_number_of_queueing_threads = 100
let with_global_lock x = Mutex.execute __internal_mutex x
(** Call the function f having incremented the number of queueing threads counter.
If we exceed a built-in threshold, throw TOO_MANY_PENDING_TASKS *)
let queue_thread f =
with_global_lock
(fun () ->
if !__number_of_queueing_threads > max_number_of_queueing_threads
then raise (Api_errors.Server_error(Api_errors.too_many_pending_tasks, []))
else incr __number_of_queueing_threads);
finally f (fun () -> with_global_lock (fun () -> decr __number_of_queueing_threads))
module type POLICY = sig
type t
val standard : t
(** Used by operations like VM.start which want to paper over transient glitches but want to fail
quickly if the objects are persistently locked (eg by a VDI.clone) *)
val fail_quickly : t
val wait : __context:Context.t -> t -> exn -> t
end
(* Mechanism for early wakeup of blocked threads. When a thread goes to sleep having got an
'other_operation_in_progress' exception, we use the interruptible sleep 'Delay.*' rather than
'Thread.delay' and provide a mechanism for the other of the conflicting task to wake us up
on the way out. *)
module Early_wakeup = struct
let table : ((string*string), Delay.t) Hashtbl.t = Hashtbl.create 10
let table_m = Mutex.create ()
let wait ((a, b) as key) time =
(* debug "Early_wakeup wait key = (%s, %s) time = %.2f" a b time; *)
let d = Delay.make () in
Mutex.execute table_m (fun () -> Hashtbl.add table key d);
finally
(fun () ->
let (_: bool) = Delay.wait d time in
()
)(fun () -> Mutex.execute table_m (fun () -> Hashtbl.remove table key))
let broadcast (a, b) =
(*debug "Early_wakeup broadcast key = (%s, %s)" a b;*)
Mutex.execute table_m
(fun () ->
Hashtbl.iter (fun (a, b) d -> (*debug "Signalling thread blocked on (%s, %s)" a b;*) Delay.signal d) table
)
let signal ((a, b) as key) =
(*debug "Early_wakeup signal key = (%s, %s)" a b;*)
Mutex.execute table_m
(fun () ->
if Hashtbl.mem table key then ((*debug "Signalling thread blocked on (%s,%s)" a b;*) Delay.signal (Hashtbl.find table key))
)
end
module Repeat_with_uniform_backoff : POLICY = struct
type t = {
minimum_delay: float; (* seconds *)
maximum_delay: float; (* maximum backoff time *)
max_total_wait: float; (* max time to wait before failing *)
wait_so_far: float; (* time waited so far *)
}
let standard = {
minimum_delay = 1.0;
maximum_delay = 20.0;
max_total_wait = 3600.0 *. 2.0; (* 2 hours *)
wait_so_far = 0.0;
}
let fail_quickly = {
minimum_delay = 2.;
maximum_delay = 2.;
max_total_wait = 120.;
wait_so_far = 0.
}
let wait ~__context (state: t) (e: exn) =
if state.wait_so_far >= state.max_total_wait then raise e;
let this_timeout = state.minimum_delay +. (state.maximum_delay -. state.minimum_delay) *. (Random.float 1.0) in
debug "Waiting for up to %f seconds before retrying..." this_timeout;
let start = Unix.gettimeofday () in
begin
match e with
| Api_errors.Server_error(code, [ cls; objref ]) when code = Api_errors.other_operation_in_progress ->
Early_wakeup.wait (cls, objref) this_timeout;
| _ ->
Thread.delay this_timeout;
end;
{ state with wait_so_far = state.wait_so_far +. (Unix.gettimeofday () -. start) }
end
(** Could replace this with something fancier which waits for objects to change at the
database level *)
module Policy = Repeat_with_uniform_backoff
(** Attempts to retry a lock-acquiring function multiple times. If it catches another operation
in progress error then it blocks before retrying. *)
let retry ~__context ~doc ?(policy = Policy.standard) f =
(* This is a cancellable operation, so mark the allowed operations on the task *)
TaskHelper.set_cancellable ~__context;
let rec loop state =
let result = ref None in
let state = ref state in
while !result = None do
try
if TaskHelper.is_cancelling ~__context then begin
error "%s locking failed: task has been cancelled" doc;
TaskHelper.cancel ~__context;
raise (Api_errors.Server_error(Api_errors.task_cancelled, [ Ref.string_of (Context.get_task_id __context) ]))
end;
result := Some (f ())
with
| Api_errors.Server_error(code, objref :: _ ) as e when code = Api_errors.other_operation_in_progress ->
debug "%s locking failed: caught transient failure %s" doc (ExnHelper.string_of_exn e);
state := queue_thread (fun () -> Policy.wait ~__context !state e)
done;
match !result with
| Some x -> x
| None -> failwith "this should never happen" in
loop policy
let retry_with_global_lock ~__context ~doc ?policy f =
retry ~__context ~doc ?policy (fun () -> with_global_lock f)
(**************************************************************************************)
(* WARNING: using persistent+cached connections with retries doesn't work for all messages.
Examples:
1. The callback in Pool.hello will fail with an emergency mode error
2. The no-other masters check will take /ages/ if a host is offline
So we have two rpc functions: one with retrying and one without.
When doing "normal" calls where the host is expected to be live, we use the retry fn.
When doing "unusual" calls (like pool hellos) where the host may well be down or
marked as down, we use the basic non-retry kind.
*)
(* Use HTTP 1.0, don't use the connection cache and don't pre-verify the connection *)
let remote_rpc_no_retry context hostname (task_opt: API.ref_task option) xml =
let open Xmlrpc_client in
let transport = SSL(SSL.make ?task_id:(may Ref.string_of task_opt) (),
hostname, !Xapi_globs.https_port) in
let http = xmlrpc ?task_id:(may Ref.string_of task_opt) ~version:"1.0" "/" in
XMLRPC_protocol.rpc ~srcstr:"xapi" ~dststr:"dst_xapi" ~transport ~http xml
(* Use HTTP 1.1, use the stunnel cache and pre-verify the connection *)
let remote_rpc_retry context hostname (task_opt: API.ref_task option) xml =
let open Xmlrpc_client in
let transport = SSL(SSL.make ~use_stunnel_cache:true ?task_id:(may Ref.string_of task_opt) (),
hostname, !Xapi_globs.https_port) in
let http = xmlrpc ?task_id:(may Ref.string_of task_opt) ~version:"1.1" "/" in
XMLRPC_protocol.rpc ~srcstr:"xapi" ~dststr:"dst_xapi" ~transport ~http xml
let call_slave_with_session remote_rpc_fn __context host (task_opt: API.ref_task option) f =
let session_id = Xapi_session.login_no_password ~__context ~uname:None ~host ~pool:true ~is_local_superuser:true ~subject:(Ref.null) ~auth_user_sid:"" ~auth_user_name:"" ~rbac_permissions:[] in
let hostname = Db.Host.get_address ~__context ~self:host in
Pervasiveext.finally
(fun ()->f session_id (remote_rpc_fn __context hostname task_opt))
(fun ()->Server_helpers.exec_with_new_task ~session_id "local logout in message forwarder" (fun __context -> Xapi_session.logout ~__context))
let call_slave_with_local_session remote_rpc_fn __context host (task_opt: API.ref_task option) f =
let hostname = Db.Host.get_address ~__context ~self:host in
let session_id = Client.Session.slave_local_login ~rpc:(remote_rpc_fn __context hostname None)
~psecret:!Xapi_globs.pool_secret in
Pervasiveext.finally
(fun () -> f session_id (remote_rpc_fn __context hostname task_opt))
(fun () -> Client.Session.local_logout ~rpc:(remote_rpc_fn __context hostname None) ~session_id)
(* set the fields on the task record to indicate that forwarding has taken place and
creates a task id for the slave to use *)
let set_forwarding_on_task ~__context ~host =
if Context.task_in_database __context
then begin
let rt = Context.get_task_id __context in
Db.Task.set_forwarded ~__context ~self:rt ~value:true;
Db.Task.set_forwarded_to ~__context ~self:rt ~value:host;
Some rt (* slave uses this task for progress/status etc. *)
end else None
let check_live ~__context h =
(* assume that localhost is always live *)
if true
&& (Helpers.get_localhost ~__context <> h)
&& (not (Xapi_vm_helpers.is_host_live ~__context h))
then raise (Api_errors.Server_error (Api_errors.host_offline, [Ref.string_of h]))
let check_enabled ~__context h =
(* check host is enabled *)
Xapi_vm_helpers.assert_host_is_enabled ~__context ~host:h
(* Forward op to one of the specified hosts if host!=localhost *)
let do_op_on_common ~local_fn ~__context ~host op f =
try
let localhost=Helpers.get_localhost ~__context in
if localhost=host then local_fn ~__context
else
let task_opt = set_forwarding_on_task ~__context ~host in
f __context host task_opt op
with
| Xmlrpc_client.Connection_reset | Http_client.Http_request_rejected _ ->
warn "Caught Connection_reset when contacting host %s; converting into CANNOT_CONTACT_HOST" (Ref.string_of host);
raise (Api_errors.Server_error (Api_errors.cannot_contact_host, [Ref.string_of host]))
| Xmlrpc_client.Stunnel_connection_failed ->
warn "Caught Stunnel_connection_failed while contacting host %s; converting into CANNOT_CONTACT_HOST" (Ref.string_of host);
raise (Api_errors.Server_error (Api_errors.cannot_contact_host, [Ref.string_of host]))
(* regular forwarding fn, with session and live-check. Used by most calls, will
use the connection cache. *)
(* we don't check "host.enabled" here, because for most messages we want to be able to forward
them even when the host is disabled; vm.start_on and resume_on do their own check for enabled *)
let do_op_on ~local_fn ~__context ~host op =
check_live ~__context host;
do_op_on_common ~local_fn ~__context ~host op
(call_slave_with_session remote_rpc_retry)
(* with session but no live check. Used by the Pool.hello calling back ONLY
Don't use the connection cache or retry logic. *)
let do_op_on_nolivecheck_no_retry ~local_fn ~__context ~host op =
do_op_on_common ~local_fn ~__context ~host op
(call_slave_with_session remote_rpc_no_retry)
(* with a local session and no checking. This is used for forwarding messages to hosts that
we don't know are alive/dead -- e.g. the pool_emergency_* messages.
Don't use the connection cache or retry logic. *)
let do_op_on_localsession_nolivecheck ~local_fn ~__context ~host op =
do_op_on_common ~local_fn ~__context ~host op
(call_slave_with_local_session remote_rpc_no_retry)
(* Map a function across a list, remove elements which throw an exception *)
let map_with_drop ?(doc = "performing unknown operation") f xs =
let one x =
try [ f x ]
with e ->
debug "Caught exception while %s in message forwarder: %s" (ExnHelper.string_of_exn e) doc; [] in
List.concat (List.map one xs)
(* Iterate a function across a list, ignoring applications which throw an exception *)
let iter_with_drop ?(doc = "performing unknown operation") f xs =
let one x =
try f x
with e ->
debug "Caught exception while %s in message forwarder: %s" doc (ExnHelper.string_of_exn e) in
List.iter one xs
let log_exn ?(doc = "performing unknown operation") f x =
try f x
with e ->
debug "Caught exception while %s in message forwarder: %s" (ExnHelper.string_of_exn e) doc;
raise e
let log_exn_ignore ?(doc = "performing unknown operation") f x =
try f x
with e ->
debug "Ignoring exception while %s in message forwarder: %s" (ExnHelper.string_of_exn e) doc
(**************************************************************************************)
let hosts_with_several_srs ~__context srs =
let hosts = Db.Host.get_all ~__context in
let filterfn host =
try
Xapi_vm_helpers.assert_can_see_specified_SRs ~__context ~reqd_srs:srs ~host;
true
with
_ -> false in
List.filter filterfn hosts
(* Given an SR, return a PBD to use for some storage operation. *)
(* In the case of SR.destroy we need to be able to forward the SR operation when all
PBDs are unplugged - this is the reason for the consider_unplugged_pbds optional
argument below. All other SR ops only consider plugged PBDs... *)
let choose_pbd_for_sr ?(consider_unplugged_pbds=false) ~__context ~self () =
let all_pbds = Db.SR.get_PBDs ~__context ~self in
let plugged_pbds = List.filter (fun pbd->Db.PBD.get_currently_attached ~__context ~self:pbd) all_pbds in
let pbds_to_consider = if consider_unplugged_pbds then all_pbds else plugged_pbds in
if Helpers.is_sr_shared ~__context ~self then
let master = Db.Pool.get_master ~__context ~self:(Helpers.get_pool ~__context) in
let master_pbds = Db.Host.get_PBDs ~__context ~self:master in
(* shared SR operations must happen on the master *)
match Listext.List.intersect pbds_to_consider master_pbds with
| pbd :: _ -> pbd (* ok, master plugged *)
| [] -> raise (Api_errors.Server_error(Api_errors.sr_no_pbds, [ Ref.string_of self ])) (* can't do op, master pbd not plugged *)
else
match pbds_to_consider with
| [] -> raise (Api_errors.Server_error(Api_errors.sr_no_pbds, [ Ref.string_of self ]))
| pdb :: _ -> pdb
let loadbalance_host_operation ~__context ~hosts ~doc ~op (f: API.ref_host -> unit) =
let task_id = Ref.string_of (Context.get_task_id __context) in
let choice = retry_with_global_lock ~__context ~doc
(fun () ->
let possibilities = List.filter
(fun self -> try Xapi_host_helpers.assert_operation_valid ~__context ~self ~op; true
with _ -> false) hosts in
if possibilities = []
then raise (Api_errors.Server_error(Api_errors.other_operation_in_progress, [ "host"; Ref.string_of (List.hd hosts) ]));
let choice = List.nth possibilities (Random.int (List.length possibilities)) in
Xapi_host_helpers.assert_operation_valid ~__context ~self:choice ~op;
Db.Host.add_to_current_operations ~__context ~self:choice ~key:task_id ~value:op;
Xapi_host_helpers.update_allowed_operations ~__context ~self:choice;
choice) in
(* Then do the action with the lock released *)
finally
(fun () -> f choice)
(* Make sure to clean up at the end *)
(fun () ->
try
Db.Host.remove_from_current_operations ~__context ~self:choice ~key:task_id;
Xapi_host_helpers.update_allowed_operations ~__context ~self:choice;
Early_wakeup.broadcast (Datamodel._host, Ref.string_of choice);
with
_ -> ())
module Forward = functor(Local: Custom_actions.CUSTOM_ACTIONS) -> struct
(* During certain operations that are executed on a pool slave, the slave management can reconfigure
* its management interface, we can lose connection with the slave.
* This function catches any "host cannot be contacted" exceptions during such calls and polls
* periodically to see whether the operation has completed on the slave. *)
let tolerate_connection_loss fn success timeout =
try
fn ()
with
| Api_errors.Server_error (ercode, params) when ercode=Api_errors.cannot_contact_host ->
debug "Lost connection with slave during call (expected). Waiting for slave to come up again.";
let time_between_retries = 1. (* seconds *) in
let num_retries = int_of_float (timeout /. time_between_retries) in
let rec poll i =
match i with
| 0 -> raise (Api_errors.Server_error (ercode, params)) (* give up and re-raise exn *)
| i ->
begin
match success () with
| Some result -> debug "Slave is back and has completed the operation!"; result (* success *)
| None -> Thread.delay time_between_retries; poll (i-1)
end
in
poll num_retries
let add_brackets s =
if s = "" then
""
else
Printf.sprintf " (%s)" s
let pool_uuid ~__context pool =
try if Pool_role.is_master () then
let name = Db.Pool.get_name_label __context pool in
Printf.sprintf "%s%s" (Db.Pool.get_uuid __context pool) (add_brackets name)
else
Ref.string_of pool
with _ -> "invalid"
let current_pool_uuid ~__context =
if Pool_role.is_master () then
let _, pool = List.hd (Db.Pool.get_all_records ~__context) in
Printf.sprintf "%s%s" pool.API.pool_uuid (add_brackets pool.API.pool_name_label)
else
"invalid"
let host_uuid ~__context host =
try if Pool_role.is_master () then
let name = Db.Host.get_name_label __context host in
Printf.sprintf "%s%s" (Db.Host.get_uuid __context host) (add_brackets name)
else
Ref.string_of host
with _ -> "invalid"
let vm_uuid ~__context vm =
try if Pool_role.is_master () then
let name = Db.VM.get_name_label __context vm in
Printf.sprintf "%s%s" (Db.VM.get_uuid __context vm) (add_brackets name)
else
Ref.string_of vm
with _ -> "invalid"
let vm_appliance_uuid ~__context vm_appliance =
try if Pool_role.is_master () then
let name = Db.VM_appliance.get_name_label __context vm_appliance in
Printf.sprintf "%s%s" (Db.VM_appliance.get_uuid __context vm_appliance) (add_brackets name)
else
Ref.string_of vm_appliance
with _ -> "invalid"
let sr_uuid ~__context sr =
try if Pool_role.is_master () then
let name = Db.SR.get_name_label __context sr in
Printf.sprintf "%s%s" (Db.SR.get_uuid __context sr) (add_brackets name)
else
Ref.string_of sr
with _ -> "invalid"
let vdi_uuid ~__context vdi =
try if Pool_role.is_master () then
Db.VDI.get_uuid __context vdi
else
Ref.string_of vdi
with _ -> "invalid"
let vif_uuid ~__context vif =
try if Pool_role.is_master () then
Db.VIF.get_uuid __context vif
else
Ref.string_of vif
with _ -> "invalid"
let vlan_uuid ~__context vlan =
try if Pool_role.is_master () then
Db.VLAN.get_uuid __context vlan
else
Ref.string_of vlan
with _ -> "invalid"
let tunnel_uuid ~__context tunnel =
try if Pool_role.is_master () then
Db.Tunnel.get_uuid __context tunnel
else
Ref.string_of tunnel
with _ -> "invalid"
let bond_uuid ~__context bond =
try if Pool_role.is_master () then
Db.Bond.get_uuid __context bond
else
Ref.string_of bond
with _ -> "invalid"
let pif_uuid ~__context pif =
try if Pool_role.is_master () then
Db.PIF.get_uuid __context pif
else
Ref.string_of pif
with _ -> "invalid"
let vbd_uuid ~__context vbd =
try if Pool_role.is_master () then
Db.VBD.get_uuid __context vbd
else
Ref.string_of vbd
with _ -> "invalid"
let pbd_uuid ~__context pbd =
try if Pool_role.is_master () then
Db.PBD.get_uuid __context pbd
else
Ref.string_of pbd
with _ -> "invalid"
let task_uuid ~__context task =
try if Pool_role.is_master () then
Db.Task.get_uuid __context task
else
Ref.string_of task
with _ -> "invalid"
let crashdump_uuid ~__context cd =
try if Pool_role.is_master () then
Db.Crashdump.get_uuid __context cd
else
Ref.string_of cd
with _ -> "invalid"
let host_crashdump_uuid ~__context hcd =
try if Pool_role.is_master () then
Db.Host_crashdump.get_uuid __context hcd
else
Ref.string_of hcd
with _ -> "invalid"
let network_uuid ~__context network =
try if Pool_role.is_master () then
Db.Network.get_uuid __context network
else
Ref.string_of network
with _ -> "invalid"
let host_patch_uuid ~__context patch =
try if Pool_role.is_master () then
Db.Host_patch.get_uuid __context patch
else
Ref.string_of patch
with _ -> "invalid"
let pool_patch_uuid ~__context patch =
try if Pool_role.is_master () then
Db.Pool_patch.get_uuid __context patch
else
Ref.string_of patch
with _ -> "invalid"
let pci_uuid ~__context pci =
try if Pool_role.is_master () then
Db.PCI.get_uuid __context pci
else
Ref.string_of pci
with _ -> "invalid"
let pgpu_uuid ~__context pgpu =
try if Pool_role.is_master () then
Db.PGPU.get_uuid __context pgpu
else
Ref.string_of pgpu
with _ -> "invalid"
let gpu_group_uuid ~__context gpu_group =
try if Pool_role.is_master () then
Db.GPU_group.get_uuid __context gpu_group
else
Ref.string_of gpu_group
with _ -> "invalid"
let vgpu_uuid ~__context vgpu =
try if Pool_role.is_master () then
Db.VGPU.get_uuid __context vgpu
else
Ref.string_of vgpu
with _ -> "invalid"
module Session = Local.Session
module Auth = Local.Auth
module Subject = Local.Subject
module Role = Local.Role
module Task = struct
include Local.Task
let cancel ~__context ~task =
let local_fn = cancel ~task in
let forwarded_to = Db.Task.get_forwarded_to ~__context ~self:task in
if Db.is_valid_ref __context forwarded_to
then do_op_on ~local_fn ~__context ~host:(Db.Task.get_forwarded_to ~__context ~self:task)
(fun session_id rpc ->
Client.Task.cancel rpc session_id task
)
else local_fn ~__context
end
module Event = Local.Event
module VMPP = Local.VMPP
module VM_appliance = struct
include Local.VM_appliance
(* Add to the VM_appliance's current operations, call a function and then remove from the *)
(* current operations. Ensure the allowed_operations are kept up to date. *)
let with_vm_appliance_operation ~__context ~self ~doc ~op f =
let task_id = Ref.string_of (Context.get_task_id __context) in
retry_with_global_lock ~__context ~doc
(fun () ->
Xapi_vm_appliance.assert_operation_valid ~__context ~self ~op;
Db.VM_appliance.add_to_current_operations ~__context ~self ~key:task_id ~value:op;
Xapi_vm_appliance.update_allowed_operations ~__context ~self);
(* Then do the action with the lock released *)
finally f
(* Make sure to clean up at the end *)
(fun () ->
try
Db.VM_appliance.remove_from_current_operations ~__context ~self ~key:task_id;
Xapi_vm_appliance.update_allowed_operations ~__context ~self;
Early_wakeup.broadcast (Datamodel._vm_appliance, Ref.string_of self);
with
_ -> ())
let start ~__context ~self ~paused =
info "VM_appliance.start: VM_appliance = '%s'" (vm_appliance_uuid ~__context self);
with_vm_appliance_operation ~__context ~self ~doc:"VM_appliance.start" ~op:`start
(fun () ->
Local.VM_appliance.start ~__context ~self ~paused)
let clean_shutdown ~__context ~self =
info "VM_appliance.clean_shutdown: VM_appliance = '%s'" (vm_appliance_uuid ~__context self);
with_vm_appliance_operation ~__context ~self ~doc:"VM_appliance.clean_shutdown" ~op:`clean_shutdown
(fun () ->
Local.VM_appliance.clean_shutdown ~__context ~self)
let hard_shutdown ~__context ~self =
info "VM_appliance.hard_shutdown: VM_appliance = '%s'" (vm_appliance_uuid ~__context self);
with_vm_appliance_operation ~__context ~self ~doc:"VM_appliance.hard_shutdown" ~op:`hard_shutdown
(fun () ->
Local.VM_appliance.hard_shutdown ~__context ~self)
let shutdown ~__context ~self =
info "VM_appliance.shutdown: VM_appliance = '%s'" (vm_appliance_uuid ~__context self);
with_vm_appliance_operation ~__context ~self ~doc:"VM_appliance.shutdown" ~op:`shutdown
(fun () ->
Local.VM_appliance.shutdown ~__context ~self)
let assert_can_be_recovered ~__context ~self ~session_to =
info "VM_appliance.assert_can_be_recovered: VM_appliance = '%s'" (vm_appliance_uuid ~__context self);
Local.VM_appliance.assert_can_be_recovered ~__context ~self ~session_to
let get_SRs_required_for_recovery ~__context ~self ~session_to =
info "VM_appliance.get_SRs_required_for_recovery: VM_appliance = '%s'" (vm_appliance_uuid ~__context self);
Local.VM_appliance.get_SRs_required_for_recovery ~__context ~self ~session_to
let recover ~__context ~self ~session_to ~force =
info "VM_appliance.recover: VM_appliance = '%s'" (vm_appliance_uuid ~__context self);
Local.VM_appliance.recover ~__context ~self ~session_to ~force
end
module DR_task = Local.DR_task
(* module Alert = Local.Alert *)
module Pool = struct
include Local.Pool
let eject ~__context ~host =
info "Pool.eject: pool = '%s'; host = '%s'" (current_pool_uuid ~__context) (host_uuid ~__context host);
let local_fn = Local.Pool.eject ~host in
do_op_on ~local_fn ~__context ~host (fun session_id rpc -> Client.Pool.eject rpc session_id host)
let designate_new_master ~__context ~host =
info "Pool.designate_new_master: pool = '%s'; host = '%s'" (current_pool_uuid ~__context) (host_uuid ~__context host);
let local_fn = Local.Pool.designate_new_master ~host in
do_op_on ~local_fn ~__context ~host (fun session_id rpc -> Client.Pool.designate_new_master rpc session_id host)
let enable_ha ~__context ~heartbeat_srs ~configuration =
info "Pool.enable_ha: pool = '%s'; heartbeat_srs = [ %s ]; configuration = [ %s ]"
(current_pool_uuid ~__context)
(String.concat ", " (List.map Ref.string_of heartbeat_srs))
(String.concat "; " (List.map (fun (k, v) -> k ^ "=" ^ v) configuration));
Local.Pool.enable_ha __context heartbeat_srs configuration
let disable_ha ~__context =
info "Pool.disable_ha: pool = '%s'" (current_pool_uuid ~__context);
Local.Pool.disable_ha __context
let ha_prevent_restarts_for ~__context ~seconds =
info "Pool.ha_prevent_restarts_for: pool = '%s'; seconds = %Ld" (current_pool_uuid ~__context) seconds;
Local.Pool.ha_prevent_restarts_for ~__context ~seconds
let ha_failover_plan_exists ~__context ~n =
info "Pool.ha_failover_plan_exists: pool = '%s'; n = %Ld" (current_pool_uuid ~__context) n;
Local.Pool.ha_failover_plan_exists ~__context ~n
let ha_compute_max_host_failures_to_tolerate ~__context =
info "Pool.ha_compute_max_host_failures_to_tolerate: pool = '%s'" (current_pool_uuid ~__context);
Local.Pool.ha_compute_max_host_failures_to_tolerate ~__context
let ha_compute_hypothetical_max_host_failures_to_tolerate ~__context ~configuration =
info "Pool.ha_compute_hypothetical_max_host_failures_to_tolerate: pool = '%s'; configuration = [ %s ]"
(current_pool_uuid ~__context)
(String.concat "; " (List.map (fun (vm, p) -> Ref.string_of vm ^ " " ^ p) configuration));
Local.Pool.ha_compute_hypothetical_max_host_failures_to_tolerate ~__context ~configuration
let ha_compute_vm_failover_plan ~__context ~failed_hosts ~failed_vms =
info "Pool.ha_compute_vm_failover_plan: pool = '%s'; failed_hosts = [ %s ]; failed_vms = [ %s ]"
(current_pool_uuid ~__context)
(String.concat "; " (List.map Ref.string_of failed_hosts))
(String.concat "; " (List.map Ref.string_of failed_vms));
Local.Pool.ha_compute_vm_failover_plan ~__context ~failed_hosts ~failed_vms
let set_ha_host_failures_to_tolerate ~__context ~self ~value =
info "Pool.set_ha_host_failures_to_tolerate: pool = '%s'; value = %Ld" (pool_uuid ~__context self) value;
Local.Pool.set_ha_host_failures_to_tolerate ~__context ~self ~value
let ha_schedule_plan_recomputation ~__context =
info "Pool.ha_schedule_plan_recomputation: pool = '%s'" (current_pool_uuid ~__context);
Local.Pool.ha_schedule_plan_recomputation ~__context
let enable_external_auth ~__context ~pool ~config ~service_name ~auth_type =
info "Pool.enable_external_auth: pool = '%s'; service name = '%s'; auth_type = '%s'" (pool_uuid ~__context pool) service_name auth_type;
Local.Pool.enable_external_auth ~__context ~pool ~config ~service_name ~auth_type
let disable_external_auth ~__context ~pool =
info "Pool.disable_external_auth: pool = '%s'" (pool_uuid ~__context pool);
Local.Pool.disable_external_auth ~__context ~pool
let enable_redo_log ~__context ~sr =
info "Pool.enable_redo_log: pool = '%s'; sr_uuid = '%s'"
(current_pool_uuid ~__context) (sr_uuid __context sr);
Local.Pool.enable_redo_log ~__context ~sr
let disable_redo_log ~__context =
info "Pool.disable_redo_log: pool = '%s'" (current_pool_uuid ~__context);
Local.Pool.disable_redo_log ~__context
let set_vswitch_controller ~__context ~address =
info "Pool.set_vswitch_controller: pool = '%s'; address = '%s'" (current_pool_uuid ~__context) address;
Local.Pool.set_vswitch_controller ~__context ~address
let get_license_state ~__context ~self =
info "Pool.get_license_state: pool = '%s'" (pool_uuid ~__context self);
Local.Pool.get_license_state ~__context ~self
let apply_edition ~__context ~self ~edition =
info "Pool.apply_edition: pool = '%s'; edition = '%s'" (pool_uuid ~__context self) edition;
Local.Pool.apply_edition ~__context ~self ~edition
end
module VM = struct
(** Add to the VM's current operations, call a function and then remove from the
current operations. Ensure the allowed_operations are kept up to date. *)
let with_vm_operation ~__context ~self ~doc ~op f =
let task_id = Ref.string_of (Context.get_task_id __context) in
retry_with_global_lock ~__context ~doc
(fun () ->
Xapi_vm_lifecycle.assert_operation_valid ~__context ~self ~op;
Db.VM.add_to_current_operations ~__context ~self ~key:task_id ~value:op;
Xapi_vm_lifecycle.update_allowed_operations ~__context ~self);
(* Then do the action with the lock released *)
finally f
(* Make sure to clean up at the end *)
(fun () ->
try
Db.VM.remove_from_current_operations ~__context ~self ~key:task_id;
Xapi_vm_lifecycle.update_allowed_operations ~__context ~self;
Early_wakeup.broadcast (Datamodel._vm, Ref.string_of self);
with
_ -> ())
let unmark_vbds ~__context ~vbds ~doc ~op =
let task_id = Ref.string_of (Context.get_task_id __context) in
iter_with_drop ~doc:("unmarking VBDs after " ^ doc)
(fun self ->
if Db.is_valid_ref __context self then begin
Db.VBD.remove_from_current_operations ~__context ~self ~key:task_id;
Xapi_vbd_helpers.update_allowed_operations ~__context ~self;
Early_wakeup.broadcast (Datamodel._vbd, Ref.string_of self);
end)
vbds
let mark_vbds ~__context ~vm ~doc ~op : API.ref_VBD list =
let task_id = Ref.string_of (Context.get_task_id __context) in
let vbds = Db.VM.get_VBDs ~__context ~self:vm in
let marked = ref [] in
(* CA-26575: paper over transient VBD glitches caused by SR.lvhd_stop_the_world by throwing the
first OTHER_OPERATION_IN_PROGRESS (or whatever) we encounter and let the caller deal with it *)
try
List.iter
(fun vbd ->
Xapi_vbd_helpers.assert_operation_valid ~__context ~self:vbd ~op;
Db.VBD.add_to_current_operations ~__context ~self:vbd ~key:task_id ~value:op;
Xapi_vbd_helpers.update_allowed_operations ~__context ~self:vbd;
marked := vbd :: !marked;
) vbds;
vbds
with e ->
debug "Caught exception marking VBD for %s on VM %s: %s" doc (Ref.string_of vm) (ExnHelper.string_of_exn e);
unmark_vbds ~__context ~vbds:!marked ~doc ~op;
raise e
let with_vbds_marked ~__context ~vm ~doc ~op f =
(* CA-26575: paper over transient VBD glitches caused by SR.lvhd_stop_the_world *)
let vbds = retry_with_global_lock ~__context ~doc ~policy:Policy.fail_quickly (fun () -> mark_vbds ~__context ~vm ~doc ~op) in
finally
(fun () -> f vbds)
(fun () -> with_global_lock (fun () -> unmark_vbds ~__context ~vbds ~doc ~op))
let unmark_vifs ~__context ~vifs ~doc ~op =
let task_id = Ref.string_of (Context.get_task_id __context) in
iter_with_drop ~doc:("unmarking VIFs after " ^ doc)
(fun self ->
if Db.is_valid_ref __context self then begin
Db.VIF.remove_from_current_operations ~__context ~self ~key:task_id;
Xapi_vif_helpers.update_allowed_operations ~__context ~self;
Early_wakeup.broadcast (Datamodel._vif, Ref.string_of self);
end)
vifs
let mark_vifs ~__context ~vm ~doc ~op : API.ref_VIF list =
let task_id = Ref.string_of (Context.get_task_id __context) in
let vifs = Db.VM.get_VIFs ~__context ~self:vm in
let marked = map_with_drop ~doc:("marking VIFs for " ^ doc)
(fun vif ->
Xapi_vif_helpers.assert_operation_valid ~__context ~self:vif ~op;
Db.VIF.add_to_current_operations ~__context ~self:vif ~key:task_id ~value:op;
Xapi_vif_helpers.update_allowed_operations ~__context ~self:vif;
vif) vifs in
(* Did we mark them all? *)
if List.length marked <> List.length vifs then begin
unmark_vifs ~__context ~vifs:marked ~doc ~op;
raise (Api_errors.Server_error(Api_errors.operation_not_allowed, ["Failed to lock all VIFs"]))
end else marked
let with_vifs_marked ~__context ~vm ~doc ~op f =
let vifs = retry_with_global_lock ~__context ~doc (fun () -> mark_vifs ~__context ~vm ~doc ~op) in
finally
(fun () -> f vifs)
(fun () -> with_global_lock (fun () -> unmark_vifs ~__context ~vifs ~doc ~op))
(* Some VM operations have side-effects on VBD allowed_operations but don't actually
lock the VBDs themselves (eg suspend) *)
let update_vbd_operations ~__context ~vm =
with_global_lock
(fun () ->
List.iter (fun self ->
Xapi_vbd_helpers.update_allowed_operations ~__context ~self;
try
let vdi = Db.VBD.get_VDI ~__context ~self in
Xapi_vdi.update_allowed_operations ~__context ~self:vdi
with _ -> ())
(Db.VM.get_VBDs ~__context ~self:vm))
let update_vif_operations ~__context ~vm =
with_global_lock
(fun () ->
List.iter (fun self -> Xapi_vif_helpers.update_allowed_operations ~__context ~self)
(Db.VM.get_VIFs ~__context ~self:vm))
(* -------- Forwarding helper functions: ------------------------------------ *)
(* Read resisdent-on field from vm to determine who to forward to *)
let forward_vm_op ~local_fn ~__context ~vm op =
let state = Db.VM.get_power_state ~__context ~self:vm in
match state with
| `Running | `Paused -> do_op_on ~local_fn ~__context ~host:(Db.VM.get_resident_on ~__context ~self:vm) op
| _ -> raise (Api_errors.Server_error(Api_errors.vm_bad_power_state, [Ref.string_of vm; "running"; Record_util.power_to_string state]))
(* Notes on memory checking/reservation logic:
When computing the hosts free memory we consider all VMs resident_on (ie running
and consuming resources NOW) and scheduled_to_be_resident_on (ie those which are
starting/resuming/migrating, whose memory has been reserved but may not all be being
used atm).
We generally call 'assert_can_boot_here' with the master forwarding lock held,
which verifies that a host has enough free memory to support the VM and then we
set 'scheduled_to_be_resident_on' which prevents concurrent competing attempts to
use the same resources from succeeding. *)
(* Reserves the resources for a VM by setting it as 'scheduled_to_be_resident_on' a host *)
let allocate_vm_to_host ~__context ~vm ~host ~snapshot ?host_op () =
begin match host_op with
| Some x ->
let task_id = Ref.string_of (Context.get_task_id __context) in
Xapi_host_helpers.assert_operation_valid ~__context ~self:host ~op:x;
Db.Host.add_to_current_operations ~__context ~self:host ~key:task_id ~value:x;
Xapi_host_helpers.update_allowed_operations ~__context ~self:host
| None -> ()
end;
(* Make sure the last_booted record has useful values for later use in memory checking
code. *)
if snapshot.API.vM_power_state = `Halted then begin
Helpers.set_boot_record ~__context ~self:vm snapshot
end;
(* Once this is set concurrent VM.start calls will start checking the memory used by this VM *)
Db.VM.set_scheduled_to_be_resident_on ~__context ~self:vm ~value:host
(* For start/start_on/resume/resume_on/migrate *)
let finally_clear_host_operation ~__context ~host ?host_op () = match host_op with
| Some x ->
let task_id = Ref.string_of (Context.get_task_id __context) in
Db.Host.remove_from_current_operations ~__context ~self:host ~key:task_id;
Xapi_host_helpers.update_allowed_operations ~__context ~self:host;
Early_wakeup.broadcast (Datamodel._host, Ref.string_of host);
| None -> ()
(* README: Note on locking -- forward_to_suitable_host and reserve_memory_for_vm are only
called in a context where the current_operations field for the VM object contains the
operation we're considering. Thus the global_lock in this context is _not_ used to cover
the period where current_operations are set, but is used to ensure that (i) choose_host_for_vm
is executed under mutual exclusion with other incoming operations; and (ii) that scheduled_to_be_resident_on
(which must not change whilst someone is calling choose_host_for_vm) only executes in exclusion with
choose_host_for_vm.
*)
(* Used by VM.start and VM.resume to choose a host with enough resource and to
'allocate_vm_to_host' (ie set the 'scheduled_to_be_resident_on' field) *)
let forward_to_suitable_host ~local_fn ~__context ~vm ~snapshot ?host_op op =
let suitable_host = with_global_lock
(fun () ->
let host = Xapi_vm_helpers.choose_host_for_vm ~__context ~vm ~snapshot in
(* HA overcommit protection: we can either perform 'n' HA plans by including this in
the 'choose_host_for_vm' function or we can be cheapskates by doing it here: *)
Xapi_ha_vm_failover.assert_vm_placement_preserves_ha_plan ~__context ~arriving:[host, (vm, snapshot)] ();
allocate_vm_to_host ~__context ~vm ~host ~snapshot ?host_op ();
host) in
finally
(fun () -> do_op_on ~local_fn ~__context ~host:suitable_host op; suitable_host)
(fun () ->
with_global_lock
(fun () ->
finally_clear_host_operation ~__context ~host:suitable_host ?host_op ();
Db.VM.set_scheduled_to_be_resident_on ~__context ~self:vm ~value:Ref.null))
(* Used by VM.start_on, VM.resume_on, VM.migrate to verify a host has enough resource and to
'allocate_vm_to_host' (ie set the 'scheduled_to_be_resident_on' field) *)
let reserve_memory_for_vm ~__context ~vm ~snapshot ~host ?host_op f =
with_global_lock
(fun () ->
Xapi_vm_helpers.assert_can_boot_here ~__context ~self:vm ~host:host ~snapshot ();
(* NB in the case of migrate although we are about to increase free memory on the sending host
we ignore this because if a failure happens while a VM is in-flight it will still be considered
on both hosts, potentially breaking the failover plan. *)
Xapi_ha_vm_failover.assert_vm_placement_preserves_ha_plan ~__context ~arriving:[host, (vm, snapshot)] ();
allocate_vm_to_host ~__context ~vm ~host ~snapshot ?host_op ());
finally f
(fun () ->
with_global_lock
(fun () ->
finally_clear_host_operation ~__context ~host ?host_op ();
Db.VM.set_scheduled_to_be_resident_on ~__context ~self:vm ~value:Ref.null))
(**
Used by VM.set_memory_dynamic_range to reserve enough memory for
increasing dynamic_min. Although a VM may actually be technically
outside the range [dynamic_min, dynamic_max] we still ensure that *if*
all VMs are obeying our commands and ballooning to dynamic_min if we ask
*then* the sum of the dynamic_mins will fit on the host.
*)
let reserve_memory_for_dynamic_change ~__context ~vm
new_dynamic_min new_dynamic_max f =
let host = Db.VM.get_resident_on ~__context ~self:vm in
let old_dynamic_min = Db.VM.get_memory_dynamic_min ~__context ~self:vm in
let old_dynamic_max = Db.VM.get_memory_dynamic_max ~__context ~self:vm in
let restore_old_values_on_error = ref false in
with_global_lock
(fun () ->
let host_mem_available =
Memory_check.host_compute_free_memory_with_maximum_compression
~__context ~host None in
let dynamic_min_change = Int64.sub old_dynamic_min
new_dynamic_min in
let new_host_mem_available = Int64.add host_mem_available
dynamic_min_change in
if new_host_mem_available < 0L
then raise (Api_errors.Server_error (
Api_errors.host_not_enough_free_memory, [
Int64.to_string (Int64.div (Int64.sub 0L dynamic_min_change) 1024L);
Int64.to_string (Int64.div host_mem_available 1024L);
]));
if dynamic_min_change < 0L then begin
restore_old_values_on_error := true;
Db.VM.set_memory_dynamic_min ~__context ~self:vm
~value:new_dynamic_min;
Db.VM.set_memory_dynamic_max ~__context ~self:vm
~value:new_dynamic_max;
end
);
try
f ()
with exn ->
if !restore_old_values_on_error then begin
Db.VM.set_memory_dynamic_min ~__context ~self:vm
~value:old_dynamic_min;
Db.VM.set_memory_dynamic_max ~__context ~self:vm
~value:old_dynamic_max;
end;
raise exn
let forward_to_access_srs ~local_fn ~__context ~vm op =
let suitable_host =
Xapi_vm_helpers.choose_host ~__context ~vm
~choose_fn:(Xapi_vm_helpers.assert_can_see_SRs ~__context ~self:vm) () in
do_op_on ~local_fn ~__context ~host:suitable_host op
(* Used for the VM.copy when an SR is specified *)
let forward_to_access_srs_and ~local_fn ~__context ?vm ?extra_sr op =
let choose_fn ~host =
begin match vm with
| Some vm ->
Xapi_vm_helpers.assert_can_see_SRs ~__context ~self:vm ~host
| _ -> () end;
begin match extra_sr with
| Some extra_sr ->
Xapi_vm_helpers.assert_can_see_specified_SRs ~__context
~reqd_srs:[extra_sr] ~host
| _ -> () end in
let suitable_host = Xapi_vm_helpers.choose_host ~__context ?vm ~choose_fn () in
do_op_on ~local_fn ~__context ~host:suitable_host op
(* -------------------------------------------------------------------------- *)
(* don't forward create. this just makes a db record *)
let create ~__context ~name_label ~name_description ~user_version ~is_a_template ~affinity ~memory_target ~memory_static_max ~memory_dynamic_max ~memory_dynamic_min ~memory_static_min ~vCPUs_params ~vCPUs_max ~vCPUs_at_startup ~actions_after_shutdown ~actions_after_reboot ~actions_after_crash ~pV_bootloader ~pV_kernel ~pV_ramdisk ~pV_args ~pV_bootloader_args ~pV_legacy_args ~hVM_boot_policy ~hVM_boot_params ~hVM_shadow_multiplier ~platform ~pCI_bus ~other_config ~recommendations ~xenstore_data ~ha_always_run ~ha_restart_priority ~tags ~blocked_operations ~protection_policy ~is_snapshot_from_vmpp ~appliance ~start_delay ~shutdown_delay ~order ~suspend_SR ~version ~generation_id =