Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
  • Loading branch information
cnuernber committed Mar 10, 2024
1 parent bd61150 commit 0a2b29c
Show file tree
Hide file tree
Showing 2 changed files with 79 additions and 41 deletions.
2 changes: 1 addition & 1 deletion deps.edn
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
:exec-fn codox.main/-main
:exec-args {:group-id "techascent"
:artifact-id "tech.ml.dataset"
:version "7.028"
:version "7.029-SNAPSHOT"
:name "TMD"
:description "A Clojure high performance data processing system"
:metadata {:doc/format :markdown}
Expand Down
118 changes: 78 additions & 40 deletions src/tech/v3/libs/parquet.clj
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,9 @@ org.xerial.snappy/snappy-java {:mvn/version \"1.1.8.4\"}
[tech.v3.datatype.protocols :as dtype-proto]
[tech.v3.datatype :as dtype]
[tech.v3.parallel.for :as parallel-for]
[ham-fisted.lazy-noncaching :as lznc]
[ham-fisted.impl :as hamf-impl]
[ham-fisted.api :as hamf]
[clojure.string :as s]
[clojure.set :as set]
[clojure.tools.logging :as log])
Expand Down Expand Up @@ -624,18 +627,18 @@ org.xerial.snappy/snappy-java {:mvn/version \"1.1.8.4\"}
(aset max-rep-counts (int row-idx) (int max-val)))))
(->>
columns
(pmap (fn [column]
(let [original-missing (ds-col/missing column)
[col-indexes new-missing]
(if-let [col-rep-counts (:row-rep-counts (meta column))]
(rep-count-indexes max-rep-counts original-missing col-rep-counts)
(rep-count-indexes max-rep-counts original-missing))
;;We clear the old missing set because that was taken care of above.
;;and select, when it has a missing set, has to go index by index and
;;make a new missing set.
new-col (-> (ds-col/set-missing column nil)
(ds-col/select col-indexes))]
(ds-col/set-missing new-col new-missing))))
(hamf/pmap (fn [column]
(let [original-missing (ds-col/missing column)
[col-indexes new-missing]
(if-let [col-rep-counts (:row-rep-counts (meta column))]
(rep-count-indexes max-rep-counts original-missing col-rep-counts)
(rep-count-indexes max-rep-counts original-missing))
;;We clear the old missing set because that was taken care of above.
;;and select, when it has a missing set, has to go index by index and
;;make a new missing set.
new-col (-> (ds-col/set-missing column nil)
(ds-col/select col-indexes))]
(ds-col/set-missing new-col new-missing))))
(vec))))


Expand All @@ -654,13 +657,13 @@ org.xerial.snappy/snappy-java {:mvn/version \"1.1.8.4\"}
(set (or (:column-blocklist options) (:column-blacklist options))))
col-parser (partial parse-parquet-column column-allowlist column-blocklist
col-read-store n-rows parse-context key-fn)
initial-columns (->> (map col-parser
(.getColumns schema)
(:columns block-metadata))
(remove nil?)
initial-columns (->> (lznc/map col-parser
(.getColumns schema)
(:columns block-metadata))
(lznc/remove nil?)
(vec))
rep-counts (->> (map (comp :row-rep-counts meta) initial-columns)
(remove nil?)
rep-counts (->> (lznc/map (comp :row-rep-counts meta) initial-columns)
(lznc/remove nil?)
(vec))
columns (if (seq rep-counts)
(scatter-rows initial-columns rep-counts)
Expand All @@ -670,20 +673,6 @@ org.xerial.snappy/snappy-java {:mvn/version \"1.1.8.4\"}
(vary-meta retval assoc :parquet-metadata (dissoc block-metadata :columns))))


(defn- read-next-dataset
[^ParquetFileReader reader options block-metadata block-metadata-seq]
(if-let [row-group (.readNextRowGroup reader)]
(try
(cons (row-group->ds row-group reader options block-metadata)
(lazy-seq (read-next-dataset reader options
(first block-metadata-seq)
(rest block-metadata-seq))))
(catch Throwable e
(.close reader)
(throw e)))
(.close reader)))


(def ^:private comp-code-map
{CompressionCodecName/BROTLI :brotli
CompressionCodecName/GZIP :gzip
Expand Down Expand Up @@ -737,10 +726,10 @@ org.xerial.snappy/snappy-java {:mvn/version \"1.1.8.4\"}



(defn- parquet-reader->metadata
(defn ^:no-doc parquet-reader->metadata
[^ParquetFileReader reader]
(->> (.getRowGroups reader)
(map (fn [^BlockMetaData block]
(mapv (fn [^BlockMetaData block]
{:path (.getPath block)
:num-rows (.getRowCount block)
:total-byte-size (.getTotalByteSize block)
Expand Down Expand Up @@ -774,7 +763,7 @@ org.xerial.snappy/snappy-java {:mvn/version \"1.1.8.4\"}
set)}))))}))))


(defn- ->file-reader
(defn ^:no-doc ->file-reader
^ParquetFileReader [data]
(cond
(instance? ParquetFileReader data)
Expand All @@ -798,22 +787,71 @@ org.xerial.snappy/snappy-java {:mvn/version \"1.1.8.4\"}
(errors/throwf "Unrecognized parquet input type: %s" (type data))))


(deftype ParquetRowGroupSupplier [^{:unsynchronized-mutable true
:tag ParquetFileReader} rdr
^{:unsynchronized-mutable true
:tag long} idx
metadata]
clojure.lang.IMeta
(meta [this] {:row-groups metadata})
java.util.function.Supplier
(get [this]
(when rdr
(let [rv (.readNextRowGroup rdr)
lidx idx
rrdr rdr]
(if-not rv
(.close this)
(do
(set! idx (inc idx))
[rrdr rv (metadata lidx)])))))
java.lang.AutoCloseable
(close [this]
(when rdr
(.close rdr)
(set! rdr nil)))
Iterable
(iterator [this] (ham_fisted.impl.SupplierIterator. this (.get this)))
ham_fisted.ITypedReduce
(reduce [this rfn acc]
(if rdr
(loop [rf (.get this)
acc acc]
(if rf
(recur (.get this) (rfn acc rf))
acc))
acc)))


(defn parquet->metadata-seq
"Given a local parquet file, return a sequence of metadata, one for each row-group.
A row-group maps directly to a dataset."
[path]
(with-open [reader (->file-reader path)]
(parquet-reader->metadata reader)))

(defn ->row-group-supplier
"Recommended way of low-level reading the file. The metadata of the supplier contains a
`:row-group` member that contains a vector of row group metadata.
The supplier implements java.util.Supplier java.util.Iterable and clojure.lang.IReduce.
Each time it is called it returns a tuple of [ParquetFileReader, PageReadStore, row-group-metadata]."
^java.lang.AutoCloseable [path]
(let [fr (->file-reader path)
md (parquet-reader->metadata fr)]
(ParquetRowGroupSupplier. fr 0 md)))


(defn parquet->ds-seq
"Given a string, hadoop path, or a parquet InputFile, return a sequence of datasets.
Column will have parquet metadata merged into their normal metadata.
Reader will be closed upon termination of the sequence."
Reader will be closed upon termination of the sequence.
The return value can be efficiently reduced over and iterated without leaking memory.
See ham-fisted's lazy noncaching namespace for help."
([path options]
(let [reader (->file-reader path)
metadata (parquet-reader->metadata reader)]
(read-next-dataset reader options (first metadata) (rest metadata))))
(let [s (->row-group-supplier path)]
(lznc/map (fn [[fr rg m]]
(row-group->ds rg fr options m))
s)))
([^String path]
(parquet->ds-seq path nil)))

Expand All @@ -832,7 +870,7 @@ org.xerial.snappy/snappy-java {:mvn/version \"1.1.8.4\"}
(.exists data-file)
"Only on-disk files work with parquet. %s does not resolve to a file"
input)
dataset-seq (parquet->ds-seq (.getCanonicalPath data-file) options)]
dataset-seq (vec (parquet->ds-seq (.getCanonicalPath data-file) options))]
(when-not (or (:disable-parquet-warn-on-multiple-datasets options)
(== 1 (count dataset-seq)))
(log/warnf "Concatenating multiple datasets (%d) into one.
Expand Down

0 comments on commit 0a2b29c

Please sign in to comment.