Skip to content

Commit

Permalink
Add FileChunkSeq for iterator style folders, namely for Tesser
Browse files Browse the repository at this point in the history
  • Loading branch information
Lassi Immonen committed Aug 5, 2015
1 parent 61154fb commit d197d72
Show file tree
Hide file tree
Showing 4 changed files with 105 additions and 24 deletions.
44 changes: 25 additions & 19 deletions src/clj/iota.clj
@@ -1,7 +1,7 @@
(ns iota
"A set of tools for using reducers over potentially very large text files."
(:require [clojure.core.reducers :as r])
(:import (iota FileVector NumberedFileVector FileSeq FileRecordSeq))
(:import (iota FileVector NumberedFileVector FileSeq FileRecordSeq FileChunkSeq))
(:refer-clojure :exclude [vec subvec seq]))

(set! *warn-on-reflection* true)
Expand Down Expand Up @@ -31,7 +31,7 @@
similar to a normal Clojure vector. This is significantly more memory effecient than
a vector of Strings.
You can provide the chunk size and a single char field delimiter as well."
You can provide the chunk size and a single char field delimiter as well."
([^java.lang.String filename] (FileVector. filename))
([^java.lang.String filename chunk-size] (new iota.FileVector filename (int chunk-size)))
([^java.lang.String filename chunk-size byte-separator] (new iota.FileVector filename (int chunk-size) (byte byte-separator))))
Expand All @@ -46,11 +46,17 @@
(byte-array (map byte separator))
(byte-array [(byte separator)])))))

(defn ^iota.FileChunkSeq chunk-seq
"Returns the sequence of arrays from underlying file seq.
Useful for iterable folds which don't support CollFold protocol."
([^java.lang.String filename] (FileChunkSeq. (rec-seq filename)))
([^java.lang.String filename buffer-size] (FileChunkSeq. (rec-seq filename buffer-size)))
([^java.lang.String filename buffer-size separator] (FileChunkSeq. (rec-seq filename buffer-size separator))))

(defn subvec
"Return a subset of the provided flatfileclj vector.
If end not provided, defaults to (count v)."
([^iota.FileVector v start] (subvec v start (count v)))
([^iota.FileVector v start] (subvec v start (count v)))
([^iota.FileVector v start end] (.subvec v start end)))

(defn numbered-vec
Expand Down Expand Up @@ -78,29 +84,29 @@
"Utility function to enable reducers for Itoa Vector's"
[^iota.FileVector v n combinef reducef]
(cond
(empty? v) (combinef)
(<= (count v) n) (reduce reducef (combinef) v)
:else
(let [split (quot (count v) 2)
v1 (.subvec v 0 split)
v2 (.subvec v split (count v))
fc (fn [child] #(foldvec child n combinef reducef))]
(fjinvoke
#(let [f1 (fc v1)
t2 (r/fjtask (fc v2))]
(fjfork t2)
(combinef (f1) (fjjoin t2)))))))
(empty? v) (combinef)
(<= (count v) n) (reduce reducef (combinef) v)
:else
(let [split (quot (count v) 2)
v1 (.subvec v 0 split)
v2 (.subvec v split (count v))
fc (fn [child] #(foldvec child n combinef reducef))]
(fjinvoke
#(let [f1 (fc v1)
t2 (r/fjtask (fc v2))]
(fjfork t2)
(combinef (f1) (fjjoin t2)))))))

(defn- foldseq
"Utility function to enable reducers for Iota Seq's"
[^iota.FileSeq s n combinef reducef]
(if-let [[v1 v2] (.split s)]
(let [fc (fn [child] #(foldseq child n combinef reducef))]
(fjinvoke
#(let [f1 (fc v1)
t2 (r/fjtask (fc v2))]
(fjfork t2)
(combinef (f1) (fjjoin t2)))))
#(let [f1 (fc v1)
t2 (r/fjtask (fc v2))]
(fjfork t2)
(combinef (f1) (fjjoin t2)))))
(reduce reducef (combinef) (.toArray s))))

(defn- foldrecseq
Expand Down
56 changes: 56 additions & 0 deletions src/java/iota/FileChunkSeq.java
@@ -0,0 +1,56 @@
package iota;

import clojure.lang.ASeq;
import clojure.lang.IPersistentMap;
import clojure.lang.ISeq;
import clojure.lang.Obj;

import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.util.ArrayList;

/*
* Variation of FileSeq which will produce sequence of arrays of data.
*
* Internally uses FileRecordSeq.
*
* This is intended for fold usages that operates on iterables rather
* than core.reducers CollFold protocol.
*
* One example is Tesser https://github.com/aphyr/tesser.
* */


public class FileChunkSeq extends ASeq {

private final FileRecordSeq seq;
private FileRecordSeq[] parts = null;

public FileChunkSeq(FileRecordSeq seq) {
this.seq = seq;
}

public Object first() {
if (parts == null) {
parts = seq.split(seq.start + seq.bufsize);
}
if (parts != null) {
return parts[0].toArray();
}
return seq.toArray();
}

public ISeq next() {
if (parts == null) {
parts = seq.split(seq.start + seq.bufsize);
}
if (parts == null) {
return null;
}
return new FileChunkSeq(parts[1]);
}

public Obj withMeta(IPersistentMap meta) {
throw new UnsupportedOperationException();
}
}
14 changes: 9 additions & 5 deletions src/java/iota/FileRecordSeq.java
Expand Up @@ -81,7 +81,7 @@ public Chunk setValues(int start, int end, boolean foundSplit, int matchIndex) {
}
}

public Chunk nextSplit(Chunk chunk, int end, byte[] sep) {
private Chunk nextSplit(Chunk chunk, int end, byte[] sep) {
// need to match separators also on the border of chunks
int matchIndex = chunk.matchIndex;
boolean lastMatch;
Expand All @@ -106,7 +106,7 @@ public Chunk nextSplit(Chunk chunk, int end, byte[] sep) {
return chunk.setValues(chunk.end, end, false, matchIndex);
}

public long nextChunkEnd(long start, long end, byte[] sep) {
private long nextChunkEnd(long start, long end, byte[] sep) {

byte[] buf = new byte[bufsize];
Chunk chunk = new Chunk(buf);
Expand All @@ -125,19 +125,23 @@ public long nextChunkEnd(long start, long end, byte[] sep) {
}

public FileRecordSeq[] split() {
FileRecordSeq[] rv = new FileRecordSeq[2];
// Find midpoint
return split((((end - start) / 2) + start));
}

public FileRecordSeq[] split(long loc) {

// Only split if buffer is larger than BUFSIZE
if ((end - start) < bufsize) {
return null;
}

// Find midpoint
long eor = nextChunkEnd((((end - start) / 2) + start), end, splitsep);
long eor = nextChunkEnd(loc, end, splitsep);
if (eor == -1 || eor >= end) {
return null;
}

FileRecordSeq[] rv = new FileRecordSeq[2];
// Create new for left and right
rv[0] = new FileRecordSeq(map, start, eor, bufsize, splitsep);
rv[1] = new FileRecordSeq(map, eor, end, bufsize, splitsep);
Expand Down
15 changes: 15 additions & 0 deletions test/iota/file_record_test.clj
Expand Up @@ -23,3 +23,18 @@
(trim (last (io/rec-seq tfile 10 sep)))))

)

(deftest simple-chunk-read

(is (= 1 (count (io/chunk-seq tfile 1000 sep))))

(is (= 2 (count (io/chunk-seq tfile 15 sep))))

(is (= 3 (count (io/chunk-seq tfile 10 sep))))

(is (= "<a>\nfirst\n</a>"
(ffirst (io/chunk-seq tfile 15 sep))))

(is (= "<a>\nlast\n</a>"
(trim (first (second (io/chunk-seq tfile 15 sep))))))
)

0 comments on commit d197d72

Please sign in to comment.