-
Notifications
You must be signed in to change notification settings - Fork 11
/
scan.clj
64 lines (53 loc) · 2.56 KB
/
scan.clj
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
(ns cbass.scan
(:require [cbass.tools :refer [to-bytes from-bytes]])
(:import [org.apache.hadoop.hbase.filter Filter KeyOnlyFilter FirstKeyOnlyFilter FilterList FilterList$Operator]
[org.apache.hadoop.hbase.client Scan]))
(defn- set-start-row! [^Scan scanner ^String prefix]
(.setStartRow scanner (to-bytes prefix)))
(defn- set-stop-row! [^Scan scanner ^String prefix]
(.setStopRow scanner (to-bytes prefix)))
;; (!) ".setRowPrefixFilter" is only available since "hbase client 0.99.1"
(defn- set-row-prefix! [^Scan scanner ^String prefix]
(when prefix
(.setRowPrefixFilter scanner (to-bytes prefix))))
(defn- set-time-range! [^Scan scanner [from to]]
(let [f (or from 0)
t (or to Long/MAX_VALUE)]
(.setTimeRange scanner f t)))
(defn set-filter! [^Scan scanner ^Filter f]
(when f
(.setFilter scanner f)))
(defn- set-reverse! [^Scan scanner reverse?]
(.setReversed scanner reverse?))
(defn- set-caching! [^Scan scanner limit]
(.setCaching scanner limit))
(defn- add-family [^Scan scanner ^String family]
(.addFamily scanner (to-bytes family)))
(defn- add-columns [^Scan scanner [^String family columns]]
(doseq [^String c columns]
(.addColumn scanner (to-bytes family) (to-bytes (name c)))))
(defn- get-keys-only-filter []
(doto (FilterList. FilterList$Operator/MUST_PASS_ALL)
(.addFilter (KeyOnlyFilter.))
(.addFilter (FirstKeyOnlyFilter.))))
;; doing one family many columns for now
(defn scan-filter [{:keys [keys-only? filter family columns starts-with from to time-range reverse? fetch-size]}]
(when (and keys-only? filter)
(throw (ex-info "filter & keys-only?=true cannot be combined" [keys-only? filter])))
(let [scanner (Scan.)
filter (if keys-only? (get-keys-only-filter) filter)
{:keys [from-ms to-ms]} time-range
params [(if (and family (seq columns))
[add-columns [family columns]]
[add-family family])
[set-filter! filter]
[set-reverse! reverse?]
[set-caching! fetch-size]
[set-row-prefix! starts-with]
[set-start-row! from]
[set-stop-row! to]
[set-time-range! (when (or from-ms to-ms)
[from-ms to-ms])]]]
(doall (map (fn [[f p]]
(when p (f scanner p))) params))
scanner))