-
Notifications
You must be signed in to change notification settings - Fork 160
/
tpch.clj
62 lines (54 loc) · 2.67 KB
/
tpch.clj
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
(ns crux.fixtures.tpch
(:require [crux.api :as c]
[crux.fixtures :as fix :refer [*api*]])
(:import [io.airlift.tpch TpchColumn TpchColumnType TpchColumnType$Base TpchEntity TpchTable]))
(def tpch-column-types->crux-calcite-type
{TpchColumnType$Base/INTEGER :bigint
TpchColumnType$Base/VARCHAR :varchar
TpchColumnType$Base/IDENTIFIER :bigint
TpchColumnType$Base/DOUBLE :double
TpchColumnType$Base/DATE :timestamp})
(defn tpch-table->crux-sql-schema [^TpchTable t]
{:crux.db/id (keyword "crux.sql.schema" (.getTableName t))
:crux.sql.table/name (.getTableName t)
:crux.sql.table/query {:find (vec (for [^TpchColumn c (.getColumns t)]
(symbol (.getColumnName c))))
:where (vec (for [^TpchColumn c (.getColumns t)]
['e (keyword (.getColumnName c)) (symbol (.getColumnName c))]))}
:crux.sql.table/columns (into {} (for [^TpchColumn c (.getColumns t)]
[(symbol (.getColumnName c)) (tpch-column-types->crux-calcite-type (.getBase (.getType c)))]))})
(defn tpch-tables->crux-sql-schemas []
(map tpch-table->crux-sql-schema (TpchTable/getTables)))
(defn tpch-entity->doc [^TpchTable t ^TpchEntity b]
(into {:crux.db/id (java.util.UUID/randomUUID)}
(for [^TpchColumn c (.getColumns t)]
[(keyword (.getColumnName c))
(condp = (.getBase (.getType c))
TpchColumnType$Base/IDENTIFIER
(.getIdentifier c b)
TpchColumnType$Base/INTEGER
(.getInteger c b)
TpchColumnType$Base/VARCHAR
(.getString c b)
TpchColumnType$Base/DOUBLE
(.getDouble c b)
TpchColumnType$Base/DATE
(.getDate c b))])))
;; 0.05 = 7500 customers, 75000 orders, 299814 lineitems, 10000 part, 40000 partsupp, 500 supplier, 25 nation, 5 region
(defn tpch-table->docs [^TpchTable t]
;; first happens to be customers (;; 150000)
(map (partial tpch-entity->doc t) (seq (.createGenerator ^TpchTable t 0.05 1 1))))
(defn with-tpch-schema [f]
(fix/transact! *api* (tpch-tables->crux-sql-schemas))
(f))
(defn load-docs! [node]
(doseq [^TpchTable t (TpchTable/getTables)]
(let [docs (tpch-table->docs t)]
(println "Transacting" (count docs) (.getTableName t))
(let [last-tx (->> docs
(partition-all 1000)
(reduce (fn [last-tx chunk]
(c/submit-tx node (vec (for [doc chunk]
[:crux.tx/put doc]))))
nil))]
(c/await-tx node last-tx)))))