Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 20 additions & 5 deletions lib/xapi-stdext-unix/unixext.ml
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,8 @@ let write_bytes_to_file fname b =
let written = Unix.write fd b 0 len in
if written <> len then (failwith "Short write occured!"))

let write_string_to_file fname s = write_bytes_to_file fname (Bytes.unsafe_of_string s)
let write_string_to_file fname s =
write_bytes_to_file fname (Bytes.unsafe_of_string s)

let execv_get_output cmd args =
let (pipe_exit, pipe_entrance) = Unix.pipe () in
Expand Down Expand Up @@ -480,7 +481,7 @@ exception Timeout
(* Write as many bytes to a file descriptor as possible from data before a given clock time. *)
(* Raises Timeout exception if the number of bytes written is less than the specified length. *)
(* Writes into the file descriptor at the current cursor position. *)
let time_limited_write filedesc length data target_response_time =
let time_limited_write_internal (write : Unix.file_descr -> 'a -> int -> int -> int) filedesc length data target_response_time =
let total_bytes_to_write = length in
let bytes_written = ref 0 in
let now = ref (Unix.gettimeofday()) in
Expand All @@ -489,13 +490,20 @@ let time_limited_write filedesc length data target_response_time =
let (_, ready_to_write, _) = Unix.select [] [filedesc] [] remaining_time in (* Note: there is a possibility that the storage could go away after the select and before the write, so the write would block. *)
if List.mem filedesc ready_to_write then begin
let bytes_to_write = total_bytes_to_write - !bytes_written in
let bytes = (try Unix.write filedesc data !bytes_written bytes_to_write with Unix.Unix_error(Unix.EAGAIN,_,_) | Unix.Unix_error(Unix.EWOULDBLOCK,_,_) -> 0) in (* write from buffer=data from offset=bytes_written, length=bytes_to_write *)
let bytes = (try write filedesc data !bytes_written bytes_to_write with Unix.Unix_error(Unix.EAGAIN,_,_) | Unix.Unix_error(Unix.EWOULDBLOCK,_,_) -> 0) in (* write from buffer=data from offset=bytes_written, length=bytes_to_write *)
bytes_written := bytes + !bytes_written;
end;
now := Unix.gettimeofday()
done;
if !bytes_written = total_bytes_to_write then () else (* we ran out of time *) raise Timeout

let time_limited_write filedesc length data target_response_time =
time_limited_write_internal Unix.write filedesc length data target_response_time

let time_limited_write_substring filedesc length data target_response_time =
time_limited_write_internal Unix.write_substring filedesc length data target_response_time


(* Read as many bytes to a file descriptor as possible before a given clock time. *)
(* Raises Timeout exception if the number of bytes read is less than the desired number. *)
(* Reads from the file descriptor at the current cursor position. *)
Expand All @@ -521,7 +529,7 @@ let time_limited_read filedesc length target_response_time =

(* Read a given number of bytes of data from the fd, or stop at EOF, whichever comes first. *)
(* A negative ~max_bytes indicates that all the data should be read from the fd until EOF. This is the default. *)
let read_data_in_chunks (f : string -> int -> unit) ?(block_size = 1024) ?(max_bytes = -1) from_fd =
let read_data_in_chunks_internal (sub : bytes -> int -> int -> 'a) (f : 'a -> int -> unit) ?(block_size = 1024) ?(max_bytes = -1) from_fd =
let buf = Bytes.make block_size '\000' in
let rec do_read acc =
let remaining_bytes = max_bytes - acc in
Expand All @@ -531,12 +539,18 @@ let read_data_in_chunks (f : string -> int -> unit) ?(block_size = 1024) ?(max_b
let bytes_read = Unix.read from_fd buf 0 bytes_to_read in
if bytes_read = 0 then acc (* we reached EOF *)
else begin
f (Bytes.sub_string buf 0 bytes_read) bytes_read;
f (sub buf 0 bytes_read) bytes_read;
do_read (acc + bytes_read)
end
end in
do_read 0

let read_data_in_string_chunks (f : string -> int -> unit) ?(block_size = 1024) ?(max_bytes = -1) from_fd =
read_data_in_chunks_internal Bytes.sub_string f ~block_size ~max_bytes from_fd

let read_data_in_chunks (f : bytes -> int -> unit) ?(block_size = 1024) ?(max_bytes = -1) from_fd =
read_data_in_chunks_internal Bytes.sub f ~block_size ~max_bytes from_fd

let spawnvp ?(pid_callback=(fun _ -> ())) cmd args =
match Unix.fork () with
| 0 ->
Expand Down Expand Up @@ -658,6 +672,7 @@ let wait_for_path path delay timeout =
let _ = Callback.register_exception "unixext.unix_error" (Unix_error (0))

let send_fd = Fd_send_recv.send_fd
let send_fd_substring = Fd_send_recv.send_fd_substring
let recv_fd = Fd_send_recv.recv_fd

type statvfs_t = {
Expand Down
14 changes: 11 additions & 3 deletions lib/xapi-stdext-unix/unixext.mli
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,11 @@ val buffer_of_file : string -> Buffer.t
val string_of_file : string -> string

val atomic_write_to_file : string -> Unix.file_perm -> (Unix.file_descr -> 'a) -> 'a

(** Atomically write a string to a file *)
val write_string_to_file : string -> string -> unit

(** Atomically write a bytes to a file *)
val write_bytes_to_file : string -> bytes -> unit
val execv_get_output : string -> string array -> int * Unix.file_descr
val copy_file : ?limit:int64 -> Unix.file_descr -> Unix.file_descr -> int64
Expand Down Expand Up @@ -98,6 +102,7 @@ val string_of_signal : int -> string
val proxy : Unix.file_descr -> Unix.file_descr -> unit
val really_read : Unix.file_descr -> bytes -> int -> int -> unit
val really_read_string : Unix.file_descr -> int -> string

(** [really_write] keeps repeating the write operation until all bytes
* have been written or an error occurs. This is not atomic but is
* robust against EINTR errors.
Expand All @@ -107,8 +112,10 @@ val really_write_string : Unix.file_descr -> string -> unit
val try_read_string : ?limit: int -> Unix.file_descr -> string
exception Timeout
val time_limited_write : Unix.file_descr -> int -> bytes -> float -> unit
val time_limited_write_substring : Unix.file_descr -> int -> string -> float -> unit
val time_limited_read : Unix.file_descr -> int -> float -> string
val read_data_in_chunks : (string -> int -> unit) -> ?block_size:int -> ?max_bytes:int -> Unix.file_descr -> int
val read_data_in_string_chunks : (string -> int -> unit) -> ?block_size:int -> ?max_bytes:int -> Unix.file_descr -> int
val read_data_in_chunks : (bytes -> int -> unit) -> ?block_size:int -> ?max_bytes:int -> Unix.file_descr -> int
val spawnvp :
?pid_callback:(int -> unit) ->
string -> string array -> Unix.process_status
Expand Down Expand Up @@ -145,8 +152,9 @@ end

val wait_for_path : string -> (float -> unit) -> int -> unit

val send_fd : Unix.file_descr -> string -> int -> int -> Unix.msg_flag list -> Unix.file_descr -> int
val recv_fd : Unix.file_descr -> string -> int -> int -> Unix.msg_flag list -> int * Unix.sockaddr * Unix.file_descr
val send_fd : Unix.file_descr -> bytes -> int -> int -> Unix.msg_flag list -> Unix.file_descr -> int
val send_fd_substring : Unix.file_descr -> string -> int -> int -> Unix.msg_flag list -> Unix.file_descr -> int
val recv_fd : Unix.file_descr -> bytes -> int -> int -> Unix.msg_flag list -> int * Unix.sockaddr * Unix.file_descr

type statvfs_t = {
f_bsize : int64;
Expand Down
2 changes: 1 addition & 1 deletion xapi-stdext-unix.opam
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ build: [[ "jbuilder" "build" "-p" name "-j" jobs ]]
depends: [
"jbuilder" {build}
"base-unix"
"fd-send-recv"
"fd-send-recv" {>= "2.0.0"}
"xapi-stdext-pervasives"
"xapi-stdext-std"
]