/
arche.clj
113 lines (97 loc) · 3.64 KB
/
arche.clj
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
(ns troy-west.arche
(:refer-clojure :exclude [derive resolve])
(:require [qbits.alia :as alia]
[qbits.alia.udt :as alia.udt]
[qbits.alia.enum :as alia.enum]
[qbits.alia.codec.default :as codec.default])
(:import (com.datastax.driver.core Cluster BatchStatement)
(clojure.lang Keyword)))
(defprotocol StatementResolver
(resolve [this connection opts]))
(extend-protocol StatementResolver
Object
(resolve [this _ _] this)
Keyword
(resolve [this connection _]
(get-in connection [:statements this :prepared])))
(defn prepare-statements
[session config]
(reduce-kv (fn [ret k v]
(let [cql (or (:cql v) v)
opts (:opts v)
prepared (alia/prepare session cql)]
(assoc ret k (cond-> {:cql cql
:prepared prepared}
opts (assoc :opts opts)))))
{}
config))
(defn prepare-encoders
[session udts]
(reduce-kv (fn [ret k v]
(assoc ret k (alia.udt/encoder session
(:name v)
(or (:codec v) codec.default/codec))))
{}
udts))
(defn connect
([cluster]
(connect cluster nil))
([^Cluster cluster {:keys [keyspace statements udts]}]
(let [session (if keyspace
(alia/connect cluster keyspace)
(alia/connect cluster))
statements (if (map? statements) statements (apply merge statements))
udts (if (map? udts) udts (apply merge udts))]
{:session session
:keyspace keyspace
:statements (prepare-statements session statements)
:udt-encoders (prepare-encoders session udts)
:fetch-size (-> cluster .getConfiguration .getQueryOptions .getFetchSize)})))
(defn derive
[connection {:keys [statements udts]}]
(let [{:keys [session keyspace fetch-size]} connection
statements (if (map? statements) statements (apply merge statements))
udts (if (map? udts) udts (apply merge udts))]
{:session session
:keyspace keyspace
:statements (prepare-statements session statements)
:udt-encoders (prepare-encoders session udts)
:fetch-size fetch-size}))
(defn disconnect
[connection]
(alia/shutdown (:session connection)))
(defn encode-udt
[connection udt-key value]
(let [encoder (get-in connection [:udt-encoders udt-key])]
(encoder value)))
(defn options
[connection stmt-key opts]
(or (some-> (get-in connection [:statements stmt-key :opts])
(merge opts))
opts))
(defn batch
([connection stmt-key values-seq]
(batch connection stmt-key values-seq :unlogged))
([connection stmt-key values-seq type]
(let [stmt (get-in connection [:statements stmt-key :prepared])
bs (BatchStatement. (alia.enum/batch-statement-type type))]
(doseq [values values-seq]
(.add bs (alia/bind stmt values)))
bs)))
(defn execute*
([f connection executable]
(execute* f connection executable nil))
([f connection executable opts]
(f (:session connection)
(resolve executable connection opts)
(options connection executable opts))))
(defn execute
([connection executable]
(execute* alia/execute connection executable))
([connection executable opts]
(execute* alia/execute connection executable opts)))
(defn execute-async
([connection executable]
(execute* alia/execute-async connection executable))
([connection executable opts]
(execute* alia/execute-async connection executable opts)))