-
-
Notifications
You must be signed in to change notification settings - Fork 59
/
client.clj
487 lines (421 loc) · 17.2 KB
/
client.clj
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
(ns datalevin.client
"Datalevin client to Datalevin server, blocking API, with a connection pool"
(:require [datalevin.util :as u]
[datalevin.constants :as c]
[clojure.string :as s]
[datalevin.bits :as b]
[datalevin.protocol :as p])
(:import [java.nio ByteBuffer BufferOverflowException]
[java.nio.channels SocketChannel]
[java.util UUID]
[java.util.concurrent ConcurrentLinkedQueue]
[java.net InetSocketAddress StandardSocketOptions URI]))
(defprotocol ^:no-doc IConnection
(send-n-receive [conn msg]
"Send a message to server and return the response, a blocking call")
(send-only [conn msg] "Send a message without waiting for a response")
(receive [conn] "Receive a message, a blocking call")
(close [conn]))
(deftype ^:no-doc Connection [^SocketChannel ch
^:volatile-mutable ^ByteBuffer bf]
IConnection
(send-n-receive [this msg]
(try
(locking bf
(p/write-message-blocking ch bf msg)
(.clear bf)
(let [[resp bf'] (p/receive-ch ch bf)]
(when-not (identical? bf' bf) (set! bf bf'))
resp))
(catch BufferOverflowException _
(let [size (* ^long c/+buffer-grow-factor+ (.capacity bf))]
(set! bf (b/allocate-buffer size))
(send-n-receive this msg)))
(catch Exception e
(u/raise "Error sending message and receiving response: "
(ex-message e) {:msg msg}))))
(send-only [this msg]
(try
(p/write-message-blocking ch bf msg)
(catch BufferOverflowException _
(let [size (* ^long c/+buffer-grow-factor+ (.capacity bf))]
(set! bf (b/allocate-buffer size))
(send-only this msg)))
(catch Exception e
(u/raise "Error sending message: " (ex-message e) {:msg msg}))))
(receive [this]
(try
(let [[resp bf'] (p/receive-ch ch bf)]
(when-not (identical? bf' bf) (set! bf bf'))
resp)
(catch Exception e
(u/raise "Error receiving data:" (ex-message e) {}))))
(close [this]
(.close ch)))
(defn- ^SocketChannel connect-socket
"connect to server and return the client socket channel"
[^String host port]
(try
(doto (SocketChannel/open)
(.setOption StandardSocketOptions/SO_KEEPALIVE true)
(.setOption StandardSocketOptions/TCP_NODELAY true)
(.connect (InetSocketAddress. host ^int port)))
(catch Exception e
(u/raise "Unable to connect to server: " (ex-message e)
{:host host :port port}))))
(defn- new-connection
[host port]
(->Connection (connect-socket host port)
(b/allocate-buffer c/+default-buffer-size+)))
(defn- set-client-id
[conn client-id]
(let [{:keys [type message]}
(send-n-receive conn {:type :set-client-id
:client-id client-id})]
(when-not (= type :set-client-id-ok) (u/raise message {}))))
(defprotocol ^:no-doc IConnectionPool
(get-connection [this] "Get a connection from the pool")
(release-connection [this connection] "Return the connection back to pool")
(close-pool [this])
(closed-pool? [this]))
(deftype ^:no-doc ConnectionPool [host port client-id pool-size time-out
^ConcurrentLinkedQueue available
^ConcurrentLinkedQueue used]
IConnectionPool
(get-connection [this]
(if (closed-pool? this)
(u/raise "This client is closed" {:client-id client-id})
(let [start (System/currentTimeMillis)]
(loop []
(if (.isEmpty available)
(if (>= (- (System/currentTimeMillis) start) ^long time-out)
(u/raise "Timeout in obtaining a connection" {})
(do (Thread/sleep 1000)
(recur)))
(let [^Connection conn (.poll available)]
(if (.isOpen ^SocketChannel (.-ch conn))
(do (.add used conn)
conn)
(let [conn (new-connection host port)]
(set-client-id conn client-id)
(.add used conn)
conn))))))))
(release-connection [this conn]
(locking this
(when (.contains used conn)
(.remove used conn)
(.add available conn))))
(close-pool [this]
(dotimes [_ (.size used)] (close ^Connection (.poll used)))
(.clear used)
(dotimes [_ (.size used)] (close ^Connection (.poll available)))
(.clear available))
(closed-pool? [this]
(and (.isEmpty used) (.isEmpty available))))
(defn- authenticate
"Send an authenticate message to server, and wait to receive the response.
If authentication succeeds, return a client id.
Otherwise, close connection, raise exception"
[host port username password]
(let [conn (new-connection host port)
{:keys [type client-id message]}
(send-n-receive conn {:type :authentication
:username username
:password password})]
(close conn)
(if (= type :authentication-ok)
client-id
(u/raise "Authentication failure: " message {}))))
(defn- new-connectionpool
[host port client-id pool-size time-out]
(assert (> ^long pool-size 0)
"Number of connections must be greater than zero")
(let [^ConnectionPool pool (->ConnectionPool
host port client-id
pool-size time-out
(ConcurrentLinkedQueue.)
(ConcurrentLinkedQueue.))
^ConcurrentLinkedQueue available (.-available pool)]
(dotimes [_ pool-size]
(let [conn (new-connection host port)]
(set-client-id conn client-id)
(.add available conn)))
pool))
(defprotocol ^:no-doc IClient
(request [client req]
"Send a request to server and return the response. The response could
also initiate a copy out")
(copy-in [client req data batch-size]
"Copy data to the server. `req` is a request type message,
`data` is a sequence, `batch-size` decides how to partition the data
so that each batch fits in buffers along the way. The response could
also initiate a copy out")
(disconnect [client])
(disconnected? [client])
(get-pool [client])
(get-id [client]))
(defn ^:no-doc parse-user-info
[^URI uri]
(when-let [user-info (.getUserInfo uri)]
(when-let [[_ username password] (re-find #"(.+):(.+)" user-info)]
{:username username :password password})))
(defn ^:no-doc parse-port
[^URI uri]
(let [p (.getPort uri)] (if (= -1 p) c/default-port p)))
(defn ^:no-doc parse-db
"Extract the identifier of database from URI. A database is uniquely
identified by its name (after being converted to its kebab case)."
[^URI uri]
(let [path (.getPath uri)]
(when-not (or (s/blank? path) (= path "/"))
(u/lisp-case (subs path 1)))))
(defn ^:no-doc parse-query
[^URI uri]
(when-let [query (.getQuery uri)]
(->> (s/split query #"&")
(map #(s/split % #"="))
(into {}))))
(defn- copy-out [conn req]
(try
(let [data (transient [])]
(loop []
(let [msg (receive conn)]
(if (map? msg)
(let [{:keys [type]} msg]
(if (= type :copy-done)
{:type :command-complete :result (persistent! data)}
(u/raise "Server error while copying out data" {:msg msg})))
(do (doseq [d msg] (conj! data d))
(recur))))))
(catch Exception e
(u/raise "Unable to receive copy:" (ex-message e) {:req req}))))
(defn- copy-in*
[conn req data batch-size ]
(try
(doseq [batch (partition batch-size batch-size nil data)]
(send-only conn batch))
(let [{:keys [type] :as result} (send-n-receive conn {:type :copy-done})]
(if (= type :copy-out-response)
(copy-out conn req)
result))
(catch Exception e
(send-n-receive conn {:type :copy-fail})
(u/raise "Unable to copy in:" (ex-message e)
{:req req :count (count data)}))))
(deftype ^:no-doc Client [username password host port pool-size time-out
^:volatile-mutable ^UUID id
^:volatile-mutable ^ConnectionPool pool]
IClient
(request [client req]
(let [success? (volatile! false)
start (System/currentTimeMillis)]
(loop []
(let [conn (get-connection pool)
res (when-let [{:keys [type] :as result}
(try
(send-n-receive conn req)
(catch Exception e
(close conn)
nil)
(finally (release-connection pool conn)))]
(vreset! success? true)
(case type
:copy-out-response (copy-out conn req)
:command-complete result
:error-response result
:reconnect
(let [client-id
(authenticate host port username password)]
(close conn)
(vreset! success? false)
(set! id client-id)
(set! pool (new-connectionpool host port client-id
pool-size time-out)))))]
(if (>= (- (System/currentTimeMillis) start) ^long (.-time-out pool))
(u/raise "Timeout in making request" {})
(if @success?
res
(recur)))))))
(copy-in [client req data batch-size]
(let [conn (get-connection pool)]
(try
(let [{:keys [type]} (send-n-receive conn req)]
(if (= type :copy-in-response)
(copy-in* conn req data batch-size)
(u/raise "Server refuses to accept copy in" {:req req})))
(finally (release-connection pool conn)))))
(disconnect [client]
(let [conn (get-connection pool)]
(send-only conn {:type :disconnect})
(release-connection pool conn))
(close-pool pool))
(disconnected? [client]
(closed-pool? pool))
(get-pool [client] pool)
(get-id [client] id))
(defn open-database
"Open a database on server. `db-type` can be \"datalog\" or \"kv\""
([client db-name db-type]
(open-database client db-name db-type nil nil))
([client db-name db-type opts]
(open-database client db-name db-type nil opts))
([client db-name db-type schema opts]
(let [{:keys [type message]}
(request client (if (= db-type c/db-store-kv)
{:type :open-kv :db-name db-name :opts opts}
(cond-> {:type :open :db-name db-name}
schema (assoc :schema schema)
opts (assoc :opts (assoc opts :db-name db-name)))))]
(when (= type :error-response)
(u/raise "Unable to open database:" db-name
{:message message})))))
(defn new-client
"Create a new client that maintains pooled connections to a remote
Datalevin database server. This operation takes at least 0.5 seconds
in order to perform a secure password hashing that defeats cracking.
Fields in the `uri-str` should be properly URL encoded, e.g. user and
password need to be URL encoded if they contain special characters.
The following can be set in the optional map:
* `:pool-size` determines number of connections maintained in the connection
pool, default is 3.
* `:time-out` specifies the time (milliseconds) before an exception is thrown
when obtaining an open network connection, default is 60000."
([uri-str]
(new-client uri-str {:pool-size c/connection-pool-size
:time-out c/connection-timeout}))
([uri-str {:keys [pool-size time-out]
:or {pool-size c/connection-pool-size
time-out c/connection-timeout}}]
(let [uri (URI. uri-str)
{:keys [username password]} (parse-user-info uri)
host (.getHost uri)
port (parse-port uri)
client-id (authenticate host port username password)
pool (new-connectionpool host port client-id pool-size time-out)]
(->Client username password host port pool-size time-out client-id pool))))
(defn ^:no-doc normal-request
"Send request to server and returns results. Does not use the
copy-in protocol. `call` is a keyword, `args` is a vector,
`writing?` is a boolean indicating if write-txn should be used"
([client call args]
(normal-request client call args false))
([client call args writing?]
(let [req {:type call :args args :writing? writing?}
{:keys [type message result]} (request client req)]
(if (= type :error-response)
(u/raise "Request to Datalevin server failed: " message req)
result))))
;; we do input validation and normalization in the server, as
;; 3rd party clients may be written
(defn create-user
"Create a user that can login. `username` will be converted to Kebab case
(i.e. all lower case and words connected with dashes)."
[client username password]
(normal-request client :create-user [username password]))
(defn reset-password
"Reset a user's password."
[client username password]
(normal-request client :reset-password [username password]))
(defn drop-user
"Delete a user."
[client username]
(normal-request client :drop-user [username]))
(defn list-users
"List all users."
[client]
(normal-request client :list-users []))
(defn create-role
"Create a role. `role-key` is a keyword."
[client role-key]
(normal-request client :create-role [role-key]))
(defn drop-role
"Delete a role. `role-key` is a keyword."
[client role-key]
(normal-request client :drop-role [role-key]))
(defn list-roles
"List all roles."
[client]
(normal-request client :list-roles []))
(defn create-database
"Create a database. `db-type` can be `:datalog` or `:key-value`.
`db-name` will be converted to Kebab case (i.e. all lower case and
words connected with dashes)."
[client db-name db-type]
(normal-request client :create-database [db-name db-type]))
(defn close-database
"Force close a database. Connected clients that are using it
will be disconnected.
See [[disconnect-client]]"
[client db-name]
(normal-request client :close-database [db-name]))
(defn drop-database
"Delete a database. May not be successful if currently in use.
See [[close-database]]"
[client db-name]
(normal-request client :drop-database [db-name]))
(defn list-databases
"List all databases."
[client]
(normal-request client :list-databases []))
(defn list-databases-in-use
"List databases that are in use."
[client]
(normal-request client :list-databases-in-use []))
(defn assign-role
"Assign a role to a user. "
[client role-key username]
(normal-request client :assign-role [role-key username]))
(defn withdraw-role
"Withdraw a role from a user. "
[client role-key username]
(normal-request client :withdraw-role [role-key username]))
(defn list-user-roles
"List the roles assigned to a user. "
[client username]
(normal-request client :list-user-roles [username]))
(defn grant-permission
"Grant a permission to a role.
`perm-act` indicates the permitted action. It can be one of
`:datalevin.server/view`, `:datalevin.server/alter`,
`:datalevin.server/create`, or `:datalevin.server/control`, with each
subsumes the former.
`perm-obj` indicates the object type of the securable. It can be one of
`:datalevin.server/database`, `:datalevin.server/user`,
`:datalevin.server/role`, or `:datalevin.server/server`, where the last one
subsumes all the others.
`perm-tgt` indicate the concrete securable target. It can be a database name,
a username, or a role key, depending on `perm-obj`. If it is `nil`, the
permission applies to all securables in that object type."
[client role-key perm-act perm-obj perm-tgt]
(normal-request client :grant-permission
[role-key perm-act perm-obj perm-tgt]))
(defn revoke-permission
"Revoke a permission from a role.
See [[grant-permission]]."
[client role-key perm-act perm-obj perm-tgt]
(normal-request client :revoke-permission
[role-key perm-act perm-obj perm-tgt]))
(defn list-role-permissions
"List the permissions granted to a role.
See [[grant-permission]]."
[client role-key]
(normal-request client :list-role-permissions [role-key]))
(defn list-user-permissions
"List the permissions granted to a user through the roles assigned."
[client username]
(normal-request client :list-user-permissions [username]))
(defn query-system
"Issue arbitrary Datalog query to the system database on the server.
Note that unlike `q` function, the arguments here should NOT include db,
as the server will supply it."
[client query & arguments]
(normal-request client :query-system [query arguments]))
(defn show-clients
"Show information about the currently connected clients on the server."
[client]
(normal-request client :show-clients []))
(defn disconnect-client
"Force disconnect a client from the server."
[client client-id]
(assert (instance? UUID client-id) "")
(normal-request client :disconnect-client [client-id]))