Skip to content

Commit

Permalink
cs & io: fix log receiver synchronization
Browse files Browse the repository at this point in the history
A misplaced `wrap-evt` could allow the result from `sync` on
a log receiver to be an opaque event, instead of a vector.

In other cases, a differently misplaced `wrap-evt` could also cause an
internal instance of `control-state-evt` to not be unregistered
correctly.

The solution to both problems is to add a wrapper procedure to
`control-state-evt`.

Closes #2664
  • Loading branch information
mflatt committed Oct 10, 2019
1 parent 2ad4c6f commit 067dda5
Show file tree
Hide file tree
Showing 9 changed files with 76 additions and 40 deletions.
23 changes: 23 additions & 0 deletions pkgs/racket-test/tests/racket/stress/log-receiver.rkt
@@ -0,0 +1,23 @@
#lang racket/base

(define (do-thread t) (thread t))
(set! do-thread do-thread)

(let loop ([i 0])
(unless (= i 2000000)
(when (zero? (modulo i 10000))
(printf "~s\n" i))
(define (spin-a-while)
(let loop ([j (random (add1 (modulo i 100000)))])
(unless (zero? j)
(loop (sub1 j)))))
(define s (make-log-receiver (current-logger) 'info 'send))
(define t
(do-thread
(lambda ()
(log-message (current-logger) 'info 'send "a" 1))))
(spin-a-while)
(unless (vector? (sync s))
(error "not a vector result!"))
(thread-wait t)
(loop (add1 i))))
5 changes: 3 additions & 2 deletions racket/src/io/host/bootstrap.rkt
Expand Up @@ -42,12 +42,13 @@

(define (poll-ctx-sched-info ctx) #f)

(struct control-state-evt (evt interrupt abandon retry)
(struct control-state-evt (evt wrap interrupt abandon retry)
#:property prop:evt (lambda (cse)
(nack-guard-evt
(lambda (nack)
(thread (lambda () (sync nack) ((control-state-evt-abandon cse))))
(control-state-evt-evt cse)))))
(wrap-evt (control-state-evt-evt cse)
(control-state-evt-wrap cse))))))

(define current-async-semaphore (make-parameter #f #f 'current-async-semaphore))

Expand Down
3 changes: 2 additions & 1 deletion racket/src/io/logger/receiver.rkt
Expand Up @@ -55,7 +55,8 @@
(increment-receiever-waiters! lr)
(queue-add! (queue-log-receiver-waiters lr) b)))
(values #f (control-state-evt
(wrap-evt async-evt (lambda (e) (unbox b)))
async-evt
(lambda (e) (unbox b))
(lambda ()
(queue-remove-node! (queue-log-receiver-waiters lr) n)
(decrement-receiever-waiters! lr))
Expand Down
1 change: 1 addition & 0 deletions racket/src/thread/api.rkt
Expand Up @@ -79,6 +79,7 @@
(if (evt? v)
v
(wrap-evt always-evt (lambda () v)))))
values
void
(lambda () (semaphore-post s))
void))))
Expand Down
38 changes: 18 additions & 20 deletions racket/src/thread/channel.rkt
Expand Up @@ -98,22 +98,21 @@
(current-thread/in-atomic)))
(define n (queue-add! gq (cons gw b)))
(values #f
(wrap-evt
(control-state-evt async-evt
(lambda () (queue-remove-node! gq n))
void
(lambda ()
;; Retry: get ready value or requeue
(define pw+v (queue-fremove! pq not-matching-select-waiter))
(cond
(control-state-evt async-evt
(lambda (v) (unbox b))
(lambda () (queue-remove-node! gq n))
void
(lambda ()
;; Retry: get ready value or requeue
(define pw+v (queue-fremove! pq not-matching-select-waiter))
(cond
[pw+v
(waiter-resume! (car pw+v) (void))
(set-box! b (cdr pw+v))
(values #t #t)]
[else
(set! n (queue-add! gq (cons gw b)))
(values #f #f)])))
(lambda (v) (unbox b))))]))
(values #f #f)]))))]))

;; ----------------------------------------

Expand Down Expand Up @@ -161,22 +160,21 @@
(current-thread/in-atomic)))
(define n (queue-add! pq (cons pw v)))
(values #f
(wrap-evt
(control-state-evt async-evt
(lambda () (queue-remove-node! pq n))
void
(lambda ()
;; Retry: put ready value or requeue
(define gw+b (queue-fremove! gq not-matching-select-waiter))
(cond
(control-state-evt async-evt
(lambda (v) self)
(lambda () (queue-remove-node! pq n))
void
(lambda ()
;; Retry: put ready value or requeue
(define gw+b (queue-fremove! gq not-matching-select-waiter))
(cond
[gw+b
(set-box! (cdr gw+b) v)
(waiter-resume! (car gw+b) v)
(values self #t)]
[else
(set! n (queue-add! pq (cons pw v)))
(values #f #f)])))
(lambda (v) self)))]))
(values #f #f)]))))]))

(define/who (channel-put-evt ch v)
(check who channel? ch)
Expand Down
8 changes: 8 additions & 0 deletions racket/src/thread/evt.rkt
Expand Up @@ -135,7 +135,15 @@
;; semaphore was meanwhile posted). As another example, a
;; `nack-guard-evt`'s result uses `abandon-proc` to post to the NACK
;; event.
;; Beware that it doesn't make sense to use `wrap-evt` around the
;; `control-state-evt` or the `evt` inside for an asynchronously
;; satisfied event (like the way that semaphores are implemented). The
;; event may be selected asynchronously before a wrapper on the inner
;; event is found, so that the result turns out to be an unwrapped
;; event. Or the `interrupt-proc`, etc., callbacks may not be found
;; early enough if the `control-state-evt` is wrapped.
(struct control-state-evt (evt
wrap-proc
interrupt-proc ; thunk for break/kill initiated or otherwise before `abandon-proc`
abandon-proc ; thunk for not selected, including break/kill complete
retry-proc) ; thunk for resume from break; return `(values _val _ready?)`
Expand Down
29 changes: 14 additions & 15 deletions racket/src/thread/semaphore.rkt
Expand Up @@ -182,28 +182,27 @@
;; event through a callback. Pair the event with a nack callback
;; to get back out of line.
(values #f
(wrap-evt
(control-state-evt async-evt
(lambda ()
(assert-atomic-mode)
(queue-remove-node! s n)
(when (queue-empty? s)
(set-semaphore-count! s 0))) ; allow CAS again
void
(lambda ()
;; Retry: decrement or requeue
(assert-atomic-mode)
(define c (semaphore-count s))
(cond
(control-state-evt async-evt
(lambda (v) result)
(lambda ()
(assert-atomic-mode)
(queue-remove-node! s n)
(when (queue-empty? s)
(set-semaphore-count! s 0))) ; allow CAS again
void
(lambda ()
;; Retry: decrement or requeue
(assert-atomic-mode)
(define c (semaphore-count s))
(cond
[(positive? c)
(unless peek?
(set-semaphore-count! s (sub1 c)))
(values result #t)]
[else
(set! n (queue-add! s w))
(set-semaphore-count! s -1) ; so CAS not tried for `semaphore-post`
(values #f #f)])))
(lambda (v) result)))]))
(values #f #f)]))))]))

;; Called only when it should immediately succeed:
(define (semaphore-wait/atomic s)
Expand Down
6 changes: 5 additions & 1 deletion racket/src/thread/sync.rkt
Expand Up @@ -374,9 +374,9 @@
(let loop ([sr (syncing-syncers s)]
[retries 0] ; count retries on `sr`, and advance if it's too many
[polled-all-so-far? #t])
(start-atomic)
(when (syncing-need-retry? s)
(syncing-retry! s))
(start-atomic)
(cond
[(syncing-selected s)
=> (lambda (sr)
Expand Down Expand Up @@ -473,9 +473,12 @@
(end-atomic)
(loop sr (add1 retries) polled-all-so-far?)])]
[(control-state-evt? new-evt)
(define wrap-proc (control-state-evt-wrap-proc new-evt))
(define interrupt-proc (control-state-evt-interrupt-proc new-evt))
(define abandon-proc (control-state-evt-abandon-proc new-evt))
(define retry-proc (control-state-evt-retry-proc new-evt))
(unless (eq? wrap-proc values)
(set-syncer-wraps! sr (cons wrap-proc (syncer-wraps sr))))
(unless (eq? interrupt-proc void)
(set-syncer-interrupts! sr (cons interrupt-proc (syncer-interrupts sr))))
(unless (eq? abandon-proc void)
Expand Down Expand Up @@ -726,6 +729,7 @@
;; represents the instantited attempt to sync on `evt`:
(control-state-evt
(nested-sync-evt s next orig-evt)
values
(lambda () (syncing-interrupt! s))
(lambda () (syncing-abandon! s))
(lambda () (syncing-retry! s)))))))
Expand Down
3 changes: 2 additions & 1 deletion racket/src/thread/thread.rkt
Expand Up @@ -1000,7 +1000,8 @@
(set-thread-mailbox-wakeup! t (lambda () (wakeup) (receive))))
(add-wakeup-callback!)
(values #f (control-state-evt
(wrap-evt async-evt (lambda (v) self))
async-evt
(lambda (v) self)
;; interrupt (all must be interrupted, so just install `void`):
(lambda () (set-thread-mailbox-wakeup! t void))
;; abandon:
Expand Down

0 comments on commit 067dda5

Please sign in to comment.