/
channel.lisp
112 lines (96 loc) · 3.76 KB
/
channel.lisp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
(in-package #:tpd2.webapp)
(defvar *channels* (make-hash-table :test 'equalp))
(defmyclass channel
(id (random-web-sparse-key 10))
(state 0)
(subscribers nil))
(my-defun channel 'initialize-instance :after (&key)
(setf (gethash (my id) *channels*) me))
(my-defun channel notify ()
(let ((subscribers (my subscribers)))
(setf (my subscribers) nil)
(incf (my state))
(loop for s in subscribers do
(with-ignored-errors (tpd2.io:report-unless-normal-connection-error)
(funcall s))))
(values))
(my-defun channel subscribe (f)
(push f (my subscribers)))
(my-defun channel unsubscribe (f)
(deletef f (my subscribers)))
(defun find-channel (id)
(check-type id byte-vector)
(gethash id *channels*))
(my-defun channel destroy ()
(when (eq me (gethash (my id) *channels*))
(remhash (my id) *channels*)))
(defgeneric channel-update (channel subscriber-state))
(defun channel-respond-page (dispatcher con done)
(declare (ignore dispatcher))
(apply-page-call (:con con :function 'channel-respond :create-frame nil) con done (.channels.)))
(defun channel-string-to-states (channels)
(let ((channel-states))
(match-bind ( (* channel "|" (state (integer)) (or ";" (last))
'(awhen (find-channel channel) (push (cons it state) channel-states))))
channels)
channel-states))
(defun channel-respond-body (channel-states &key always-body)
(let (at-least-one)
(let ((sendbuf
(with-ml-output
(loop for (channel . state) in channel-states do
(unless (equalp state (channel-state channel))
(setf at-least-one t)
(output-raw-ml
(js-to-string
(channel
(unquote (force-string (channel-id channel)))
(unquote (channel-state channel)))))
(output-raw-ml (channel-update channel state))))
(output-raw-ml (js-to-string "OK")))))
(when (or at-least-one always-body)
sendbuf))))
(defun channel-respond (con done &key .channels.)
(let ((channel-states (channel-string-to-states .channels.)))
(with-preserve-specials (*webapp-frame* *servestate*)
(flet ((finished ()
(or (con-dead? con)
(with-specials-restored
(with-frame-site
(awhen (channel-respond-body channel-states)
(start-http-response)
(send-http-response con done it)
t))))))
(unless (finished)
(let (func
(original-timeout-handler
(timeout-func (tpd2.io:con-timeout con)))
(original-hangup-handler
(tpd2.io:con-hangup-hook con)))
(flet ((unsubscribe ()
(setf (timeout-func (tpd2.io:con-timeout con)) original-timeout-handler
(tpd2.io:con-hangup-hook con) original-hangup-handler)
(loop for (channel ) in channel-states do (channel-unsubscribe channel func))))
(setf func
(lambda() (when (finished) (unsubscribe))))
(loop for (channel ) in channel-states do (channel-subscribe channel func))
(setf (timeout-func (tpd2.io:con-timeout con))
(lambda ()
(unsubscribe)
(unless (con-dead? con)
(with-specials-restored
(with-ignored-errors (tpd2.io:report-unless-normal-connection-error)
(start-http-response
:banner (force-byte-vector "504 Timeout")
:content-type #.(byte-vector-cat "Retry-after: 0" +newline+))
(send-http-response con done
(with-sendbuf () (js-to-string "TIMEOUT"))))))))
(setf (tpd2.io:con-hangup-hook con)
(lambda (&rest args)
(unsubscribe)
(when original-hangup-handler
(apply original-hangup-handler args)))))))))))
(defun register-channel-page (&optional (url (site-action-page-name (current-site))))
(dispatcher-register-path (site-dispatcher (current-site)) url #'channel-respond-page))
(my-defun channel 'object-to-ml ()
(js-html-script (channel (unquote (force-string (my id))) (unquote (my state)))))