diff --git a/pkgs/racket-test/tests/racket/stress/log-receiver.rkt b/pkgs/racket-test/tests/racket/stress/log-receiver.rkt new file mode 100644 index 00000000000..6fc78e0ff5c --- /dev/null +++ b/pkgs/racket-test/tests/racket/stress/log-receiver.rkt @@ -0,0 +1,23 @@ +#lang racket/base + +(define (do-thread t) (thread t)) +(set! do-thread do-thread) + +(let loop ([i 0]) + (unless (= i 2000000) + (when (zero? (modulo i 10000)) + (printf "~s\n" i)) + (define (spin-a-while) + (let loop ([j (random (add1 (modulo i 100000)))]) + (unless (zero? j) + (loop (sub1 j))))) + (define s (make-log-receiver (current-logger) 'info 'send)) + (define t + (do-thread + (lambda () + (log-message (current-logger) 'info 'send "a" 1)))) + (spin-a-while) + (unless (vector? (sync s)) + (error "not a vector result!")) + (thread-wait t) + (loop (add1 i)))) diff --git a/racket/src/io/host/bootstrap.rkt b/racket/src/io/host/bootstrap.rkt index 756cdcf0c66..56824d84159 100644 --- a/racket/src/io/host/bootstrap.rkt +++ b/racket/src/io/host/bootstrap.rkt @@ -42,12 +42,13 @@ (define (poll-ctx-sched-info ctx) #f) -(struct control-state-evt (evt interrupt abandon retry) +(struct control-state-evt (evt wrap interrupt abandon retry) #:property prop:evt (lambda (cse) (nack-guard-evt (lambda (nack) (thread (lambda () (sync nack) ((control-state-evt-abandon cse)))) - (control-state-evt-evt cse))))) + (wrap-evt (control-state-evt-evt cse) + (control-state-evt-wrap cse)))))) (define current-async-semaphore (make-parameter #f #f 'current-async-semaphore)) diff --git a/racket/src/io/logger/receiver.rkt b/racket/src/io/logger/receiver.rkt index 127d985a62f..2e455931940 100644 --- a/racket/src/io/logger/receiver.rkt +++ b/racket/src/io/logger/receiver.rkt @@ -55,7 +55,8 @@ (increment-receiever-waiters! lr) (queue-add! (queue-log-receiver-waiters lr) b))) (values #f (control-state-evt - (wrap-evt async-evt (lambda (e) (unbox b))) + async-evt + (lambda (e) (unbox b)) (lambda () (queue-remove-node! (queue-log-receiver-waiters lr) n) (decrement-receiever-waiters! lr)) diff --git a/racket/src/thread/api.rkt b/racket/src/thread/api.rkt index 4037e1dd5f2..61b703830a9 100644 --- a/racket/src/thread/api.rkt +++ b/racket/src/thread/api.rkt @@ -79,6 +79,7 @@ (if (evt? v) v (wrap-evt always-evt (lambda () v))))) + values void (lambda () (semaphore-post s)) void)))) diff --git a/racket/src/thread/channel.rkt b/racket/src/thread/channel.rkt index 77f866414ea..b89ad5471b9 100644 --- a/racket/src/thread/channel.rkt +++ b/racket/src/thread/channel.rkt @@ -98,22 +98,21 @@ (current-thread/in-atomic))) (define n (queue-add! gq (cons gw b))) (values #f - (wrap-evt - (control-state-evt async-evt - (lambda () (queue-remove-node! gq n)) - void - (lambda () - ;; Retry: get ready value or requeue - (define pw+v (queue-fremove! pq not-matching-select-waiter)) - (cond + (control-state-evt async-evt + (lambda (v) (unbox b)) + (lambda () (queue-remove-node! gq n)) + void + (lambda () + ;; Retry: get ready value or requeue + (define pw+v (queue-fremove! pq not-matching-select-waiter)) + (cond [pw+v (waiter-resume! (car pw+v) (void)) (set-box! b (cdr pw+v)) (values #t #t)] [else (set! n (queue-add! gq (cons gw b))) - (values #f #f)]))) - (lambda (v) (unbox b))))])) + (values #f #f)]))))])) ;; ---------------------------------------- @@ -161,22 +160,21 @@ (current-thread/in-atomic))) (define n (queue-add! pq (cons pw v))) (values #f - (wrap-evt - (control-state-evt async-evt - (lambda () (queue-remove-node! pq n)) - void - (lambda () - ;; Retry: put ready value or requeue - (define gw+b (queue-fremove! gq not-matching-select-waiter)) - (cond + (control-state-evt async-evt + (lambda (v) self) + (lambda () (queue-remove-node! pq n)) + void + (lambda () + ;; Retry: put ready value or requeue + (define gw+b (queue-fremove! gq not-matching-select-waiter)) + (cond [gw+b (set-box! (cdr gw+b) v) (waiter-resume! (car gw+b) v) (values self #t)] [else (set! n (queue-add! pq (cons pw v))) - (values #f #f)]))) - (lambda (v) self)))])) + (values #f #f)]))))])) (define/who (channel-put-evt ch v) (check who channel? ch) diff --git a/racket/src/thread/evt.rkt b/racket/src/thread/evt.rkt index bf10dfdd62d..d7f563f5dcb 100644 --- a/racket/src/thread/evt.rkt +++ b/racket/src/thread/evt.rkt @@ -135,7 +135,15 @@ ;; semaphore was meanwhile posted). As another example, a ;; `nack-guard-evt`'s result uses `abandon-proc` to post to the NACK ;; event. +;; Beware that it doesn't make sense to use `wrap-evt` around the +;; `control-state-evt` or the `evt` inside for an asynchronously +;; satisfied event (like the way that semaphores are implemented). The +;; event may be selected asynchronously before a wrapper on the inner +;; event is found, so that the result turns out to be an unwrapped +;; event. Or the `interrupt-proc`, etc., callbacks may not be found +;; early enough if the `control-state-evt` is wrapped. (struct control-state-evt (evt + wrap-proc interrupt-proc ; thunk for break/kill initiated or otherwise before `abandon-proc` abandon-proc ; thunk for not selected, including break/kill complete retry-proc) ; thunk for resume from break; return `(values _val _ready?)` diff --git a/racket/src/thread/semaphore.rkt b/racket/src/thread/semaphore.rkt index dc0a23869de..6594ef58c8a 100644 --- a/racket/src/thread/semaphore.rkt +++ b/racket/src/thread/semaphore.rkt @@ -182,19 +182,19 @@ ;; event through a callback. Pair the event with a nack callback ;; to get back out of line. (values #f - (wrap-evt - (control-state-evt async-evt - (lambda () - (assert-atomic-mode) - (queue-remove-node! s n) - (when (queue-empty? s) - (set-semaphore-count! s 0))) ; allow CAS again - void - (lambda () - ;; Retry: decrement or requeue - (assert-atomic-mode) - (define c (semaphore-count s)) - (cond + (control-state-evt async-evt + (lambda (v) result) + (lambda () + (assert-atomic-mode) + (queue-remove-node! s n) + (when (queue-empty? s) + (set-semaphore-count! s 0))) ; allow CAS again + void + (lambda () + ;; Retry: decrement or requeue + (assert-atomic-mode) + (define c (semaphore-count s)) + (cond [(positive? c) (unless peek? (set-semaphore-count! s (sub1 c))) @@ -202,8 +202,7 @@ [else (set! n (queue-add! s w)) (set-semaphore-count! s -1) ; so CAS not tried for `semaphore-post` - (values #f #f)]))) - (lambda (v) result)))])) + (values #f #f)]))))])) ;; Called only when it should immediately succeed: (define (semaphore-wait/atomic s) diff --git a/racket/src/thread/sync.rkt b/racket/src/thread/sync.rkt index 371a8a6e9e4..e7def972d4c 100644 --- a/racket/src/thread/sync.rkt +++ b/racket/src/thread/sync.rkt @@ -374,9 +374,9 @@ (let loop ([sr (syncing-syncers s)] [retries 0] ; count retries on `sr`, and advance if it's too many [polled-all-so-far? #t]) + (start-atomic) (when (syncing-need-retry? s) (syncing-retry! s)) - (start-atomic) (cond [(syncing-selected s) => (lambda (sr) @@ -473,9 +473,12 @@ (end-atomic) (loop sr (add1 retries) polled-all-so-far?)])] [(control-state-evt? new-evt) + (define wrap-proc (control-state-evt-wrap-proc new-evt)) (define interrupt-proc (control-state-evt-interrupt-proc new-evt)) (define abandon-proc (control-state-evt-abandon-proc new-evt)) (define retry-proc (control-state-evt-retry-proc new-evt)) + (unless (eq? wrap-proc values) + (set-syncer-wraps! sr (cons wrap-proc (syncer-wraps sr)))) (unless (eq? interrupt-proc void) (set-syncer-interrupts! sr (cons interrupt-proc (syncer-interrupts sr)))) (unless (eq? abandon-proc void) @@ -726,6 +729,7 @@ ;; represents the instantited attempt to sync on `evt`: (control-state-evt (nested-sync-evt s next orig-evt) + values (lambda () (syncing-interrupt! s)) (lambda () (syncing-abandon! s)) (lambda () (syncing-retry! s))))))) diff --git a/racket/src/thread/thread.rkt b/racket/src/thread/thread.rkt index 2e1c395cf65..79ccd02d71a 100644 --- a/racket/src/thread/thread.rkt +++ b/racket/src/thread/thread.rkt @@ -1000,7 +1000,8 @@ (set-thread-mailbox-wakeup! t (lambda () (wakeup) (receive)))) (add-wakeup-callback!) (values #f (control-state-evt - (wrap-evt async-evt (lambda (v) self)) + async-evt + (lambda (v) self) ;; interrupt (all must be interrupted, so just install `void`): (lambda () (set-thread-mailbox-wakeup! t void)) ;; abandon: