Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

CompositeBuffer transferTo + bug fixes

  • Loading branch information...
commit 3e3a03950c867e24acfb5f875f3b4dbf9d74bf52 1 parent f90cd53
@carllerche carllerche authored
View
50 src/jvm/momentum/buffer/BufferRope.java
@@ -1,5 +1,7 @@
package momentum.buffer;
+import java.io.IOException;
+import java.nio.channels.WritableByteChannel;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Iterator;
@@ -175,6 +177,14 @@ final private BufferRope growTo(int idx) {
abstract BufferRope concat(Buffer buf);
abstract BufferRope slice(int off, int len);
abstract BufferRope grow(int newBufSize);
+ abstract int transferTo(WritableByteChannel chan, int idx, int len) throws IOException;
+
+ /**
+ * Only called from persistent composite buffers
+ */
+ final BufferRope concat(BufferRope other) {
+ return new Concat(this, other);
+ }
/**
* Branch node, children point either to another concat node or a flat node.
@@ -253,6 +263,24 @@ else if (left.length >= off + len) {
BufferRope grow(int newBufSize) {
return new Growable(this, Buffer.allocate(newBufSize));
}
+
+ int transferTo(WritableByteChannel chan, int idx, int len) throws IOException {
+ if (idx >= left.length) {
+ return right.transferTo(chan, idx - left.length, len);
+ }
+ else if (left.length >= idx + len) {
+ return left.transferTo(chan, idx, len);
+ }
+ else {
+ int maxLeft = left.length - idx;
+ int ret = left.transferTo(chan, idx, maxLeft);
+
+ if (ret != maxLeft)
+ return ret;
+
+ return ret + right.transferTo(chan, 0, len - ret);
+ }
+ }
}
/**
@@ -333,6 +361,24 @@ BufferRope grow(int newBufSize) {
new Concat(left, new Flat(right)),
Buffer.allocate(newBufSize));
}
+
+ int transferTo(WritableByteChannel chan, int idx, int len) throws IOException {
+ if (idx >= left.length) {
+ return right._transferTo(chan, idx - left.length, len);
+ }
+ else if (left.length >= idx + len) {
+ return left.transferTo(chan, idx, len);
+ }
+ else {
+ int maxLeft = left.length - idx;
+ int ret = left.transferTo(chan, idx, maxLeft);
+
+ if (ret != maxLeft)
+ return ret;
+
+ return ret + right._transferTo(chan, 0, len - ret);
+ }
+ }
}
/**
@@ -392,6 +438,10 @@ BufferRope slice(int off, int len) {
BufferRope grow(int newBufSize) {
return new Growable(this, Buffer.allocate(newBufSize));
}
+
+ int transferTo(WritableByteChannel chan, int idx, int len) throws IOException {
+ return buffer._transferTo(chan, idx, len);
+ }
}
/*
View
19 src/jvm/momentum/buffer/PersistentBuffer.java
@@ -45,8 +45,23 @@ protected PersistentBuffer _appendBuffer(PersistentBuffer b) {
}
public PersistentBuffer cons(Object o) {
- if (o instanceof PersistentBuffer)
- return _appendBuffer((PersistentBuffer) o);
+ if (o instanceof PersistentBuffer) {
+ PersistentBuffer b = (PersistentBuffer) o;
+
+ int newCount = count + b.count;
+
+ if (newCount > 32) {
+ return _appendBuffer(b);
+ }
+ else {
+ TransientBuffer ret = new TransientHeapBuffer(newCount);
+
+ ret._write(0, this, 0, count);
+ ret._write(count, b, 0, b.count);
+
+ return ret.persistent();
+ }
+ }
throw new IllegalArgumentException("Can only cons persistent buffers");
}
View
11 src/jvm/momentum/buffer/PersistentCompositeBuffer.java
@@ -3,6 +3,7 @@
import clojure.lang.*;
import java.io.IOException;
import java.nio.channels.WritableByteChannel;
+import java.util.Iterator;
final class PersistentCompositeBuffer extends PersistentBuffer {
@@ -27,7 +28,12 @@ protected byte _read(int idx) {
}
protected PersistentBuffer _appendBuffer(PersistentBuffer b) {
- return new PersistentCompositeBuffer(rope.concat(b));
+ if (b instanceof PersistentCompositeBuffer) {
+ return new PersistentCompositeBuffer(rope.concat(((PersistentCompositeBuffer)b).rope));
+ }
+ else {
+ return new PersistentCompositeBuffer(rope.concat(b));
+ }
}
protected PersistentBuffer _slice(int idx, int len) {
@@ -43,8 +49,7 @@ public TransientBuffer asTransient() {
}
protected int _transferTo(WritableByteChannel chan, int idx, int len) throws IOException {
- return 0;
- // return impl.transferTo(chan, idx, len);
+ return rope.transferTo(chan, idx, len);
}
}
View
2  src/jvm/momentum/buffer/TransientCompositeBuffer.java
@@ -48,7 +48,7 @@ protected PersistentBuffer _persistent() {
}
protected int _transferTo(WritableByteChannel chan, int idx, int len) throws IOException {
- return 0;
+ return rope.transferTo(chan, idx, len);
}
}
View
37 test/momentum/test/core/buffer.clj
@@ -236,9 +236,7 @@
(is (= expect-2 (skip buf 10)))
(is (= expect-3 (slice buf 10 80)))
- (comment
- ;; TODO: Get this working
- (is (= (class buf) (class (slice buf 10 80)))))
+ (is (= (class buf) (class (slice buf 10 80))))
(let [sliced (limit buf 10)
new-buf (persistent! sliced)]
@@ -325,30 +323,29 @@
composite-increasing-buffer))
(deftest buffer-writing
- (comment
- (test-writing
- #(transient-buffer 100) 100)
+ (test-writing
+ #(transient-buffer 100) 100)
- (test-writing
- #(transient (buffer decreasing)) 100)
+ (test-writing
+ #(transient (buffer decreasing)) 100)
- (test-writing
- #(transient-buffer 50) 50)
+ (test-writing
+ #(transient-buffer 50) 50)
- (test-writing
- #(transient-buffer 200) 200)
+ (test-writing
+ #(transient-buffer 200) 200)
- (test-writing
- #(transient-buffer 500) 500)
+ (test-writing
+ #(transient-buffer 500) 500)
- (test-writing
- #(direct-buffer 100) 100)
+ (test-writing
+ #(direct-buffer 100) 100)
- (test-writing
- #(transient (buffer (ByteBuffer/allocateDirect 100))) 100)
+ (test-writing
+ #(transient (buffer (ByteBuffer/allocateDirect 100))) 100)
- (test-writing
- #(transient composite-increasing-buffer) Integer/MAX_VALUE))
+ (test-writing
+ #(transient composite-increasing-buffer) Integer/MAX_VALUE)
(test-writing
#(transient (reduce conj (repeatedly 10 (constantly (limit blank-buffer 10)))))
View
21 test/momentum/test/net/server.clj
@@ -368,3 +368,24 @@
ch1
:open addr-info
:abort #(instance? Exception %)))))
+
+(def composite-msg
+ (conj (buffer "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa")
+ (buffer "bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb")
+ (buffer "cccccccccccccccccccccccccccccccc")
+ (buffer "dddddddddddddddddddddddddddddddd")
+ (buffer "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa")
+ (buffer "bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb")
+ (buffer "cccccccccccccccccccccccccccccccc")
+ (buffer "dddddddddddddddddddddddddddddddd")))
+
+(defcoretest sends-composite-buffers-down-the-wire
+ [ch1]
+ (start
+ (fn [dn _]
+ (fn [evt val]
+ (when (= :open evt)
+ (dn :message composite-msg)))))
+
+ (with-socket
+ (is (receiving (to-string composite-msg)))))
Please sign in to comment.
Something went wrong with that request. Please try again.