Skip to content

Commit

Permalink
cs & thread: fix problems with future-shutdown synchronization
Browse files Browse the repository at this point in the history
Relevant to #2725
  • Loading branch information
mflatt committed Jul 18, 2019
1 parent 91cdc06 commit e4c6a25
Show file tree
Hide file tree
Showing 4 changed files with 120 additions and 33 deletions.
37 changes: 37 additions & 0 deletions pkgs/racket-test/tests/future/shutdown-stress.rkt
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
#lang racket/base
(require racket/future
racket/place)

;; Check shutdown of future threads by exiting a place and via
;; `custodian-shutdown-all`.

(define (go)
(place pch (run)))

(define (run)
(define c (make-custodian))
(define f #f)

(define loop? ((random) . < . 0.2))

(parameterize ([current-custodian c])
(void
(thread (lambda ()
(set! f (future (lambda ()
(if loop?
(let loop () (loop))
10))))))))

(sync (system-idle-evt))
(sleep 0.1)

(when (zero? (random 2))
(custodian-shutdown-all c)))

(module+ main
(for ([i 30])
(printf "ok ~a\n" (current-seconds))
(place-wait (go))))

(module+ test
(require (submod ".." main)))
29 changes: 27 additions & 2 deletions racket/src/thread/custodian-object.rkt
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,16 @@

(provide (struct-out custodian)
create-custodian

custodian-shut-down?
custodian-shut-down?/other-pthread
set-custodian-shut-down!

initial-place-root-custodian
root-custodian)

(struct custodian (children ; weakly maps maps object to callback
[shut-down? #:mutable]
shut-down?-box ; box of boolean
[shutdown-sema #:mutable]
[need-shutdown #:mutable] ; queued asynchronous shutdown: #f, 'needed, or 'needed/sent-wakeup
[parent-reference #:mutable]
Expand All @@ -22,7 +27,7 @@

(define (create-custodian parent)
(custodian (make-weak-hasheq)
#f ; shut-down?
(box #f) ; shut-down?-box
#f ; shutdown semaphore
#f ; need shutdown?
#f ; parent reference
Expand All @@ -34,6 +39,26 @@
#f ; immediate limit
#f)) ; sync-futures?

;; Call only from a place's main pthread, and only a
;; place's main thread should change the box value.
(define (custodian-shut-down? c)
(unbox* (custodian-shut-down?-box c)))

;; Call only from a place's main pthread
(define (set-custodian-shut-down! c)
(unless (box-cas! (custodian-shut-down?-box c) #f #t)
(set-custodian-shut-down! c)))

;; Call from other pthreads to make sure they synchronize
;; enough with the place's main pthread:
(define (custodian-shut-down?/other-pthread c)
(cond
[(box-cas! (custodian-shut-down?-box c) #f #f)
#f]
[(box-cas! (custodian-shut-down?-box c) #t #t)
#t]
[else (custodian-shut-down?/other-pthread c)]))

(define initial-place-root-custodian (create-custodian #f))

(define-place-local root-custodian initial-place-root-custodian)
2 changes: 1 addition & 1 deletion racket/src/thread/custodian.rkt
Original file line number Diff line number Diff line change
Expand Up @@ -265,7 +265,7 @@
;; In atomic mode
(define (do-custodian-shutdown-all c)
(unless (custodian-shut-down? c)
(set-custodian-shut-down?! c #t)
(set-custodian-shut-down! c)
(when (custodian-sync-futures? c)
(futures-sync-for-custodian-shutdown))
(for ([(child callback) (in-hash (custodian-children c))])
Expand Down
85 changes: 55 additions & 30 deletions racket/src/thread/future.rkt
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,8 @@
(internal-error "not running in a future")))

;; called with lock on f held;
;; in a non-main pthread, caller is responsible for logging 'end-work
;; in a non-main pthread, caller is responsible for logging 'end-work;
;; in a non-mail thread, decrements `(current-atomic)` just before starting thunk
(define (run-future f #:was-blocked? [was-blocked? #f])
(set-future*-state! f 'running)
(define thunk (future*-thunk f))
Expand Down Expand Up @@ -123,7 +124,9 @@
;; we only need to handle success
(call-with-values (lambda ()
(call-with-continuation-prompt
thunk
(lambda ()
(current-atomic (sub1 (current-atomic)))
(thunk))
future-start-prompt-tag
(lambda args (void))))
(lambda results
Expand Down Expand Up @@ -360,14 +363,15 @@
[futures-head #:mutable]
[futures-tail #:mutable]
mutex ; guards futures chain; see "future-lock.rkt" for discipline
cond) ; signaled when chain goes from empty to non-empty
cond ; signaled when chain goes from empty to non-empty
ping-cond)
#:authentic)

(struct worker (id
[pthread #:mutable]
current-future-box ; reports current future (for access external to pthread)
[die? #:mutable]
sync-state) ; box used to sync shutdowns: 'idle, 'running, or 'pending
[ping #:mutable]) ; box set to #t when the thread should check in with scheduler
#:authentic)

(define current-scheduler
Expand All @@ -380,7 +384,7 @@
#f ; pthread
(box #f) ; current-future-box
#f ; die?
(box 'idle)))
(box #f)))

;; called in a Racket thread
(define (maybe-start-scheduler)
Expand All @@ -391,6 +395,7 @@
#f ; futures-head
#f ; futures-tail
(host:make-mutex)
(host:make-condition)
(host:make-condition)))
(current-scheduler s)
(define workers
Expand All @@ -407,7 +412,6 @@
(host:mutex-acquire (scheduler-mutex s))
(for ([w (in-list (scheduler-workers s))])
(set-worker-die?! w #t))
(host:condition-broadcast (scheduler-cond s))
(host:mutex-release (scheduler-mutex s))
(futures-sync-for-shutdown)
(current-scheduler #f)))
Expand Down Expand Up @@ -507,14 +511,10 @@
(current-future 'worker)
(host:mutex-acquire (scheduler-mutex s))
(let loop ()
(keep-trying
(or (box-cas! (worker-sync-state w) 'idle 'running)
(box-cas! (worker-sync-state w) 'running 'running)
(box-cas! (worker-sync-state w) 'pending 'running)))
(check-in w)
(cond
[(worker-die? w) ; worker was killed
(host:mutex-release (scheduler-mutex s))
(box-cas! (worker-sync-state w) 'running 'dead)]
(host:mutex-release (scheduler-mutex s))]
[(scheduler-futures-head s)
=> (lambda (f)
(deschedule-future f)
Expand All @@ -527,9 +527,6 @@
(loop))]
[else
;; wait for work
(keep-trying
(or (box-cas! (worker-sync-state w) 'pending 'idle)
(box-cas! (worker-sync-state w) 'running 'idle)))
(host:condition-wait (scheduler-cond s) (scheduler-mutex s))
(loop)])))))
(set-worker-pthread! w th))
Expand All @@ -540,7 +537,7 @@
;; because we may have transitioned from 'pending to
;; 'running without an intervening check
(cond
[(custodian-shut-down? (future*-custodian f))
[(custodian-shut-down?/other-pthread (future*-custodian f))
(set-future*-state! f 'blocked)
(on-transition-to-unfinished)
(lock-release (future*-lock f))]
Expand All @@ -559,16 +556,25 @@
;; still supposed to run.
;; We take advantage of `current-atomic`, which would otherwise
;; be unused, to disable interruptions.
(define e (make-engine (lambda () (run-future f))
(define e (make-engine (lambda ()
;; decrements `(current-atomic)`
(run-future f))
future-scheduler-prompt-tag
void
break-enabled-default-cell
#t))
(current-atomic (add1 (current-atomic)))
(let loop ([e e])
(e TICKS
(lambda ()
;; Check whether the main pthread wants to know we're here
(when (and (zero? (current-atomic))
(worker-pinged? w))
(host:mutex-acquire (scheduler-mutex (current-scheduler)))
(check-in w)
(host:mutex-release (scheduler-mutex (current-scheduler))))
;; Check that the future should still run
(when (and (custodian-shut-down? (future*-custodian f))
(when (and (custodian-shut-down?/other-pthread (future*-custodian f))
(zero? (current-atomic)))
(lock-acquire (future*-lock f))
(set-future*-state! f #f)
Expand All @@ -589,18 +595,37 @@
;; have had a chance to notice a custodian shutdown or a
;; future-scheduler shutdown.
;;
;; Move each 'running worker into the 'pending state:
(for ([w (in-list (scheduler-workers (current-scheduler)))])
(box-cas! (worker-sync-state w) 'running 'pending))
;; A worker that transitions from 'pending to 'running or 'idle
;; is guaranteed to not run a future chose custodian is
;; shutdown or run any future if the worker is terminated
(for ([w (in-list (scheduler-workers (current-scheduler)))])
(define bx (worker-sync-state w))
(let loop ()
(when (box-cas! bx 'pending 'pending)
(host:sleep 0.001) ; not much alternative to spinning
(loop)))))
;; Assert: all works have `ping` as #f.
(define s (current-scheduler))
(host:mutex-acquire (scheduler-mutex s))
(for ([w (in-list (scheduler-workers s))])
;; Although `(worker-ping w) is #f, a worker pthread may be
;; running `(worker-pinged? w)`, so we need a retry loop here
(let retry ()
(unless (box-cas! (worker-ping w) #f #t)
(retry))))
;; Assert: all workers have `ping` as #t.
;; Wake up idle threads so they check in:
(host:condition-broadcast (scheduler-cond s))
;; When a worker sets `ping` to #f, they must broadcast
;; a wakeup for the following loop's benefit
(let loop ()
(host:condition-wait (scheduler-ping-cond s) (scheduler-mutex s))
(when (for/or ([w (in-list (scheduler-workers s))])
(unbox (worker-ping w)))
(loop)))
;; Assert: all workers have `ping` as #f.
(host:mutex-release (scheduler-mutex s)))

;; lock-free synchronization to check whether the box content is #f
(define (worker-pinged? w)
(box-cas! w #f #f))

;; called with scheduler lock
(define (check-in w)
(when (unbox (worker-ping w))
(set-box! (worker-ping w) #f)
(host:condition-broadcast (scheduler-ping-cond (current-scheduler)))))

;; ----------------------------------------

Expand Down

0 comments on commit e4c6a25

Please sign in to comment.