Skip to content

Commit

Permalink
move batched ds into examples directory
Browse files Browse the repository at this point in the history
  • Loading branch information
koonwen committed Aug 27, 2024
1 parent 2d455ee commit 968bfb4
Show file tree
Hide file tree
Showing 11 changed files with 60 additions and 1,127 deletions.
952 changes: 0 additions & 952 deletions ds/btree.ml

This file was deleted.

5 changes: 0 additions & 5 deletions ds/dune

This file was deleted.

27 changes: 0 additions & 27 deletions ds/hashtbl.ml

This file was deleted.

104 changes: 0 additions & 104 deletions ds/set.ml

This file was deleted.

39 changes: 6 additions & 33 deletions examples/ds/batched_counter.ml
Original file line number Diff line number Diff line change
@@ -1,37 +1,10 @@
open Picos

(* Higher level concurrency primatives implemented on top of Picos *)
let parallel_for_reduce ?(n_fibers = 1) ~start ~finish ~body reduce_fn init =
let chunk_size =
let sz = (finish - start + 1) / n_fibers in
max sz 1
in
let rec work bundle s e =
if e - s < chunk_size then
let rec loop i acc =
if i > e then acc else loop (i + 1) (reduce_fn acc (body i))
in
loop (s + 1) (body s)
else
let d = s + ((e - s) / 2) in
let p =
Picos_structured.Bundle.fork_as_promise bundle (fun _ ->
work bundle s d)
in
let right = work bundle (d + 1) e in
let left = Picos_structured.Promise.await p in
reduce_fn left right
in
if finish < start then init
else
reduce_fn init
(Picos_structured.Bundle.join_after (fun bundle ->
work bundle start finish))

module Batched = struct
type t = int Atomic.t

let init ~ctx:_ = Atomic.make 0
type cfg = unit
let init ?cfg:_ () = Atomic.make 0

type _ op = Incr : unit op | Decr : unit op | Get : int op
type wrapped_op = Mk : 'a op * 'a Computation.t -> wrapped_op
Expand All @@ -41,7 +14,7 @@ module Batched = struct
Logs.info (fun m -> m "Processing batch of %d operations" len);
let start = Atomic.get t in
let delta =
parallel_for_reduce
Utils.parallel_for_reduce
~n_fibers:(Domain.recommended_domain_count () - 1)
~start:0 ~finish:(len - 1)
~body:(fun i ->
Expand All @@ -65,16 +38,16 @@ include Obatcher.Make (Batched)

let incr t =
Logs.debug (fun m -> m "Incr requested");
apply t Incr;
exec t Incr;
Logs.debug (fun m -> m "Incr completed")

let decr t =
Logs.debug (fun m -> m "Decr requested");
apply t Decr;
exec t Decr;
Logs.debug (fun m -> m "Decr completed")

let get t =
Logs.debug (fun m -> m "Got requested");
let got = apply t Get in
let got = exec t Get in
Logs.debug (fun m -> m "Got completed with %d" got);
got
34 changes: 34 additions & 0 deletions examples/ds/batched_hashtbl.ml
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
(* Flat-combining hashtable, with no fancy internal parallelism *)

open Picos

module Batched = struct
type t = (int, string) Stdlib.Hashtbl.t

type 'a op =
| Add : int * string -> unit op
| Replace : int * string -> unit op
| Remove : int -> unit op
| Find : int -> string op

type wrapped_op = Mk : 'a op * 'a Computation.t -> wrapped_op
type cfg = { random : bool option; initial_size : int }

let init ?(cfg = { random = None; initial_size = 0 }) () =
Stdlib.Hashtbl.create ?random:cfg.random cfg.initial_size

let run t (batch : wrapped_op array) =
Array.iter
(function
| Mk (Add (k, v), comp) ->
Computation.return comp (Stdlib.Hashtbl.add t k v)
| Mk (Replace (k, v), comp) ->
Computation.return comp (Stdlib.Hashtbl.replace t k v)
| Mk (Remove k, comp) ->
Computation.return comp (Stdlib.Hashtbl.remove t k)
| Mk (Find k, comp) -> Computation.return comp (Stdlib.Hashtbl.find t k))
batch
end

(* Set up implicit batching *)
include Obatcher.Make (Batched)
File renamed without changes.
10 changes: 9 additions & 1 deletion examples/ds/dune
Original file line number Diff line number Diff line change
@@ -1,3 +1,11 @@
(library
(name ds)
(public_name obatcher.ds)
(libraries picos picos.structured obatcher logs)
(synopsis "Batched Data Structures")
(modules batched_counter batched_hashtbl batched_skiplist utils))

(executable
(name run_counter)
(libraries obatcher logs logs.fmt fmt.tty unix picos.fifos picos.threaded picos.randos))
(libraries ds logs logs.fmt fmt.tty picos.threaded picos.randos picos.fifos)
(modules run_counter))
7 changes: 4 additions & 3 deletions examples/ds/run_counter.ml
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
module Counter = Batched_counter
module Counter = Ds.Batched_counter

(* Break each request into it's own fiber *)
let n_fiber_incr t n () =
Expand All @@ -14,7 +14,7 @@ let n_fiber_get t n () =
Picos_structured.Run.all thunks

let main () =
let counter = Counter.init ~ctx:() in
let counter = Counter.init () in
Picos_structured.Run.all
[
n_fiber_incr counter 10_000;
Expand Down Expand Up @@ -49,6 +49,7 @@ let () =
done
in
spawn_ndoms_with_nthreads extra extra;
Logs.info (fun m -> m "Spawning %d domains with extra %d threads per domain" extra extra);
Logs.info (fun m ->
m "Spawning %d domains with extra %d threads per domain" extra extra);
Picos_randos.run ~context main
| _ -> Printf.eprintf "Usage: %s <threaded | fifos | randos>\n" Sys.argv.(0)
File renamed without changes.
9 changes: 7 additions & 2 deletions examples/uring/burcp.ml
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
(* copy function but using uring with implicit batching enabled *)
open Cmdliner

let setup_log style_renderer level =
Expand All @@ -13,12 +14,16 @@ let scheduler_conv =
| "threaded" -> Ok Threaded
| "fifos" -> Ok Fifos
| "randos" -> Ok Randos
| _ -> Error (`Msg "Unknown option, please provide one of <threaded | fifos | randos>")
| _ ->
Error
(`Msg
"Unknown option, please provide one of <threaded | fifos | randos>")
in
let printer ppf = function
| Threaded -> Format.fprintf ppf "Threaded"
| Fifos -> Format.fprintf ppf "Fifos"
| Randos -> Format.fprintf ppf "Randos" in
| Randos -> Format.fprintf ppf "Randos"
in
Arg.conv (parser, printer)

let copy backend infile outfile () =
Expand Down

0 comments on commit 968bfb4

Please sign in to comment.