-
Notifications
You must be signed in to change notification settings - Fork 24
/
core.clj
321 lines (289 loc) · 13.9 KB
/
core.clj
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
(ns puppetlabs.jdbc-util.core
(:import [com.zaxxer.hikari HikariDataSource]
[java.util.regex Pattern]
[org.postgresql.util PGobject PSQLException]
[org.postgresql.core Utils])
(:require [clojure.java.jdbc :as jdbc]
[clojure.string :as str]
[clojure.tools.logging :as log]
[puppetlabs.i18n.core :refer [trs trsn]]
[puppetlabs.kitchensink.core :as ks]))
(defn pg-escape-string
"Takes an arbitrary string, escapes any SQL quoting meta-characters, and
returns it in SQL string literal format to be naively interpolated into an SQL
string. Only for use with unplanned statements (those besides SELECT, INSERT,
UPDATE, and DELETE), which do not accept query parameters."
[s]
(let [escaped (-> (Utils/escapeLiteral nil s true) .toString)]
(str "'" escaped "'")))
(defn pg-escape-identifier
"Escape an arbitrary string to make it safe to naively interpolate into an SQL
string as an identifier. Use of this function is discouraged."
[s]
(-> (Utils/escapeIdentifier nil s) .toString))
(defn connection-pool
"Given a DB spec map containing :subprotocol, :subname, :user, and :password
keys, return a pooled DB spec map (one containing just the :datasource key
with a pooled DataSource object as the value). The returned pooled DB spec
can be passed directly as the first argument to clojure.java.jdbc's
functions.
Times out after 30 seconds and throws org.postgresql.util.PSQLException"
[db-spec]
(let [ds (doto (HikariDataSource.)
(.setJdbcUrl (str "jdbc:"
(:subprotocol db-spec) ":"
(:subname db-spec)))
(.setUsername (:user db-spec))
(.setPassword (:password db-spec))
(.setInitializationFailTimeout -1))]
{:datasource ds}))
(defmacro with-timeout [timeout-s default & body]
`(let [f# (future (do ~@body))
result# (deref f# (* 1000 ~timeout-s) ~default)]
(future-cancel f#)
result#))
(defn has-role?
"Returns true if the user has permission to act as the member of the role, and
false if not."
[db-spec user role]
(-> (jdbc/query db-spec ["SELECT pg_has_role(?, ?, 'MEMBER')" user role])
first
:pg_has_role
true?))
(def db-status-timeout-secs 4)
(defn db-up?
[db-spec]
(let [result (with-timeout db-status-timeout-secs :timeout
(try (let [select-42 "SELECT (a - b) AS answer FROM (VALUES ((7 * 7), 7)) AS x(a, b)"
[{:keys [answer]}] (jdbc/query db-spec [select-42])]
(= answer 42))
(catch Exception e
(log/warn e (trs "Status check of db failed with error:"))
false)))]
(if (= :timeout result)
(do (log/warn (trs "Database status check timed out after 4 seconds."))
false)
result)))
(defn db-exists?
"Given a DB spec that connects to a database besides `db-name`, return
a boolean indicating whether `db-name` exists."
[admin-db-spec db-name]
(-> (jdbc/query admin-db-spec ["SELECT 1 AS exists FROM pg_database WHERE datname = ?" db-name])
first
:exists
(= 1)))
(defn create-db!
"Given a DB spec, the database's name, and the name of the user that will own
the database, creates the database `db-name` owned by `db-owner`, with the
DB's encoding set to UTF-8. The DB spec should connect to a different database
than the one being created, and the user in the DB spec must have the CREATEDB
permission. If the `db-owner` differs from the user in the DB spec, then the
user in the DB spec must either be a member of the `db-owner` role, or have
the CREATEROLE permission, or be a superuser.
NB: this function is not thread-safe when multiple threads create databases
with the same `db-owner`, unless the :user specified in `admin-db-spec` is
`db-owner`, is otherwise a member of the `db-owner` role, or is a superuser."
[admin-db-spec db-name db-owner]
(let [safe-db-name (pg-escape-identifier db-name)
safe-owner (pg-escape-identifier db-owner)
safe-user (pg-escape-identifier (:user admin-db-spec))
create-db-statement (format "CREATE DATABASE %s WITH OWNER %s ENCODING 'UTF8'"
safe-db-name safe-owner)
sql (if (has-role? admin-db-spec (:user admin-db-spec) db-owner)
create-db-statement
(format (str "GRANT %s TO %s"
";" create-db-statement
";REVOKE %s FROM %s")
safe-owner safe-user
safe-owner safe-user))]
(jdbc/execute! admin-db-spec sql {:transaction? false})))
(defn drop-db!
"Given a DB spec that has a user with permission to drop the database
`db-name` and that connects to a database that isn't `db-name`, and the
database's name `db-name`, drop that database named by `db-name`."
[admin-db-spec db-name]
(let [sql (format "DROP DATABASE IF EXISTS %s" (pg-escape-identifier db-name))]
(jdbc/execute! admin-db-spec [sql] {:transaction? false})
nil))
(defn user-exists?
"Given a DB spec that connects with a user besides `username`, return
a boolean indicating whether `username` exists."
[admin-db-spec username]
(-> (jdbc/query admin-db-spec ["SELECT 1 AS present FROM pg_roles WHERE rolname = ?" username])
first
:present
(= 1)))
(defn create-user!
"Given a DB spec that has a user with permission to create roles and isn't
`username`, create the user `username` with the given `password`. Accepts an
optional options map where the :superuser? option will make the created user
a superuser if the value is exactly `true` (not just truthy); note that only
superusers can create more superusers."
([admin-db-spec username password] (create-user! admin-db-spec username password {}))
([admin-db-spec username password options]
(let [{:keys [superuser?]} options
sql (format (str "CREATE ROLE %s WITH LOGIN PASSWORD %s" (if (true? superuser?)
" SUPERUSER"))
(pg-escape-identifier username)
(pg-escape-string password))]
(jdbc/execute! admin-db-spec [sql])
nil)))
(defn drop-user!
"Given a DB spec that connects with a user besides `username` and has
permisson to drop that user, remove the named user. Note that only superusers
can remove other superusers; other users can be removed by anyone with the
CREATEROLE permission."
[admin-db-spec username]
(let [sql (format "DROP ROLE IF EXISTS %s" (pg-escape-identifier username))]
(jdbc/execute! admin-db-spec [sql])
nil))
(defn public-tables
"Get the names of all public tables in a database"
[db-spec]
(let [query "SELECT table_name FROM information_schema.tables WHERE LOWER(table_schema) = 'public'"]
(jdbc/query db-spec [query] {:row-fn :table_name})))
(defn drop-public-tables!
"Drops all public tables in a database. Super dangerous."
[db-spec]
(when-let [tables (seq (public-tables db-spec))]
(jdbc/db-do-commands db-spec (map #(format "DROP TABLE %s CASCADE" %) tables))))
(defn public-functions
"Get the names of all public functions in a database"
[db-spec]
(let [query (str "SELECT ns.nspname || '.' || proname || '(' || oidvectortypes(proargtypes) || ')' AS function"
" FROM pg_proc INNER JOIN pg_namespace ns ON (pg_proc.pronamespace = ns.oid)"
" WHERE ns.nspname = 'public'")]
(jdbc/query db-spec [query] {:row-fn :function})))
(defn drop-public-functions!
"Drops all public functions in a database. Super dangerous."
[db-spec]
(when-let [functions (seq (public-functions db-spec))]
(jdbc/db-do-commands db-spec (map #(format "DROP FUNCTION %s CASCADE" %) functions))))
(defn convert-result-arrays
"Converts Java and JDBC arrays in a result set using the provided
function (eg. vec, set). Values which aren't arrays are unchanged."
([result-set]
(convert-result-arrays vec result-set))
([f result-set]
(let [convert (fn [v] (cond
(ks/array? v) (f v)
(instance? java.sql.Array v) (f (.getArray v))
:else v))]
(map #(ks/mapvals convert %) result-set))))
(defn convert-result-pgobjects
"Converts PGObjects in a result set to be the value that they contain.
Values which aren't arrays are unchanged."
[result-set]
(let [val-if-pgobj (fn [v] (if (instance? PGobject v)
(.getValue v)
v))]
(map #(ks/mapvals val-if-pgobj %) result-set)))
(defn query
"An implementation of query that returns a fully evaluated result (no
lazy sequences, JDBCArray objects, or PGObjects)."
[db sql-and-params]
(let [convert (fn [rs]
(doall
(->> (jdbc/result-set-seq rs)
(convert-result-arrays vec)
(convert-result-pgobjects))))]
(jdbc/db-query-with-resultset db sql-and-params convert)))
(defn ordered-group-by
[f coll]
(let [grouped-w-index (loop [i 0, groups (transient {}), coll (seq coll)]
(if-let [x (first coll)]
(let [k (f x)
group (get groups k [i])
groups' (assoc! groups k (conj group x))]
(recur (inc i) groups' (next coll)))
;; else (nothing left in coll)
(persistent! groups)))]
(->> (seq grouped-w-index)
; sort the groups by the index where the first member appeared
(sort-by #(get-in % [1 0])))))
(defn aggregate-submap-by
"Given a sequence of maps in results where each map contains agg-key
and agg-val as keys, groups the maps that are identical except for the
values in agg-key or agg-val. The values of agg-key and agg-val are
turned into a map and stored in the resulting map under under-key."
[agg-key agg-val under-key results]
(for [[combined [_ & all]] (ordered-group-by #(dissoc % agg-key agg-val) results)]
(assoc combined under-key (->> all
(map (juxt agg-key agg-val))
(remove (comp nil? first))
(into {})))))
(defn aggregate-column
"Given a sequence of rows as maps, aggregate the values of `column`
into a sequence under `under`, combining rows that are equal except
for the value of `column`. Useful for consolidating the results of an
outer join."
[column under results]
(for [[combined [_ & all]] (ordered-group-by #(dissoc % column) results)]
(assoc combined under (map #(get % column) all))))
(defn- sequence-placeholder
[xs]
(str "("
(->> (repeat (count xs) "?")
(str/join ","))
")"))
(defn- replace-nth-?
[^String s n replacement]
(let [through-?-pattern (Pattern/compile (format "([^?]+?\\?){%d}" (inc n)))]
(if-let [[match] (re-find through-?-pattern s)]
(let [tail (.substring s (.length ^String match) (.length s))
replaced (str/replace match #"(.*)\?$" (str "$1" replacement))]
(str replaced tail))
(throw (IllegalArgumentException. (trsn "There are no '?'s in the given string"
"There are not {0} '?'s in the given string"
n))))))
(defn expand-seq-params
"A helper for prepared SQL statements with sequential parameters.
Returns a new prepared statement with every `?` that corresponded to a
sequential parameter expanded to a tuple literal of the appropriate
length and flattened parameters."
[[sql & parameters]]
(let [seq-params-w-indices (->> (map vector parameters (range))
(filter (comp sequential? first)))
[sql'] (reduce (fn [[sql shift] [param i]]
(let [shift' (+ shift (dec (count param)))
expansion (sequence-placeholder param)]
[(replace-nth-? sql (+ i shift) expansion) shift']))
[sql 0]
seq-params-w-indices)]
(vec (conj (flatten parameters) sql'))))
(defn has-extension? [db extension]
(-> (jdbc/query db ["select count(*) from pg_extension where extname = ?" extension])
first
:count
pos?))
(defn get-sequence-name-for-column
"Returns the name of the sequence associated with a column, or nil if there is
no sequence."
[db table column]
(-> (jdbc/query db ["SELECT pg_get_serial_sequence(?, ?)" table column])
first
:pg_get_serial_sequence))
(defn quoted
"Given a psql identifier like public.table-name or column-name, quotes it so
that it is suitable to use in jdbc queries.
E.g., public.table-name -> \"public\".\"table-name\"
column-name -> \"column-name\""
[id]
(jdbc/as-sql-name (jdbc/quoted \") id))
(defn reconcile-sequence-for-column!
"Finds the sequence associated with the given column and compares it to the
max value in the column. If the sequence is lower, sets it equal to the max
value.
If the column has no associated sequence, throws an Exception."
[db table column]
(if-let [sequence-name (get-sequence-name-for-column db table column)]
(let [select-last-value (str "(SELECT last_value FROM " (quoted sequence-name) ")")
select-max-in-column (str "(SELECT MAX(" (quoted column) ") FROM " (quoted table) ")")]
(jdbc/with-db-transaction [txn-db db]
(jdbc/execute! txn-db [(format "LOCK TABLE \"%s\" IN EXCLUSIVE MODE" table)])
(jdbc/query txn-db [(str "SELECT"
" CASE"
" WHEN " select-max-in-column " > " select-last-value
" THEN setval('" sequence-name "', " select-max-in-column ")"
" END")])))
(throw (Exception. (format "No sequence found for column %s on table %s." table column)))))