Skip to content

Commit

Permalink
Merge pull request #52 from djs55/optional-sync
Browse files Browse the repository at this point in the history
Allow clients to request different flush behaviour
  • Loading branch information
djs55 committed Sep 21, 2016
2 parents cc511fc + c145b50 commit 7e9cb06
Show file tree
Hide file tree
Showing 10 changed files with 4,076 additions and 3,483 deletions.
2 changes: 1 addition & 1 deletion _oasis
Expand Up @@ -12,7 +12,7 @@ Library mirage_block_unix
Path: lib
Findlibname: mirage-block-unix
Modules: Block
BuildDepends: cstruct (>= 1.3.0), cstruct.lwt, lwt, lwt.unix, mirage-types, logs
BuildDepends: cstruct (>= 1.3.0), cstruct.lwt, lwt, lwt.unix, mirage-types, logs, uri
CSources: odirect_stubs.c, blkgetsize_stubs.c, lseekhole_stubs.c, flush_stubs.c
CCOpt: -I $(pkg_lwt)

Expand Down
9 changes: 8 additions & 1 deletion _tags
@@ -1,5 +1,5 @@
# OASIS_START
# DO NOT EDIT (digest: 2523786f1804452927ad7ed0a182fe0f)
# DO NOT EDIT (digest: b4dad6087354892e863ef19ab7d35982)
# Ignore VCS directories, you can use the same kind of rule outside
# OASIS_START/STOP if you want to exclude directories that contains
# useless stuff for the build process
Expand Down Expand Up @@ -28,30 +28,35 @@ true: annot, bin_annot
<lib/*.ml{,i,y}>: pkg_lwt
<lib/*.ml{,i,y}>: pkg_lwt.unix
<lib/*.ml{,i,y}>: pkg_mirage-types
<lib/*.ml{,i,y}>: pkg_uri
"lib/odirect_stubs.c": pkg_cstruct
"lib/odirect_stubs.c": pkg_cstruct.lwt
"lib/odirect_stubs.c": pkg_logs
"lib/odirect_stubs.c": pkg_lwt
"lib/odirect_stubs.c": pkg_lwt.unix
"lib/odirect_stubs.c": pkg_mirage-types
"lib/odirect_stubs.c": pkg_uri
"lib/blkgetsize_stubs.c": pkg_cstruct
"lib/blkgetsize_stubs.c": pkg_cstruct.lwt
"lib/blkgetsize_stubs.c": pkg_logs
"lib/blkgetsize_stubs.c": pkg_lwt
"lib/blkgetsize_stubs.c": pkg_lwt.unix
"lib/blkgetsize_stubs.c": pkg_mirage-types
"lib/blkgetsize_stubs.c": pkg_uri
"lib/lseekhole_stubs.c": pkg_cstruct
"lib/lseekhole_stubs.c": pkg_cstruct.lwt
"lib/lseekhole_stubs.c": pkg_logs
"lib/lseekhole_stubs.c": pkg_lwt
"lib/lseekhole_stubs.c": pkg_lwt.unix
"lib/lseekhole_stubs.c": pkg_mirage-types
"lib/lseekhole_stubs.c": pkg_uri
"lib/flush_stubs.c": pkg_cstruct
"lib/flush_stubs.c": pkg_cstruct.lwt
"lib/flush_stubs.c": pkg_logs
"lib/flush_stubs.c": pkg_lwt
"lib/flush_stubs.c": pkg_lwt.unix
"lib/flush_stubs.c": pkg_mirage-types
"lib/flush_stubs.c": pkg_uri
# Executable test
<lib_test/test.{native,byte}>: pkg_cstruct
<lib_test/test.{native,byte}>: pkg_cstruct.lwt
Expand All @@ -62,6 +67,7 @@ true: annot, bin_annot
<lib_test/test.{native,byte}>: pkg_lwt.unix
<lib_test/test.{native,byte}>: pkg_mirage-types
<lib_test/test.{native,byte}>: pkg_oUnit
<lib_test/test.{native,byte}>: pkg_uri
<lib_test/test.{native,byte}>: use_mirage_block_unix
<lib_test/*.ml{,i,y}>: pkg_cstruct
<lib_test/*.ml{,i,y}>: pkg_cstruct.lwt
Expand All @@ -72,6 +78,7 @@ true: annot, bin_annot
<lib_test/*.ml{,i,y}>: pkg_lwt.unix
<lib_test/*.ml{,i,y}>: pkg_mirage-types
<lib_test/*.ml{,i,y}>: pkg_oUnit
<lib_test/*.ml{,i,y}>: pkg_uri
<lib_test/*.ml{,i,y}>: use_mirage_block_unix
<lib_test/test.{native,byte}>: custom
# OASIS_STOP
2 changes: 1 addition & 1 deletion appveyor.yml
Expand Up @@ -7,7 +7,7 @@ environment:

install:
- appveyor DownloadFile https://raw.githubusercontent.com/ocaml/ocaml-ci-scripts/master/appveyor-opam.sh
- "%CYG_ROOT%\\setup-x86.exe -qnNdO -R %CYG_ROOT% -s http://cygwin.mirror.constant.com -l C:/cygwin/var/cache/setup -P rsync -P patch -P diffutils -P curl -P make -P unzip -P git -P m4 -P perl -P mingw64-x86_64-gcc-core"
- "%CYG_ROOT%\\setup-x86.exe -qnNdO -R %CYG_ROOT% -s http://cygwin.mirror.constant.com -l C:/cygwin/var/cache/setup -P rsync -P patch -P diffutils -P make -P unzip -P git -P m4 -P perl -P mingw64-x86_64-gcc-core"

build_script:
- "%CYG_BASH% '${APPVEYOR_BUILD_FOLDER}/appveyor-opam.sh'"
4 changes: 2 additions & 2 deletions lib/META
@@ -1,8 +1,8 @@
# OASIS_START
# DO NOT EDIT (digest: 312a616af69e79921b12c9b4fe4c9c17)
# DO NOT EDIT (digest: ef967ecf0e377d8e40ec630b22bf2faf)
version = "2.3.0"
description = "Mirage block driver for Unix"
requires = "cstruct cstruct.lwt lwt lwt.unix mirage-types logs"
requires = "cstruct cstruct.lwt lwt lwt.unix mirage-types logs uri"
archive(byte) = "mirage_block_unix.cma"
archive(byte, plugin) = "mirage_block_unix.cma"
archive(native) = "mirage_block_unix.cmxa"
Expand Down
96 changes: 69 additions & 27 deletions lib/block.ml
Expand Up @@ -60,15 +60,47 @@ type info = {
size_sectors: int64;
}

module Config = struct
type t = {
buffered: bool;
sync: bool;
path: string;
}

let create ?(buffered = false) ?(sync = true) path =
{ buffered; sync; path }

let to_string t =
let query = [
"buffered", [ if t.buffered then "1" else "0" ];
"sync", [ if t.sync then "1" else "0" ];
] in
let u = Uri.make ~scheme:"file" ~path:t.path ~query () in
Uri.to_string u

let of_string x =
let u = Uri.of_string x in
match Uri.scheme u with
| Some "file" ->
let query = Uri.query u in
let buffered = try List.assoc "buffered" query = [ "1" ] with Not_found -> false in
let sync = try List.assoc "sync" query = [ "1" ] with Not_found -> false in
let path = Uri.(pct_decode @@ path u) in
`Ok { buffered; sync; path }
| _ ->
`Error (`Msg "Config.to_string expected a string of the form file://<path>?sync=(0|1)&buffered=(0|1)")
end

type t = {
mutable fd: Lwt_unix.file_descr option;
m: Lwt_mutex.t;
name: string;
mutable info: info;
use_fsync: bool;
config: Config.t;
use_fsync_after_write: bool;
use_fsync_on_flush: bool;
}

let id { name } = name
let to_config { config } = config

module Result = struct
type ('a, 'b) result = [
Expand Down Expand Up @@ -106,18 +138,8 @@ let get_file_size filename fd =
(`Unknown
(Printf.sprintf "get_file_size %s: neither a file nor a block device" filename))

(* prefix which signals we want to use buffered I/O *)
let buffered_prefix = "buffered:"

let remove_prefix prefix x =
let prefix' = String.length prefix and x' = String.length x in
if x' >= prefix' && (String.sub x 0 prefix' = prefix)
then true, String.sub x prefix' (x' - prefix')
else false, x

let connect name =
let buffered, name = remove_prefix buffered_prefix name in
let openfile, use_fsync = match buffered, is_win32 with
let of_config ({ Config.buffered; sync; path } as config) =
let openfile, use_fsync_after_write = match buffered, is_win32 with
| true, _ -> Raw.openfile_buffered, false
| false, false -> Raw.openfile_unbuffered, false
| false, true ->
Expand All @@ -128,10 +150,10 @@ let connect name =
try
let fd, read_write =
try
openfile name true 0o0, true
openfile path true 0o0, true
with _ ->
openfile name false 0o0, false in
match get_file_size name fd with
openfile path false 0o0, false in
match get_file_size path fd with
| `Error e ->
Unix.close fd;
return (`Error e)
Expand All @@ -140,10 +162,28 @@ let connect name =
let size_sectors = Int64.(div x (of_int sector_size)) in
let fd = Lwt_unix.of_unix_file_descr fd in
let m = Lwt_mutex.create () in
return (`Ok { fd = Some fd; m; name; info = { sector_size; size_sectors; read_write }; use_fsync })
let use_fsync_on_flush = sync in
return (`Ok { fd = Some fd; m; info = { sector_size; size_sectors; read_write };
config; use_fsync_after_write; use_fsync_on_flush })
with e ->
Log.err (fun f -> f "connect %s: failed to open file" name);
return (`Error (`Unknown (Printf.sprintf "connect %s: failed to open file" name)))
Log.err (fun f -> f "connect %s: failed to open file" path);
return (`Error (`Unknown (Printf.sprintf "connect %s: failed to open file" path)))

(* prefix which signals we want to use buffered I/O *)
let buffered_prefix = "buffered:"

let remove_prefix prefix x =
let prefix' = String.length prefix and x' = String.length x in
if x' >= prefix' && (String.sub x 0 prefix' = prefix)
then true, String.sub x prefix' (x' - prefix')
else false, x

let connect ?buffered ?sync name =
let legacy_buffered, path = remove_prefix buffered_prefix name in
(* Keep support for the legacy buffered: prefix until version 3.x.y *)
let buffered = if legacy_buffered then Some true else buffered in
let config = Config.create ?buffered ?sync name in
of_config config

let disconnect t = match t.fd with
| Some fd ->
Expand Down Expand Up @@ -172,17 +212,17 @@ let lwt_wrap_exn t op offset ?buffer f =
| Some b ->
let len = Cstruct.len b in
if len mod t.info.sector_size <> 0
then fatalf "%s: buffer length (%d) is not a multiple of sector_size (%d) for file %s" op len t.info.sector_size t.name
then fatalf "%s: buffer length (%d) is not a multiple of sector_size (%d) for file %s" op len t.info.sector_size t.config.Config.path
else Lwt.return (`Ok ())
) >>*= fun () ->
Lwt.catch f
(function
| End_of_file ->
fatalf "%s: End_of_file at file %s offset %Ld %s" op t.name offset (describe_buffer buffer)
fatalf "%s: End_of_file at file %s offset %Ld %s" op t.config.Config.path offset (describe_buffer buffer)
| Unix.Unix_error(code, fn, arg) ->
fatalf "%s: %s in %s '%s' at file %s offset %Ld %s" op (Unix.error_message code) fn arg t.name offset (describe_buffer buffer)
fatalf "%s: %s in %s '%s' at file %s offset %Ld %s" op (Unix.error_message code) fn arg t.config.Config.path offset (describe_buffer buffer)
| e ->
fatalf "%s: %s at file %s offset %Ld %s" op (Printexc.to_string e) t.name offset (describe_buffer buffer)
fatalf "%s: %s at file %s offset %Ld %s" op (Printexc.to_string e) t.config.Config.path offset (describe_buffer buffer)
)

let rec read x sector_start buffers = match buffers with
Expand Down Expand Up @@ -236,7 +276,7 @@ let rec write x sector_start buffers = match buffers with
really_write fd b
end
) >>= fun () ->
( if x.use_fsync then Lwt_unix.fsync fd else Lwt.return () )
( if x.use_fsync_after_write then Lwt_unix.fsync fd else Lwt.return () )
>>= fun () ->
return (`Ok ())
) >>= function
Expand Down Expand Up @@ -272,7 +312,9 @@ let flush t =
| Some fd ->
lwt_wrap_exn t "fsync" 0L
(fun () ->
Lwt_unix.run_job (flush_job (Lwt_unix.unix_file_descr fd))
( if t.use_fsync_on_flush
then Lwt_unix.run_job (flush_job (Lwt_unix.unix_file_descr fd))
else Lwt.return_unit )
>>= fun () ->
return (`Ok ())
)
Expand Down
32 changes: 31 additions & 1 deletion lib/block.mli
Expand Up @@ -32,7 +32,31 @@ val blkgetsize: string -> Unix.file_descr -> [ `Ok of int64 | `Error of error ]
given by [fd]. [path] is only used to construct a human-readable error
message. *)

val connect : string -> [`Ok of t | `Error of error] io
module Config: sig
type t = {
buffered: bool; (** true if I/O hits the OS disk caches, false if "direct" *)
sync: bool; (** true if [flush] flushes all caches, including disk drive caches *)
path: string; (** path to the underlying file *)
}
(** Configuration of a device *)

val create: ?buffered:bool -> ?sync:bool -> string -> t
(** [create ?buffered ?sync path] constructs a configuration referencing the
file stored at [path]/ *)

val to_string: t -> string
(** Marshal a config into a string of the form
file://<path>?sync=(0|1)&buffered=(0|1) *)

val of_string: string -> [ `Ok of t | `Error of [ `Msg of string ] ]
(** Parse the result of a previous [to_string] invocation *)
end

val connect : ?buffered:bool -> ?sync:bool -> string -> [`Ok of t | `Error of error] io
(** [connect ?buffered ?sync path] connects to a block device on the filesystem
at [path]. By default I/O is unbuffered and fully synchronous. These defaults
can be changed by supplying the optional arguments [~buffered:true] and
[~sync:false] *)

val resize : t -> int64 -> [ `Ok of unit | `Error of error ] io
(** [resize t new_size_sectors] attempts to resize the connected device
Expand All @@ -51,3 +75,9 @@ val seek_mapped: t -> int64 -> [ `Ok of int64 | `Error of error ] io
(** [seek_mapped t start] returns the sector offset of the next regoin of the
device which may have data in it (typically this is the next mapped
region) *)

val to_config: t -> Config.t
(** [to_config t] returns the configuration of a device *)

val of_config: Config.t -> [ `Ok of t | `Error of error ] io
(** [of_config config] creates a fresh device from [config] *)
15 changes: 15 additions & 0 deletions lib_test/test.ml
Expand Up @@ -202,6 +202,19 @@ let test_flush () =
) in
Lwt_main.run t

let test_parse_print_config config =
let open Block.Config in
let s = to_string config in
Printf.sprintf "test parse(print(x)) == x for %s" s
>:: (fun () ->
match of_string s with
| `Error (`Msg m) -> failwith m
| `Ok config' ->
assert_equal ~printer:string_of_bool config.buffered config'.buffered;
assert_equal ~printer:string_of_bool config.sync config'.sync;
assert_equal ~printer:(fun x -> x) config.path config'.path;
)

let not_implemented_on_windows = [
"test resize" >:: test_resize;
]
Expand All @@ -214,6 +227,8 @@ let tests = [
*)
"test read/write after last sector" >:: test_eof;
"test flush" >:: test_flush;
test_parse_print_config { Block.Config.buffered = true; sync = false; path = "C:\\cygwin" };
test_parse_print_config { Block.Config.buffered = false; sync = true; path = "/var/tmp/foo.qcow2" };
"test write then read" >:: test_write_read;
"test that writes fail if the buffer has a bad length" >:: test_buffer_wrong_length;
] @ (if Sys.os_type <> "Win32" then not_implemented_on_windows else [])
Expand Down

0 comments on commit 7e9cb06

Please sign in to comment.