|
1 | 1 | # obatcher
|
2 |
| -**obatcher** OCaml framework for building thread-safe "Implicitly |
3 |
| -Batched Concurrent" data structures that are by design, performant and |
4 |
| -easy to reason about. |
5 |
| - |
6 |
| -# Motivation |
7 |
| - |
8 |
| -### What is batch-processing? |
9 |
| -Batch processing is a technique typically used for increasing data |
10 |
| -throughput of IO-bound workloads. As the name indicates, systems that |
11 |
| -leverage batch-processing are structured around collecting a number of |
12 |
| -requests before processing the whole batch at one go. |
13 |
| - |
14 |
| -This concept of "processing a batch at one go" is not to be confused |
15 |
| -with parallel systems in which multiple threads make requests to some |
16 |
| -shared resource which can be handled in parallel. |
17 |
| - |
18 |
| -> A helpful analogy is to imagine that batch-processing as a singular |
19 |
| -> waiter collating all the orders from the diners before passing it on |
20 |
| -> to the kitchen. In contrast, multi-threaded systems are multiple |
21 |
| -> waiters going back and forth passing orders to the kitchen but each |
22 |
| -> time with only one order. |
23 |
| -
|
24 |
| -### Where is it used? |
25 |
| -Typically, batch-processing is implemented in database systems that |
26 |
| -must service many small IO requests. In such cases, the IO bandwidth |
27 |
| -of the system ends up being under-utilized with many wasted cycles |
28 |
| -because of the latency for each request. Waiting to collect a batch of |
29 |
| -requests to send at one shot effectively amortizes the cost of |
30 |
| -performing each singular request. [Nagles's |
31 |
| -algorithm](https://en.wikipedia.org/wiki/Nagle%27s_algorithm) which is |
32 |
| -implemented in efficient TCP/IP networks is another use case of |
33 |
| -batching to improve overall throughput. |
34 |
| - |
35 |
| -Batch-processing has recently found it's way into other use cases. |
36 |
| -The new asynchronous interface in Linux - **io-uring** uses |
37 |
| -batch-processing help applications to reduce the overhead of system |
38 |
| -calls. Uring allows applications to queue up multiple IO requests for |
39 |
| -the kernel to perform and post them at one go with a singulare system |
40 |
| -call. |
41 |
| - |
42 |
| -Furthermore, new research suggests that batch-processing is a useful |
43 |
| -design pattern for structuring concurrent data structures. "Batch |
44 |
| -parallel data structures (BPDS)" is a subset of thread-safe concurrent |
45 |
| -structures. Unlike typical concurrent structures that use locks or |
46 |
| -careful ordering to prevent data races, BPDS specify that only one |
47 |
| -batch runs at a time. Synchronization of parallel operations are |
48 |
| -performed upon entry to the BPDS. Requests that are made when the BPDS |
49 |
| -is busy are sent to form a batch that runs next. |
| 2 | +OCaml design framework for building batch-parallel "services". Based |
| 3 | +on ["Concurrent structures made |
| 4 | +easy"](https://www.arxiv.org/abs/2408.13779) study. Whilst the paper |
| 5 | +was written with a focus on batched **data structures**, it is |
| 6 | +discovered that the mechanism can be generalized to work on any type |
| 7 | +of "service", where a "service" refers to any modular component of |
| 8 | +software that we interact with via API's. |
| 9 | + |
| 10 | +### Contents |
| 11 | +* [Description](#description) |
| 12 | +* [Benefits of batch-parallel service design](#benefits-of-batch-parallel-service-design) |
| 13 | +* [Example usage](#example-usage) |
| 14 | +* [Batching in the wild](#batching-in-the-wild) |
| 15 | +* [Results](#results) |
| 16 | + |
| 17 | +# Description |
| 18 | +At it's core, **obatcher** is primarily an approach to designing |
| 19 | +efficient concurrent services. The key observation being that |
| 20 | +_"processing a batch of a priori known operations in parallel is |
| 21 | +easier than optimising performance for a stream of arbitrary |
| 22 | +asynchronous concurrent requests"._ However, designing such services |
| 23 | +with API's that expects users to pass in an explicit batch (e.g. array |
| 24 | +or list) of operations is unergonomic and requires systems to be |
| 25 | +designed around this pattern. **obatcher** solves this by cleverly |
| 26 | +wrapping explicitly batched services and then use the scheduler to |
| 27 | +implicitly batch operations under the hood before passing it to the |
| 28 | +service. From the clients perspective, interfacing with the batched |
| 29 | +service looks like any other plain atomic request service. |
| 30 | + |
| 31 | +## Benefits of batch-parallel service design |
| 32 | +* [Picos scheduler agnostic](#picos-scheduler-agnostic) |
| 33 | +* [Thread-safe](#thread-safe) |
| 34 | +* [Batch optimization & Incremental parallelism](#batch-optimization-&-incremental-parallelism) |
| 35 | +* [Easy to test and reason about](#easy-to-test-and-reason-about) |
| 36 | + |
| 37 | +### Picos scheduler agnostic |
| 38 | +**obatcher** depends on scheduler primatives to perform |
| 39 | +transformations on services. As a consequence, it suffers from |
| 40 | +portability issues across different schedulers. To account for this, |
| 41 | +**obatcher** is built on top of |
| 42 | +[picos](https://www.github.com/polytipic/picos). Picos provides the |
| 43 | +low-level building blocks for writing schedulers. By using the same |
| 44 | +picos primatives to implement **obatcher**, any picos scheduler |
| 45 | +becomes compatible with **obatcher**. |
| 46 | + |
| 47 | +### Thread-safe |
| 48 | +A defining invariant of batched services is that only **a single batch |
| 49 | +of operations runs at any time**. To guarantee this, **obatcher** adds |
| 50 | +an efficient lock-free queue in front of the service to collect |
| 51 | +operations in batches before submitting it to the service. This design |
| 52 | +takes inspiration from |
| 53 | +[**Flat-combining**](https://people.csail.mit.edu/shanir/publications/Flat%20Combining%20SPAA%2010.pdf). The |
| 54 | +research shows that this synchronization method provides better |
| 55 | +scaling properties as compared to coarse-grained locking. |
| 56 | + |
| 57 | +### Batch optimization & Incremental parallelism |
| 58 | +The benefit of taking a batch of operations as input is that this |
| 59 | +enables a whole slew optimizations based on the spread of |
| 60 | +operations. Furthermore, pre-analysis of operations can better advise |
| 61 | +how to add parallelism which can be evolved overtime rather than |
| 62 | +having to guarantee safety across all operations to the service. |
| 63 | + |
| 64 | +### Easy to test and reason about |
| 65 | +Because services only handle single batches at any time, this makes it |
| 66 | +fairly easy to design tests that are reproducable just by fixing the |
| 67 | +same input batch. |
| 68 | + |
| 69 | +# Example usage |
| 70 | +Concretely, **obatcher** library consist of 2 things. A signature of |
| 71 | +service and a functor against that signature. The service signature is |
| 72 | +simple but crucially expects users to implement their services to |
| 73 | +handle batches of operations. Services may then perform optimizations |
| 74 | +based on the spread of operations in the batch and furthermore leverge |
| 75 | +parallelism via primatives provided by the underlying picos scheduler |
| 76 | +to speed up processing. |
| 77 | + |
| 78 | +```ocaml |
| 79 | +module type Service = sig |
| 80 | + type t |
| 81 | + type cfg |
| 82 | +
|
| 83 | + type 'a op |
| 84 | + type wrapped_op = |
| 85 | + | Mk : 'a op * 'a Picos.Computation.t -> wrapped_op |
| 86 | + (** [wrapped_op] binds the operation on the service with it's |
| 87 | + corresponding suspended continuation to run after its |
| 88 | + completion. *) |
| 89 | +
|
| 90 | + val init : ?cfg:cfg -> unit -> t |
| 91 | + val run : t -> wrapped_op array -> unit |
| 92 | +end |
| 93 | +``` |
| 94 | + |
| 95 | +A simple example of a counter that abides by this signature is |
| 96 | + |
| 97 | +```ocaml |
| 98 | +module BatchedCounter = struct |
| 99 | + type t = int ref |
| 100 | + type cfg = unit |
| 101 | +
|
| 102 | + let init ?cfg:_ () = ref 0 |
| 103 | +
|
| 104 | + type _ op = Incr : unit op | Decr : unit op | Get : int op |
| 105 | + type wrapped_op = Mk : 'a op * 'a Computation.t -> wrapped_op |
50 | 106 |
|
51 |
| -Additionally, it turns out that recieving a "batch" of a-priori known |
52 |
| -operations brings forth more opportunities to bake parallelism into |
53 |
| -batch handling. Furthermore various tricks of batch reordering or |
54 |
| -batch optimizations can be used to increase throughput whilst |
55 |
| -preserving sequential consistency. |
56 |
| - |
57 |
| -### Why aren't "batched concurrent data structures" popular? |
58 |
| -Potentially one of the big reasons why there has been not much uptake |
59 |
| -is that batching in itself is extra overhead. Unlike in IO-bound |
60 |
| -workloads, it's not obvious that collecting a set of operations will |
61 |
| -have pay-off in the speedup of batched operations. Much more than |
62 |
| -that, restructuring programs to explicitly batch their requests so |
63 |
| -that they can interface with BPDS is costly too. |
64 |
| - |
65 |
| -### Our solution |
66 |
| -The **BATCHER** paper made some initial steps toward exemplifying a |
67 |
| -system that could implicitly batch requests in order to preserve the |
68 |
| -same atomic data structure interface and have batching work under the |
69 |
| -hood. This however, required adding support directly into the |
70 |
| -programming language's runtime scheduler. |
71 |
| - |
72 |
| -In our library **obatcher**, we leverage OCaml's native **effects** |
73 |
| -system through **Picos**, an interoperable effects-based concurrency |
74 |
| -library. This allows us to cleanly separate our puzzle into 3 distinct |
75 |
| -pieces. The batched data structure, the scheduler and support for |
76 |
| -implicit batching. In practical application, batched data structure |
77 |
| -designers implements their BPDS against the interface provided by |
78 |
| -**obatcher** and can tap on the underlying Picos API's to introduce |
79 |
| -parallelism and concurrency within their batch-processing. Separately, |
80 |
| -the scheduler can also be implemented from scratch as long as it is |
81 |
| -written to be Picos-compatible. Finally, **obatcher** provides an easy |
82 |
| -to use functor to hook in implicit batching to their data structure. |
83 |
| - |
84 |
| -Together, users have efficient BPDS that interfaces like any atomic |
85 |
| -concurrent data structure. Along with the ability to swap in any |
86 |
| -scheduler that is Picos compliant. |
87 |
| - |
88 |
| -> Our library here was initially targeted at data structures but |
89 |
| -> implicit batching goes beyond just this. As mentioned, |
90 |
| -> batch-processing shows quick and easy wins in situations that are |
91 |
| -> IO-bound. Our DS_sig interface is generic enough to support any |
92 |
| -> producer-consumer interaction model whereby the underlying consumer |
93 |
| -> can take advantage of batching. |
94 |
| -
|
95 |
| -### Results |
96 |
| -Our approach here is new with few benchmarks. Though as an immediate |
97 |
| -win, the ***Flat-combiner** paper which essentially studies the |
98 |
| -cooperative batching mechanism to synchronize parallel requests to the |
99 |
| -underlying data structure (much like what we use in |
100 |
| -BPDS). Demonstrates that having this thin layer in front of the data |
101 |
| -structure which then processes the requests sequentially, scales much |
102 |
| -better than systems where threads race to hold the lock. In our own |
103 |
| -microbenchmarking validates this as our batched skip-list shows to |
104 |
| -have better performance than that of a coarse grained locked version |
105 |
| -but is unable to overcome the lockfree skiplist. |
106 |
| - |
107 |
| -## What are Batched Concurrent Data Structures (BCDS)? |
108 |
| -BCDS are data structures that are expected to _recieve_ and |
109 |
| -_processes_ all operations in "batches", whereby each "batch" can be |
110 |
| -some $n > 0$ number of operations. |
111 |
| - |
112 |
| -In practice, This means that BCDS will have a single entry point |
113 |
| -function which takes in a "batch" which can be encoded as a List, Set, |
114 |
| -Array, etc... |
115 |
| - |
116 |
| -``` ocaml |
117 |
| -type ds |
118 |
| -type op |
119 |
| -val run : ds -> sched_ctx -> op array -> unit |
| 107 | + let run (t : t) (ops : wrapped_op array) = |
| 108 | + Array.iter (function |
| 109 | + | Mk (Incr, comp) -> incr t; Computation.return comp () |
| 110 | + | Mk (Decr, comp) -> decr t; Computation.return comp () |
| 111 | + | Mk (Get, comp) -> Computation.return comp !t) |
| 112 | +end |
120 | 113 | ```
|
121 | 114 |
|
122 |
| -is the application of **Batch-parallelism** which is driven by |
123 |
| -the idea that |
| 115 | +The issue with this counter is that users now need to form explicit |
| 116 | +batches of requests. This can become incredibly complicated when |
| 117 | +software grows and is also a serious balancing act between optimizing |
| 118 | +for latency or for throughput. For that reason, we provide a functor |
| 119 | +that composes over your batched services to make batching work |
| 120 | +implicitly and return back to interfacing with the service with |
| 121 | +individual operations. |
| 122 | + |
| 123 | +```ocaml |
| 124 | +module Make : functor (S : Service) -> sig |
| 125 | + type t |
| 126 | + val init : ?cfg:S.cfg -> unit -> t |
| 127 | +
|
| 128 | + val exec : t -> 'a S.op -> 'a |
| 129 | + (** [exec t op] is the API call for a singular operation on the |
| 130 | + service with operations being automatically batched before |
| 131 | + passed to the service *) |
| 132 | +end |
| 133 | +
|
| 134 | +include Obatcher.Make (BatchedCounter) |
| 135 | +
|
| 136 | +let incr t = exec t Incr |
| 137 | +let decr t = exec t Decr |
| 138 | +let get t = exec t Get |
| 139 | +``` |
| 140 | + |
| 141 | +Now running your program in a Picos scheduler, you have a thread-safe |
| 142 | +concurrent counter which synchronizes parallel requests and batches |
| 143 | +them before submitting them to the counter. Our batched counter is not |
| 144 | +very smart, it just processes requests in order like a |
| 145 | +**Flat-combiner**. To demonstrate some types of optimizations that we |
| 146 | +could do, we first realize that sequential consistency is preserved |
| 147 | +even if operations are reordered. In particular, we can say that all |
| 148 | +`Get` requests are processed at the beginning of the batch. For `Incr` |
| 149 | +and `Decr`, these operations are commutative which means we can use a |
| 150 | +parallel-for-reduce to gather the total change to the counter. As such |
| 151 | +we now have: |
| 152 | + |
| 153 | +```ocaml |
| 154 | + let run (t : t) (ops : wrapped_op array) = |
| 155 | + let len = Array.length ops in |
| 156 | + let start = !t in |
| 157 | + let delta = |
| 158 | + Utils.parallel_for_reduce |
| 159 | + ~n_fibers:(Domain.recommended_domain_count () - 1) |
| 160 | + ~start:0 ~finish:(len - 1) |
| 161 | + ~body:(fun i -> |
| 162 | + match ops.(i) with |
| 163 | + | Mk (Incr, comp) -> |
| 164 | + Computation.return comp (); |
| 165 | + 1 |
| 166 | + | Mk (Decr, comp) -> |
| 167 | + Computation.return comp (); |
| 168 | + -1 |
| 169 | + | Mk (Get, comp) -> |
| 170 | + Computation.return comp start; |
| 171 | + 0) |
| 172 | + ( + ) 0 |
| 173 | + in |
| 174 | + t := (start + delta) |
| 175 | +
|
| 176 | +``` |
124 | 177 |
|
125 |
| -> efficiently processing a batch of a priori known operations in parallel is easier than optimising performance for a stream of arbitrary asynchronous concurrent requests. |
| 178 | +# Batching in the wild |
| 179 | +Databases commonly use batching to service many small IO requests. In |
| 180 | +such cases, the IO bandwidth of the system ends up being |
| 181 | +under-utilized with many wasted cycles because of the latency for each |
| 182 | +request. Waiting to collect a batch of requests to send at one shot |
| 183 | +effectively amortizes the cost of performing each singular |
| 184 | +request. |
| 185 | + |
| 186 | +[Nagles's |
| 187 | +algorithm](https://en.wikipedia.org/wiki/Nagle%27s_algorithm) is the |
| 188 | +network equivalent of request batching. The algorithm solves the small |
| 189 | +packet problem by batching packets before sending them out. This is |
| 190 | +used in many efficient TCP/IP networks. |
| 191 | + |
| 192 | +With spectre and meltdown mitigations, the cost of each syscall |
| 193 | +context switch has increased. The new asynchronous interface in |
| 194 | +Linux - **io-uring** uses batch-processing help applications to reduce |
| 195 | +the overhead of system calls. Uring allows applications to queue up |
| 196 | +multiple IO requests for the kernel to perform and post them at one go |
| 197 | +with a singular system call. See [examples/uring](examples/uring) for |
| 198 | +an example of how we can use obatcher to wrap uring to get implicit |
| 199 | +batching. |
| 200 | + |
| 201 | +**obatcher** is based on "batch parallel data structures |
| 202 | +(BPDS)". Unlike typical concurrent structures that use locks or |
| 203 | +careful ordering to prevent data races, BPDS specify that only one |
| 204 | +batch runs at a time. Synchronization of parallel operations are |
| 205 | +performed upon entry to the BPDS. Requests that are made when the BPDS |
| 206 | +is busy are sent to form a batch that runs next. |
126 | 207 |
|
127 |
| -[paper]((https://dl.acm.org/doi/10.1145/2555243.2555284)** which is a |
128 |
| -novel) |
| 208 | +# Results |
| 209 | +Our approach here is new with unfortunately few benchmarks. However, |
| 210 | +**obatcher** is designed almost identically to the one described in |
| 211 | +["Concurrent structures made |
| 212 | +easy"](https://www.arxiv.org/abs/2408.13779). You can see from the |
| 213 | +results in the paper that batched structures scales much better than |
| 214 | +those where threads race for mutual exclusion. |
0 commit comments