Skip to content

Commit

Permalink
Fixes #294
Browse files Browse the repository at this point in the history
  • Loading branch information
cnuernber committed Mar 9, 2022
1 parent c1e0035 commit 369e5ef
Show file tree
Hide file tree
Showing 4 changed files with 49 additions and 8 deletions.
1 change: 1 addition & 0 deletions project.clj
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
:scope "provided"]
[org.lz4/lz4-java "1.8.0" :scope "provided"]
[com.cnuernber/jarrow "1.000"]
[net.java.dev.jna/jna "5.10.0" :scope "provided"]

[uncomplicate/neanderthal "0.43.3" :scope "provided"]
;;Geni dependencies
Expand Down
52 changes: 45 additions & 7 deletions src/tech/v3/libs/arrow.clj
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@
;;Compression codecs
[org.lz4/lz4-java \"1.8.0\"]
;;Required for decompressing lz4 streams with dependent blocks.
[net.java.dev.jna/jna \"5.10.0\" :scope \"provided\"]
[com.github.luben/zstd-jni \"1.5.1-1\"]
```"
(:require [tech.v3.datatype.mmap :as mmap]
Expand All @@ -47,6 +49,7 @@
[tech.v3.datatype.casting :as casting]
[tech.v3.datatype.packing :as packing]
[tech.v3.datatype.array-buffer :as array-buffer]
[tech.v3.datatype.ffi :as dt-ffi]
[tech.v3.dataset.impl.column :as col-impl]
[tech.v3.protocols.column :as col-proto]
[tech.v3.dataset.impl.dataset :as ds-impl]
Expand Down Expand Up @@ -225,15 +228,50 @@
{:writer-cache dstbuf
:dst-buffer final-bytes}))))


(defonce init-liblz4* (delay ((requiring-resolve 'tech.v3.libs.arrow.liblz4/initialize!))))


(defn- ensure-native-buffer
^NativeBuffer [nbuf]
(if-let [retval (dtype/as-native-buffer nbuf)]
nbuf
(dtype/make-container :native-heap :int8 {:resource-type :stack} nbuf)))

(defn- create-jpnz-lz4-frame-decompressor
[]
(fn [srcbuf dstbuf]
(let [src-byte-data (dtype/->byte-array srcbuf)
bis (ByteArrayInputStream. src-byte-data)
is (net.jpountz.lz4.LZ4FrameInputStream. bis)
temp-dstbuf (byte-array (dtype/ecount dstbuf))]
(.read is temp-dstbuf)
(dtype/copy! temp-dstbuf dstbuf))))
(try
@init-liblz4*
(let [ctx-fn (requiring-resolve 'tech.v3.libs.arrow.liblz4/create-decomp-ctx)
decomp-fn (requiring-resolve 'tech.v3.libs.arrow.liblz4/LZ4F_decompress)
is-err-int (requiring-resolve 'tech.v3.libs.arrow.liblz4/LZ4F_isError)
err-str (requiring-resolve 'tech.v3.libs.arrow.liblz4/LZ4F_getErrorName)
decomp-ctx (ctx-fn)]
(fn [srcbuf dstbuf]
(resource/stack-resource-context
(let [srcbuf (ensure-native-buffer srcbuf)
n-dstbuf (ensure-native-buffer dstbuf)
srcsize (dt-ffi/make-ptr :int64 (dtype/ecount srcbuf))
dstsize (dt-ffi/make-ptr :int64 (dtype/ecount n-dstbuf))
errcode (decomp-fn decomp-ctx
n-dstbuf dstsize
srcbuf srcsize
nil)]
(when-not (== 0 (long (is-err-int errcode)))
(throw (Exception. (str (err-str errcode)))))
(when-not (identical? n-dstbuf dstbuf)
(dtype/copy! n-dstbuf dstbuf))
dstbuf))))
(catch Exception e
(log/warn "Unable to load native lz4 library, falling back to jpounz.
Dependent block frames are not supported!!")
(fn [srcbuf dstbuf]
(let [src-byte-data (dtype/->byte-array srcbuf)
bis (ByteArrayInputStream. src-byte-data)
is (net.jpountz.lz4.LZ4FrameInputStream. bis)
temp-dstbuf (byte-array (dtype/ecount dstbuf))]
(.read is temp-dstbuf)
(dtype/copy! temp-dstbuf dstbuf))))))


(def ^:private compression-info
Expand Down
Binary file added test/data/tweets_sentiment.feather
Binary file not shown.
4 changes: 3 additions & 1 deletion test/tech/v3/libs/arrow_test.clj
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,9 @@
"test/data/alldtypes.arrow-feather-v1" ;v1
]]
(doseq [file all-files]
(is (= 1000 (ds/row-count (arrow/stream->dataset file)))))))
(is (= 1000 (ds/row-count (arrow/stream->dataset file)))))
;; lz4 with dependent frames))))))
(is (= 31962 (ds/row-count (arrow/stream->dataset "test/data/tweets_sentiment.feather"))))))


(deftest base-ds-seq-test
Expand Down

0 comments on commit 369e5ef

Please sign in to comment.