-
Notifications
You must be signed in to change notification settings - Fork 10
/
db.clj
218 lines (201 loc) · 9.88 KB
/
db.clj
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
(ns skyscraper.db
(:require
[clojure.core.async :as async]
[clojure.core.strint :refer [<<]]
[clojure.java.io :as io]
[clojure.java.jdbc :as jdbc]
[clojure.set :as set]
[clojure.string :as string]
[skyscraper.context :as context]
[skyscraper.data :refer [separate]]
[taoensso.timbre :refer [debugf warnf]]))
(defn- keyword->db-name
"Converts a keyword (naming a DB table or column) to a string
suitable for use in SQL queries."
[k]
(string/replace (name k) "-" "_"))
(defn- db-name->keyword
"The inverse of keyword->db-name."
[str]
(keyword (string/replace str "_" "-")))
(defn- create-index-ddl
"Returns SQL to create an index on a given table and columns. Note that
Skyscraper only creates one index per table, so it's sufficient to just
name it after the table."
[table-name key-column-names]
(let [index-name (str "idx_" table-name)]
(str "CREATE UNIQUE INDEX " index-name " ON " table-name " (" (string/join ", " key-column-names) ")")))
(defn- create-index
"Creates a unique index for table-name on key-column-names."
[db table-name key-column-names]
(when (seq key-column-names)
(jdbc/execute! db
(create-index-ddl table-name key-column-names))))
(defn- create-table
"Creates a table in db containing the given column-names. If key-column-names
is non-empty, also creates a unique index on those columns."
[db table-name column-names key-column-names]
(jdbc/execute! db
(jdbc/create-table-ddl
table-name
(into [["id" :integer "primary key"]
["parent" :integer]]
(for [col column-names :when (not= col "parent")]
[col :text]))))
(create-index db table-name key-column-names))
(defn- query-context-ids
"Selects the rows corresponding to the upserted contexts, to retrieve
their database-assigned IDs."
[db table-name key-columns key-column-names ctxs]
(let [key-part (string/join ", " key-column-names)
values-1 (str "(" (string/join ", " (repeat (count key-column-names) "?")) ")")
values (string/join ", " (repeat (count ctxs) values-1))
null-clause (string/join " or " (map #(str % " is null") key-column-names)) ;; XXX: this might return too broad a result set
query (<< "select * from ~{table-name} where (~{key-part}) in (values~{values}) or ~{null-clause}") ;; XXX: only select id + key columns, not *
params (mapcat (apply juxt key-columns) ctxs)]
(jdbc/query db (into [query] params)
{:identifiers db-name->keyword})))
(defn- upsert-multi-row-sql
"Returns SQL for upsert-multi!"
[table-name column-names key-column-names values]
(let [nc (count column-names)
vcs (map count values)
non-key-column-names (vec (set/difference (set column-names) (set key-column-names)))
comma-join (partial string/join ", ")
qmarks (repeat (first vcs) "?")]
(if (not (and (or (zero? nc) (= nc (first vcs))) (apply = vcs)))
(throw (IllegalArgumentException. "insert! called with inconsistent number of columns / values"))
(into [(str (<< "INSERT INTO ~{table-name} (~(comma-join column-names)) VALUES (~(comma-join qmarks))")
(when (seq key-column-names)
(let [set-clause (string/join ", " (map #(str % " = excluded." %) non-key-column-names))
do-clause (if (empty? non-key-column-names)
"NOTHING"
(str "UPDATE SET " set-clause))]
(<< " ON CONFLICT (~(comma-join key-column-names)) DO ~{do-clause}"))))]
values))))
(defn- upsert-multi!
"Like clojure.java.jdbc/insert-multi!, but updates the existing rows
where key-column-names match supplied ones. Requires rows to be a
sequence of vectors. Not wrapped in a transaction.
Equivalent to insert-multi! if key-column-names is empty.
Note: This is currently implemented as an INSERT ... ON CONFLICT DO
UPDATE, which requires a DBMS able to support this syntax (SQLite
3.24+ or PostgreSQL 9.5+)."
[db table-name column-names key-column-names rows]
(jdbc/db-do-prepared db false
(upsert-multi-row-sql table-name column-names key-column-names rows)
{:multi? true}))
(defn- upsert-multi-ensure-table!
"Tries an upsert-multi!, and if it fails due to a missing table,
creates it and tries again."
[db table-name column-names key-column-names rows]
(try
(upsert-multi! db table-name column-names key-column-names rows)
(catch org.sqlite.SQLiteException e
(condp #(string/includes? %2 %1) (.getMessage e)
"no such table"
#_=> (do
(create-table db table-name column-names key-column-names)
(upsert-multi! db table-name column-names key-column-names rows))
"ON CONFLICT clause does not match any PRIMARY KEY or UNIQUE constraint"
#_=> (do
(create-index db table-name key-column-names)
(upsert-multi! db table-name column-names key-column-names rows))
(throw e)))))
(defn- ensure-types-single
"Returns context, emitting warnings if the fields named by columns
don't exist or are not of expected type (int for :parent, nilable string
otherwise)."
[columns context]
(doseq [[k v] context
:when (contains? columns k)
:let [check (if (= k :parent) int? #(or (nil? %) (string? %)))]
:when (not (check v))]
(warnf "Wrong type for key %s, value %s" k v))
(doseq [column columns
:when (and (not= column :parent)
(not (contains? context column)))]
(warnf "Context contains no value for key %s: %s" column (context/describe context)))
(merge (zipmap columns (repeat nil))
context))
(defn- ensure-types
"Ensures types of all contexts as per ensure-types-single."
[columns ctxs]
(mapv (partial ensure-types-single columns) ctxs))
(defn- extract-ids
"Given a sequence of ctxs that are assumed to exist in the given db table,
queries the DB for them and assocs each one's id as :parent."
;; Remember that this runs after the processor's :process-fn, so
;; calling it :parent ensures that the child processors will encounter
;; this in the expected place.
[db table-name key-columns key-column-names ctxs]
(let [inserted-rows (query-context-ids db table-name key-columns key-column-names ctxs)
inserted-row-ids (into {}
(map (fn [r] [(select-keys r key-columns) (:id r)]))
inserted-rows)]
(map (fn [ctx]
(assoc ctx :parent (get inserted-row-ids (select-keys ctx key-columns))))
ctxs)))
(defn- extract-ids-from-last-rowid
"Given a sequence of ctxs that have just been successfully inserted,
assocs each one's id in the DB as :parent based on last_insert_rowid()
(SQLite-specific)."
[db ctxs]
(let [rowid (-> (jdbc/query db "select last_insert_rowid() rowid") first :rowid)]
(map #(assoc %1 :parent %2) ctxs (range (inc (- rowid (count ctxs))) (inc rowid)))))
(defn upsert-contexts
"Inserts new contexts into a given db table, returning them augmented
with the `:parent` fields corresponding to the DB-generated primary
keys. If `key-columns` (a vector of column names) is provided,
does an upsert rather than an insert, checking for conflicts on
those columns and updating db accordingly."
[db table key-columns columns ctxs]
(debugf "Upserting %s rows" (count ctxs))
(when (seq ctxs)
(let [ctxs (ensure-types (set columns) ctxs)
table-name (keyword->db-name table)
column-names (mapv keyword->db-name columns)
key-column-names (mapv keyword->db-name key-columns)
rows (map (apply juxt columns) ctxs)]
(upsert-multi-ensure-table! db table-name column-names key-column-names rows)
(if (seq key-column-names)
(extract-ids db table-name key-columns key-column-names ctxs)
(extract-ids-from-last-rowid db ctxs)))))
(defn maybe-store-in-db
"Wraps upsert-context, skipping contexts that contain ::skip."
[db {:keys [name ::columns ::key-columns] :as processor} ignore-db-keys contexts]
(if (and db columns)
(let [columns (distinct (conj columns :parent))
[skipped inserted] (separate ::skip contexts)
new-items (upsert-contexts db name (when-not ignore-db-keys key-columns) columns inserted)]
(into (vec skipped) new-items))
contexts))
(defn enhancer
"An enhancer that upserts supplied batches of contexts into
the database."
[{:keys [db ignore-db-keys]} {:keys [enhancer-input-chan enhancer-output-chan]}]
(jdbc/with-db-transaction [db db]
(loop []
(when-let [item (async/<!! enhancer-input-chan)]
(let [new-items (:skyscraper.core/new-items item)
updated (maybe-store-in-db db (:skyscraper.core/current-processor item) ignore-db-keys new-items)]
(async/>!! enhancer-output-chan (assoc item :skyscraper.core/new-items updated)))
(recur)))))
(defn initialize-db-options
"Sets up DB-related options: handles :db-file and :enhancer, autodetects :ignore-db-keys."
[{:keys [db db-file ignore-db-keys]}]
(let [db (or db
(when db-file
{:classname "org.sqlite.JDBC"
:subprotocol "sqlite"
:subname db-file}))
file (cond (nil? db) nil
(map? db) (io/file (:subname db))
(string? db) (io/file (:subname (#'jdbc/parse-properties-uri (java.net.URI. (#'jdbc/strip-jdbc db))))) ; yuck! accessing innards of clojure.java.jdbc
:otherwise (throw (Exception. ":db needs to be a map or a string")))]
{:db db
:enhancer (when db enhancer)
:ignore-db-keys (or ignore-db-keys
(when file
(or (not (.exists file))
(zero? (.length file)))))}))