-
Notifications
You must be signed in to change notification settings - Fork 0
/
async-channel-threads.rkt
85 lines (77 loc) · 3.1 KB
/
async-channel-threads.rkt
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
#lang typed/racket/base
(require "typed-tcp.rkt"
"typed-async-channel.rkt")
(define-type Handler (-> Input-Port Output-Port Void))
(define-type Timeout-Thread Thread)
(define-type Worker-Thread Thread)
(define-type Request-Channel (Async-Channelof request))
(struct: pool ([workers : (List Worker-Thread)]
[requests : Request-Channel]))
(struct: request ([in : Input-Port]
[out : Output-Port]))
(: request-channel Request-Channel)
(define request-channel (make-async-channel))
(: serve (->* () (#:port Integer
#:handler Handler
#:num-workers Nonnegative-Integer
#:worker-timeout Nonnegative-Integer
#:worker-memory-limit Nonnegative-Integer) (-> Void)))
(define (serve #:port [port-number 3000]
#:handler [handler handle]
#:num-workers [num-workers 10]
#:worker-timeout [worker-timeout 10]
#:worker-memory-limit [worker-memory-limit (* 50 1024 1024)])
(define custodian (make-custodian))
(parameterize ([current-custodian custodian])
(define listener (tcp-listen port-number (* 4 num-workers)))
(for ([i (in-range 0 num-workers)])
(thread (serve-loop listener handler num-workers worker-timeout worker-memory-limit))))
;; Return a Procedure to shutdown the custodian
;; and its children.
(lambda ()
(custodian-shutdown-all custodian)))
(: serve-loop (-> TCP-Listener Handler Nonnegative-Integer Nonnegative-Integer Nonnegative-Integer (-> Void)))
(define (serve-loop listener handler num-workers worker-timeout worker-memory-limit)
(define i (random num-workers))
(: inner (-> Void))
(define (inner)
(define run (lambda ()
(display (string-append "id: " (number->string i) "\n"))
(accept listener handler worker-timeout worker-memory-limit)))
(if (tcp-accept-ready? listener)
(run)
(void))
(inner))
inner)
;; Simple TCP worker
(: accept (-> TCP-Listener (-> Input-Port Output-Port Void) Nonnegative-Integer Nonnegative-Integer Thread))
(define (accept listener handler timeout memory-limit)
(define custodian (make-custodian))
(custodian-limit-memory custodian memory-limit)
(parameterize ([current-custodian custodian])
(define-values (in out) (tcp-accept listener))
(thread (lambda ()
(handler in out)
(close-input-port in)
(close-output-port out))))
(thread (lambda ()
(sleep timeout)
(custodian-shutdown-all custodian))))
;; Dummy Handler
(: handle Handler)
(define (handle in out)
(define req
(let ([request-line
(let ([line (read-line in)])
(if (eof-object? line)
""
line))])
;; Match the first line to extract the request:
(regexp-match #rx"^GET (.+) HTTP/[0-9]+\\.[0-9]+" request-line)))
(when req
;; Discard the rest of the header (up to blank line):
(regexp-match #rx"(\r\n|^)\r\n" in)
;; Send reply:
(display "HTTP/1.0 200 Okay\r\n" out)
(display "Server: k\r\nContent-Type: text/html\r\n\r\n" out)
(display "Hello" out)))