Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Newer
Older
100644 820 lines (648 sloc) 33.404 kb
6a7b76a @lisp external build, take 1
authored
1 ;;; -*- Package: de.setf.amqp.implementation; -*-
2
3 (in-package :de.setf.amqp.implementation)
4
c2728ca @lisp renaming protocol directories, phase one
authored
5 (:documentation "This file defines the wire-level frame model for for the `de.setf.amqp` Connon Lisp library."
6a7b76a @lisp external build, take 1
authored
6 (copyright
7 "Copyright 2010 [james anderson](mailto:james.anderson@setf.de) All Rights Reserved"
8 "'de.setf.amqp' is free software: you can redistribute it and/or modify it under the terms of version 3
9 of the GNU Affero General Public License as published by the Free Software Foundation.
10
11 'setf.amqp' is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the
12 implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
13 See the Affero General Public License for more details.
14
15 A copy of the GNU Affero General Public License should be included with 'de.setf.amqp' as `AMQP:agpl.txt`.
16 If not, see the GNU [site](http://www.gnu.org/licenses/).")
17
18 (long-description "Each protocol version packages the data on the wire in a slightly different form. The
19 `frame` is specialized for each version to implement operators which marshall/unmarshall frame data as
20 as per the version, and operators which map between abstract objcet and method class names and the
21 version-specific numeric identifiers."))
22
23
bbae9ab @lisp rabbitmq interoperability: framing independent of protocol version, conv...
authored
24 (defclass amqp:frame ()
6a7b76a @lisp external build, take 1
authored
25 ((connection
26 :initarg :connection :initform nil ;(error "connection required.")
27 :reader frame-connection)
28 (header
29 :initarg :header :initform nil ;(error "header required")
30 :reader frame-header)
31 (data
32 :initarg :buffer :initarg :data :initform nil ; (error "buffer required.")
33 :reader frame-data :writer setf-frame-data :writer (setf frame-data)
34 :type (simple-array (unsigned-byte 8) (*))
35 :documentation "A byte buffer serves to capture the i/o for any frames
36 which are not streamed. the encoding operators use the buffer if it is present.")
37 (end-marker :initform #xce :allocation :class
38 :reader frame-end-marker))
39 (:documentation "Comprises the header and payload for a single AMQP frame.
40 Specializations are defined for input and output and for the respective
41 protocol version. The former distinguish methods to treat the data as
42 received or sent while the latter distinguish header format."))
43
44
897590f @lisp relocate frame operators from classes.lisp to frames.lisp to correct loa...
authored
45 (defmethod amqp:ensure-method ((class amqp:object) (frame amqp:frame) &rest initargs)
46 (declare (dynamic-extent initargs))
47 (apply #'amqp:ensure-method class (frame-method-code frame) initargs))
48
49 (defmethod amqp:ensure-object ((context t) (frame amqp:frame) &rest initargs)
50 (declare (dynamic-extent initargs))
51 (apply #'amqp:ensure-object context (frame-class-name frame) initargs))
52
53
bbae9ab @lisp rabbitmq interoperability: framing independent of protocol version, conv...
authored
54 (defclass output-frame (amqp:frame)
6a7b76a @lisp external build, take 1
authored
55 ())
56
bbae9ab @lisp rabbitmq interoperability: framing independent of protocol version, conv...
authored
57 (defclass input-frame (amqp:frame)
6a7b76a @lisp external build, take 1
authored
58 ())
59
bbae9ab @lisp rabbitmq interoperability: framing independent of protocol version, conv...
authored
60 (defclass 7-byte-header-frame (amqp:frame)
61 ((input-buffer :type (simple-array (unsigned-byte 8) 7))))
62
63 (defclass amqp.u:7-byte-header-input-frame (7-byte-header-frame input-frame) ())
64 (defclass amqp.u:7-byte-header-output-frame (7-byte-header-frame output-frame) ())
65
66
67 (defclass 8-byte-header-frame (amqp:frame)
68 ((input-buffer :type (simple-array (unsigned-byte 8) 8))))
69
70 (defclass amqp.u:8-byte-header-input-frame (8-byte-header-frame input-frame) ())
71 (defclass amqp.u:8-byte-header-output-frame (8-byte-header-frame output-frame) ())
72
73
74 (defclass 12-byte-header-frame (amqp:frame)
75 ((input-buffer :type (simple-array (unsigned-byte 8) 12))))
76
77 (defclass amqp.u:12-byte-header-input-frame (12-byte-header-frame input-frame) ())
78 (defclass amqp.u:12-byte-header-output-frame (12-byte-header-frame output-frame) ())
79
80
b20e82d @lisp continued corrections plus adjustments to rabbotmq for 0.9.1
authored
81 (defparameter *frame-format-byte-count* 12)
bbae9ab @lisp rabbitmq interoperability: framing independent of protocol version, conv...
authored
82
6a7b76a @lisp external build, take 1
authored
83 (defgeneric format-frame (frame stream)
84 (:method ((frame amqp::frame) stream)
85 (let ((header (when (slot-boundp frame 'header) (frame-header frame)))
86 (data (when (slot-boundp frame 'data) (frame-data frame)))
87 (connection (when (slot-boundp frame 'connection) (frame-connection frame)))
88 (class-name nil))
89 (when header
90 (format stream "[(~:[-~;+~])~:[?~;~d|~d|c.~d|t.~d~]]"
91 connection
92 header
93 (setf class-name (frame-type-class-name frame))
94 (frame-cycle frame)
95 (frame-channel-number frame)
96 (frame-track-number frame)))
97 (write-string "." stream)
98 (when (eq class-name 'amqp:method)
99 (format stream "{~a.~a}"
100 (frame-class-name frame)
101 (frame-method-name frame)))
102 (format stream "[~:[?~;~@[~a~]/~d~]:"
103 data
104 (when header (frame-size frame))
105 (length data))
b20e82d @lisp continued corrections plus adjustments to rabbotmq for 0.9.1
authored
106 (let* ((size (if header (min (frame-size frame) (length data)) (length data))))
107 (if *frame-format-byte-count*
108 (let* ((beginning (min size *frame-format-byte-count*))
109 (end (min *frame-format-byte-count* (- size beginning))))
110 (dotimes (x beginning) (format stream " ~0,'2d" (aref data x)))
111 (when (< (+ beginning end) size)
112 (write-string " ..." stream))
113 (dotimes (x end) (format stream " ~0,'2d" (aref data (+ x (- size end))))))
114 (dotimes (x size)
115 (format stream " ~0,'2d" (aref data x)))))
6a7b76a @lisp external build, take 1
authored
116 (write-string "]" stream))))
117
118 (defgeneric format-frame-header (frame stream)
119 (:method ((frame amqp::frame) stream)
120 (let ((header (when (slot-boundp frame 'header) (frame-header frame)))
121 (connection (when (slot-boundp frame 'connection) (frame-connection frame)))
122 (class-name nil))
123 (when header
124 (format stream "[(~:[-~;+~])~:[?~;~d|~d|c.~d|t.~d~]]"
125 connection
126 header
127 (setf class-name (frame-type-class-name frame))
128 (frame-cycle frame)
129 (frame-channel-number frame)
130 (frame-track-number frame)))
131 (write-string "." stream)
132 (when (eq class-name 'amqp:method)
133 (format stream "{~a.~a}"
134 (frame-class-name frame)
135 (frame-method-name frame)))
136 (write-string "]" stream))))
137
138
139 (defmethod print-object ((object amqp::frame) (stream t))
140 (print-unreadable-object (object stream :type t :identity t)
141 (format-frame object stream)))
142
143
897590f @lisp relocate frame operators from classes.lisp to frames.lisp to correct loa...
authored
144 ;;;
145 ;;;
146
147 (defgeneric release-frame (frame)
148 (:documentation "Release the frame.
149 This is specialized for input, respective output frames to return the
150 frame back to the respective free pool.")
151
152 (:method ((frame input-frame))
153 (when (frame-connection frame)
56d42db @lisp consolidate stream api in de.setf.utility.codec module
authored
154 (setf (frame-channel-number frame) 0))
897590f @lisp relocate frame operators from classes.lisp to frames.lisp to correct loa...
authored
155 (enqueue frame (device-free-input-frames (frame-connection frame))))
156
157 (:method ((frame output-frame))
158 (when (frame-connection frame)
56d42db @lisp consolidate stream api in de.setf.utility.codec module
authored
159 (setf (frame-channel-number frame) 0)
897590f @lisp relocate frame operators from classes.lisp to frames.lisp to correct loa...
authored
160 (enqueue frame (device-free-output-frames (frame-connection frame))))))
161
162
163 (defmacro with-input-frame ((frame device) &body body)
164 `(let* ((,frame (claim-input-frame ,device)))
165 (unwind-protect (progn ,@body)
166 (release-frame ,frame))))
167
168
169 (defgeneric make-input-frame (connection &rest args)
170 (:documentation "Allocate a new output frame and bind it to the
171 connection. The concrete frame class will depend on the connection version.
172 Given a channel, delegate to the respective connection.")
173
174 (:method ((channel amqp:channel) &rest args)
175 (declare (dynamic-extent args))
176 (apply #'make-input-frame (channel-connection channel) args))
177
178 (:method ((connection amqp:connection) &rest args)
179 (declare (dynamic-extent args))
180 (apply #'make-instance (connection-input-frame-class connection)
181 :connection connection
182 args)))
6a7b76a @lisp external build, take 1
authored
183
897590f @lisp relocate frame operators from classes.lisp to frames.lisp to correct loa...
authored
184
185 (defgeneric make-output-frame (connection &rest args)
186 (:documentation "Allocate a new input frame and bind it to the
187 connection. The concrete frame class will depend on the connection version.
188 Given a channel, delegate to the respective connection.")
189
190 (:method ((channel amqp:channel) &rest args)
191 (declare (dynamic-extent args))
192 (apply #'make-output-frame (channel-connection channel) args))
193
194 (:method ((connection amqp:connection) &rest args)
195 (declare (dynamic-extent args))
196 (apply #'make-instance (connection-output-frame-class connection)
197 :connection connection
198 args)))
199
200
201 (defgeneric put-encoded-frame (connection frame)
202 (:documentation "Place an encoded frame in the output queue. If the queue is empty.
203 write through. If the frame is written, release it.")
204
205 (:method ((channel amqp:channel) (frame output-frame))
206 (put-encoded-frame (channel-connection channel) frame))
207
208 (:method ((connection amqp:connection) (frame output-frame))
209 (flet ((write-encoded-frames ()
210 (do ((frame (dequeue (device-encoded-frames connection) :if-empty nil)
211 (dequeue (device-encoded-frames connection) :if-empty nil)))
212 ((or (null frame) (not (open-stream-p connection))))
213 (write-frame connection frame)
214 (release-frame frame))))
215 (declare (dynamic-extent #'write-encoded-frames))
216 (enqueue frame (device-encoded-frames connection)
217 :if-empty #'write-encoded-frames))))
218
219
220 (defgeneric get-encoded-frame (connection )
221 (:documentation "Get an encoded frame from the output queue. If the queue is empty.
222 return nil.")
223
224 (:method ((connection amqp:connection) )
225 (dequeue (device-encoded-frames connection) :if-empty nil)))
226
227
228 (defgeneric put-read-frame (connection frame)
229 (:documentation "Add the frame to the read frame queue.")
230
231 (:method ((connection amqp:connection) (frame input-frame))
232 (enqueue frame (device-read-frames connection)))
233
234 (:method ((connection amqp:channel) (frame input-frame))
235 (enqueue frame (device-read-frames connection))))
236
237
238 (defgeneric get-read-frame (connection &key wait)
239 (:documentation "Return the next read input frame for the given context.
240 A connection dequeues or reads. A channel dequeues or delegates to the
241 connection.")
242
243 (:method ((channel amqp:channel) &key (wait t))
244 (labels ((read-channel-frame ()
245 (let ((connection (channel-connection channel)))
246 (loop (if (open-stream-p connection)
247 (let ((frame (read-frame connection (claim-input-frame channel))))
248 (if (frame-matches-channel-p frame)
249 (return frame)
250 (put-read-frame channel frame))) ; this is the same as the connection queue
251 (return)))))
252 (frame-matches-channel-p (frame)
253 (eql (frame-channel-number frame) (channel-number channel))))
254 (declare (dynamic-extent #'frame-matches-channel-p #'read-channel-frame))
255 (dequeue (device-read-frames channel)
256 :test #'frame-matches-channel-p
257 :if-empty (when (or wait (stream-listen channel))
258 #'read-channel-frame))))
259
260 (:method ((connection amqp:connection) &key (wait t))
261 (labels ((read-connection-frame ()
262 (loop (let ((frame (read-frame connection (claim-input-frame connection))))
263 (if (frame-matches-connection-p frame)
264 (return frame)
265 (put-read-frame connection frame)))))
266 (frame-matches-connection-p (frame)
267 (eql (frame-channel-number frame) 0)))
268 (declare (dynamic-extent #'frame-matches-connection-p #'read-connection-frame))
269 (dequeue (device-read-frames connection)
270 :test #'frame-matches-connection-p
271 :if-empty (when (or wait (stream-listen connection))
272 #'read-connection-frame)))))
d772d24 @lisp changes to streamed framing to eliminate extra messages
authored
273
274 (defgeneric unget-read-frame (channel frame)
275 (:method ((channel amqp:channel) frame)
276 (undequeue (device-read-frames channel) frame)))
277
bbae9ab @lisp rabbitmq interoperability: framing independent of protocol version, conv...
authored
278 ;;;
279 ;;; frame accessors
280
281 (defgeneric frame-overhead (frame)
282 (:method ((frame 7-byte-header-frame))
283 8)
284
285 (:method ((frame 8-byte-header-frame))
286 9)
287
288 (:method ((frame 12-byte-header-frame))
289 12))
290
6a7b76a @lisp external build, take 1
authored
291
292 (defgeneric frame-decoder (frame)
293 (:method ((frame amqp:frame))
929300e @lisp ensure finalized method objects when decoding frames; add list-values he...
authored
294 (let ((class (find-class (frame-type-class-name frame))))
295 (c2mop:ensure-finalized class)
296 (c2mop:class-prototype class))))
6a7b76a @lisp external build, take 1
authored
297
bbae9ab @lisp rabbitmq interoperability: framing independent of protocol version, conv...
authored
298 (defgeneric frame-type (frame)
299 (:method ((frame 7-byte-header-frame))
300 (buffer-unsigned-byte-8 (frame-header frame) 0))
301
302 (:method ((frame 8-byte-header-frame))
303 (buffer-unsigned-byte-8 (frame-header frame) 0))
304
305 (:method ((frame 12-byte-header-frame))
306 (buffer-unsigned-byte-8 (frame-header frame) 1)))
307
308 (defgeneric (setf frame-type) (type frame)
309 (:method (type (frame 7-byte-header-frame))
310 (setf (buffer-unsigned-byte-8 (frame-header frame) 0) type))
311
312 (:method (type (frame 8-byte-header-frame))
313 (setf (buffer-unsigned-byte-8 (frame-header frame) 0) type))
314
315 (:method (type (frame 12-byte-header-frame))
316 (setf (buffer-unsigned-byte-8 (frame-header frame) 1) type)))
317
318
319 (defgeneric frame-type-class-name (frame)
320 (:method ((frame amqp:frame))
321 (connection-frame-type-class-name (frame-connection frame) (frame-type frame))))
322
323 (defgeneric connection-frame-type-class-name (connection type-code))
324
325
6a7b76a @lisp external build, take 1
authored
326 (defgeneric setf-frame-type-class-name (name frame)
bbae9ab @lisp rabbitmq interoperability: framing independent of protocol version, conv...
authored
327 (:method ((name t) (frame amqp:frame))
328 (setf (frame-type frame) (connection-class-name-frame-type (frame-connection frame) name))
329 name))
330
331 (defgeneric connection-class-name-frame-type (connection type-code))
332
6a7b76a @lisp external build, take 1
authored
333 (setf (fdefinition '(setf frame-type-class-name))
334 #'setf-frame-type-class-name)
335
bbae9ab @lisp rabbitmq interoperability: framing independent of protocol version, conv...
authored
336
6a7b76a @lisp external build, take 1
authored
337 (defgeneric frame-cycle (frame)
bbae9ab @lisp rabbitmq interoperability: framing independent of protocol version, conv...
authored
338 (:method ((frame 7-byte-header-frame))
339 0)
340
341 (:method ((frame 8-byte-header-frame))
342 (buffer-unsigned-byte-8 (frame-header frame) 1))
343
344 (:method ((frame 12-byte-header-frame))
345 0))
346
6a7b76a @lisp external build, take 1
authored
347
348 (defgeneric setf-frame-cycle (cycle frame)
bbae9ab @lisp rabbitmq interoperability: framing independent of protocol version, conv...
authored
349 (:method (cycle (frame 7-byte-header-frame))
350 cycle)
351
352 (:method (cycle (frame 8-byte-header-frame))
353 (setf (buffer-unsigned-byte-8 (frame-header frame) 1) cycle))
354
355 (:method (cycle (frame 12-byte-header-frame))
356 cycle))
357
6a7b76a @lisp external build, take 1
authored
358
56d42db @lisp consolidate stream api in de.setf.utility.codec module
authored
359 (defmethod frame-channel-number ((frame 7-byte-header-frame))
360 (buffer-unsigned-byte-16 (frame-header frame) 1))
bbae9ab @lisp rabbitmq interoperability: framing independent of protocol version, conv...
authored
361
56d42db @lisp consolidate stream api in de.setf.utility.codec module
authored
362 (defmethod frame-channel-number ((frame 8-byte-header-frame))
363 (buffer-unsigned-byte-16 (frame-header frame) 2))
364
365 (defmethod frame-channel-number ((frame 12-byte-header-frame))
366 (buffer-unsigned-byte-16 (frame-header frame) 6))
bbae9ab @lisp rabbitmq interoperability: framing independent of protocol version, conv...
authored
367
6a7b76a @lisp external build, take 1
authored
368
56d42db @lisp consolidate stream api in de.setf.utility.codec module
authored
369 (defmethod (setf frame-channel-number) (number (frame 7-byte-header-frame))
370 (setf (buffer-unsigned-byte-16 (frame-header frame) 1) number))
bbae9ab @lisp rabbitmq interoperability: framing independent of protocol version, conv...
authored
371
56d42db @lisp consolidate stream api in de.setf.utility.codec module
authored
372 (defmethod (setf frame-channel-number) (number (frame 8-byte-header-frame))
373 (setf (buffer-unsigned-byte-16 (frame-header frame) 2) number))
bbae9ab @lisp rabbitmq interoperability: framing independent of protocol version, conv...
authored
374
56d42db @lisp consolidate stream api in de.setf.utility.codec module
authored
375 (defmethod (setf frame-channel-number) (number (frame 12-byte-header-frame))
376 number
377 (error "NYI"))
bbae9ab @lisp rabbitmq interoperability: framing independent of protocol version, conv...
authored
378
56d42db @lisp consolidate stream api in de.setf.utility.codec module
authored
379 (setf (fdefinition 'setf-frame-channel-number)
380 #'(setf frame-channel-number))
6a7b76a @lisp external build, take 1
authored
381
bbae9ab @lisp rabbitmq interoperability: framing independent of protocol version, conv...
authored
382
6a7b76a @lisp external build, take 1
authored
383 (defgeneric frame-track-number (frame)
bbae9ab @lisp rabbitmq interoperability: framing independent of protocol version, conv...
authored
384 (:method ((frame 7-byte-header-frame))
385 0)
386
387 (:method ((frame 8-byte-header-frame))
388 0)
389
390 (:method ((frame 12-byte-header-frame))
391 (logand (buffer-unsigned-byte-8 (frame-header frame) 5) #x0f)))
6a7b76a @lisp external build, take 1
authored
392
393 (defgeneric setf-frame-track-number (number frame)
bbae9ab @lisp rabbitmq interoperability: framing independent of protocol version, conv...
authored
394 (:method (number (frame 7-byte-header-frame))
395 number)
396
397 (:method (number (frame 8-byte-header-frame))
398 number)
399
400 (:method (number (frame 12-byte-header-frame))
401 (setf (buffer-unsigned-byte-8 (frame-header frame) 5)
402 (logior (logand number #x0f)
403 (logand (buffer-unsigned-byte-8 (frame-header frame) 5) #xf0)))))
404
6a7b76a @lisp external build, take 1
authored
405
406 (defgeneric frame-size (frame)
bbae9ab @lisp rabbitmq interoperability: framing independent of protocol version, conv...
authored
407 (:method ((frame 7-byte-header-frame))
408 (buffer-unsigned-byte-32 (frame-header frame) 3))
409
410 (:method ((frame 8-byte-header-frame))
411 (buffer-unsigned-byte-32 (frame-header frame) 4))
412
413 (:method ((frame 12-byte-header-frame))
414 (buffer-unsigned-byte-16 (frame-header frame) 2)))
6a7b76a @lisp external build, take 1
authored
415
416 (defgeneric setf-frame-size (size frame)
bbae9ab @lisp rabbitmq interoperability: framing independent of protocol version, conv...
authored
417 (:method (size (frame 7-byte-header-frame))
418 (setf (buffer-unsigned-byte-32 (frame-header frame) 3) size))
419
420 (:method (size (frame 8-byte-header-frame))
421 (setf (buffer-unsigned-byte-32 (frame-header frame) 4) size))
422
423 (:method (size (frame 12-byte-header-frame))
424 (setf (buffer-unsigned-byte-16 (frame-header frame) 2) size)))
425
6a7b76a @lisp external build, take 1
authored
426
427 (defgeneric frame-class-code (frame)
bbae9ab @lisp rabbitmq interoperability: framing independent of protocol version, conv...
authored
428 (:method ((frame 7-byte-header-frame))
429 (buffer-unsigned-byte-16 (frame-data frame) 0))
430
431 (:method ((frame 8-byte-header-frame))
432 (buffer-unsigned-byte-16 (frame-data frame) 0))
433
434 (:method ((frame 12-byte-header-frame))
435 (buffer-unsigned-byte-8 (frame-data frame) 0)))
6a7b76a @lisp external build, take 1
authored
436
437 (defgeneric setf-frame-class-code (code frame)
bbae9ab @lisp rabbitmq interoperability: framing independent of protocol version, conv...
authored
438 (:method (code (frame 7-byte-header-frame))
439 (setf (buffer-unsigned-byte-16 (frame-data frame) 0) code))
6a7b76a @lisp external build, take 1
authored
440
bbae9ab @lisp rabbitmq interoperability: framing independent of protocol version, conv...
authored
441 (:method (code (frame 8-byte-header-frame))
442 (setf (buffer-unsigned-byte-16 (frame-data frame) 0) code))
443
444 (:method (code (frame 12-byte-header-frame))
445 (setf (buffer-unsigned-byte-8 (frame-data frame) 0) code)))
6a7b76a @lisp external build, take 1
authored
446
447
448 (defgeneric frame-class-name (frame)
449 (:method ((frame amqp::frame))
bbae9ab @lisp rabbitmq interoperability: framing independent of protocol version, conv...
authored
450 (connection-class-code-class-name (frame-connection frame) (frame-class-code frame))))
6a7b76a @lisp external build, take 1
authored
451
bbae9ab @lisp rabbitmq interoperability: framing independent of protocol version, conv...
authored
452 (defgeneric (setf frame-class-name) (name frame)
453 (:method (name (frame amqp::frame))
454 (setf-frame-class-code (connection-class-name-class-code (frame-connection frame) name)
455 frame)))
6a7b76a @lisp external build, take 1
authored
456
457
bbae9ab @lisp rabbitmq interoperability: framing independent of protocol version, conv...
authored
458 (defgeneric frame-method-code (frame)
459 (:method ((frame 7-byte-header-frame))
460 (buffer-unsigned-byte-16 (frame-data frame) 2))
6a7b76a @lisp external build, take 1
authored
461
bbae9ab @lisp rabbitmq interoperability: framing independent of protocol version, conv...
authored
462 (:method ((frame 8-byte-header-frame))
463 (buffer-unsigned-byte-16 (frame-data frame) 2))
6a7b76a @lisp external build, take 1
authored
464
bbae9ab @lisp rabbitmq interoperability: framing independent of protocol version, conv...
authored
465 (:method ((frame 12-byte-header-frame))
466 (buffer-unsigned-byte-8 (frame-data frame) 1)))
6a7b76a @lisp external build, take 1
authored
467
bbae9ab @lisp rabbitmq interoperability: framing independent of protocol version, conv...
authored
468 (defgeneric setf-frame-method-code (code frame)
469 (:method (code (frame 7-byte-header-frame))
470 (setf (buffer-unsigned-byte-16 (frame-data frame) 2) code))
6a7b76a @lisp external build, take 1
authored
471
bbae9ab @lisp rabbitmq interoperability: framing independent of protocol version, conv...
authored
472 (:method (code (frame 8-byte-header-frame))
473 (setf (buffer-unsigned-byte-16 (frame-data frame) 2) code))
6a7b76a @lisp external build, take 1
authored
474
bbae9ab @lisp rabbitmq interoperability: framing independent of protocol version, conv...
authored
475 (:method (code (frame 12-byte-header-frame))
476 (setf (buffer-unsigned-byte-8 (frame-data frame) 1) code)))
6a7b76a @lisp external build, take 1
authored
477
bbae9ab @lisp rabbitmq interoperability: framing independent of protocol version, conv...
authored
478 (defgeneric frame-method-name (frame)
479 (:method ((frame amqp::frame))
480 (connection-method-code-method-name (frame-connection frame) (frame-class-code frame) (frame-method-code frame))))
6a7b76a @lisp external build, take 1
authored
481
bbae9ab @lisp rabbitmq interoperability: framing independent of protocol version, conv...
authored
482 (defgeneric (setf frame-method-name) (name frame)
483 (:method ((name symbol) (frame amqp::frame))
484 (setf-frame-method-code (connection-method-name-method-code (frame-connection frame)
485 (frame-class-code frame)
486 name)
487 frame)))
6a7b76a @lisp external build, take 1
authored
488
489
490 (defgeneric content-header-class-id (frame)
491 (:method ((buffer vector))
492 (buffer-unsigned-byte-16 buffer 0))
493 (:method ((frame amqp:frame))
494 (buffer-unsigned-byte-16 (frame-data frame) 0)))
495
496 (defgeneric setf-content-header-class-id (id frame)
497 (:method (id (buffer vector))
498 (setf (buffer-unsigned-byte-16 buffer 0) id))
499 (:method (id (frame amqp:frame))
500 (setf (buffer-unsigned-byte-16 (frame-data frame) 0) id)))
501
502 (defgeneric content-header-class-name (frame)
503 (:method ((frame amqp::frame))
bbae9ab @lisp rabbitmq interoperability: framing independent of protocol version, conv...
authored
504 (connection-class-code-class-name (frame-connection frame) (content-header-class-id frame))))
6a7b76a @lisp external build, take 1
authored
505
506
507 (defgeneric content-weight (frame)
508 (:method ((buffer vector))
509 (buffer-unsigned-byte-16 buffer 2))
510 (:method ((frame amqp:frame))
511 (buffer-unsigned-byte-16 (frame-data frame) 2)))
512
513
514 (defgeneric setf-content-weight (id frame)
515 (:method (id (buffer vector))
516 (setf (buffer-unsigned-byte-16 buffer 2) id))
517 (:method (id (frame amqp:frame))
518 (setf (buffer-unsigned-byte-16 (frame-data frame) 2) id)))
519
520
521 (defgeneric content-body-size (frame)
522 (:method ((buffer vector))
523 (buffer-unsigned-byte-64 buffer 4))
524 (:method ((frame amqp:frame))
525 (buffer-unsigned-byte-64 (frame-data frame) 4)))
526
527
528 (defgeneric setf-content-body-size (id frame)
529 (:method (id (buffer vector))
530 (setf (buffer-unsigned-byte-64 buffer 4) id))
531 (:method (id (frame amqp:frame))
532 (setf (buffer-unsigned-byte-64 (frame-data frame) 4) id)))
533
534
bbae9ab @lisp rabbitmq interoperability: framing independent of protocol version, conv...
authored
535
536
537 ;;;
538 ;;; frame initialization -
539 ;;; leave them as individually specialized until know if there is some reliable system...
540
541 (defmethod initialize-instance ((instance 8-byte-header-frame)
542 &rest initargs
543 &key
544 connection
545 ;; maximum payload is frame max net both the header
546 ;; _and_ the end byte (cf. http://dev.rabbitmq.com/wiki/Amqp08To091)
547 ;; in this case, 8 byte header + 1 end byte
548 (frame-size (connection-frame-size connection))
549 (header (make-array 8 :element-type '(unsigned-byte 8)))
550 (data (make-array (- frame-size 9) :element-type '(unsigned-byte 8))))
551 (declare (dynamic-extent initargs))
552 (apply #'call-next-method instance
553 :header header
554 :data data
555 initargs))
556
557 (defmethod initialize-instance ((instance 7-byte-header-frame)
558 &rest initargs
559 &key
560 connection
561 ;; maximum payload is frame max net both the header
562 ;; _and_ the end byte (cf. http://dev.rabbitmq.com/wiki/Amqp08To091)
563 ;; in this case, 7 byte header + 1 end byte
564 (frame-size (connection-frame-size connection))
565 (header (make-array 7 :element-type '(unsigned-byte 8)))
566 (data (make-array (- frame-size 8) :element-type '(unsigned-byte 8))))
567 (declare (dynamic-extent initargs))
568 (apply #'call-next-method instance
569 :header header
570 :data data
571 initargs))
572
573 (defmethod initialize-instance ((instance 12-byte-header-frame)
574 &rest initargs
575 &key
576 connection
577 (frame-size (connection-frame-size connection))
578 (header (make-array 12 :element-type '(unsigned-byte 8)))
579 (data (make-array (- frame-size 12) :element-type '(unsigned-byte 8))))
580 (declare (dynamic-extent initargs))
581 (apply #'call-next-method instance
582 :header header
583 :data data
584 initargs))
585
586 ;;;
587 ;;; frame input operators
588
897590f @lisp relocate frame operators from classes.lisp to frames.lisp to correct loa...
authored
589
590 (defgeneric read-frame (connection frame &key start end)
591 (:documentation "Read from the connection socket into the given frame.
592 This varies per protocol as the header layout varies.")
e183bc3 @lisp implement stream-file-position for channels to accumulate across message...
authored
593
d772d24 @lisp changes to streamed framing to eliminate extra messages
authored
594 ; #+amqp.log-frames
897590f @lisp relocate frame operators from classes.lisp to frames.lisp to correct loa...
authored
595 (:method :around ((connection amqp:connection) (frame t) &key start end)
e183bc3 @lisp implement stream-file-position for channels to accumulate across message...
authored
596 (unless (open-stream-p connection)
597 (error 'end-of-file :stream connection))
598 (multiple-value-bind (frame length)
599 (call-next-method)
600 (amqp:log :debug connection "read-frame: (~a,~a) ~s = ~d" start end frame length)
601 (setf (connection-read-frame-timestamp connection) (get-universal-time))
602 (incf (device-frame-position connection) length)
603 (values frame length))))
897590f @lisp relocate frame operators from classes.lisp to frames.lisp to correct loa...
authored
604
605 (defgeneric write-frame (connection frame &key start end)
606 (:documentation "Write from the given frame to the connection socket.
607 This varies per protocol as the header layout varies.")
608
d772d24 @lisp changes to streamed framing to eliminate extra messages
authored
609 ; #+amqp.log-frames
897590f @lisp relocate frame operators from classes.lisp to frames.lisp to correct loa...
authored
610 (:method :around ((connection amqp:connection) (frame t) &key start end)
611 (amqp:log :debug connection "write-frame: (~a,~a) ~s" start end frame)
612 (call-next-method))
613
614 (:method ((channel amqp:channel) (frame amqp:frame) &rest args)
615 (declare (dynamic-extent args))
616 (apply #'write-frame (channel-connection channel) frame args))
617
618 (:method :after ((connection amqp:connection) (frame t) &key start end)
619 (declare (ignore start end))
620 (setf (connection-write-frame-timestamp connection) (get-universal-time))))
621
bbae9ab @lisp rabbitmq interoperability: framing independent of protocol version, conv...
authored
622
623 (defmethod read-frame ((connection amqp:connection) (frame amqp.u:7-byte-header-input-frame) &rest args)
624 (declare (dynamic-extent args))
625 (apply #'read-7-byte-header-frame frame (stream-input-handle connection)
626 args))
627
628 (defmethod write-frame ((connection amqp:connection) (frame amqp.u:7-byte-header-output-frame) &key (start 0) (end nil))
629 (let ((stream (stream-output-handle connection)))
630 (setf end (or end (frame-size frame)))
e183bc3 @lisp implement stream-file-position for channels to accumulate across message...
authored
631 (incf (device-frame-position connection) (+ end 7 1))
bbae9ab @lisp rabbitmq interoperability: framing independent of protocol version, conv...
authored
632 (write-sequence (frame-header frame) stream :start start)
633 (write-sequence (frame-data frame) stream :end end)
634 (write-sequence #(#xCE) stream)
635 (force-output stream)
636 (+ end 7 1))) ; total count
637
a9bec0d @lisp correct epoch valus to use a valid time string
authored
638 #+(or) ; for debugging
639 (defmethod write-frame ((connection amqp:connection) (frame amqp.u:7-byte-header-output-frame) &key (start 0) (end nil))
640 (let ((stream (stream-output-handle connection)))
641 (setf end (or end (frame-size frame)))
642 (incf (device-frame-position connection) (+ end 7 1))
643 (write-sequence (frame-header frame) stream :start start)
644 (unwind-protect
645 (let ((sequence (frame-data frame)))
646 (unless (= end (length sequence))
647 (setf sequence (subseq sequence start end)))
648 (trace sb-impl::ansi-stream-write-sequence
649 sb-impl::fd-stream-p
650 sb-impl::compatible-vector-and-stream-element-types-p
651 sb-impl::buffer-output
652 sb-impl::write-or-buffer-output
653 sb-impl::write-output-from-queue
654 sb-impl::%queue-and-replace-output-buffer
655 sb-impl::flush-output-buffer)
656 (write-sequence sequence stream :start 0 :end end)
657 (write-sequence #(#xCE) stream)
658 (force-output stream))
659 (untrace sb-impl::ansi-stream-write-sequence
660 sb-impl::fd-stream-p
661 sb-impl::compatible-vector-and-stream-element-types-p
662 sb-impl::buffer-output
663 sb-impl::write-or-buffer-output
664 sb-impl::write-output-from-queue
665 sb-impl::%queue-and-replace-output-buffer
666 sb-impl::flush-output-buffer))
667 (+ end 7 1)))
668 #+(or)
669 (progn
670 (in-package :spocq.i)
671 (main)
672 (stop)
673 ;;
674 (in-package :sb-unix)
675 (defparameter *trace-sap-buffer-lock* (bt:make-lock "sap-lock"))
676 (defun trace-sap-buffer (fd sap offset len)
677 (bt:with-lock-held (*trace-sap-buffer-lock*)
678 (format *trace-output* "~&[[~a ~a ~8x ~a]: '"
679 (de.setf.utility::iso-time) fd (sb-sys:sap-int sap)
680 (sb-thread:thread-name sb-thread:*current-thread*))
681 (loop for i from offset for count from 0 below len
682 do (let* ((byte (sb-impl::sap-ref-8 sap i))
683 (char (code-char byte)))
684 (write-char (if (graphic-char-p char) char #\.) *trace-output*)))
685 (format *trace-output* "]~%")))
686
687 (defun unix-write (fd buf offset len)
688 (declare (type unix-fd fd)
689 (type (unsigned-byte 32) offset len))
690 (flet ((%write (sap)
691 (declare (system-area-pointer sap))
692 (when (>= fd 12)
693 (trace-sap-buffer fd sap offset len))
694 (int-syscall ("write" int (* char) int)
695 fd
696 (with-alien ((ptr (* char) sap))
697 (addr (deref ptr offset)))
698 len)))
699 (etypecase buf
700 ((simple-array * (*))
701 (with-pinned-objects (buf)
702 (%write (vector-sap buf))))
703 (system-area-pointer
704 (%write buf)))))
705 ;; set up the tracer
706 ;; /development/downloads/rabbitmq-java-client-bin-2.4.1/runjava.sh com.rabbitmq.tools.Tracer
707 ;; change the broker uri
708 (in-package :spocq.i)
709 (setq *broker-uri* (puri:uri "amqp://HETZNERkopakooooooooo:HETZNERasdjfi2j3o4iajs@hetzner.dydra.com:5673"))
710 (setq *log-level* :warn)
711 (run)
712 )
bbae9ab @lisp rabbitmq interoperability: framing independent of protocol version, conv...
authored
713
714 (defmethod read-frame ((connection amqp:connection) (frame amqp.u:8-byte-header-input-frame) &rest args)
715 (declare (dynamic-extent args))
716 (apply #'read-8-byte-header-frame frame (stream-input-handle connection)
717 args))
718
719 (defmethod write-frame ((connection amqp:connection) (frame amqp.u:8-byte-header-output-frame) &key (start 0) (end nil))
720 (let ((stream (stream-output-handle connection)))
721 (setf end (or end (frame-size frame)))
e183bc3 @lisp implement stream-file-position for channels to accumulate across message...
authored
722 (incf (device-frame-position connection) (+ end 8 1))
bbae9ab @lisp rabbitmq interoperability: framing independent of protocol version, conv...
authored
723 (write-sequence (frame-header frame) stream :start start)
724 (write-sequence (frame-data frame) stream :end end)
725 (write-sequence #(#xCE) stream)
726 (force-output stream)
727 (+ end 1 8))) ; total count
728
729
730 (defmethod read-frame ((connection amqp:connection) (frame amqp.u:12-byte-header-input-frame) &rest args)
731 (declare (dynamic-extent args))
732 (apply #'read-12-byte-header-frame frame (stream-input-handle connection)
733 args))
734
735 (defmethod write-frame ((connection amqp:connection) (frame amqp.u:12-byte-header-output-frame) &key (start 0) (end nil))
736 (let ((stream (stream-output-handle connection)))
737 (setf end (or end (frame-size frame)))
e183bc3 @lisp implement stream-file-position for channels to accumulate across message...
authored
738 (incf (device-frame-position connection) (+ end 12))
bbae9ab @lisp rabbitmq interoperability: framing independent of protocol version, conv...
authored
739 (write-sequence (frame-header frame) stream :start start)
740 (write-sequence (frame-data frame) stream :end end)
741 (+ end 12)))
742
6a7b76a @lisp external build, take 1
authored
743
c2728ca @lisp renaming protocol directories, phase one
authored
744 (:documentation (read-7-byte-header-frame read-8-byte-header-frame read-12-byte-header-frame)
6a7b76a @lisp external build, take 1
authored
745 "The abstract frame structure (header . payload), is implemented variously in the respective versions.
746 The header size varies. The end marker is eliminated. The field sizes change. To allow for this, each
747 connection uses a specialized frame class and that class implements the general frame format.
748 This includes the buffer access as well as mediating translation between protocol-specific wire codes
749 for operations and classes and the generic names. Each protocol implements its logic in its
750 'abstract-classes' and 'classes' files.
751 nb. some stream classes which appear as handles may not specialize the respective `stream-read-*`
752 operators.")
753
754
755 (defun read-7-byte-header-frame (frame stream &key (start 0))
756 "Read a frame with a 7-byte header into a FRAME from a binary STREAM.
757 Permit a START offset to allow for the first already-read byte of an initial
758 start frame.
759 VALUES : the frame
760 the complete frame length = header + payload net end marker
761 CONDITIONS : signals end-of file if either the header or the payload is incomplete."
762 (unless (eql 7 (read-sequence (frame-header frame) stream :start start :end 7))
763 (error 'end-of-file :stream stream))
764 (let* ((payload-length (buffer-unsigned-byte-32 (frame-header frame) 3))
765 (frame-length (+ 7 payload-length))
766 (frame-end-marker nil)
767 (data (frame-data frame)))
768 ;; ignore the end marker for the length calculation
769 (assert (<= payload-length (length data)) ()
770 "Frame length exceeds buffer size: ~s, ~s"
771 payload-length data)
772 (unless (eql payload-length (read-sequence data stream :end payload-length))
773 (error 'end-of-file :stream stream))
774 (assert (eql (setf frame-end-marker (read-byte stream)) #xCE) ()
775 "Invalid frame end: ~s" frame-end-marker)
776 (values frame frame-length)))
777
778
779 (defun read-8-byte-header-frame (frame stream &key (start 0))
780 "Read a frame with a 8-byte header into a FRAME from a binary STREAM.
781 Permit a START offset to allow for the first already-read byte of an iitial
782 start frame.
783 VALUES : the frame
784 the complete frame length = header + payload net end marker
785 CONDITIONS : signals end-of file if either the header or the payload is incomplete."
786 (unless (eql 8 (read-sequence (frame-header frame) stream :start start :end 8))
787 (error 'end-of-file :stream stream))
788 (let* ((payload-length (buffer-unsigned-byte-32 (frame-header frame) 4))
789 (frame-length (+ 8 payload-length))
790 (frame-end-marker nil)
791 (data (frame-data frame)))
792 ;; ignore the end marker for the length calculation
793 (assert (<= payload-length (length (frame-data frame))) ()
794 "Frame length exceeds buffer size: ~s, ~s"
795 payload-length data)
796 (unless (eql payload-length (read-sequence data stream :end payload-length))
797 (error 'end-of-file :stream stream))
798 (assert (eql (setf frame-end-marker (read-byte stream)) #xCE) ()
799 "Invalid frame end: ~s" frame-end-marker)
800 (values frame frame-length)))
801
802 (defun read-12-byte-header-frame (frame stream &key (start 0))
803 "Read a frame with a 12-byte header into a BUFFER from a binary STREAM.
804 Permit a START offset to allow for the first already-read byte of an iitial
805 start frame.
806 VALUES : the frame
807 the complete frame length = header + payload net end marker
808 CONDITIONS : signals end-of file if either the header or the payload is incomplete."
809 (unless (eql 12 (read-sequence (frame-header frame) stream :start start :end 12))
810 (error 'end-of-file :stream stream))
811 (let* ((frame-length (buffer-unsigned-byte-16 (frame-header frame) 2))
812 (payload-length (- frame-length 12))
813 (data (frame-data frame)))
814 (assert (<= payload-length (length data)) ()
815 "Frame length exceeds buffer size: ~s, ~s"
816 payload-length (length data))
817 (unless (eql payload-length (read-sequence data stream :end payload-length))
818 (error 'end-of-file :stream stream))
819 (values frame frame-length)))
Something went wrong with that request. Please try again.