-
-
Notifications
You must be signed in to change notification settings - Fork 26
/
Copy pathmessage.lisp
239 lines (202 loc) · 8.41 KB
/
message.lisp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
(in-package #:jupyter)
#|
# Representation and manipulation of kernel messages #
|#
#|
## IPython messages ##
|#
(defvar *message* nil)
(defvar *suspended-message* nil)
(defclass message ()
((header
:initarg :header
:initform (make-hash-table :test #'equal)
:accessor message-header)
(parent-header
:initarg :parent-header
:initform (make-hash-table :test #'equal)
:accessor message-parent-header)
(identities
:initarg :identities
:initform (list (make-uuid t))
:accessor message-identities)
(metadata
:initarg :metadata
:initform (make-hash-table :test #'equal)
:accessor message-metadata)
(content
:initarg :content
:initform (make-hash-table :test #'equal)
:accessor message-content)
(buffers
:initarg :buffers
:initform nil
:accessor message-buffers))
(:documentation "Representation of IPython messages"))
#+(or); abcl allegro ccl clasp cmu ecl lispworks sbcl)
(defmethod initialize-instance :after ((instance message) &rest initargs &key &allow-other-keys)
(declare (ignore initargs))
(let ((buffers (message-buffers instance)))
(when buffers
(trivial-garbage:finalize
instance
(lambda () (mapcar (lambda (buffer)
(static-vectors:free-static-vector buffer))
buffers))))))
(defun date-now ()
(multiple-value-bind (s m h dt mth yr day)
(get-decoded-time)
(declare (ignore day))
(format nil "~4,'0D-~2,'0D-~2,'0DT~2,'0D:~2,'0D:~2,'0DZ" yr mth dt h m s)))
(defun make-message (session-id msg-type content &key metadata buffers (parent *message*))
(if parent
(let ((hdr (message-header parent))
(identities (message-identities parent)))
(make-instance 'message
:header `(:object-alist
("msg_id" . ,(make-uuid))
("username" . ,(gethash "username" hdr))
("session" . ,session-id)
("msg_type" . ,msg-type)
("date" . ,(date-now))
("version" . ,+KERNEL-PROTOCOL-VERSION+))
:parent-header hdr
:identities identities
:content content
:metadata (or metadata :empty-object)
:buffers buffers))
(make-instance 'message
:header `(:object-alist
("msg_id" . ,(make-uuid))
("username" . "")
("session" . ,session-id)
("msg_type" . ,msg-type)
("date" . ,(date-now))
("version" . ,+KERNEL-PROTOCOL-VERSION+))
:content content
:metadata (or metadata :empty-object)
:buffers buffers)))
;; XXX: should be a defconstant but strings are not EQL-able...
(defvar +IDS-MSG-DELIMITER+ (babel:string-to-octets "<IDS|MSG>"))
(defvar +BODY-LENGTH+ 5)
#|
### Sending and receiving messages ###
|#
(defun send-string-part (ch part more)
(pzmq:send (channel-socket ch) part :sndmore more))
(defun send-binary-part (ch part more)
(let ((len (length part)))
(cond
((typep part '(array single-float *))
(cffi:with-foreign-array (m part (list :array :float len))
(pzmq:send (channel-socket ch) m :len (* 4 len) :sndmore more)))
(t
(cffi:with-foreign-array (m part (list :array :uint8 len))
(pzmq:send (channel-socket ch) m :len len :sndmore more))))))
(defun send-parts (ch identities body buffers)
(with-slots (send-lock socket) ch
(bordeaux-threads:with-lock-held (send-lock)
(dolist (part identities)
(send-binary-part ch part t))
(send-binary-part ch +IDS-MSG-DELIMITER+ t)
(trivial-do:dolist* (index part body)
(send-string-part ch part (or (< index (1- (length body)))
buffers)))
(trivial-do:dolist* (index part buffers)
(send-binary-part ch part (< index (1- (length buffers))))))))
(defun message-send (ch msg)
(with-slots (mac) ch
(with-slots (identities header parent-header metadata content buffers) msg
(let* ((*print-pretty* nil)
(*read-default-float-format* 'double-float)
(tail (mapcar (lambda (value)
(shasht:write-json value nil))
(list header parent-header metadata content))))
(send-parts ch identities
(cons (compute-signature mac tail) tail)
buffers)))))
(defun more-parts (ch msg)
(declare (ignore ch))
(not (zerop (pzmq::%msg-more msg))))
(defun read-binary-part (ch msg)
(pzmq:msg-recv msg (channel-socket ch))
(cffi:foreign-array-to-lisp (pzmq:msg-data msg)
(list :array :uint8 (pzmq:msg-size msg))
; explicitly defined element type is needed for CLISP
:element-type '(unsigned-byte 8)))
#+(or); abcl allegro ccl clasp cmu ecl lispworks sbcl)
(defun read-buffer-part (ch msg)
(pzmq:msg-recv msg (channel-socket ch))
(let* ((size (pzmq:msg-size msg))
(result (static-vectors:make-static-vector size :element-type '(unsigned-byte 8))))
(static-vectors:replace-foreign-memory (static-vectors:static-vector-pointer result)
(pzmq:msg-data msg)
size)
result))
(defun read-string-part (ch msg)
(pzmq:msg-recv msg (channel-socket ch))
(handler-case
(cffi:foreign-string-to-lisp (pzmq:msg-data msg)
:count (pzmq:msg-size msg)
:encoding :utf-8)
(babel-encodings:character-decoding-error ()
(inform :warn ch "Unable to decode message part.")
"")))
(defun recv-parts (ch)
(with-slots (recv-lock) ch
(bordeaux-threads:with-lock-held (recv-lock)
(pzmq:with-message msg
(values
; Read the identities first
(prog (part parts)
next
(when (equalp +IDS-MSG-DELIMITER+ (setf part (read-binary-part ch msg)))
(return (nreverse parts)))
(push part parts)
; This test should probably be at the beginning but the flag isn't set until you read
; the first part.
(unless (more-parts ch msg)
(inform :warn ch "No identities/message delimiter found in message parts")
(return (nreverse parts)))
(go next))
; Read the message body
(prog (parts (i 0))
next
(unless (more-parts ch msg)
(inform :warn ch "Incomplete message body.")
(return (nreverse parts)))
(push (read-string-part ch msg) parts)
(when (= +BODY-LENGTH+ (incf i))
(return (nreverse parts)))
(go next))
; The remaining parts should be binary buffers
(prog (parts)
next
(unless (more-parts ch msg)
(return (nreverse parts)))
(push
#-(or)#| abcl allegro ccl clasp cmu ecl lispworks sbcl)|# (read-binary-part ch msg)
#+(or)#| abcl allegro ccl clasp cmu ecl lispworks sbcl)|# (read-buffer-part ch msg)
parts)
(go next)))))))
(defun message-recv (ch)
(multiple-value-bind (identities body buffers) (recv-parts ch)
(unless (equal (car body) (compute-signature (channel-mac ch) (cdr body)))
(inform :warn ch "Signature mismatch on received message."))
(destructuring-bind (header parent-header metadata content)
(let ((*read-default-float-format* 'double-float))
(mapcar #'shasht:read-json (cdr body)))
(make-instance 'message :identities identities
:header header
:parent-header parent-header
:metadata metadata
:content content
:buffers buffers))))
(defun recv-heartbeat (ch)
(with-slots (recv-lock) ch
(bordeaux-threads:with-lock-held (recv-lock)
(pzmq:with-message msg
(read-binary-part ch msg)))))
(defun send-heartbeat (ch part)
(with-slots (send-lock) ch
(send-binary-part ch part nil)))