@@ -17,23 +17,23 @@ open Lwt.Infix
17
17
+-----------------------------------------------------------------+ *)
18
18
19
19
(* Minimum number of preemptive threads: *)
20
- let min_threads : int ref = ref 0
20
+ let min_threads : int Atomic.t = Atomic. make 0
21
21
22
22
(* Maximum number of preemptive threads: *)
23
- let max_threads : int ref = ref 0
23
+ let max_threads : int Atomic.t = Atomic. make 0
24
24
25
25
(* Size of the waiting queue: *)
26
- let max_thread_queued = ref 1000
26
+ let max_thread_queued = Atomic. make 1000
27
27
28
28
let get_max_number_of_threads_queued _ =
29
- ! max_thread_queued
29
+ Atomic. get max_thread_queued
30
30
31
31
let set_max_number_of_threads_queued n =
32
32
if n < 0 then invalid_arg " Lwt_preemptive.set_max_number_of_threads_queued" ;
33
- max_thread_queued := n
33
+ Atomic. set max_thread_queued n
34
34
35
35
(* The total number of preemptive threads currently running: *)
36
- let threads_count = ref 0
36
+ let threads_count = Atomic. make 0
37
37
38
38
(* +-----------------------------------------------------------------+
39
39
| Preemptive threads management |
@@ -102,14 +102,14 @@ let rec worker_loop worker =
102
102
task () ;
103
103
(* If there is too much threads, exit. This can happen if the user
104
104
decreased the maximum: *)
105
- if ! threads_count > ! max_threads then worker.reuse < - false ;
105
+ if Atomic. get threads_count > Atomic. get max_threads then worker.reuse < - false ;
106
106
(* Tell the main thread that work is done: *)
107
107
Lwt_unix. send_notification (Domain. self () ) id;
108
108
if worker.reuse then worker_loop worker
109
109
110
110
(* create a new worker: *)
111
111
let make_worker () =
112
- incr threads_count;
112
+ Atomic. incr threads_count;
113
113
let worker = {
114
114
task_cell = CELL. make () ;
115
115
thread = Thread. self () ;
@@ -130,7 +130,7 @@ let add_worker worker =
130
130
let get_worker () =
131
131
if not (Queue. is_empty workers) then
132
132
Lwt. return (Queue. take workers)
133
- else if ! threads_count < ! max_threads then
133
+ else if Atomic. get threads_count < Atomic. get max_threads then
134
134
Lwt. return (make_worker () )
135
135
else
136
136
(Lwt. add_task_r [@ ocaml.warning " -3" ]) waiters
@@ -139,33 +139,33 @@ let get_worker () =
139
139
| Initialisation, and dynamic parameters reset |
140
140
+-----------------------------------------------------------------+ *)
141
141
142
- let get_bounds () = (! min_threads, ! max_threads)
142
+ let get_bounds () = (Atomic. get min_threads, Atomic. get max_threads)
143
143
144
144
let set_bounds (min , max ) =
145
145
if min < 0 || max < min then invalid_arg " Lwt_preemptive.set_bounds" ;
146
- let diff = min - ! threads_count in
147
- min_threads := min;
148
- max_threads := max;
146
+ let diff = min - Atomic. get threads_count in
147
+ Atomic. set min_threads min;
148
+ Atomic. set max_threads max;
149
149
(* Launch new workers: *)
150
150
for _i = 1 to diff do
151
151
add_worker (make_worker () )
152
152
done
153
153
154
- let initialized = ref false
154
+ let initialized = Atomic. make false
155
155
156
156
let init min max _errlog =
157
- initialized := true ;
157
+ Atomic. set initialized true ;
158
158
set_bounds (min, max)
159
159
160
160
let simple_init () =
161
- if not ! initialized then begin
162
- initialized := true ;
161
+ if not ( Atomic. get initialized) then begin
162
+ Atomic. set initialized true ;
163
163
set_bounds (0 , 4 )
164
164
end
165
165
166
- let nbthreads () = ! threads_count
166
+ let nbthreads () = Atomic. get threads_count
167
167
let nbthreadsqueued () = Lwt_sequence. fold_l (fun _ x -> x + 1 ) waiters 0
168
- let nbthreadsbusy () = ! threads_count - Queue. length workers
168
+ let nbthreadsbusy () = Atomic. get threads_count - Queue. length workers
169
169
170
170
(* +-----------------------------------------------------------------+
171
171
| Detaching |
@@ -199,7 +199,7 @@ let detach f args =
199
199
(* Put back the worker to the pool: *)
200
200
add_worker worker
201
201
else begin
202
- decr threads_count;
202
+ Atomic. decr threads_count;
203
203
(* Or wait for the thread to terminates, to free its associated
204
204
resources: *)
205
205
Thread. join worker.thread
0 commit comments