-
-
Notifications
You must be signed in to change notification settings - Fork 647
/
streams.rkt
71 lines (63 loc) · 2.44 KB
/
streams.rkt
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
#lang racket/base
(require racket/private/port)
(provide if-stream-out
if-stream-in
streamify-in
streamify-out
pump-ports)
(define (if-stream-out who p [sym-ok? #f])
(cond [(and sym-ok? (eq? p 'stdout)) p]
[(or (not p) (and (output-port? p) (file-stream-port? p))) p]
[(output-port? p) #f]
[else (raise-type-error who
(if sym-ok?
"output port, #f, or 'stdout"
"output port or #f")
p)]))
(define (if-stream-in who p)
(cond [(or (not p) (and (input-port? p) (file-stream-port? p))) p]
[(input-port? p) #f]
[else (raise-type-error who "input port or #f" p)]))
(define (streamify-in cin in ready-for-break)
(if (and cin (not (file-stream-port? cin)))
(parameterize-break #f
(thread (lambda ()
(dynamic-wind
void
(lambda ()
(with-handlers ([exn:break? void])
(parameterize-break #t
(ready-for-break #t)
(copy-port cin in)
(ready-for-break #f))))
(lambda () (close-output-port in)))
(ready-for-break #t))))
in))
(define (streamify-out cout out)
(if (and cout
(not (eq? cout 'stdout))
(not (file-stream-port? cout)))
(thread (lambda ()
(dynamic-wind
void
(lambda () (copy-port out cout))
(lambda () (close-input-port out)))))
out))
(define (pump-ports evt pin pout perr in out err)
(define who 'pump-ports)
(define it-ready (make-semaphore))
(define inpump (streamify-in in
(if-stream-out who pin)
(lambda (ok?)
(if ok?
(semaphore-post it-ready)
(semaphore-wait it-ready)))))
(define outpump (streamify-out out (if-stream-in who pout)))
(define errpump (streamify-out err (if-stream-in who perr)))
(when (thread? inpump)
;; Wait for place to end, then stop copying input:
(thread (lambda ()
(sync evt inpump)
(semaphore-wait it-ready)
(break-thread inpump))))
(values inpump outpump errpump))