Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP

Comparing changes

Choose two branches to see what’s changed or to start a new pull request. If you need to, you can also compare across forks.

Open a pull request

Create a new pull request by comparing changes across two branches. If you need to, you can also compare across forks.
base fork: momentumclj/momentum
base: 16067487c5
...
head fork: momentumclj/momentum
compare: 4a8388e1dd
  • 2 commits
  • 5 files changed
  • 0 commit comments
  • 1 contributor
View
2  doc/getting_started.md
@@ -10,7 +10,7 @@ performance and ease of use. (Say more...)
## About this guide
-This guide is a quick tutoria to help you get started writing network
+This guide is a quick tutorial to help you get started writing network
applications with Momentum. It should take no more than 20 minutes to
go through.
View
55 src/clj/momentum/core/buffer.clj
@@ -11,6 +11,51 @@
[java.util
Collection]))
+(declare
+ transient-buffer)
+
+(defprotocol Buffers
+ (^{:private true} ^Buffer to-buffer [bufferable])
+ (^{:private true} buffer-count [bufferable])
+ (^{:private true} into-buffer [bufferable buffer]))
+
+(extend-protocol Buffers
+ (class (byte-array 0))
+ (to-buffer [bytes] (Buffer/wrap bytes))
+ (buffer-count [bytes] (alength bytes))
+ (into-buffer [bytes dest] (.write dest (to-buffer bytes)))
+
+ Buffer
+ (to-buffer [buffer] buffer)
+ (buffer-count [buffer] (count buffer))
+ (into-buffer [buffer dest] (.write dest buffer))
+
+ ByteBuffer
+ (to-buffer [byte-buffer] (Buffer/wrap byte-buffer))
+ (buffer-count [byte-buffer] (.capacity byte-buffer))
+ (into-buffer [byte-buffer dest] (.write dest (to-buffer byte-buffer)))
+
+ String
+ (to-buffer [string] (Buffer/wrap (.getBytes string)))
+ (buffer-count [string] (* 2 (.length string)))
+ (into-buffer [string dest] (.write dest (to-buffer string)))
+
+ clojure.lang.Seqable
+ (to-buffer [seqable]
+ (let [cnt (buffer-count seqable)]
+ (if (> cnt 2048)
+ (Buffer/wrap (seq seqable))
+ (-> (transient-buffer cnt)
+ (into-buffer seqable)
+ persistent!))))
+ (buffer-count [seqable] (reduce #(+ %1 (buffer-count %2)) seqable))
+ (into-buffer [seqable dest] (reduce into-buffer dest seqable))
+
+ nil
+ (to-buffer [_] nil)
+ (buffer-count [_] 0)
+ (int-buffer [_ dest] dest))
+
(defn ^Buffer transient-buffer
([] (Buffer/allocate 1024))
([size] (Buffer/allocate size)))
@@ -20,13 +65,9 @@
([size] (Buffer/allocateDirect size)))
(defn buffer
- [b]
- (cond
- (instance? ByteBuffer b)
- (Buffer/wrap ^ByteBuffer b)
-
- :else
- (Buffer/wrap ^bytes b)))
+ ([] Buffer/EMPTY)
+ ([bufferable] (to-buffer bufferable))
+ ([bufferable & bufferables] (to-buffer (cons bufferable bufferables))))
(defn buffer?
[maybe-buffer] (instance? Buffer maybe-buffer))
View
27 src/jvm/momentum/buffer/Buffer.java
@@ -4,6 +4,8 @@
import java.io.UnsupportedEncodingException;
import java.nio.*;
import java.nio.channels.WritableByteChannel;
+import java.util.Collection;
+import java.util.Iterator;
import clojure.lang.*;
public abstract class Buffer implements Indexed {
@@ -12,14 +14,14 @@
* An empty persistent buffer, this is returned when calling empty() on any
* given buffer.
*/
- static final PersistentBuffer EMPTY = new PersistentHeapBuffer();
+ public static final PersistentBuffer EMPTY = new PersistentHeapBuffer();
/*
* Returns a freshly allocated transient buffer backed by a byte array of the
* requested size.
*/
public static final Buffer allocate(int size) {
- return new TransientHeapBuffer(new byte[size], 0, size);
+ return new TransientHeapBuffer(new byte[size], 0, 0);
}
/*
@@ -62,6 +64,27 @@ public final static Buffer wrap(ByteBuffer buf) {
}
}
+ public final static Buffer wrap(Collection<Buffer> bufs) {
+ PersistentBuffer [] persistentBufs = new PersistentBuffer[bufs.size()];
+
+ Iterator<Buffer> i = bufs.iterator();
+
+ int idx = 0;
+
+ while (i.hasNext()) {
+ Buffer curr = i.next();
+
+ if (curr instanceof PersistentBuffer) {
+ persistentBufs[idx++] = (PersistentBuffer) curr;
+ }
+ else {
+ throw new IllegalArgumentException("Can only wrap a collection of persistent buffers");
+ }
+ }
+
+ return new PersistentCompositeBuffer(persistentBufs);
+ }
+
/*
* The size of the buffer, this should only change for transient buffers
*/
View
12 src/jvm/momentum/buffer/CompositeBufferImpl.java
@@ -25,7 +25,7 @@
int size = Math.max(MIN_IDX_ARR_LEN, bufArr.length * 2);
bufs = new Buffer[size];
- indices = new int[size];
+ indices = new int[size + 1];
bufCount = bufArr.length;
for (int i = 0; i < bufArr.length; ++i) {
@@ -124,18 +124,18 @@ private int bufferIndexFor(int idx) {
private int growTo(int idx) {
// Calculate the size of the new buffer. This will be at least twice
// the size of the current buffer.
- int newCount = idx + 1,
- lastBufIdx = bufCount - 1,
- lastBufCount = bufs[lastBufIdx].count;
+ int newCount = idx + 1,
+ lastBufIdx = bufCount - 1,
+ lastBufCap = bufs[lastBufIdx].capacity();
// If the last buffer can hold the new size, then just use that
- if (indices[lastBufIdx] + lastBufCount >= newCount) {
+ if (indices[lastBufIdx] + lastBufCap >= newCount) {
indices[bufCount] = count = newCount;
return lastBufIdx;
}
// Ensure that the last buffer is fully used
- indices[bufCount] = indices[lastBufIdx] + lastBufCount;
+ indices[bufCount] = indices[lastBufIdx] + lastBufCap;
// The size of the new buffer
int newBufSize =
View
28 test/momentum/test/core/buffer.clj
@@ -10,6 +10,12 @@
;; ==== Helpers
+(def blank-buffer
+ (let [buf (transient-buffer 4096)]
+ (-> (transient-buffer 4096)
+ (write-byte 4095 (byte 0))
+ persistent!)))
+
(defn- mk-byte-array
[from to]
(let [len (- to from)
@@ -211,12 +217,11 @@
(buffer increasing-byte-buffer)))
(deftest transient-composite-buffer-writing
- (let [empty-buffer (persistent! (transient-buffer 40))]
- (test-write-increasing
- (transient
- (conj (limit empty-buffer 30)
- (limit empty-buffer 30)
- (limit empty-buffer 40))))))
+ (test-write-increasing
+ (transient
+ (conj (limit blank-buffer 30)
+ (limit blank-buffer 30)
+ (limit blank-buffer 40)))))
(deftest persistent-heap-buffer-manipulation
(test-increasing-manipulation (buffer increasing)))
@@ -244,8 +249,7 @@
(test-conjoining-buffers increasing-buffer hello-buffer decreasing-buffer))
(deftest growing-composite-buffers
- (let [empty-buffer (persistent! (transient-buffer 2048))
- composite-buffer (conj empty-buffer empty-buffer)
+ (let [composite-buffer (conj (limit blank-buffer 2048) (limit blank-buffer 2048))
expected (ByteBuffer/allocate 5096)]
(.position expected 4096)
@@ -253,6 +257,8 @@
(.put expected (byte (mod i 127))))
(.clear expected)
- (is (= (reduce #(write-byte %1 (+ 4096 %2) (byte (mod %2 127))) composite-buffer (take 1000 increasing-seq))
- (buffer expected)))))
-
+ (is (= (buffer expected)
+ (reduce
+ #(write-byte %1 (+ 4096 %2) (byte (mod %2 127)))
+ composite-buffer
+ (take 1000 increasing-seq))))))

No commit comments for this range

Something went wrong with that request. Please try again.