-
Notifications
You must be signed in to change notification settings - Fork 4
/
core.clj
390 lines (341 loc) · 14.8 KB
/
core.clj
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
;; Copyright (C) 2014 Clark & Parsia
;; Copyright (C) 2014 Paula Gearon
;;
;;
;; Licensed under the Apache License, Version 2.0 (the "License");
;; you may not use this file except in compliance with the License.
;; You may obtain a copy of the License at
;;
;; http://www.apache.org/licenses/LICENSE-2.0
;;
;; Unless required by applicable law or agreed to in writing, software
;; distributed under the License is distributed on an "AS IS" BASIS,
;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
;; See the License for the specific language governing permissions and
;; limitations under the License.
(ns stardog.core
(:require [clojure.string :as str]
[stardog.values :as values])
(:import [com.complexible.stardog.api
Connection ConnectionPool ConnectionPoolConfig ConnectionConfiguration
Query ReadQuery]
[clojure.lang IFn]
[java.util Map Iterator]
[com.complexible.stardog.reasoning.api ReasoningType]
[com.stardog.stark Values Namespace]
[com.stardog.stark.query SelectQueryResult GraphQueryResult BindingSet Binding]
[com.stardog.stark.impl StatementImpl]))
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;;; Connection management
(defn create-db-spec
"Helper function to create a dbspec with sensible defaults for nontypical parameters"
[db url user pass reasoning]
{:url url
:db db
:pass pass
:user user
:max-idle 100
:max-pool 200
:min-pool 10
:reasoning reasoning})
(defprotocol Connectable
(connect [c] "Creates a connection with the given parameters"))
(extend-protocol Connectable
java.util.Map
(connect
[{:keys [db user pass url server reasoning]}]
(let [config (ConnectionConfiguration/to db)]
(when user (.credentials config user pass))
(when-let [server-url (or url server)] (.server config server-url))
(when reasoning (.reasoning config reasoning))
(.connect config)))
String
(connect [cs] (ConnectionConfiguration/at cs)))
(defn make-datasource
"Creates a Stardog datasource, i.e. ConnectionPool"
[db-spec]
(let [{:keys [url user pass db
max-idle min-pool max-pool reasoning]} db-spec
con-config (-> (ConnectionConfiguration/to db)
(.server url)
(.credentials user pass)
(.reasoning reasoning))
pool-config (-> (ConnectionPoolConfig/using con-config)
(.minPool min-pool)
(.maxIdle max-idle)
(.minPool min-pool))
pool (.create pool-config)]
{:ds pool}))
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;;; Utility functions for marshalling API calls
(defn binding->map
"Converts a BindingSet into a map."
[^IFn key-fn ^IFn value-fn ^BindingSet mb]
(into {} (map (fn [^Binding b] [(key-fn (.name b)) (value-fn (.value b))]))
(iterator-seq (.iterator mb))))
(defn statement->map
"Converts a Statement into a map."
[^IFn value-fn ^StatementImpl mb]
(vector (value-fn (.subject mb))
(value-fn (.predicate mb))
(value-fn (.object mb))))
(defn key-map-results
"Converts a Iteration of bindings into a seq of keymaps."
[^IFn keyfn ^IFn valfn ^Iterator results]
(let [mapper (partial binding->map keyfn valfn)
realized-results (into [] (map mapper) (iterator-seq results))]
(.close results)
realized-results))
(defn vector-map-results
"Converts a Graph of statements into a seq of vectors."
[^IFn valfn ^Iterator results]
(let [mapper (partial statement->map valfn)
realized-results (into [] (map mapper) (iterator-seq results))]
(.close results)
realized-results))
(defprotocol ClojureResult
(clojure-data* [results keyfn valfn]
"Typed dispatched conversion of query results into Clojure data"))
(extend-protocol ClojureResult
GraphQueryResult
(clojure-data* [results keyfn valfn]
(let [namespaces (into {}
(map (fn [^Namespace ns] [(.prefix ns) (.iri ns)]))
(iterator-seq (.. results namespaces iterator)))]
(with-meta (vector-map-results valfn results) {:namespaces namespaces})))
SelectQueryResult
(clojure-data* [results keyfn valfn] (key-map-results keyfn valfn results))
nil
(clojure-data* [results _ valfn] results)
Boolean
(clojure-data* [results _ valfn] results))
(defn clojure-data
"Converts query results into Clojure data. Optionally uses functions for interpreting
names and value bindings in results."
([results] (clojure-data* results keyword values/standardize))
([results keyfn valfn] (clojure-data* results keyfn valfn)))
(defn execute* [^Query q {:keys [key-converter converter]
:or {key-converter keyword converter values/standardize}}]
(clojure-data (.execute q) key-converter converter))
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;;; Query APIs
(defn configure-query
"Configures a query is the valid parameters for that type of query"
[^Query q {:keys [parameters reasoning limit offset dataset]}]
(doseq [[k v] parameters] (.parameter q (name k) v))
(when dataset (.dataset q dataset))
(when (instance? ReadQuery q)
(let [^ReadQuery rq q]
(when reasoning (.reasoning rq (boolean reasoning)))
(when limit (.limit rq limit))
(when offset (.offset rq offset))))
q)
(defn create-query
"Creates a query using a map of optional arguments.
new-with-base: Function that creates the query with a base URI.
new-without-base: Function that creates the query without a base URI.
args: A map containing any of the following: base, parameters, reasoning, limit, offset"
^Query
[^IFn new-with-base ^IFn new-without-base ^Map {:keys [base] :as args}]
(let [q (if base (new-with-base base) (new-without-base))]
(configure-query q args)))
(def query-keys #{:base :parameters :reasoning :limit :offset :converter :key-converter :dataset})
(defn check-arg [pred [f & r :as a]] (if (pred f) [f r] [nil a]))
(defn convert-to-map
"Converts an arguments array into a map. The arguments are either positional,
named, or already in map form. This function is a fixpoint."
[[f & r :as args]]
(cond
(and (map? f)
(= 1 (count args))
(every? query-keys (keys f))) f
(keyword? f) (apply map args)
;; walk down the arguments and pull them out positionally
:default (let [[base a] (check-arg string? args)
[params a] (check-arg map? a)
[reasoning a] (check-arg #(or (true? %) (false? %)) a)
[converter a] (check-arg fn? a)
[key-converter [limit offset]] (check-arg fn? a)]
(->> {:base base :parameters params :reasoning reasoning
:limit limit :offset offset
:converter converter :key-converter key-converter}
(filter second)
(into {})))))
(defn query
"Executes a query and returns results.
When constructing a query from text, the parameters are:
- connection: The connection to query over (required).
- text: The text of the connection (String - required).
Remaining argument are optional, and may be positional args,
a map of args, or named args. Mapped and named args use the keys:
- base, parameters, reasoning, limit, offset, converter, key-converter
Positional arguments are in order:
- base: The base URI for the query (String).
- parameters: A parameter map to bind parameters in the query (Map).
- reasoning: boolean true/false for reasoning, or not
- converter: A function to convert returned values with (Function).
- key-converter: A function to convert returned binding names with (Function).
- limit: The limit for the result. Must be present to use offset (integer).
- offset: The offset to start the result (integer)."
[^Connection connection ^String text & args]
(let [args (convert-to-map args)
q (create-query #(.select connection text %) #(.select connection text) args)]
(execute* q args)))
(defn ask
"Executes a boolean query.
Optional parameters may be provided as a map or named parameters.
Parameter names are:
- base, parameters, reasoning, limit, offset, converter, key-converter."
[^Connection connection ^String text & args]
(let [args (convert-to-map args)
q (create-query #(.ask connection text %) #(.ask connection text) args)]
(execute* q args)))
(defn graph
"Executes a graph query.
Optional parameters may be provided as a map or named parameters.
Parameter names are:
- base, parameters, reasoning, limit, offset, converter, key-converter."
[^Connection connection ^String text & args]
(let [args (convert-to-map args)
q (create-query #(.graph connection text %) #(.graph connection text) args)]
(execute* q args)))
(defn update!
"Executes an update operation.
Optional parameters may be provided as a map or named parameters.
Parameter names are:
- base, parameters, reasoning, converter."
[^Connection connection ^String text & args]
(let [args (convert-to-map args)
q (create-query #(.update connection text %) #(.update connection text) args)]
(execute* q args)))
(defn execute
"Executes a query that has already been created and configured.
Valid parameters are key-converter and converter. Query configuration
parameters are ignored."
[^Query q & args]
(execute* q (convert-to-map args)))
(defn insert!
"Inserts a statement (subject, predicate, object) represented as a 3 item vector"
([^Connection connection triple-list]
(insert! connection triple-list Values/DEFAULT_GRAPH))
([^Connection connection triple-list graph-uri]
(when (< (count triple-list) 3) (throw (IllegalArgumentException. "triple-list must have 3 elements")))
(let [adder (.add connection)
subj (-> (first triple-list) (values/as-uri) (values/convert))
pred (-> (second triple-list) (values/as-uri) (values/convert))
obj (-> (nth triple-list 2) (values/convert))
context (if (instance? com.stardog.stark.impl.IRIImpl graph-uri)
graph-uri
(values/convert (values/as-uri graph-uri)))]
(.statement adder (StatementImpl. subj pred obj context)))))
(defn remove!
"Removes a statements (subject, predicate, object) represented as a 3 item vector"
([^Connection connection triple-list]
(remove! connection triple-list Values/DEFAULT_GRAPH))
([^Connection connection triple-list graph-uri]
(when (< (count triple-list) 3) (throw (IllegalArgumentException. "triple-list must have 3 elements")))
(let [remover (.remove connection)
subj (-> (first triple-list) (values/as-uri) (values/convert))
pred (-> (second triple-list) (values/as-uri) (values/convert))
obj (-> (nth triple-list 2) (values/convert))
context (if (instance? com.stardog.stark.impl.IRIImpl graph-uri)
graph-uri
(values/convert (values/as-uri graph-uri)))]
(.statement remover (StatementImpl. subj pred obj context)))))
(defn add-ns!
"Adds a namespace prefix"
[^Connection connection ^String prefix ^String rdf-ns]
(let [ns-api (.namespaces connection)]
(.add ns-api prefix rdf-ns)))
(defn remove-ns!
"Removes a namespace prefix"
[^Connection connection ^String prefix]
(let [ns-api (.namespaces connection)]
(.remove ns-api prefix)))
(defn list-namespaces
"Lists configured namespaces in the database"
[^Connection connection]
(let [ns-api (.namespaces connection)]
(iterator-seq (.iterator ns-api))))
(defn transact
"(transact pool (something con ..))
Executes a function over a connection pool and transaction"
[pool func]
(let [conn (.obtain (:ds pool))
_ (.begin conn)]
(try
(let [result (func conn)]
(.commit conn)
result)
(catch Throwable t
(.rollback conn))
(finally
(.release (:ds pool) conn)))))
(defmacro assert-args
"Duplicates the functionality of the private clojure.core/assert-args"
[& pairs]
`(do (when-not ~(first pairs)
(throw (IllegalArgumentException.
(str (first ~'&form) " requires " ~(second pairs) " in " ~'*ns* ":" (:line (meta ~'&form))))))
~(let [more (nnext pairs)]
(when more
(list* `assert-args more)))))
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;;; Resource evaluation macros
(defmacro with-transaction
"(with-transaction [connection...] body)
Executes the body with a transaction on each of the connections. At completion of the body
the transaction is committed. If the body fails due to exception, the transaction is rolled back.
This macro intentionally restricts connections to be symbols, to encourage them to be
bindings in with-open."
[connections & body]
(assert-args
(vector? connections) "a vector for its connections"
(every? symbol? connections) "symbols for all connections")
(let [begins (for [c connections] `(.begin ~c))
rev (reverse connections)
commits (for [c rev] `(.commit ~c))
rollbacks (for [c rev] `(.rollback ~c))]
`(do
~@begins
(try
~@body
~@commits
(catch Throwable t#
~@rollbacks
(throw t#))))))
(defmacro with-connection-tx
"(with-connection-tx binding-forms body)
Establishes a connection and a transaction to execute the body within."
[bindings & body]
(assert-args
(vector? bindings) "a vector for its binding"
(even? (count bindings)) "an even number of forms in binding vector")
(cond
(empty? bindings) `(do ~@body)
(symbol? (bindings 0)) `(let ~(subvec bindings 0 2)
(try
(with-transaction [~(bindings 0)]
(with-connection-tx ~(subvec bindings 2) ~@body))
(finally
(.close ~(bindings 0)))))
:else (throw (IllegalArgumentException.
"with-connection-tx only allows Symbols in bindings"))))
(defmacro with-connection-pool
"(with-connection-pool [con pool] .. con, body ..)
Evaluates body in the context of an active connection"
[bindings & body]
(assert-args
(vector? bindings) "a vector for its binding"
(even? (count bindings)) "an even number of forms in binding vector")
(cond
(empty? bindings) `(do ~@body)
(symbol? (bindings 0))
`(let [db-spec# ~(second bindings)]
(let [~(first bindings) (.obtain (:ds db-spec#))]
(try
~@body
(finally
(.release (:ds db-spec#) ~(first bindings))))))
:else (throw (IllegalArgumentException.
"with-pool only allows Symbols in bindings"))))