-
Notifications
You must be signed in to change notification settings - Fork 0
/
transducers.clj
99 lines (90 loc) · 3.28 KB
/
transducers.clj
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
;; # Error Handling Transducers
(ns danger-mouse.transducers
(:require [danger-mouse.utils :as utils]
[clojure.string :as str]
[schema.core :as s]
[danger-mouse.schema :as dm-schema]
[clojure.string :as str]))
;; ## Transducers
(def contain-errors-xf
"Like `danger-mouse.catch-errors#catch-errors`, but instead of collecting errors as it goes,
presents them in `danger-mouse` error format for later handling.
If further transducers are used, `chain` should be used to compose them."
(fn [rf]
(fn
([] (rf))
([result] (try (rf result)
(catch Exception e
(dm-schema/as-error {:error-msg (.getMessage e)
:error e
:input result}))))
([result input] (try (rf result input)
(catch Exception e
(rf result
(dm-schema/as-error {:error-msg (.getMessage e)
:error e
:input input}))))))))
(defn handle-errors-xf
"Handle errors as part of the transduction process via `handler`, removing them
from later stages."
[handler]
(fn [rf]
(fn
([] (rf))
([result] (rf result))
([result input]
(utils/resolve
(fn [error] (handler error) result)
(fn [success] (rf result success))
input)))))
(defn handle-and-continue-xf
"Tranducer transformer that takes an existing transducer `xf`, and applies it to
unmarked values while using `handler` to deal with and remove errored values."
[handler xf]
(fn [rf]
(fn
([] (rf))
([result] (rf result))
([result input]
(utils/resolve
(fn [error] (handler error) result)
(fn [success] ((xf rf) result success))
input)))))
(defn carry-errors-xf
"Propogates errors as errors, and otherwise applies the marked transducer `xf`."
[xf]
(fn [rf]
(let [transformed (xf rf)]
(fn
([]
(transformed))
([result]
(transformed result))
([result input]
(utils/resolve
(fn [error]
(rf result (dm-schema/as-error error)))
(fn [success]
(transformed result success))
input))))))
;; ## Transducer Helper Functions
(defn chain
"Takes a splat of transducers `xfs` and wraps them to carry errors forward and
otherwise act on values as normal. As no grouping is necessary, this can be used
on arbitrary collections, including streams and infinite lists."
[& xfs]
(apply comp (map carry-errors-xf xfs)))
(defn collect
"Takes a splat of transducers `xfs`. Any errors encountered will be thrown
into a side channel `errors` and returned as part of a `GroupedResults`.
Blocks until transduction is complete, so not appropriate for streaming."
[& xfs]
(fn [coll]
(let [errors (transient [])
handler (handle-errors-xf (fn [e]
(conj! errors e)))
result (into []
(apply comp (interleave (cons handler xfs) (repeat handler)))
coll)]
{:errors (persistent! errors)
:result result})))