Skip to content

Commit

Permalink
move all I/O operation to gloss.io
Browse files Browse the repository at this point in the history
  • Loading branch information
ztellman committed Nov 27, 2010
1 parent fa40114 commit 3d39387
Show file tree
Hide file tree
Showing 4 changed files with 87 additions and 56 deletions.
2 changes: 1 addition & 1 deletion project.clj
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
(defproject gloss "0.1.0-SNAPSHOT"
(defproject gloss "0.1.1-SNAPSHOT"
:description "speaks in bytes, so that you don't have to"
:license {:name "Eclipse Public License - v 1.0"
:url "http://www.eclipse.org/legal/epl-v10.html"
Expand Down
51 changes: 2 additions & 49 deletions src/gloss/core.clj
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@
(:use
potemkin
[gloss.core protocols]
[gloss.data primitives])
[gloss.data primitives]
[gloss.core.formats :only (to-byte-buffer to-buf-seq)])
(:require
[gloss.data.bytes :as bytes]
[gloss.core.formats :as formats]
Expand All @@ -34,54 +35,6 @@

;;;

(import-fn #'formats/to-byte-buffer)
(import-fn #'formats/to-buf-seq)

(defn contiguous
"Takes a sequence of ByteBuffers and returns a single contiguous ByteBuffer."
[buf-seq]
(when buf-seq
(bytes/take-contiguous-bytes (bytes/byte-count buf-seq) buf-seq)))

(defn encode
"Turns a frame value into a sequence of ByteBuffers."
[codec val]
(when val
(write-bytes codec nil val)))

(defn encode-all
"Turns a sequence of frame values into a sequence of ByteBuffers."
[codec vals]
(apply concat
(map #(write-bytes codec nil %) vals)))

(defn decode
"Turns bytes into a single frame value. If there are too few or too many bytes
for the frame, an exception is thrown."
[codec bytes]
(let [buf-seq (bytes/dup-bytes (to-buf-seq bytes))
[success val remainder] (read-bytes codec buf-seq)]
(when-not success
(throw (Exception. "Insufficient bytes to decode frame.")))
(when-not (empty? remainder)
(throw (Exception. "Bytes left over after decoding frame.")))
val))

(defn decode-all
"Turns bytes into a sequence of frame values. If there are bytes left over at the end
of the sequence, an exception is thrown."
[codec bytes]
(let [buf-seq (bytes/dup-bytes (to-buf-seq bytes))]
(loop [buf-seq buf-seq, vals []]
(if (empty? buf-seq)
vals
(let [[success val remainder] (read-bytes codec buf-seq)]
(when-not success
(throw (Exception. "Bytes left over after decoding sequence of frames.")))
(recur remainder (conj vals val)))))))

;;;

(import-fn codecs/enum)
(import-fn codecs/ordered-map)

Expand Down
88 changes: 83 additions & 5 deletions src/gloss/io.clj
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,88 @@

(ns gloss.io
(:use
[gloss.core codecs structure protocols formats]
[lamina core]))
[gloss.core codecs structure protocols]
[potemkin]
[lamina core])
(:require
[gloss.core.formats :as formats]
[gloss.data.bytes :as bytes])
(:import
[java.nio.channels
Channels]
[java.nio
ByteBuffer]))

(defn- decode-stream [codec reader buf-seq]
;;;

(import-fn #'formats/to-byte-buffer)
(import-fn #'formats/to-buf-seq)

(defn contiguous
"Takes a sequence of ByteBuffers and returns a single contiguous ByteBuffer."
[buf-seq]
(when buf-seq
(bytes/take-contiguous-bytes (bytes/byte-count buf-seq) buf-seq)))

;;;

(defn encode
"Turns a frame value into a sequence of ByteBuffers."
[codec val]
(when val
(write-bytes codec nil val)))

(defn encode-to-buffer
"Encodes a sequence of values, and writes them to a ByteBuffer."
[codec buf vals]
(assert (sizeof codec))
(doseq [v vals]
(write-bytes codec buf v)))

(defn encode-all
"Turns a sequence of frame values into a sequence of ByteBuffers."
[codec vals]
(if-let [size (sizeof codec)]
(let [buf (ByteBuffer/allocate (* size (count vals)))]
(encode-to-buffer codec buf vals))
(apply concat
(map #(write-bytes codec nil %) vals))))

(defn encode-to-stream
"Encodes a sequence of values, and writes them to an OutputStream."
[codec output-stream vals]
(let [channel (Channels/newChannel output-stream)]
(doseq [buf (encode-all codec vals)]
(.write channel ^ByteBuffer buf))))

;;;

(defn decode
"Turns bytes into a single frame value. If there are too few or too many bytes
for the frame, an exception is thrown."
[codec bytes]
(let [buf-seq (bytes/dup-bytes (to-buf-seq bytes))
[success val remainder] (read-bytes codec buf-seq)]
(when-not success
(throw (Exception. "Insufficient bytes to decode frame.")))
(when-not (empty? remainder)
(throw (Exception. "Bytes left over after decoding frame.")))
val))

(defn decode-all
"Turns bytes into a sequence of frame values. If there are bytes left over at the end
of the sequence, an exception is thrown."
[codec bytes]
(let [buf-seq (bytes/dup-bytes (to-buf-seq bytes))]
(loop [buf-seq buf-seq, vals []]
(if (empty? buf-seq)
vals
(let [[success val remainder] (read-bytes codec buf-seq)]
(when-not success
(throw (Exception. "Bytes left over after decoding sequence of frames.")))
(recur remainder (conj vals val)))))))

(defn- decode-byte-sequence [codec reader buf-seq]
(loop [buf-seq buf-seq, vals [], reader reader]
(if (empty? buf-seq)
[vals codec nil]
Expand All @@ -20,7 +98,7 @@
(recur remainder (conj vals x) codec)
[vals x remainder])))))

(defn decoder-channel [codec ch]
(defn decode-channel [codec ch]
(let [src (fork ch)
dst (channel)]
(run-pipeline {:reader codec :bytes nil}
Expand All @@ -32,7 +110,7 @@
(if-not (closed? src)
(enqueue dst nil)
(enqueue-and-close dst nil))
(let [[s reader remainder] (decode-stream
(let [[s reader remainder] (decode-byte-sequence
codec
(:reader state)
(concat (:bytes state) (to-buf-seq bytes)))]
Expand Down
2 changes: 1 addition & 1 deletion test/gloss/test/core.clj
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
(defn test-stream-roundtrip [frame vals]
(let [bytes (split-bytes 1 (encode frame vals))
in (channel)
out (decoder-channel frame in)]
out (decode-channel frame in)]
(doseq [b bytes]
(enqueue in b))
(let [s (convert-char-sequences (channel-seq out))]
Expand Down

0 comments on commit 3d39387

Please sign in to comment.