-
Notifications
You must be signed in to change notification settings - Fork 0
/
streams.clj
99 lines (83 loc) · 3.64 KB
/
streams.clj
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
(ns gelfino.streams
(:use
(gelfino.drools dsl bridging straping)
[clojure.tools.logging :only (trace info debug error)]
(gelfino compression constants chunked header)
lamina.core)
(:require
[cheshire.core :as cheshire]
[clojure.walk :as walk]
[gelfino.statistics :as stats])
(:import gelfino.drools.bridging.Message
java.util.concurrent.TimeUnit))
(def base-channels (atom {}))
(def stream-channels (atom {}))
(defn route-handling [data]
(let [type (gelf-type data) {:keys [input output]} @base-channels]
(trace type)
(condp = type
zlib-header-id (future (enqueue output (decompress-zlib data)))
gzip-header-id (future (enqueue output (decompress-gzip data)))
chunked-header-id (future (handle-chunked data input))
(error (str "No matching handling found for " type)))))
(defn- into-json [s]
(let [json-m (cheshire/parse-string s true)]
(debug json-m)
(stats/inc-processed)
json-m))
(defn initialize-channels []
(reset! base-channels {:input (channel) :output (channel)})
(swap! base-channels assoc :jsons (map* into-json (@base-channels :output)))
(let [{:keys [input output jsons]} @base-channels]
(doseq [f (vals @stream-channels)] (f jsons))
(receive-all input #(route-handling %))))
(defn close-channels []
(doseq [c (vals @base-channels)] (close c))
(reset! stream-channels {}))
(defn feed-fn [packet]
(trace (str "recieved packet " packet))
(stats/inc-received)
(enqueue (@base-channels :input) packet))
(defn into-pred [selector]
(cond
(instance? clojure.lang.IFn (eval selector)) (eval selector)
(instance? java.util.regex.Pattern selector) (fn [v] (-> (re-matches selector v) nil? not))
(instance? java.lang.String selector) (fn [v] (.contains v selector))
:else (throw (Exception.(str "Bad predicate type " selector)))))
(defn filter-fn [pred-pairs]
(let [pairs (map #(into [] % ) (partition 2 pred-pairs))
preds (map (fn [[k v]] [k (into-pred v)]) pairs)]
(fn [m] (every? (fn [[k v]] (v (m k))) preds))))
(defn apply-sym [orig form]
(let [new (gensym (name orig))]
(concat (list 'fn [new]) (list (walk/postwalk #(if (= % orig) new %) form)))))
(defn drools-stream [name rule]
`(let [stream-input# (channel) session# (drools-session :pkgs [~rule])
entry# (.getWorkingMemoryEntryPoint session# "event-stream")]
(swap! stream-channels assoc ~(keyword name)
(fn [jsons#]
(receive-all jsons#
(fn [m#]
(if (or (nil? (.longValue (m# :timestamp))) (nil? (m# :level)))
(error "no timestamp or level value given for message")
(do
(info (str m#))
(.insert entry# (Message. (m# :level) (.longValue (m# :timestamp))))
(.fireAllRules session#)))))))))
(defn selectors-stream [name rest]
`(let [stream-input# (channel)]
(swap! stream-channels assoc ~(keyword name)
(fn [jsons#]
(receive-all (filter* (filter-fn '~rest) jsons#) ~(apply-sym 'message (last rest)))))))
(defmacro defstream
"A stream of messages filtered out of the entire messages recieved
the defenition takes pairs of key values where key is the message part we filter upon and the value is either:
* A predicate function that accepts the part and returns true if it macthes.
* A regex on which the parts will be matched.
* A substring of the matched message part."
[name & rest]
(if (= (first rest) :rule)
(drools-stream name (second rest))
(selectors-stream name rest)))
;examples
#_(macroexpand '(defstream not-too-long :short_message #" not too long " (println message)))