diff --git a/src/clj/iota.clj b/src/clj/iota.clj
index 1202a73..cd45bdb 100644
--- a/src/clj/iota.clj
+++ b/src/clj/iota.clj
@@ -1,7 +1,7 @@
(ns iota
"A set of tools for using reducers over potentially very large text files."
(:require [clojure.core.reducers :as r])
- (:import (iota FileVector NumberedFileVector FileSeq FileRecordSeq))
+ (:import (iota FileVector NumberedFileVector FileSeq FileRecordSeq FileChunkSeq))
(:refer-clojure :exclude [vec subvec seq]))
(set! *warn-on-reflection* true)
@@ -31,7 +31,7 @@
similar to a normal Clojure vector. This is significantly more memory effecient than
a vector of Strings.
- You can provide the chunk size and a single char field delimiter as well."
+ You can provide the chunk size and a single char field delimiter as well."
([^java.lang.String filename] (FileVector. filename))
([^java.lang.String filename chunk-size] (new iota.FileVector filename (int chunk-size)))
([^java.lang.String filename chunk-size byte-separator] (new iota.FileVector filename (int chunk-size) (byte byte-separator))))
@@ -46,11 +46,17 @@
(byte-array (map byte separator))
(byte-array [(byte separator)])))))
+(defn ^iota.FileChunkSeq chunk-seq
+ "Returns the sequence of arrays from underlying file seq.
+ Useful for iterable folds which don't support CollFold protocol."
+ ([^java.lang.String filename] (FileChunkSeq. (rec-seq filename)))
+ ([^java.lang.String filename buffer-size] (FileChunkSeq. (rec-seq filename buffer-size)))
+ ([^java.lang.String filename buffer-size separator] (FileChunkSeq. (rec-seq filename buffer-size separator))))
(defn subvec
"Return a subset of the provided flatfileclj vector.
If end not provided, defaults to (count v)."
- ([^iota.FileVector v start] (subvec v start (count v)))
+ ([^iota.FileVector v start] (subvec v start (count v)))
([^iota.FileVector v start end] (.subvec v start end)))
(defn numbered-vec
@@ -78,18 +84,18 @@
"Utility function to enable reducers for Itoa Vector's"
[^iota.FileVector v n combinef reducef]
(cond
- (empty? v) (combinef)
- (<= (count v) n) (reduce reducef (combinef) v)
- :else
- (let [split (quot (count v) 2)
- v1 (.subvec v 0 split)
- v2 (.subvec v split (count v))
- fc (fn [child] #(foldvec child n combinef reducef))]
- (fjinvoke
- #(let [f1 (fc v1)
- t2 (r/fjtask (fc v2))]
- (fjfork t2)
- (combinef (f1) (fjjoin t2)))))))
+ (empty? v) (combinef)
+ (<= (count v) n) (reduce reducef (combinef) v)
+ :else
+ (let [split (quot (count v) 2)
+ v1 (.subvec v 0 split)
+ v2 (.subvec v split (count v))
+ fc (fn [child] #(foldvec child n combinef reducef))]
+ (fjinvoke
+ #(let [f1 (fc v1)
+ t2 (r/fjtask (fc v2))]
+ (fjfork t2)
+ (combinef (f1) (fjjoin t2)))))))
(defn- foldseq
"Utility function to enable reducers for Iota Seq's"
@@ -97,10 +103,10 @@
(if-let [[v1 v2] (.split s)]
(let [fc (fn [child] #(foldseq child n combinef reducef))]
(fjinvoke
- #(let [f1 (fc v1)
- t2 (r/fjtask (fc v2))]
- (fjfork t2)
- (combinef (f1) (fjjoin t2)))))
+ #(let [f1 (fc v1)
+ t2 (r/fjtask (fc v2))]
+ (fjfork t2)
+ (combinef (f1) (fjjoin t2)))))
(reduce reducef (combinef) (.toArray s))))
(defn- foldrecseq
diff --git a/src/java/iota/FileChunkSeq.java b/src/java/iota/FileChunkSeq.java
new file mode 100644
index 0000000..4626aa6
--- /dev/null
+++ b/src/java/iota/FileChunkSeq.java
@@ -0,0 +1,56 @@
+package iota;
+
+import clojure.lang.ASeq;
+import clojure.lang.IPersistentMap;
+import clojure.lang.ISeq;
+import clojure.lang.Obj;
+
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+import java.util.ArrayList;
+
+/*
+* Variation of FileSeq which will produce sequence of arrays of data.
+*
+* Internally uses FileRecordSeq.
+*
+* This is intended for fold usages that operates on iterables rather
+* than core.reducers CollFold protocol.
+*
+* One example is Tesser https://github.com/aphyr/tesser.
+* */
+
+
+public class FileChunkSeq extends ASeq {
+
+ private final FileRecordSeq seq;
+ private FileRecordSeq[] parts = null;
+
+ public FileChunkSeq(FileRecordSeq seq) {
+ this.seq = seq;
+ }
+
+ public Object first() {
+ if (parts == null) {
+ parts = seq.split(seq.start + seq.bufsize);
+ }
+ if (parts != null) {
+ return parts[0].toArray();
+ }
+ return seq.toArray();
+ }
+
+ public ISeq next() {
+ if (parts == null) {
+ parts = seq.split(seq.start + seq.bufsize);
+ }
+ if (parts == null) {
+ return null;
+ }
+ return new FileChunkSeq(parts[1]);
+ }
+
+ public Obj withMeta(IPersistentMap meta) {
+ throw new UnsupportedOperationException();
+ }
+}
diff --git a/src/java/iota/FileRecordSeq.java b/src/java/iota/FileRecordSeq.java
index c8959c2..faccac4 100644
--- a/src/java/iota/FileRecordSeq.java
+++ b/src/java/iota/FileRecordSeq.java
@@ -81,7 +81,7 @@ public Chunk setValues(int start, int end, boolean foundSplit, int matchIndex) {
}
}
- public Chunk nextSplit(Chunk chunk, int end, byte[] sep) {
+ private Chunk nextSplit(Chunk chunk, int end, byte[] sep) {
// need to match separators also on the border of chunks
int matchIndex = chunk.matchIndex;
boolean lastMatch;
@@ -106,7 +106,7 @@ public Chunk nextSplit(Chunk chunk, int end, byte[] sep) {
return chunk.setValues(chunk.end, end, false, matchIndex);
}
- public long nextChunkEnd(long start, long end, byte[] sep) {
+ private long nextChunkEnd(long start, long end, byte[] sep) {
byte[] buf = new byte[bufsize];
Chunk chunk = new Chunk(buf);
@@ -125,19 +125,23 @@ public long nextChunkEnd(long start, long end, byte[] sep) {
}
public FileRecordSeq[] split() {
- FileRecordSeq[] rv = new FileRecordSeq[2];
+ // Find midpoint
+ return split((((end - start) / 2) + start));
+ }
+
+ public FileRecordSeq[] split(long loc) {
// Only split if buffer is larger than BUFSIZE
if ((end - start) < bufsize) {
return null;
}
- // Find midpoint
- long eor = nextChunkEnd((((end - start) / 2) + start), end, splitsep);
+ long eor = nextChunkEnd(loc, end, splitsep);
if (eor == -1 || eor >= end) {
return null;
}
+ FileRecordSeq[] rv = new FileRecordSeq[2];
// Create new for left and right
rv[0] = new FileRecordSeq(map, start, eor, bufsize, splitsep);
rv[1] = new FileRecordSeq(map, eor, end, bufsize, splitsep);
diff --git a/test/iota/file_record_test.clj b/test/iota/file_record_test.clj
index 0a7de43..ad2b546 100644
--- a/test/iota/file_record_test.clj
+++ b/test/iota/file_record_test.clj
@@ -23,3 +23,18 @@
(trim (last (io/rec-seq tfile 10 sep)))))
)
+
+(deftest simple-chunk-read
+
+ (is (= 1 (count (io/chunk-seq tfile 1000 sep))))
+
+ (is (= 2 (count (io/chunk-seq tfile 15 sep))))
+
+ (is (= 3 (count (io/chunk-seq tfile 10 sep))))
+
+ (is (= "\nfirst\n"
+ (ffirst (io/chunk-seq tfile 15 sep))))
+
+ (is (= "\nlast\n"
+ (trim (first (second (io/chunk-seq tfile 15 sep))))))
+ )
\ No newline at end of file