Skip to content

Commit

Permalink
[test] failing insert test (#10)
Browse files Browse the repository at this point in the history
* `[test]` failing insert test
 - Note, if rn=8, then it works fine

* simplest possible fix for insert test

* Somewhat more efficient formulation - attempting to fully task out the machine during insert

* Optimized copy ops that avoid dispatch costs of dtype/copy

* Better parallelization converting result chunks into jvm-based datasets.

---------

Co-authored-by: Chris Nuernberger <chris@techascent.com>
  • Loading branch information
harold and cnuernber committed Sep 7, 2023
1 parent 1e32523 commit 9583c57
Show file tree
Hide file tree
Showing 3 changed files with 107 additions and 55 deletions.
3 changes: 2 additions & 1 deletion deps.edn
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
{:paths ["src"]
:deps {techascent/tech.ml.dataset {:mvn/version "7.006"}
techascent/tech.ml.dataset.sql {:mvn/version "7.000-beta-52"}
cnuernber/dtype-next {:mvn/version "10.011"}
net.java.dev.jna/jna {:mvn/version "5.13.0"}}
:aliases
{:build
Expand All @@ -23,7 +24,7 @@
:exec-fn codox.main/-main
:exec-args {:group-id "com.techascent"
:artifact-id "tmducken"
:version "0.8.1-11"
:version "0.8.1-12-SNAPSHOT"
:name "TMDucken"
:description "Clojure bindings for duckdb"
:metadata {:doc/format :markdown}
Expand Down
137 changes: 83 additions & 54 deletions src/tmducken/duckdb.clj
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ _unnamed [5 3]:
[java.util.function Supplier]
[java.time LocalDate LocalTime Instant]
[tech.v3.datatype.ffi Pointer]
[tech.v3.datatype UnsafeUtil]
[ham_fisted ITypedReduce IFnDef$LO Casts IFnDef]
[tech.v3.datatype ObjectReader]
[org.roaringbitmap RoaringBitmap]
Expand Down Expand Up @@ -341,7 +342,8 @@ tmducken.duckdb> (get-config-options)
(let [row-offset (* chunk chunk-size)
row-count (rem n-rows chunk-size)
n-valid (quot (+ row-count 63) 64)
string-allocs (ArrayList.)]
string-allocs (ArrayList.)
reduce-groups (ArrayList.)]
;;String are tracked in bulk to ease the burden on the resource system.
(resource/track string-allocs
{:track-type :stack
Expand Down Expand Up @@ -405,30 +407,32 @@ tmducken.duckdb> (get-config-options)
(:string :text)
(let [stable (hamf/java-concurrent-hashmap)
nbuf (wrap-addr daddr (* 16 row-count) :int8)]
(hamf/pgroups row-count
(fn [^long sidx ^long eidx]
(let [ne (- eidx sidx)]
(dotimes [idx ne]
(let [idx (+ sidx idx)
sval (str (subcol idx))]
(if-let [init-addr (.get stable sval)]
(dt/copy! (wrap-addr init-addr 16 :uint8)
(dt/sub-buffer nbuf (* 16 idx) 16))
(let [bval (.getBytes sval)
slen (alength bval)
bufoff (* 16 idx)]
(native-buffer/write-int nbuf bufoff slen)
(if (<= slen 12)
(let [bufoff (+ bufoff 4)]
(dt/copy! bval (dt/sub-buffer nbuf bufoff slen)))
(let [bufoff (+ bufoff 8)
valbuf (native-buffer/malloc slen {:resource-type nil
:uninitialized? true})
_ (locking string-allocs (.add string-allocs valbuf))
bufaddr (ptr->addr valbuf)]
(dt/copy! bval valbuf)
(native-buffer/write-long nbuf bufoff bufaddr)))
(.put stable sval (+ daddr bufoff)))))))))))))
(.add reduce-groups
(hamf/pgroups row-count
(fn [^long sidx ^long eidx]
(let [ne (- eidx sidx)]
(dotimes [idx ne]
(let [idx (+ sidx idx)
sval (str (subcol idx))]
(if-let [init-addr (.get stable sval)]
(UnsafeUtil/copyBytes (long init-addr) (+ daddr (* 16 idx)) 16)
(let [bval (.getBytes sval)
slen (alength bval)
bufoff (* 16 idx)]
(native-buffer/write-int nbuf bufoff slen)
(if (<= slen 12)
(let [bufoff (+ bufoff 4)]
(UnsafeUtil/copyBytes bval (+ daddr bufoff) slen))
(let [bufoff (+ bufoff 8)
valbuf (native-buffer/malloc slen {:resource-type nil
:uninitialized? true})
_ (locking string-allocs (.add string-allocs valbuf))
bufaddr (ptr->addr valbuf)]
(UnsafeUtil/copyBytes bval bufaddr slen)
(native-buffer/write-long nbuf bufoff bufaddr)))
(.put stable sval (+ daddr bufoff))))))))))))))
;;Force all parallelization requests to finish by now.
(reduce #(dorun %2) nil reduce-groups)
(check-error (duckdb-ffi/duckdb_append_data_chunk appender write-chunk))
(duckdb-ffi/duckdb_data_chunk_reset write-chunk))))
n-rows
Expand Down Expand Up @@ -463,6 +467,15 @@ tmducken.duckdb> (get-config-options)
rval)))


(defprotocol ^:private PDelayedClone
(delayed-clone [this]))

(extend-type Object
PDelayedClone
(delayed-clone [this]
(delay (dt/clone this))))


(deftype ^:private StringReader [^IFnDef$LO accessor ^long sidx ^long eidx]
ObjectReader
(elemwiseDatatype [this] :string)
Expand All @@ -480,18 +493,23 @@ tmducken.duckdb> (get-config-options)
(if (and (< idx eidx) (not (reduced? acc)))
(recur (unchecked-inc idx) (rfn acc (accessor idx)))
(if (reduced? acc) @acc acc))))
tech.v3.datatype.protocols/PClone
(clone [this]
PDelayedClone
(delayed-clone [this]
(let [ne (- eidx sidx)
^objects sdata (make-array String ne)
gfn (fn [^long group-sidx group-eidx]
(let [group-ne (- group-eidx group-sidx)
group-sidx (+ group-sidx sidx)]
(dotimes [idx group-ne]
(let [lidx (+ idx group-sidx)]
(aset sdata lidx (.invokePrim accessor lidx))))))]
(dorun (hamf/pgroups ne gfn))
(hamf/wrap-array sdata))))
(aset sdata lidx (.invokePrim accessor lidx))))))
group-data (hamf/pgroups ne gfn)]
(delay
(dorun group-data)
(hamf/wrap-array sdata))))
tech.v3.datatype.protocols/PClone
(clone [this]
@(delayed-clone this)))

(defn- coldata->buffer
[^RoaringBitmap missing ^long n-rows ^long duckdb-type ^long data-ptr]
Expand Down Expand Up @@ -727,33 +745,44 @@ tmducken.duckdb> (get-config-options)
(hamf/range n-cols))
;;This function gets called a *lot*
realize-chunk (fn [data-chunk clone?]
(try
(let [n-rows (duckdb-ffi/duckdb_data_chunk_get_size data-chunk)
colmap (hamf/mut-map)
key-fn (get options :key-fn identity)
columns
(->> (hamf/range n-cols)
(hamf/mapv (fn [cidx]
(let [vdata (duckdb-ffi/duckdb_data_chunk_get_vector data-chunk cidx)
^Pointer data-ptr (duckdb-ffi/duckdb_vector_get_data vdata)
missing (validity->missing
n-rows
(duckdb-ffi/duckdb_vector_get_validity vdata))
coldata (coldata->buffer missing
(let [n-rows (duckdb-ffi/duckdb_data_chunk_get_size data-chunk)
colmap (hamf/mut-map)
key-fn (get options :key-fn identity)
;;Columns are processed in two stages to make sure we use all parallelism
;;available if we have to do a nontrivial clone op such as copying strings
;;into the jvm. Since the chunksize is fairly small we attempt to launch all
;;parallel clone ops before we force any of them to complete.
columns
(->> (hamf/range n-cols)
(hamf/mapv (fn [cidx]
(let [vdata (duckdb-ffi/duckdb_data_chunk_get_vector data-chunk cidx)
^Pointer data-ptr (duckdb-ffi/duckdb_vector_get_data vdata)
missing (validity->missing
n-rows
(duckdb-ffi/duckdb_vector_get_validity vdata))
coldata (try
(coldata->buffer missing
n-rows
(type-ids cidx)
(.-address data-ptr))
cname (key-fn (names cidx))
col
(Column. missing
(if clone? (dt/clone coldata) coldata)
{:name cname}
nil)]
(.put colmap cname cidx)
col))))]
(Dataset. columns (persistent! colmap) {:data-chunk data-chunk
:name :_unnamed}
0 0))))
(catch Exception e
(throw (RuntimeException.
(str "Error processing column " (names cidx)) e))))
cname (key-fn (names cidx))
delayed-data (if clone?
(delayed-clone coldata)
(delay coldata))]
(.put colmap cname cidx)
(delay
(Column. missing
(deref delayed-data)
{:name cname}
nil))))))]
(Dataset. (hamf/mapv deref columns)
(persistent! colmap)
{:data-chunk data-chunk
:name :_unnamed}
0 0)))
reduce-type (get options :reduce-type :clone)]
(if (== 0 (long (duckdb-ffi/duckdb_result_is_streaming duckdb-result)))
(RealizedResultChunks. sql
Expand Down
22 changes: 22 additions & 0 deletions test/tmducken/duckdb_test.clj
Original file line number Diff line number Diff line change
Expand Up @@ -133,3 +133,25 @@
(try
(duckdb/drop-table! @conn* "testdb")
(catch Throwable e nil)))))

(deftest insert-test
#_(println "========================================")
#_(println "insert-test")
(let [cn 4
rn 1024
ds-fn #(-> (into {} (for [i (range cn)] [(str "c" i)
(for [_ (range rn)] (str (random-uuid)))]))
(ds/->dataset {:dataset-name "t"})
(ds/select-columns (for [i (range cn)] (str "c" i))))]
#_(println "drop-table!")
(try
(duckdb/drop-table! @conn* "t")
(catch Throwable e nil))
(duckdb/create-table! @conn* (ds-fn))
#_(println "insert-dataset! (first)")
(duckdb/insert-dataset! @conn* (ds-fn))
#_(println "insert-dataset! (second)")
(duckdb/insert-dataset! @conn* (ds-fn))
#_(println "insert-dataset! (sql->dataset)")
(is (= (* 2 rn) (-> (duckdb/sql->dataset @conn* "from t")
(ds/row-count))))))

0 comments on commit 9583c57

Please sign in to comment.