From d197d72e27a13dea336a9e30fe8a3d49a2c7cb50 Mon Sep 17 00:00:00 2001 From: Lassi Immonen Date: Wed, 5 Aug 2015 14:50:36 +0300 Subject: [PATCH] Add FileChunkSeq for iterator style folders, namely for Tesser --- src/clj/iota.clj | 44 ++++++++++++++----------- src/java/iota/FileChunkSeq.java | 56 ++++++++++++++++++++++++++++++++ src/java/iota/FileRecordSeq.java | 14 +++++--- test/iota/file_record_test.clj | 15 +++++++++ 4 files changed, 105 insertions(+), 24 deletions(-) create mode 100644 src/java/iota/FileChunkSeq.java diff --git a/src/clj/iota.clj b/src/clj/iota.clj index 1202a73..cd45bdb 100644 --- a/src/clj/iota.clj +++ b/src/clj/iota.clj @@ -1,7 +1,7 @@ (ns iota "A set of tools for using reducers over potentially very large text files." (:require [clojure.core.reducers :as r]) - (:import (iota FileVector NumberedFileVector FileSeq FileRecordSeq)) + (:import (iota FileVector NumberedFileVector FileSeq FileRecordSeq FileChunkSeq)) (:refer-clojure :exclude [vec subvec seq])) (set! *warn-on-reflection* true) @@ -31,7 +31,7 @@ similar to a normal Clojure vector. This is significantly more memory effecient than a vector of Strings. - You can provide the chunk size and a single char field delimiter as well." + You can provide the chunk size and a single char field delimiter as well." ([^java.lang.String filename] (FileVector. filename)) ([^java.lang.String filename chunk-size] (new iota.FileVector filename (int chunk-size))) ([^java.lang.String filename chunk-size byte-separator] (new iota.FileVector filename (int chunk-size) (byte byte-separator)))) @@ -46,11 +46,17 @@ (byte-array (map byte separator)) (byte-array [(byte separator)]))))) +(defn ^iota.FileChunkSeq chunk-seq + "Returns the sequence of arrays from underlying file seq. + Useful for iterable folds which don't support CollFold protocol." + ([^java.lang.String filename] (FileChunkSeq. (rec-seq filename))) + ([^java.lang.String filename buffer-size] (FileChunkSeq. (rec-seq filename buffer-size))) + ([^java.lang.String filename buffer-size separator] (FileChunkSeq. (rec-seq filename buffer-size separator)))) (defn subvec "Return a subset of the provided flatfileclj vector. If end not provided, defaults to (count v)." - ([^iota.FileVector v start] (subvec v start (count v))) + ([^iota.FileVector v start] (subvec v start (count v))) ([^iota.FileVector v start end] (.subvec v start end))) (defn numbered-vec @@ -78,18 +84,18 @@ "Utility function to enable reducers for Itoa Vector's" [^iota.FileVector v n combinef reducef] (cond - (empty? v) (combinef) - (<= (count v) n) (reduce reducef (combinef) v) - :else - (let [split (quot (count v) 2) - v1 (.subvec v 0 split) - v2 (.subvec v split (count v)) - fc (fn [child] #(foldvec child n combinef reducef))] - (fjinvoke - #(let [f1 (fc v1) - t2 (r/fjtask (fc v2))] - (fjfork t2) - (combinef (f1) (fjjoin t2))))))) + (empty? v) (combinef) + (<= (count v) n) (reduce reducef (combinef) v) + :else + (let [split (quot (count v) 2) + v1 (.subvec v 0 split) + v2 (.subvec v split (count v)) + fc (fn [child] #(foldvec child n combinef reducef))] + (fjinvoke + #(let [f1 (fc v1) + t2 (r/fjtask (fc v2))] + (fjfork t2) + (combinef (f1) (fjjoin t2))))))) (defn- foldseq "Utility function to enable reducers for Iota Seq's" @@ -97,10 +103,10 @@ (if-let [[v1 v2] (.split s)] (let [fc (fn [child] #(foldseq child n combinef reducef))] (fjinvoke - #(let [f1 (fc v1) - t2 (r/fjtask (fc v2))] - (fjfork t2) - (combinef (f1) (fjjoin t2))))) + #(let [f1 (fc v1) + t2 (r/fjtask (fc v2))] + (fjfork t2) + (combinef (f1) (fjjoin t2))))) (reduce reducef (combinef) (.toArray s)))) (defn- foldrecseq diff --git a/src/java/iota/FileChunkSeq.java b/src/java/iota/FileChunkSeq.java new file mode 100644 index 0000000..4626aa6 --- /dev/null +++ b/src/java/iota/FileChunkSeq.java @@ -0,0 +1,56 @@ +package iota; + +import clojure.lang.ASeq; +import clojure.lang.IPersistentMap; +import clojure.lang.ISeq; +import clojure.lang.Obj; + +import java.io.IOException; +import java.io.UnsupportedEncodingException; +import java.util.ArrayList; + +/* +* Variation of FileSeq which will produce sequence of arrays of data. +* +* Internally uses FileRecordSeq. +* +* This is intended for fold usages that operates on iterables rather +* than core.reducers CollFold protocol. +* +* One example is Tesser https://github.com/aphyr/tesser. +* */ + + +public class FileChunkSeq extends ASeq { + + private final FileRecordSeq seq; + private FileRecordSeq[] parts = null; + + public FileChunkSeq(FileRecordSeq seq) { + this.seq = seq; + } + + public Object first() { + if (parts == null) { + parts = seq.split(seq.start + seq.bufsize); + } + if (parts != null) { + return parts[0].toArray(); + } + return seq.toArray(); + } + + public ISeq next() { + if (parts == null) { + parts = seq.split(seq.start + seq.bufsize); + } + if (parts == null) { + return null; + } + return new FileChunkSeq(parts[1]); + } + + public Obj withMeta(IPersistentMap meta) { + throw new UnsupportedOperationException(); + } +} diff --git a/src/java/iota/FileRecordSeq.java b/src/java/iota/FileRecordSeq.java index c8959c2..faccac4 100644 --- a/src/java/iota/FileRecordSeq.java +++ b/src/java/iota/FileRecordSeq.java @@ -81,7 +81,7 @@ public Chunk setValues(int start, int end, boolean foundSplit, int matchIndex) { } } - public Chunk nextSplit(Chunk chunk, int end, byte[] sep) { + private Chunk nextSplit(Chunk chunk, int end, byte[] sep) { // need to match separators also on the border of chunks int matchIndex = chunk.matchIndex; boolean lastMatch; @@ -106,7 +106,7 @@ public Chunk nextSplit(Chunk chunk, int end, byte[] sep) { return chunk.setValues(chunk.end, end, false, matchIndex); } - public long nextChunkEnd(long start, long end, byte[] sep) { + private long nextChunkEnd(long start, long end, byte[] sep) { byte[] buf = new byte[bufsize]; Chunk chunk = new Chunk(buf); @@ -125,19 +125,23 @@ public long nextChunkEnd(long start, long end, byte[] sep) { } public FileRecordSeq[] split() { - FileRecordSeq[] rv = new FileRecordSeq[2]; + // Find midpoint + return split((((end - start) / 2) + start)); + } + + public FileRecordSeq[] split(long loc) { // Only split if buffer is larger than BUFSIZE if ((end - start) < bufsize) { return null; } - // Find midpoint - long eor = nextChunkEnd((((end - start) / 2) + start), end, splitsep); + long eor = nextChunkEnd(loc, end, splitsep); if (eor == -1 || eor >= end) { return null; } + FileRecordSeq[] rv = new FileRecordSeq[2]; // Create new for left and right rv[0] = new FileRecordSeq(map, start, eor, bufsize, splitsep); rv[1] = new FileRecordSeq(map, eor, end, bufsize, splitsep); diff --git a/test/iota/file_record_test.clj b/test/iota/file_record_test.clj index 0a7de43..ad2b546 100644 --- a/test/iota/file_record_test.clj +++ b/test/iota/file_record_test.clj @@ -23,3 +23,18 @@ (trim (last (io/rec-seq tfile 10 sep))))) ) + +(deftest simple-chunk-read + + (is (= 1 (count (io/chunk-seq tfile 1000 sep)))) + + (is (= 2 (count (io/chunk-seq tfile 15 sep)))) + + (is (= 3 (count (io/chunk-seq tfile 10 sep)))) + + (is (= "\nfirst\n" + (ffirst (io/chunk-seq tfile 15 sep)))) + + (is (= "\nlast\n" + (trim (first (second (io/chunk-seq tfile 15 sep)))))) + ) \ No newline at end of file