Permalink
Browse files

Lwt: do not use deprecated functions

  • Loading branch information...
vbmithr authored and klakplok committed Dec 29, 2017
1 parent 0ce7c79 commit 58ad60f38a4c4f9c084f03d04e57eef497648abd
@@ -120,7 +120,8 @@ end = struct
}

and status =
| Pending of { wakener : value tzresult Lwt.u ;
| Pending of { waiter : value tzresult Lwt.t ;
wakener : value tzresult Lwt.u ;
mutable waiters : int ;
param : param }
| Found of value
@@ -216,20 +217,20 @@ end = struct
| exception Not_found -> begin
let waiter, wakener = Lwt.wait () in
Memory_table.add s.memory k
(Pending { wakener ; waiters = 1 ; param }) ;
(Pending { waiter ; wakener ; waiters = 1 ; param }) ;
Scheduler.request s.scheduler peer k ;
wrap s k ?timeout waiter
end
| Pending data ->
Scheduler.request s.scheduler peer k ;
data.waiters <- data.waiters + 1 ;
wrap s k ?timeout (Lwt.waiter_of_wakener data.wakener)
wrap s k ?timeout data.waiter
| Found v -> return v
end
| Pending data ->
Scheduler.request s.scheduler peer k ;
data.waiters <- data.waiters + 1 ;
wrap s k ?timeout (Lwt.waiter_of_wakener data.wakener)
wrap s k ?timeout data.waiter
| Found v -> return v

let prefetch s ?peer ?timeout k param =
@@ -19,6 +19,7 @@ type t = {
timeout: timeout ;
bootstrap_threshold: int ;
mutable bootstrapped: bool ;
bootstrapped_waiter: unit Lwt.t ;
bootstrapped_wakener: unit Lwt.u ;
valid_block_input: State.Block.t Lwt_watcher.input ;
global_valid_block_input: State.Block.t Lwt_watcher.input ;
@@ -130,7 +131,7 @@ let rec create
let valid_block_input = Lwt_watcher.create_input () in
let new_head_input = Lwt_watcher.create_input () in
let canceler = Lwt_canceler.create () in
let _, bootstrapped_wakener = Lwt.wait () in
let bootstrapped_waiter, bootstrapped_wakener = Lwt.wait () in
let nv = {
db ; net_state ; net_db ; block_validator ;
prevalidator ;
@@ -139,6 +140,7 @@ let rec create
new_head_input ;
parent ; max_child_ttl ; child = None ;
bootstrapped = (bootstrap_threshold <= 0) ;
bootstrapped_waiter ;
bootstrapped_wakener ;
bootstrap_threshold ;
active_peers =
@@ -336,8 +338,8 @@ let validate_block nv ?(force = false) hash block operations =
else
failwith "Fitness too low"

let bootstrapped { bootstrapped_wakener } =
Lwt.protected (Lwt.waiter_of_wakener bootstrapped_wakener)
let bootstrapped { bootstrapped_waiter } =
Lwt.protected bootstrapped_waiter

let valid_block_watcher { valid_block_input } =
Lwt_watcher.create_stream valid_block_input
@@ -14,7 +14,7 @@ exception Closed
type 'a t =
{ mutable data : 'a option ;
mutable closed : bool ;
mutable put_waiter : unit Lwt.u option ;
mutable put_waiter : (unit Lwt.t * unit Lwt.u) option ;
}

let create () =
@@ -26,9 +26,9 @@ let create () =
let notify_put dropbox =
match dropbox.put_waiter with
| None -> ()
| Some w ->
| Some (_waiter, wakener) ->
dropbox.put_waiter <- None ;
Lwt.wakeup_later w ()
Lwt.wakeup_later wakener ()

let put dropbox elt =
if dropbox.closed then
@@ -48,14 +48,14 @@ let close dropbox =

let wait_put ~timeout dropbox =
match dropbox.put_waiter with
| Some w ->
| Some (waiter, _wakener) ->
Lwt.choose [
timeout ;
Lwt.protected (Lwt.waiter_of_wakener w)
Lwt.protected waiter
]
| None ->
let waiter, wakener = Lwt.wait () in
dropbox.put_waiter <- Some wakener ;
dropbox.put_waiter <- Some (waiter, wakener) ;
Lwt.choose [
timeout ;
Lwt.protected waiter ;
@@ -65,58 +65,58 @@ end
type trigger =
| Absent
| Present
| Waiting of unit Lwt.u
| Waiting of unit Lwt.t * unit Lwt.u

let trigger () : (unit -> unit) * (unit -> unit Lwt.t) =
let state = ref Absent in
let trigger () =
match !state with
| Absent -> state := Present
| Present -> ()
| Waiting u ->
| Waiting (_waiter, wakener) ->
state := Absent;
Lwt.wakeup u ()
Lwt.wakeup wakener ()
in
let wait () =
match !state with
| Absent ->
let waiter, u = Lwt.wait () in
state := Waiting u;
let waiter, wakener = Lwt.wait () in
state := Waiting (waiter, wakener) ;
waiter
| Present ->
state := Absent;
Lwt.return_unit
| Waiting u ->
Lwt.waiter_of_wakener u
| Waiting (waiter, _wakener) ->
waiter
in
trigger, wait

type 'a queue =
| Absent
| Present of 'a list ref
| Waiting of 'a list Lwt.u
| Waiting of ('a list Lwt.t * 'a list Lwt.u)

let queue () : ('a -> unit) * (unit -> 'a list Lwt.t) =
let state = ref Absent in
let queue v =
match !state with
| Absent -> state := Present (ref [v])
| Present r -> r := v :: !r
| Waiting u ->
| Waiting (_waiter, wakener) ->
state := Absent;
Lwt.wakeup u [v]
Lwt.wakeup wakener [v]
in
let wait () =
match !state with
| Absent ->
let waiter, u = Lwt.wait () in
state := Waiting u;
let waiter, wakener = Lwt.wait () in
state := Waiting (waiter, wakener) ;
waiter
| Present r ->
state := Absent;
Lwt.return (List.rev !r)
| Waiting u ->
Lwt.waiter_of_wakener u
| Waiting (waiter, _wakener) ->
waiter
in
queue, wait

0 comments on commit 58ad60f

Please sign in to comment.