Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Newer
Older
100644 527 lines (410 sloc) 21.275 kb
6a7b76a @lisp external build, take 1
authored
1 ;;; -*- Package: de.setf.amqp.implementation; -*-
2
3 (in-package :de.setf.amqp.implementation)
4
c2728ca @lisp renaming protocol directories, phase one
authored
5 (:documentation "This file defines a stream interface for AMQP channel and connection instances for the
6 'de.setf.amqp' library."
7
6a7b76a @lisp external build, take 1
authored
8 (copyright
9 "Copyright 2010 [james anderson](mailto:james.anderson@setf.de) All Rights Reserved"
10 "'de.setf.amqp' is free software: you can redistribute it and/or modify it under the terms of version 3
11 of the GNU Affero General Public License as published by the Free Software Foundation.
12
13 'setf.amqp' is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the
14 implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
15 See the Affero General Public License for more details.
16
17 A copy of the GNU Affero General Public License should be included with 'de.setf.amqp' as `AMQP:agpl.txt`.
18 If not, see the GNU [site](http://www.gnu.org/licenses/).")
19
20 (long-description "The goal here is to hook the channel/connection stream operations into standard
21 stream operations. this file implements
22
23 - gray stream compatibility
24 - stream-ty* support
25
26 the original "gray streams" proposal source[1] has not been forthcoming, but there is a copy on the clozure
27 ftp site.[2] kmp has notes[3], corman lisp publishes an implementation[4], and clim discusses in interim gray
28 streams implementation[5], to be used in the interim, until a real implementation is available.
29
30
31 the stream uses a channel's the dual-channel-simple-stream device implementation to manage body length,
32 position and buffering together with the its operators for character coding. the interface
33 supports (vector (unsigned byte 8)) and string element types and (vector (unsigned byte 8))
34 i/o buffers. codecs are always present - even for ascii, and are used to implement unread-char.
35
36 the protocol is
37 on input:
38
39 buffer is the buffer
40 buf-len is the full buffer size
41 buffer-ptr is the length of read data, which is given by the frame payload size
42 buffpos is the position from which to take the next byte
43 body-position is that position wrt the entire body
44 body-length is the length of the entire body
45
46 the decoders maintain buffpos and body-position, and use the relation between buffpos
47 and buffer-ptr to refresh the buffer iff needed. if buff-pr is negative, that indicates that eof
48 has already been reached. the relation between body-position and body-size can indicate
49 the end of the current content, depends on whether the channel expects chunked content.
50 (see device-read)
51
52 on output:
53
54 out-buffer is the buffer
55 max-out-pos is the full buffer size
56 outpos is the position at which to put the next byte
57 body-position is that position wrt the entire body
58 body-length is the length of the entire body
59
60 the encoders maintian outpos and body-position and use the relation between outpos and
61 max-out-pos to flush the buffer iff needed. the relation between body-position and
62 body-size indicates the end of intended content and is used to effect chunking. (see device-write)
63
64 body-position and body-length can be multiplexed between input and output so long as the channels
65 remain half-duplex. the application must adhere to the single-channel per process constraint and
66 limit stream i/o to device-read/write-content or the respetive request-publish/get. should interleaved
67 input and output be required, the channel will need distinct state for each direction.
68
69 ---
70 [1]: ftp://parcftp.xerox.com/pub/cl/cleanup/mail/stream-definition-by-user.mail
6260875 @lisp correct link to gray stream document
authored
71 [2]: ftp://ftp.clozure.com/pub/stream-definition-by-user.mail
72 [3]: http://www.nhplace.com/kent/CL/Issues/stream-definition-by-user-notes.html
6a7b76a @lisp external build, take 1
authored
73 [4]: http://www.grumblesmurf.org/corman-patches/modules/gray-streams.lisp
74 [5]: http://www.cs.cmu.edu/afs/cs/project/clisp/hackers/phg/clim/src/utils/cl-streams.lisp"))
75
76
77 ;;;
78 ;;; input
79
fd51e77 @lisp additions and corrections in connection with spocq sae development.
authored
80 (defun amqp-stream-read-char (stream)
9bdf42b @lisp documentation edits
authored
81 "Read a character from an open channel on an amqp-device according to the channel's current encoding.
82 At EOF return the stream-eof-marker."
6a7b76a @lisp external build, take 1
authored
83 (with-slots (decoder) stream
84 (flet ((buffer-extract-byte (stream)
85 (with-slots (buffer buffpos buffer-ptr body-position body-length) stream
86 (when (or (< buffpos buffer-ptr)
87 (and (not (minusp buffer-ptr))
88 ; resets buff-pos / renews buffer content unless eof
89 (plusp (device-read stream nil 0 nil t))))
90 (incf body-position)
91 (prog1 (aref buffer buffpos)
92 (incf buffpos))))))
93 (or (funcall decoder #'buffer-extract-byte stream) (stream-eof-marker stream)))))
94
fd51e77 @lisp additions and corrections in connection with spocq sae development.
authored
95 (defmethod stream-read-char ((stream amqp-device))
96 (amqp-stream-read-char stream))
97
98
6a7b76a @lisp external build, take 1
authored
99 ;;;!!! for multi-byte encodings the content-encoding needs to supply this
100 ;;;!!! in order that the position is properly maintained.
101 ;;;!!! it's also not clear what happens when this backs up over a frame boundary
102 (defmethod stream-unread-char ((stream amqp-device) char)
103 (with-slots (buffpos body-position) stream
104 (let ((old-decoder (device-decoder stream)))
105 (flet ((unread-decoder (byte-decoder stream)
106 (declare (ignore byte-decoder))
107 (with-slots (buffpos body-position) stream
108 (setf-device-decoder old-decoder stream)
109 (incf body-position)
110 (incf buffpos)
111 char)))
112 (decf body-position)
113 (decf buffpos)
114 (setf-device-decoder #'unread-decoder stream)
115 nil))))
116
117
118 (defmethod stream-read-char-no-hang ((stream amqp-device))
9bdf42b @lisp documentation edits
authored
119 "If input is already available from an open channel on an amqp-device read the next character according
120 to the channel's current encoding. If none is available, return NIL. At EOF return the stream-eof-marker."
6a7b76a @lisp external build, take 1
authored
121 (with-slots (body-position body-length) stream
122 (if (< body-position body-length)
123 (when (stream-listen (stream-input-handle stream))
124 (stream-read-char stream))
125 (stream-eof-marker stream))))
126
127
128 (defmethod stream-peek-char ((stream amqp-device))
129 (with-slots (decoder) stream
91e34d1 @lisp correct character readers to actually read characters; delegate stream-l...
authored
130 ;; cannot do this manipulating buffer position, as a multi-byte encoding
131 ;; might cross a frame boundary
6a7b76a @lisp external build, take 1
authored
132 (flet ((buffer-extract-byte (stream)
91e34d1 @lisp correct character readers to actually read characters; delegate stream-l...
authored
133 (with-slots (buffer buffpos buffer-ptr body-position body-length) stream
6a7b76a @lisp external build, take 1
authored
134 (when (or (< buffpos buffer-ptr)
135 (and (not (minusp buffer-ptr))
136 ; resets buff-pos / renews buffer content unless eof
137 (plusp (device-read stream nil 0 nil t))))
91e34d1 @lisp correct character readers to actually read characters; delegate stream-l...
authored
138 (incf body-position)
139 (prog1 (aref buffer buffpos)
140 (incf buffpos))))))
141 (let ((char (funcall decoder #'buffer-extract-byte stream)))
142 (cond (char
143 (stream-unread-char stream char)
144 char)
145 (t
146 (stream-eof-marker stream)))))))
6a7b76a @lisp external build, take 1
authored
147
148
149 (defmethod stream-listen ((stream amqp-device))
91e34d1 @lisp correct character readers to actually read characters; delegate stream-l...
authored
150 ;; sbcl has no stream-listen for the input stream
151 (device-listen stream))
6a7b76a @lisp external build, take 1
authored
152
153
154 (defmethod stream-read-line ((stream amqp-device))
9bdf42b @lisp documentation edits
authored
155 "Read a line of characters from an open channel on an amqp-device according to the channel's current encoding.
156 Returns those up to the next stream-eol-marker as a new string. Iff the line is terminated by EOF, returns
157 a second value, the stream-eof-marker."
6a7b76a @lisp external build, take 1
authored
158 (with-slots (decoder) stream
91e34d1 @lisp correct character readers to actually read characters; delegate stream-l...
authored
159 (let ((eol-char (stream-eol-marker stream))
6a7b76a @lisp external build, take 1
authored
160 (line (stream-line-buffer stream)))
161 (setf (fill-pointer line) 0)
162 (flet ((buffer-extract-byte (stream)
91e34d1 @lisp correct character readers to actually read characters; delegate stream-l...
authored
163 (with-slots (buffer buffpos buffer-ptr body-position body-length) stream
164 (when (or (< buffpos buffer-ptr)
165 (and (not (minusp buffer-ptr))
166 ; resets buff-pos / renews buffer content unless eof
167 (plusp (device-read stream nil 0 nil t))))
168 (incf body-position)
169 (prog1 (aref buffer buffpos)
170 (incf buffpos))))))
171 (loop (let ((char (funcall decoder #'buffer-extract-byte stream)))
172 (cond ((eql char eol-char)
6a7b76a @lisp external build, take 1
authored
173 (return (copy-seq line)))
91e34d1 @lisp correct character readers to actually read characters; delegate stream-l...
authored
174 (char
175 (vector-push-extend char line))
6a7b76a @lisp external build, take 1
authored
176 (t
177 (return (values (copy-seq line) (stream-eof-marker stream)))))))))))
178
179
180 (defmethod stream-clear-input ((stream amqp-device))
9bdf42b @lisp documentation edits
authored
181 "Clear any pending buffered input from the stream's device."
6a7b76a @lisp external build, take 1
authored
182 (device-clear-input stream t))
183
184
185 (defmethod stream-line-column ((stream amqp-device))
9bdf42b @lisp documentation edits
authored
186 "Constantly nil."
6a7b76a @lisp external build, take 1
authored
187 nil)
188
189
190 (defmethod stream-start-line-p ((stream amqp-device))
9bdf42b @lisp documentation edits
authored
191 "Constantly nil."
6a7b76a @lisp external build, take 1
authored
192 nil)
193
194
195 (defun amqp-stream-write-char (stream character)
196 (with-slots (encoder) stream
197 (flet ((buffer-insert-byte (stream byte)
198 (with-slots (out-buffer outpos max-out-pos body-position body-length encoder) stream
199 (setf (aref out-buffer outpos) byte)
200 (incf body-position)
201 (incf outpos)
202 (when (>= outpos max-out-pos)
203 ;; resets outpos / flushes out-buffer content
204 (device-write stream nil 0 nil t)))))
205 (funcall encoder character #'buffer-insert-byte stream))
206 character))
207
208 (defmethod stream-write-char ((stream amqp-device) character)
9bdf42b @lisp documentation edits
authored
209 "Write a character to an open channel on an amqp-device according to the channel's current encoding."
6a7b76a @lisp external build, take 1
authored
210 (amqp-stream-write-char stream character))
211
212
9bdf42b @lisp documentation edits
authored
213 (defun amqp-stream-write-string (stream string start end)
6a7b76a @lisp external build, take 1
authored
214 (with-slots (encoder) stream
215 (flet ((buffer-insert-byte (stream byte)
216 (with-slots (out-buffer outpos max-out-pos body-position body-length) stream
217 (setf (aref out-buffer outpos) byte)
218 (incf body-position)
219 (incf outpos)
220 (when (>= outpos max-out-pos)
221 (device-write stream nil 0 nil t)))))
222 (unless start (setf start 0))
223 (unless end (setf end (length string)))
224 (do ((i start (1+ i)))
225 ((>= i end))
226 (funcall encoder (char string i) #'buffer-insert-byte stream)))
227 string))
228
229 (defmethod stream-write-string ((stream amqp-device) string #-mcl &optional start end)
9bdf42b @lisp documentation edits
authored
230 "Write a string to an open channel on an amqp-device according to the channel's current encoding."
231 (amqp-stream-write-string stream string start end))
6a7b76a @lisp external build, take 1
authored
232
233
234 (defmethod stream-terpri ((stream amqp-device))
9bdf42b @lisp documentation edits
authored
235 (stream-write-char stream (stream-eol-marker stream)))
6a7b76a @lisp external build, take 1
authored
236
237
238 ;; stream-fresh-line :
239 ;; default suffices
240
241
242 (defmethod stream-finish-output ((stream amqp-device))
9bdf42b @lisp documentation edits
authored
243 "Force output and delegate the finish to the output stream."
6a7b76a @lisp external build, take 1
authored
244 (stream-force-output stream)
245 (stream-finish-output (stream-output-handle stream)))
246
247
248 (defmethod stream-force-output ((stream amqp-device))
249 "Flush the current buffer, w/o checking for content. (see device-flush)"
250 (device-flush stream))
251
252
253 (defmethod stream-clear-output ((stream amqp-device))
9bdf42b @lisp documentation edits
authored
254 "Discard anything in the present output buffer."
6a7b76a @lisp external build, take 1
authored
255 (with-slots (outpos body-position) stream
256 (when (plusp outpos)
257 ;; back up in the global stream
258 (decf body-position outpos)
259 ;; reset the current frame
260 (setf outpos 0))))
261
262
13b6a7a @lisp correct parameters to stream-advance-to-column
authored
263 (defmethod stream-advance-to-column ((stream amqp-device) (column t))
6a7b76a @lisp external build, take 1
authored
264 nil)
265
266
267 ;;; close :
268 ;;; see extremely-simple-streams.lisp
269
270
271 #-mcl
272 (defmethod open-stream-p ((stream amqp-device))
9bdf42b @lisp documentation edits
authored
273 "Return true iff the stream's channel is open."
274 ;; This replicates the mcl definition, so that s single stream-direction suffices.
6a7b76a @lisp external build, take 1
authored
275
276 (not (eql (stream-direction stream) :closed)))
277
278 (defmethod stream-direction ((stream amqp-device))
279 "A device's direction depends on the state of its handles."
280 (if (stream-input-handle stream)
281 (if (stream-output-handle stream)
282 :io
283 :input)
284 (if (stream-output-handle stream)
285 :output
286 :closed)))
287
288
91e34d1 @lisp correct character readers to actually read characters; delegate stream-l...
authored
289 #+ccl
290 (defmethod stream-eofp ((stream amqp-device))
291 (with-slots (body-length body-position) stream
292 (>= body-position body-length)))
293
6a7b76a @lisp external build, take 1
authored
294 ;; input-stream-p :
295 ;; mcl version is not generic, it is based on direction, which suffices
296
297 ;; output-stream-p :
298 ;; mcl version is not generic, it is based on direction, which suffices
299
300 (defmethod stream-element-type ((stream amqp-device))
301 (device-element-type stream))
302
e183bc3 @lisp implement stream-file-position for channels to accumulate across message...
authored
303 (defmethod stream-position ((stream amqp-device) &optional new)
304 (when (null new)
305 (device-file-position stream)))
306
6b79164 @lisp lispworks stream imports
authored
307 #-lispworks
e183bc3 @lisp implement stream-file-position for channels to accumulate across message...
authored
308 (defmethod stream-file-position ((stream amqp-device) &optional new)
309 (when (null new)
310 (device-file-position stream)))
6b79164 @lisp lispworks stream imports
authored
311 #+lispworks
312 (defmethod stream-file-position ((stream amqp-device))
313 (device-file-position stream))
6a7b76a @lisp external build, take 1
authored
314
315 ;; pathname : NYI
316
317
318 ;; truename : NYI
319
320
fd51e77 @lisp additions and corrections in connection with spocq sae development.
authored
321 (defun amqp-stream-read-byte (stream)
9bdf42b @lisp documentation edits
authored
322 "Read a byte from an open channel on an amqp-device. Manage buffer positions and refresh
323 buffers from read frames as required. AAt EOF return the stream-eof-marker."
6a7b76a @lisp external build, take 1
authored
324 (with-slots (buffer buffpos buffer-ptr body-position) stream
325 (if (or (< buffpos buffer-ptr)
326 (and (not (minusp buffer-ptr))
327 ; resets buff-pos / buffer content unless eof
328 (plusp (device-read stream nil 0 nil t))))
329 (prog1 (aref buffer buffpos)
330 (incf buffpos)
331 (incf body-position))
332 (stream-eof-marker stream))))
333
fd51e77 @lisp additions and corrections in connection with spocq sae development.
authored
334 (defmethod stream-read-byte ((stream amqp-device))
335 (amqp-stream-read-byte stream))
336
6a7b76a @lisp external build, take 1
authored
337
fd51e77 @lisp additions and corrections in connection with spocq sae development.
authored
338 (defun amqp-stream-write-byte (stream byte)
9bdf42b @lisp documentation edits
authored
339 "Write a byte to an open channel on an amqp-device. Add the byte at the current buffer position.
340 If either the buffer is full, or the stream length is reached, write the buffer."
6a7b76a @lisp external build, take 1
authored
341 (with-slots (out-buffer outpos max-out-pos body-position body-length) stream
342 (setf (aref out-buffer outpos) byte)
343 (incf body-position)
344 (incf outpos)
345 (when (>= outpos max-out-pos)
346 (device-write stream nil 0 nil t))
347 byte))
348
fd51e77 @lisp additions and corrections in connection with spocq sae development.
authored
349 (defmethod stream-write-byte ((stream amqp-device) byte)
350 (amqp-stream-write-byte stream byte))
351
6a7b76a @lisp external build, take 1
authored
352
353
d84e1d0 @lisp lispworks stream imports
authored
354 (defmethod stream-read-sequence
355 #+ccl ((stream amqp-device) (sequence vector) &key (start 0) end)
356 #+lispworks ((stream amqp-device) (sequence vector) start end)
357 #-(or ccl lispworks) ((stream amqp-device) (sequence vector) #-ccl &optional (start 0) end)
9bdf42b @lisp documentation edits
authored
358 "Read a byte sequence from an open channel on an amqp-device. Invokes device-read to read and transfer data
359 directly via the device frame buffers."
6a7b76a @lisp external build, take 1
authored
360 (setf end (or end (length sequence)))
361 (let ((count (device-read stream sequence start end nil)))
362 (when (plusp count) (+ start count))))
363
364
365
d84e1d0 @lisp lispworks stream imports
authored
366 (defmethod stream-read-sequence
367 #+ccl ((stream amqp-device) (sequence string) &key (start 0) end)
368 #+lispworks ((stream amqp-device) (sequence string) start end)
369 #-(or ccl lispworks) ((stream amqp-device) (sequence string) #-ccl &optional (start 0) end)
9bdf42b @lisp documentation edits
authored
370 "Read a character sequence from an open channel on an amqp-device. Arrange to read bytes from the device buffer,
371 construct characters, and return the the next position. Iff the first byte read shows eof, return nil."
6a7b76a @lisp external build, take 1
authored
372 (setf end (or end (length sequence)))
373 (with-slots (decoder) stream
374 (flet ((buffer-extract-byte (stream)
375 (with-slots (buffer buffpos buffer-ptr body-position) stream
376 (when (or (< buffpos buffer-ptr)
377 (and (not (minusp buffer-ptr))
378 ; resets buff-pos / buffer content unless eof
379 (plusp (device-read stream nil 0 nil t))))
380 (prog1 (aref buffer buffpos)
381 (incf body-position)
382 (incf buffpos))))))
383 (if (> end start)
384 (let ((char (funcall decoder #'buffer-extract-byte stream)))
385 (when char
386 (setf (char sequence start) char)
387 (do ((i (1+ start) (1+ i)))
388 ((>= i end) end)
389 (if (setf char (funcall decoder #'buffer-extract-byte stream))
390 (setf (char sequence i) char)
391 (return i)))))
392 end))))
393
394
395
d84e1d0 @lisp lispworks stream imports
authored
396 (defmethod stream-write-sequence
397 #+ccl ((stream amqp-device) (sequence vector) &key (start 0) end)
398 #+lispworks ((stream amqp-device) (sequence vector) start end)
399 #-(or ccl lispworks) ((stream amqp-device) (sequence vector) #-ccl &optional (start 0) end)
9bdf42b @lisp documentation edits
authored
400 "Write a byte sequence to an open channel on an amqp-device. Invokes device-read to transfer data write
401 directly via the device frame buffers."
6a7b76a @lisp external build, take 1
authored
402 (setf end (or end (length sequence)))
403 (device-write stream sequence start end 0))
404
405
d84e1d0 @lisp lispworks stream imports
authored
406 (defmethod stream-write-sequence
407 #+ccl ((stream amqp-device) (sequence string) &key (start 0) end)
408 #+lispworks ((stream amqp-device) (sequence string) start end)
409 #-(or ccl lispworks) ((stream amqp-device) (sequence string) #-ccl &optional (start 0) end)
9bdf42b @lisp documentation edits
authored
410 "Write a character sequence from an open channel on an amqp-device. Arrange to read bytes from the device buffer,
411 construct characters, and return the the next position."
6a7b76a @lisp external build, take 1
authored
412 (setf end (or end (length sequence)))
413 (stream-write-string stream sequence start end))
414
415
416
417 #+clozure
418 (progn
419 (defmethod ccl:stream-read-vector ((stream amqp-device) (sequence vector) start end)
420 (let ((count (device-read stream sequence (or start 0) (or end (length sequence)) nil)))
421 (when (plusp count) (+ start count))))
422
423 (defmethod ccl:stream-read-vector ((stream amqp-device) (sequence string) start end)
424 (stream-read-sequence stream sequence :start (or start 0) :end (or end (length sequence))))
425
426 (defmethod ccl:stream-write-vector ((stream amqp-device) (sequence vector) start end)
427 (device-write stream sequence (or start 0) (or end (length sequence)) t))
428
429 (defmethod ccl:stream-write-vector ((stream amqp-device) (sequence string) start end)
430 (stream-write-sequence stream sequence :start (or start 0) :end (or end (length sequence)))))
431
432
433 (defmethod stream-tyo ((stream amqp-device) character)
434 ;; (stream-tyo *trace-output* character)
435 (stream-write-char stream character))
436
437 (defmethod stream-tyi ((stream amqp-device))
438 (let ((char (stream-read-char stream)))
439 (typecase char
440 (character char)
441 (t nil))))
442
443 (defmethod stream-untyi ((stream amqp-device) character)
444 (stream-unread-char stream character))
445
446
447
448 ;;;
449 ;;; fu interface
450
451 (defmethod device-allocate-buffer ((stream amqp-device) &key
452 (length (device-buffer-length stream))
453 (initial-contents nil))
454 ;; the description does not make this obvious, but it
455 ;; makes sense for thi initial contents to be in the application domain
456 ;; - that is, decoded
457 (let ((new-buffer (make-frame-buffer length)))
458 (when initial-contents
459 (assert (= (length initial-contents) length) ()
460 "Inconsistent lengths: ~d, ~d" length (length initial-contents))
461 (with-slots (out-buffer outpos max-out-pos body-length body-position) stream
462 (let ((.out-buffer out-buffer)
463 (.outpos outpos)
464 (.max-out-pos max-out-pos)
465 (.body-length body-length)
466 (.body-position body-position))
467 (unwind-protect
468 (progn (setf out-buffer new-buffer
469 outpos 0
470 max-out-pos (1+ length) ; prevent writing
471 body-length (1+ length)
472 body-position 0)
473 (stream-write-sequence stream initial-contents))
474 (setf out-buffer .out-buffer
475 outpos .outpos
476 max-out-pos .max-out-pos
477 body-length .body-length
478 body-position .body-position)))))
479 new-buffer))
480
481
482 (defmethod device-input-element-type ((stream amqp-device))
483 ;; one only
484 (device-element-type stream))
485
486
487 (defmethod device-output-element-type ((stream amqp-device))
488 ;; one only
489 (device-element-type stream))
490
491 (defmethod device-encoded-length ((stream amqp-device) buffer &optional start end)
492 (declare (ignore buffer start end))
493 nil)
494
495 ;;;
496 ;;; the description does not make this obvious, but it
497 ;;; [http://paste.lisp.org/display/65229], and the names do imply that
498 ;;; the buffers are in the device domain - that is the content is encoded
499 ;;; thus device-write rather than stream-write-sequence
500
501 (defmethod device-write-buffers ((stream amqp-device) &rest buffer-specs)
502 (loop for (buffer start end) in buffer-specs by #'cddr
503 do (device-write stream buffer start end t)))
504
505 (defmethod device-read-buffers ((stream amqp-device) &rest buffer-specs)
506 (loop for (buffer start end) in buffer-specs by #'cddr
507 do (device-read stream buffer start end t)))
91e34d1 @lisp correct character readers to actually read characters; delegate stream-l...
authored
508
509
fd51e77 @lisp additions and corrections in connection with spocq sae development.
authored
510
511 ;;;
512 ;;; stream-reader/writer
513
514 (defmethod stream-reader ((stream amqp-device))
515 (typecase (amqp.u:channel-content-type stream)
516 (mime:binary
517 (values #'amqp-stream-read-byte stream))
518 (t
519 (values #'amqp-stream-read-char stream))))
520
521 (defmethod stream-writer ((stream amqp-device))
522 (typecase (amqp.u:channel-content-type stream)
523 (mime:binary
524 (values #'amqp-stream-write-byte stream))
525 (t
526 (values #'amqp-stream-write-char stream))))
Something went wrong with that request. Please try again.