-
Notifications
You must be signed in to change notification settings - Fork 29
/
query.cljc
344 lines (307 loc) · 13.7 KB
/
query.cljc
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
(ns ^{:doc "Implements a full query engine based on fully indexed data."
:author "Paula Gearon"}
asami.query
#?(:clj (:refer-clojure :exclude [eval]))
(:require [clojure.set :as set]
[clojure.string :as str]
[naga.schema.store-structs :as st
:refer [EPVPattern FilterPattern Pattern
Results Value Axiom
epv-pattern? filter-pattern? op-pattern?]]
[naga.util :as u]
[naga.store :refer [Storage]]
[naga.storage.store-util :as store-util]
[asami.graph :as gr]
[asami.util :refer [c-eval]]
#?(:clj [schema.core :as s]
:cljs [schema.core :as s :include-macros true])
#?(:clj [clojure.core.cache :as c])
#?(:cljs [cljs.core :refer [Symbol PersistentVector List LazySeq]])
#?(:clj [clojure.edn :as edn]
:cljs [cljs.reader :as edn]))
#?(:clj
(:import [clojure.lang Symbol IPersistentVector IPersistentList])))
(declare get-vars)
(declare left-join)
(s/defn without :- [s/Any]
"Returns a sequence minus a specific element"
[e :- s/Any
s :- [s/Any]]
(remove (partial = e) s))
(s/defn find-start :- EPVPattern
"Returns the first pattern with the smallest count"
[pattern-counts :- {EPVPattern s/Num}
patterns :- [EPVPattern]]
(let [local-counts (select-keys pattern-counts patterns)
low-count (reduce min (map second local-counts))
pattern (ffirst (filter #(= low-count (second %)) local-counts))]
;; must use first/filter/= instead of some/#{pattern} because
;; patterns contains metadata and pattern does not
(first (filter (partial = pattern) patterns))))
(s/defn paths :- [[EPVPattern]]
"Returns a seq of all paths through the constraints. A path is defined
by new patterns containing at least one variable common to the patterns
that appeared before it. Patterns must form a group."
([patterns :- [EPVPattern]
pattern-counts :- {EPVPattern s/Num}]
(s/letfn [(remaining-paths :- [[EPVPattern]]
[bound :- #{Symbol}
rpatterns :- [EPVPattern]]
(if (seq rpatterns)
(apply concat
(keep ;; discard paths that can't proceed (they return nil)
(fn [p]
(let [b (get-vars p)]
;; only proceed when the pattern matches what has been bound
(if (or (empty? bound) (seq (set/intersection b bound)))
;; pattern can be added to the path, get the other patterns
(let [remaining (without p rpatterns)]
;; if there are more patterns to add to the path, recurse
(if (seq remaining)
(map (partial cons p)
(seq
(remaining-paths (into bound b) remaining)))
[[p]])))))
rpatterns))
[[]]))]
(let [start (find-start pattern-counts patterns)
all-paths (map (partial cons start)
(remaining-paths (get-vars start) (without start patterns)))]
(assert (every? (partial = (count patterns)) (map count all-paths))
(str "No valid paths through: " (vec patterns)))
all-paths))))
(s/defn merge-filters
"Merges filters into the sequence of patterns, so that they appear
as soon as all their variables are first bound"
[epv-patterns filter-patterns]
(let [filter-vars (u/mapmap get-vars filter-patterns)
all-bound-for? (fn [bound fltr] (every? bound (filter-vars fltr)))]
(loop [plan [] bound #{} [np & rp :as patterns] epv-patterns filters filter-patterns]
(if-not (seq patterns)
;; no patterns left, so apply remaining filters
(concat plan filters)
;; divide the filters into those which are fully bound, and the rest
(let [all-bound? (partial all-bound-for? bound)
nxt-filters (filter all-bound? filters)
remaining-filters (remove all-bound? filters)]
;; if filters were bound, append them, else get the next EPV pattern
(if (seq nxt-filters)
(recur (into plan nxt-filters) bound patterns remaining-filters)
(recur (conj plan np) (into bound (get-vars np)) rp filters)))))))
(s/defn merge-ops
"Merges operator patterns into the sequence of patterns, so that they appear
as soon as all their variables are first bound"
[patterns op-patterns]
;; todo: similar to merge-filters above
;; for now, operators are going to the back
(concat patterns op-patterns))
(s/defn first-group* :- [(s/one [Pattern] "group") (s/one [Pattern] "remainder")]
"Finds a group from a sequence of patterns. A group is defined by every pattern
sharing at least one var with at least one other pattern. Returns a pair.
The first returned element is the Patterns in the group, the second is what was left over."
[[fp & rp] :- [Pattern]]
(letfn [;; Define a reduction step.
;; Accumulates a triple of: known vars; patterns that are part of the group;
;; patterns that are not in the group. Each step looks at a pattern for
;; inclusion or exclusion
(step [[vs included excluded] next-pattern]
(let [new-vars (get-vars next-pattern)]
(if (seq (set/intersection vs new-vars))
[(into vs new-vars) (conj included next-pattern) excluded]
[vs included (conj excluded next-pattern)])))
;; apply the reduction steps, with a given set of known vars, and
;; included patterns. Previously excluded patterns are being scanned
;; again using the new known vars.
(groups [[v i e]] (reduce step [v i []] e))]
;; scan for everything that matches the first pattern, and then iterate until
;; everything that matches the resulting patterns has also been found.
;; Drop the set of vars before returning.
(rest (u/fixpoint groups [(get-vars fp) [fp] rp]))))
(def first-group (memoize first-group*))
(s/defn min-join-path :- [EPVPattern]
"Calculates a plan based on no outer joins (a cross product), and minimized joins.
A plan is the order in which to evaluate constraints and join them to the accumulated
evaluated data. If it is not possible to create a path without a cross product,
then return a plan of the patterns in the provided order."
[patterns :- [Pattern]
count-map :- {EPVPattern s/Num}]
(loop [[grp rmdr] (first-group patterns) ordered []]
(let [all-ordered (->> (paths grp count-map)
(sort-by (partial mapv count-map))
first
(concat ordered))] ;; TODO: order groups, rather than concat as found
(if (empty? rmdr)
all-ordered
(recur (first-group rmdr) all-ordered)))))
(s/defn user-plan :- [EPVPattern]
"Returns the original path specified by the user"
[patterns :- [EPVPattern]
_ :- {EPVPattern s/Num}]
patterns)
(s/defn select-planner
"Selects a query planner function"
[options]
(let [opt (set options)]
(case (get opt :planner)
:user user-plan
:min min-join-path
min-join-path)))
(s/defn modify-pattern :- [s/Any]
"Creates a new EPVPattern from an existing one, based on existing bindings.
Uses the mapping to copy from columns in 'existing' to overwrite variables in 'pattern'.
The variable locations have already been found and are in the 'mapping' argument"
[existing :- [Value]
mapping :- {s/Num s/Num}
pattern :- EPVPattern]
;; TODO: this is in an inner loop. Is it faster to:
;; (reduce (fn [p [f t]] (assoc p f t)) pattern mapping)
(map-indexed (fn [n v]
(if-let [x (mapping n)]
(nth existing x)
v))
pattern))
(s/defn pattern-left-join :- Results
"Takes a partial result, and joins on the resolution of a pattern"
[graph
part :- Results
pattern :- EPVPattern]
(let [cols (:cols (meta part))
total-cols (->> (st/vars pattern)
(remove (set cols))
(concat cols)
(into []))
pattern->left (store-util/matching-vars pattern cols)]
;; iterate over part, lookup pattern
(with-meta
(for [lrow part
:let [lookup (modify-pattern lrow pattern->left pattern)]
rrow (gr/resolve-pattern graph lookup)]
(concat lrow rrow))
{:cols total-cols})))
(s/defn filter-join
"Filters down results."
[graph
part :- Results
[fltr] :- FilterPattern]
(let [m (meta part)
vars (vec (:cols m))
filter-fn (c-eval (list 'fn [vars] fltr))]
(with-meta (filter filter-fn part) m)))
(s/defn binding-join
"Binds a var and adds to results."
[graph
part :- Results
[expr bnd-var] :- FilterPattern]
(let [cols (vec (:cols (meta part)))
binding-fn (c-eval (list 'fn [cols] expr))
new-cols (conj cols bnd-var)]
(with-meta
(map (fn [row] (concat row [(c-eval row)])) part)
{:cols new-cols})))
(def ^:dynamic *plan-options* [:min])
(declare plan-path)
(s/defn minus
"Removes matches."
[graph
part :- Results
[_ & patterns]]
(let [[path _] (plan-path graph patterns *plan-options*) ;; TODO: update optimizer to do this
ljoin #(left-join %2 %1 graph)]
(remove (fn [part-line] (seq (reduce ljoin part path))) part)))
(s/defn disjunction
"NOTE: This is a placeholder implementation. There is no optimization."
[graph
part :- Results
[_ & patterns]]
(apply concat (map #(left-join % part graph) patterns)))
(s/defn find-vars [f] (set (filter st/vartest? f)))
(def operators
{'not {:get-vars #(mapcat get-vars (rest %))
:left-join minus}
'or {:get-vars #(mapcat get-vars (rest %))
:left-join disjunction}})
(defn op-error
[pattern]
(throw (ex-info "Unknown operator" {:op (first pattern)
:args (rest pattern)})))
(defn pattern-error
[pattern]
(throw (ex-info "Unknown pattern type in query" {:pattern pattern})))
(defn get-vars
"Returns all vars used by a pattern"
[pattern]
(cond
(epv-pattern? pattern) (set (st/vars pattern))
(filter-pattern? pattern) (or (:vars (meta pattern)) (find-vars (first pattern)))
(op-pattern? pattern) (if-let [{:keys [get-vars]} (operators (first pattern))]
(get-vars pattern)
(op-error pattern))
:default (pattern-error pattern)))
(defn left-join
"Joins a partial result (on the left) to a pattern (on the right).
The pattern type will determine dispatch."
[pattern results graph]
(cond
(epv-pattern? pattern) (pattern-left-join graph results pattern)
(filter-pattern? pattern) (filter-join graph results pattern)
(op-pattern? pattern) (if-let [{:keys [left-join]} (operators (first pattern))]
(left-join graph results pattern)
(op-error pattern))
:default (pattern-error pattern)))
(s/defn plan-path :- [(s/one [Pattern] "Patterns in planned order")
(s/one {EPVPattern Results} "Single patterns mapped to their resolutions")]
"Determines the order in which to perform the elements that go into a query.
Tries to optimize, so it uses the graph to determine some of the
properties of the query elements. Options can describe which planner to use.
Planning will determine the resolution map, and this is returned with the plan.
By default the min-join-path function is used. This can be overriden with options:
[:planner plan]
The plan can be one of :user, :min.
:min is the default. :user means to execute in provided order."
[graph
patterns :- [Pattern]
options]
(let [epv-patterns (filter epv-pattern? patterns)
filter-patterns (filter filter-pattern? patterns)
op-patterns (filter op-pattern? patterns)
resolution-map (u/mapmap (partial gr/resolve-pattern graph)
epv-patterns)
count-map (u/mapmap (comp count resolution-map) epv-patterns)
query-planner (select-planner options)
;; run the query planner
planned (query-planner epv-patterns count-map)
filtered-plan (merge-filters planned filter-patterns)
plan (merge-ops filtered-plan op-patterns)]
;; result
[plan resolution-map]))
(s/defn join-patterns :- Results
"Joins the resolutions for a series of patterns into a single result."
[graph
patterns :- [Pattern]
& options]
(let [[[fpath & rpath] resolution-map] (plan-path graph patterns options)
;; execute the plan by joining left-to-right
;; left-join has back-to-front params for dispatch reasons
ljoin #(left-join %2 %1 graph)
part-result (with-meta
(resolution-map fpath)
{:cols (st/vars fpath)})]
(reduce ljoin part-result rpath)))
(s/defn add-to-graph
[graph
data :- Results]
(reduce (fn [acc d] (apply gr/graph-add acc d)) graph data))
(s/defn delete-from-graph
[graph
data :- Results]
(reduce (fn [acc d] (apply gr/graph-delete acc d)) graph data))
(s/defn query-map
[query]
(cond
(map? query) query
(string? query) (query-map (edn/read-string query))
(sequential? query) (->> query
(partition-by #{:find :in :with :where})
(partition 2)
(map (fn [[[k] v]] [k v]))
(into {}))))