Skip to content
Merged
2 changes: 1 addition & 1 deletion _oasis
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ Library xenvmidl
CompiledObject: best
Path: idl
Findlibname: xenvmidl
Modules: Xenvm_interface, Xenvm_client, Log, Result, Errors, ResizeRequest
Modules: Xenvm_interface, Xenvm_client, Log, Result, Errors, ResizeRequest, ResizeResponse, Pidfile
BuildDepends: rpclib, rpclib.syntax, sexplib, sexplib.syntax, lvm, cohttp.lwt, threads, mirage-block-unix, devmapper, bisect

Executable "xenvmd"
Expand Down
14 changes: 14 additions & 0 deletions idl/pidfile.ml
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
(* We wish to ensure at-most-one copy of the program is started *)

let write_pid pidfile =
let txt = string_of_int (Unix.getpid ()) in
try
let fd = Unix.openfile pidfile [ Unix.O_WRONLY; Unix.O_CREAT ] 0o0644 in
Unix.lockf fd Unix.F_TLOCK (String.length txt);
let (_: int) = Unix.write fd txt 0 (String.length txt) in
()
with e ->
Printf.fprintf stderr "%s\n" (Printexc.to_string e);
Printf.fprintf stderr "The pidfile %s is locked: you cannot start the program twice!\n" pidfile;
Printf.fprintf stderr "If the process was shutdown cleanly then verify and remove the pidfile.\n%!";
exit 1
13 changes: 13 additions & 0 deletions idl/resizeResponse.ml
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
open Sexplib.Std

module T = struct
type t =
| Device_mapper_device_does_not_exist of string
| Request_for_no_segments of int64
| Success
with sexp
(** Response from xenvm-local-allocator to xenvm *)
end

include SexpToCstruct.Make(T)
include T
4 changes: 3 additions & 1 deletion idl/xenvmidl.mldylib
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
# OASIS_START
# DO NOT EDIT (digest: d1d1ec0a025cbfc9d6a3157d3e0422d2)
# DO NOT EDIT (digest: 57b376d9a9ea46b70acab11aab5f012c)
Xenvm_interface
Xenvm_client
Log
Result
Errors
ResizeRequest
ResizeResponse
Pidfile
# OASIS_STOP
4 changes: 3 additions & 1 deletion idl/xenvmidl.mllib
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
# OASIS_START
# DO NOT EDIT (digest: d1d1ec0a025cbfc9d6a3157d3e0422d2)
# DO NOT EDIT (digest: 57b376d9a9ea46b70acab11aab5f012c)
Xenvm_interface
Xenvm_client
Log
Result
Errors
ResizeRequest
ResizeResponse
Pidfile
# OASIS_STOP
54 changes: 42 additions & 12 deletions xenvm-local-allocator/local_allocator.ml
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,8 @@ module FreePool = struct
return ()
)

(* Allocate up to [nr_extents]. Blocks if there is no space free. Can return
a partial allocation. *)
let remove nr_extents =
Lwt_mutex.with_lock m
(fun () ->
Expand All @@ -161,10 +163,17 @@ module FreePool = struct
| `Ok x ->
free := Lvm.Pv.Allocator.sub !free x;
return x
| _ ->
| `Error (`OnlyThisMuchFree 0L) ->
Lwt_condition.wait ~mutex:m c
>>= fun () ->
wait () in
wait ()
| `Error (`OnlyThisMuchFree n) ->
begin match Lvm.Pv.Allocator.find !free n with
| `Ok x ->
free := Lvm.Pv.Allocator.sub !free x;
return x
| _ -> assert false
end in
wait ()
)

Expand Down Expand Up @@ -269,7 +278,7 @@ let extend_volume device vg lv extents =
(Lvm.Pv.Name.to_string pvname);
next_sector, segments, targets
) (next_sector, [], []) extents in
segments, targets
List.rev segments, List.rev targets

let stat x =
match Devmapper.stat x with
Expand All @@ -289,6 +298,8 @@ let main config daemon socket journal fromLVM toLVM =
debug "Loaded configuration: %s" (Sexplib.Sexp.to_string_hum (Config.sexp_of_t config));
if daemon then Lwt_daemon.daemonize ();

Pidfile.write_pid (config.Config.socket ^ ".lock");

let t =
Device.read_sector_size config.Config.devices
>>= fun sector_size ->
Expand Down Expand Up @@ -390,26 +401,31 @@ let main config daemon socket journal fromLVM toLVM =
fun { ResizeRequest.local_dm_name = device; action } ->
Lwt_mutex.with_lock m
(fun () ->
( match Devmapper.stat device with
(* We may need to enlarge in multiple chunks if the free pool is depleted *)
let rec expand action = match Devmapper.stat device with
| None ->
(* Log this kind of error. This tapdisk may block but at least
others will keep going *)
error "Couldn't find device mapper device: %s" device;
return ()
return (ResizeResponse.Device_mapper_device_does_not_exist device)
| Some data_volume ->
let sector_size = Int64.of_int sector_size in
let current = Int64.mul sector_size (sizeof data_volume) in
let extent_b = Int64.mul sector_size extent_size in
(* NB: make sure we round up to the next extent *)
let nr_extents = match action with
| `Absolute x ->
Int64.(div (div (sub x current) sector_size) extent_size)
Int64.(div (add (sub x current) (sub extent_b 1L)) extent_b)
| `IncreaseBy x ->
Int64.(div (div x sector_size) extent_size) in
if nr_extents < 0L then begin
error "Request for -ve number of extents";
return ()
Int64.(div (add x extent_b) extent_b) in
if nr_extents <= 0L then begin
error "Request for %Ld (<= 0) segments" nr_extents;
return (ResizeResponse.Request_for_no_segments nr_extents)
end else begin
FreePool.remove nr_extents
>>= fun extents ->
(* This may have allocated short *)
let nr_extents' = Lvm.Pv.Allocator.size extents in
let segments, targets = extend_volume vg_device metadata data_volume extents in
let _, volume = Mapper.vg_lv_of_name device in
let volume = { ExpandVolume.volume; segments } in
Expand All @@ -419,8 +435,15 @@ let main config daemon socket journal fromLVM toLVM =
(* The operation is now in the journal *)
wait ()
(* The operation is now complete *)
end
)
>>= fun () ->
let action = match action with
| `Absolute x -> `Absolute x
| `IncreaseBy x -> `IncreaseBy Int64.(sub x (mul nr_extents' (mul sector_size extent_size))) in
if nr_extents = nr_extents'
then return ResizeResponse.Success
else expand action
end in
expand action
) in

let ls = Devmapper.ls () in
Expand All @@ -431,6 +454,8 @@ let main config daemon socket journal fromLVM toLVM =
>>= fun device ->
let r = { ResizeRequest.local_dm_name = device; action = `IncreaseBy 1L } in
handler r
>>= fun resp ->
Lwt_io.write_line Lwt_io.stdout (Sexplib.Sexp.to_string_hum (ResizeResponse.sexp_of_t resp))
>>= fun () ->
stdin () in
debug "Creating Unix domain socket %s" config.Config.socket;
Expand All @@ -446,11 +471,16 @@ let main config daemon socket journal fromLVM toLVM =
Lwt_unix.accept s
>>= fun (fd, _) ->
let ic = Lwt_io.of_fd ~mode:Lwt_io.input fd in
let oc = Lwt_io.of_fd ~mode:Lwt_io.output ~close:return fd in
(* read one line *)
Lwt_io.read_line ic
>>= fun message ->
let r = ResizeRequest.t_of_sexp (Sexplib.Sexp.of_string message) in
handler r
>>= fun resp ->
Lwt_io.write_line oc (Sexplib.Sexp.to_string (ResizeResponse.sexp_of_t resp))
>>= fun () ->
Lwt_io.flush oc
>>= fun () ->
Lwt_io.close ic
>>= fun () ->
Expand Down
35 changes: 29 additions & 6 deletions xenvm/lvresize.ml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ let lvresize copts live (vg_name,lv_opt) real_size percent_size =
| Some info -> info.local_device (* If we've got a default, use that *)
| None -> failwith "Need to know the local device!" in

let existing_size = Int64.(mul (mul 512L vg.Lvm.Vg.extent_size) (Lvm.Lv.size_in_extents lv)) in

let device_is_active =
let name = Mapper.name_of vg lv in
let all = Devmapper.ls () in
Expand Down Expand Up @@ -48,18 +50,39 @@ let lvresize copts live (vg_name,lv_opt) real_size percent_size =
let r = { ResizeRequest.local_dm_name = name; action = size } in
Lwt_io.write_line oc (Sexplib.Sexp.to_string (ResizeRequest.sexp_of_t r))
>>= fun () ->
Lwt_io.close oc in

let ic = Lwt_io.of_fd ~mode:Lwt_io.input ~close:return s in
Lwt_io.read_line ic
>>= fun txt ->
let resp = ResizeResponse.t_of_sexp (Sexplib.Sexp.of_string txt) in
Lwt_io.close oc
>>= fun () ->
match resp with
| ResizeResponse.Device_mapper_device_does_not_exist name ->
Printf.fprintf stderr "Device mapper device does not exist: %s\n%!" name;
exit 1
| ResizeResponse.Request_for_no_segments nr ->
Printf.fprintf stderr "Request for an illegal number of segments: %Ld\n%!" nr;
exit 2
| ResizeResponse.Success ->
return () in
match live, info with
| true, Some { Xenvm_common.local_allocator_path = Some allocator } ->
if device_is_active
then resize_locally allocator
else resize_remotely ()
if device_is_active then begin
match size with
| `Absolute size ->
(* The local allocator can only allocate. When in this state we cannot shrink:
deactivate the device first. *)
if size < existing_size
then failwith (Printf.sprintf "Existing size is %Ld: cannot decrease to %Ld" existing_size size);
if size = existing_size
then return ()
else resize_locally allocator
| _ -> resize_locally allocator
end else resize_remotely ()
| _, _ ->
(* safe to allocate remotely *)
resize_remotely ()
)

let live_arg =
let doc = "Resize a live device using the local allocator" in
Arg.(value & flag & info ["live"] ~doc)
Expand Down
50 changes: 42 additions & 8 deletions xenvmd/xenvmd.ml
Original file line number Diff line number Diff line change
Expand Up @@ -104,13 +104,19 @@ module FromLVM = struct
module R = Shared_block.Ring.Make(Log)(Vg_IO.Volume)(FreeAllocation)
let create ~disk () =
fatal_error "FromLVM.create" (R.Producer.create ~disk ())
let rec attach ~name ~disk () = R.Producer.attach ~queue:(name ^ " FromLVM Producer") ~client:"xenvmd" ~disk () >>= function
| `Error `Suspended ->
debug "FromLVM.attach got `Suspended; sleeping";
Lwt_unix.sleep 5.
>>= fun () ->
attach ~name ~disk ()
| x -> fatal_error "FromLVM.attach" (return x)
let attach ~name ~disk () =
let initial_state = ref `Running in
let rec loop () = R.Producer.attach ~queue:(name ^ " FromLVM Producer") ~client:"xenvmd" ~disk () >>= function
| `Error `Suspended ->
debug "FromLVM.attach got `Suspended; sleeping";
Lwt_unix.sleep 5.
>>= fun () ->
initial_state := `Suspended;
loop ()
| x -> fatal_error "FromLVM.attach" (return x) in
loop ()
>>= fun x ->
return (!initial_state, x)
let state t = fatal_error "FromLVM.state" (R.Producer.state t)
let rec push t item = R.Producer.push ~t ~item () >>= function
| `Error (`Msg x) -> fatal_error_t (Printf.sprintf "Error pushing to the FromLVM queue: %s" x)
Expand Down Expand Up @@ -271,7 +277,25 @@ module VolumeManager = struct
| `Error _ -> fail (Failure (Printf.sprintf "Failed to open %s" fromLVM))
| `Ok disk ->
FromLVM.attach ~name ~disk ()
>>= fun from_LVM ->
>>= fun (initial_state, from_LVM) ->
( if initial_state = `Suspended then begin
debug "The FromLVM queue was already suspended: resending the free blocks";
( match Vg_IO.find vg freeLVM with
| Some lv -> return lv
| None -> assert false ) >>= fun lv ->
let allocation = Lvm.Lv.to_allocation (Vg_IO.Volume.metadata_of lv) in
FromLVM.push from_LVM allocation
>>= fun pos ->
FromLVM.advance from_LVM pos
>>= fun () ->
debug "Free blocks pushed";
return ()
end else begin
debug "The FromLVM queue was running: no need to resend the free blocks";
return ()
end )
>>= fun () ->
debug "querying state";
FromLVM.state from_LVM
>>= fun state ->
debug "FromLVM queue is currently %s" (match state with `Running -> "Running" | `Suspended -> "Suspended");
Expand All @@ -290,12 +314,15 @@ module VolumeManager = struct
let to_lvm = List.assoc name !to_LVMs in
ToLVM.pop to_lvm
>>= fun (pos, items) ->
debug "FromLVM queue %s has %d items" name (List.length items);
Lwt_list.iter_s (function { ExpandVolume.volume; segments } ->
write (fun vg ->
debug "Expanding volume %s" volume;
let id = (Lvm.Vg.LVs.find_by_name volume vg.Lvm.Vg.lvs).Lvm.Lv.id in
Lvm.Vg.do_op vg (Lvm.Redo.Op.(LvExpand(id, { lvex_segments = segments })))
) >>= fun () ->
write (fun vg ->
debug "Removing free blocks from %s free LV" name;
let (_,freeid) = (List.assoc name !free_LVs) in
Lvm.Vg.do_op vg (Lvm.Redo.Op.(LvCrop(freeid, { lvc_segments = segments })))
)
Expand Down Expand Up @@ -618,6 +645,13 @@ let run port sock_path config daemon =
let config = { config with Config.listenPort = match port with None -> config.Config.listenPort | Some x -> x } in
let config = { config with Config.listenPath = match sock_path with None -> config.Config.listenPath | Some x -> Some x } in
if daemon then Lwt_daemon.daemonize ();
( match config.Config.listenPath with
| None ->
(* don't need a lock file because we'll fail to bind to the port *)
()
| Some path ->
info "Writing pidfile to %s" path;
Pidfile.write_pid (path ^ ".lock") );
let t =
info "Started with configuration: %s" (Sexplib.Sexp.to_string_hum (Config.sexp_of_t config));
VolumeManager.vgopen ~devices:config.Config.devices
Expand Down