-
-
Notifications
You must be signed in to change notification settings - Fork 1
/
streams.clj
168 lines (135 loc) · 6.09 KB
/
streams.clj
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
(ns taoensso.telemere.streams
"Intake support for standard stream/s -> Telemere."
(:refer-clojure :exclude [binding])
(:require
[taoensso.encore :as enc :refer [binding have have?]]
[taoensso.telemere.impl :as impl]))
(enc/defonce ^:private orig-*out* "Original `*out*` on ns load" *out*)
(enc/defonce ^:private orig-*err* "Original `*err*` on ns load" *err*)
(enc/defonce ^:no-doc ^:dynamic prev-*out* "Previous `*out*` (prior to any Telemere binds)" nil)
(enc/defonce ^:no-doc ^:dynamic prev-*err* "Previous `*err*` (prior to any Telemere binds)" nil)
(def ^:private ^:const default-out-opts {:kind :system/out, :level :info})
(def ^:private ^:const default-err-opts {:kind :system/err, :level :error})
(defn ^:no-doc osw
"Private, don't use."
^java.io.OutputStreamWriter [x]
(java.io.OutputStreamWriter. x))
(defn ^:no-doc telemere-print-stream
"Private, don't use.
Returns a `java.io.PrintStream` that will flush to Telemere signals with given opts."
^java.io.PrintStream [{:as sig-opts :keys [kind level id]}]
(let [baos
(proxy [java.io.ByteArrayOutputStream] []
(flush []
(let [^java.io.ByteArrayOutputStream this this]
(proxy-super flush)
(let [msg (.trim (.toString this))]
(proxy-super reset)
(when-not (.isEmpty msg)
(binding [*out* (or prev-*out* orig-*out*)
*err* (or prev-*err* orig-*err*)]
(impl/signal!
{:location nil
:ns nil
:kind kind
:level level
:id id
:msg msg})))))))]
(java.io.PrintStream. baos true ; Auto flush
java.nio.charset.StandardCharsets/UTF_8)))
;;;;
(defmacro ^:public with-out->telemere
"Executes form with `*out*` bound to flush to Telemere signals with given opts."
([ form] `(with-out->telemere nil ~form))
([opts form]
`(binding [prev-*out* (or prev-*out* *out*)
*out* (osw (telemere-print-stream ~(conj default-out-opts opts)))]
~form)))
(defmacro ^:public with-err->telemere
"Executes form with `*err*` bound to flush to Telemere signals with given opts."
([ form] `(with-err->telemere nil ~form))
([opts form]
`(binding [prev-*err* (or prev-*err* *err*)
*err* (osw (telemere-print-stream ~(conj default-err-opts opts)))]
~form)))
(defmacro ^:public with-streams->telemere
"Executes form with `*out*` and/or `*err*` bound to flush to Telemere signals
with given opts."
([form] `(with-streams->telemere nil ~form))
([{:keys [out err]
:or {out default-out-opts
err default-err-opts}} form]
`(binding [prev-*out* (or prev-*out* *out*)
prev-*err* (or prev-*err* *err*)
*out* (if-let [out# ~out] (osw (telemere-print-stream out#)) *out*)
*err* (if-let [err# ~err] (osw (telemere-print-stream err#)) *err*)]
~form)))
(comment (impl/with-signal (with-out->telemere (println "hello"))))
(enc/defonce ^:private orig-out_ "Original `System/out`, or nil" (atom nil))
(enc/defonce ^:private orig-err_ "Original `System/err`, or nil" (atom nil))
(let [monitor (Object.)]
(defn ^:public streams->reset!
"Experimental, subject to change without notice!
Resets `System/out` and `System/err` to their original value (prior to any
`streams->telemere!` call)."
[]
(let [[orig-out _] (reset-vals! orig-out_ nil)
[orig-err _] (reset-vals! orig-err_ nil)]
(impl/signal!
{:kind :event
:level :info
:id :taoensso.telemere/streams->telemere!
:msg "Disabling intake: standard stream/s -> Telemere"
:data {:system/out? (boolean orig-out)
:system/err? (boolean orig-err)}})
(locking monitor
(when orig-out (System/setOut orig-out))
(when orig-err (System/setErr orig-err)))
(boolean (or orig-out orig-err))))
(defn ^:public streams->telemere!
"Experimental, subject to change without notice!
When given `out`, sets JVM's `System/out` to flush to Telemere signals with those opts.
When given `err`, sets JVM's `System/err` to flush to Telemere signals with those opts.
Note that setting `System/out` won't necessarily affect Clojure's `*out*`,
and setting `System/err` won't necessarily affect Clojure's `*err*`.
See also:
`with-out->telemere`,
`with-err->telemere`,
`with-streams->telemere`."
([] (streams->telemere! nil))
([{:keys [out err]
:or {out default-out-opts
err default-err-opts}}]
(when (or out err)
(let [out (when out (telemere-print-stream out))
err (when err (telemere-print-stream err))]
(impl/signal!
{:kind :event
:level :info
:id :taoensso.telemere/streams->telemere!
:msg "Enabling intake: standard stream/s -> Telemere"
:data {:system/out? (boolean out)
:system/err? (boolean err)}})
(locking monitor
(when out (compare-and-set! orig-out_ nil System/out) (System/setOut out))
(when err (compare-and-set! orig-err_ nil System/err) (System/setErr err)))
true)))))
(comment
(streams->telemere?)
(streams->telemere! {})
(streams->reset!))
;;;;
(defn check-out-intake
"Returns {:keys [sending->telemere? telemere-receiving?]}."
[]
(let [sending? (boolean @orig-out_)
receiving? (and sending? (impl/test-intake! "`System/out` -> Telemere" #(.println System/out %)))]
{:sending->telemere? sending?, :telemere-receiving? receiving?}))
(defn check-err-intake
"Returns {:keys [sending->telemere? telemere-receiving?]}."
[]
(let [sending? (boolean @orig-err_)
receiving? (and sending? (impl/test-intake! "`System/err` -> Telemere" #(.println System/err %)))]
{:sending->telemere? sending?, :telemere-receiving? receiving?}))
(impl/add-intake-check! :system/out check-out-intake)
(impl/add-intake-check! :system/err check-err-intake)