Skip to content

Commit

Permalink
Use bytes instead of strings for internal buffers (#4)
Browse files Browse the repository at this point in the history
This ensures compatibility with OCaml >= 4.06.
Changed opam package to request OCaml >= 4.02.3.
  • Loading branch information
scemama authored and xavierleroy committed Jun 20, 2018
1 parent c6eaf91 commit 9db4b5c
Show file tree
Hide file tree
Showing 5 changed files with 51 additions and 42 deletions.
8 changes: 8 additions & 0 deletions Changes
@@ -1,3 +1,11 @@
Version 1.03:
- Ensure compatibility with OCaml 4.06 and up by using bytes instead of strings for internal buffers.
The API does not change.
(Contributed by Anthony Scemama, review by Timothy Bourke)

Version 1.02:
- OPAM Packaging

Version 1.01:
- Relicensed under the LGPL
- Modernized build and installation procedure.
Expand Down
2 changes: 1 addition & 1 deletion collcomm.c
Expand Up @@ -273,7 +273,7 @@ value caml_mpi_reduce_intarray(value data, value result, value op,
value root, value comm)
{
mlsize_t len = Wosize_val(data);
int i, myrank;
int myrank;
/* Decode data at all nodes in place */
caml_mpi_decode_intarray(data, len);
/* Do the reduce */
Expand Down
77 changes: 38 additions & 39 deletions mpi.ml
Expand Up @@ -181,25 +181,25 @@ external barrier : communicator -> unit = "caml_mpi_barrier"

(* Broadcast *)

external broadcast_string: string -> int -> communicator -> unit
external broadcast_bytes: bytes -> int -> communicator -> unit
= "caml_mpi_broadcast"
external broadcast_int: int -> int -> communicator -> int
= "caml_mpi_broadcast_int"

let broadcast v root comm =
let myself = comm_rank comm in
if myself = root then begin
let data = Marshal.to_string v [Marshal.Closures] in
ignore(broadcast_int (String.length data) root comm);
broadcast_string data root comm;
let data = Marshal.to_bytes v [Marshal.Closures] in
ignore(broadcast_int (Bytes.length data) root comm);
broadcast_bytes data root comm;
v
end else begin
(* Other processes receive length, allocate buffer, receive data,
and unmarshal it. *)
let len = broadcast_int 0 root comm in
let data = String.create len in
broadcast_string data root comm;
Marshal.from_string data 0
let data = Bytes.create len in
broadcast_bytes data root comm;
Marshal.from_bytes data 0
end

let broadcast_opt data root comm =
Expand All @@ -223,8 +223,8 @@ external broadcast_float_array:

(* Scatter *)

external scatter_string:
string -> int array -> string -> int -> communicator -> unit
external scatter_bytes:
bytes -> int array -> bytes -> int -> communicator -> unit
= "caml_mpi_scatter"

external scatter_int: int array -> int -> communicator -> int
Expand All @@ -237,36 +237,36 @@ let scatter data root comm =
(* Check correct length for array *)
if Array.length data <> nprocs
then mpi_error "Mpi.scatter: wrong array size";
(* Marshal data to strings *)
(* Marshal data to bytes *)
let buffers =
Array.map (fun d -> Marshal.to_string d [Marshal.Closures]) data in
(* Determine lengths of strings *)
let lengths = Array.map String.length buffers in
Array.map (fun d -> Marshal.to_bytes d [Marshal.Closures]) data in
(* Determine lengths of bytes *)
let lengths = Array.map Bytes.length buffers in
(* Scatter those lengths *)
ignore(scatter_int lengths root comm);
(* Build single buffer with all data *)
let total_len = Array.fold_left (+) 0 lengths in
let send_buffer = String.create total_len in
let send_buffer = Bytes.create total_len in
let pos = ref 0 in
for i = 0 to nprocs - 1 do
String.blit buffers.(i) 0 send_buffer !pos lengths.(i);
Bytes.blit buffers.(i) 0 send_buffer !pos lengths.(i);
pos := !pos + lengths.(i)
done;
(* Allocate receive buffer *)
let recv_buffer = String.create lengths.(myself) in
let recv_buffer = Bytes.create lengths.(myself) in
(* Do the scatter *)
scatter_string send_buffer lengths recv_buffer root comm;
scatter_bytes send_buffer lengths recv_buffer root comm;
(* Return value for root *)
data.(myself)
end else begin
(* Get our length *)
let len = scatter_int [||] root comm in
(* Allocate receive buffer *)
let recv_buffer = String.create len in
let recv_buffer = Bytes.create len in
(* Do the scatter *)
scatter_string "" [||] recv_buffer root comm;
scatter_bytes Bytes.empty [||] recv_buffer root comm;
(* Return value received *)
Marshal.from_string recv_buffer 0
Marshal.from_bytes recv_buffer 0
end

external scatter_float:
Expand All @@ -292,8 +292,8 @@ let scatter_float_array src dst rank comm =

(* Gather *)

external gather_string:
string -> string -> int array -> int -> communicator -> unit
external gather_bytes:
bytes -> bytes -> int array -> int -> communicator -> unit
= "caml_mpi_gather"

external gather_int: int -> int array -> int -> communicator -> unit
Expand All @@ -302,30 +302,30 @@ external gather_int: int -> int array -> int -> communicator -> unit
let gather data root comm =
let myself = comm_rank comm in
let nprocs = comm_size comm in
let send_buffer = Marshal.to_string data [Marshal.Closures] in
let send_buffer = Marshal.to_bytes data [Marshal.Closures] in
if myself = root then begin
(* Gather lengths for all data *)
let lengths = Array.make nprocs 0 in
gather_int (String.length send_buffer) lengths root comm;
gather_int (Bytes.length send_buffer) lengths root comm;
(* Allocate receive buffer big enough to hold all data *)
let total_len = Array.fold_left (+) 0 lengths in
let recv_buffer = String.create total_len in
let recv_buffer = Bytes.create total_len in
(* Gather the data *)
gather_string send_buffer recv_buffer lengths root comm;
gather_bytes send_buffer recv_buffer lengths root comm;
(* Build array of results *)
let res0 = Marshal.from_string recv_buffer 0 in
let res0 = Marshal.from_bytes recv_buffer 0 in
let res = Array.make nprocs res0 in
let pos = ref 0 in
for i = 1 to nprocs - 1 do
pos := !pos + lengths.(i - 1);
res.(i) <- Marshal.from_string recv_buffer !pos
res.(i) <- Marshal.from_bytes recv_buffer !pos
done;
res
end else begin
(* Send our length *)
gather_int (String.length send_buffer) [||] root comm;
gather_int (Bytes.length send_buffer) [||] root comm;
(* Send our data *)
gather_string send_buffer "" [||] root comm;
gather_bytes send_buffer Bytes.empty [||] root comm;
(* Return dummy results *)
[||]
end
Expand Down Expand Up @@ -365,32 +365,31 @@ let gather_float_array src dst rank comm =

(* Gather to all *)

external allgather_string:
string -> string -> int array -> communicator -> unit
external allgather_bytes:
bytes -> bytes -> int array -> communicator -> unit
= "caml_mpi_allgather"

external allgather_int: int -> int array -> communicator -> unit
= "caml_mpi_allgather_int"

let allgather data comm =
(*let myself = comm_rank comm in*)
let nprocs = comm_size comm in
let send_buffer = Marshal.to_string data [Marshal.Closures] in
let send_buffer = Marshal.to_bytes data [Marshal.Closures] in
(* Gather lengths for all data *)
let lengths = Array.make nprocs 0 in
allgather_int (String.length send_buffer) lengths comm;
allgather_int (Bytes.length send_buffer) lengths comm;
(* Allocate receive buffer big enough to hold all data *)
let total_len = Array.fold_left (+) 0 lengths in
let recv_buffer = String.create total_len in
let recv_buffer = Bytes.create total_len in
(* Gather the data *)
allgather_string send_buffer recv_buffer lengths comm;
allgather_bytes send_buffer recv_buffer lengths comm;
(* Build array of results *)
let res0 = Marshal.from_string recv_buffer 0 in
let res0 = Marshal.from_bytes recv_buffer 0 in
let res = Array.make nprocs res0 in
let pos = ref 0 in
for i = 1 to nprocs - 1 do
pos := !pos + lengths.(i - 1);
res.(i) <- Marshal.from_string recv_buffer !pos
res.(i) <- Marshal.from_bytes recv_buffer !pos
done;
res

Expand Down
4 changes: 3 additions & 1 deletion opam
@@ -1,9 +1,11 @@
opam-version: "1.2"
version: "1.03"
maintainer: "xavier.leroy@inria.fr"
authors: ["Xavier Leroy"]
homepage: "https://github.com/xavierleroy/ocamlmpi"
bug-reports: "https://github.com/xavierleroy/ocamlmpi/issues"
dev-repo: "git://github.com/xavierleroy/ocamlmpi"
license: "LGPL"

build: [
[make "all" "opt"
Expand All @@ -20,4 +22,4 @@ depexts: [
[["debian"] ["mpi-default-dev"]]
[["ubuntu"] ["mpi-default-dev"]]
]

available: [[ ocaml-version >= "4.02.3" ]]
2 changes: 1 addition & 1 deletion test.ml
Expand Up @@ -141,7 +141,7 @@ let test_broadcast broadcastfun printfun data =
if myrank = 0 then begin
printf "0: broadcasting %a" printfun data; print_newline()
end;
let res = broadcastfun data 0 comm_world in
ignore (broadcastfun data 0 comm_world);
printf "%d: received %a" myrank printfun data; print_newline()

let _ =
Expand Down

0 comments on commit 9db4b5c

Please sign in to comment.