-
Notifications
You must be signed in to change notification settings - Fork 19
/
raw_reporter.clj
74 lines (64 loc) · 2.7 KB
/
raw_reporter.clj
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
(ns trombi.reporters.raw-reporter
(:require [clojure.java.io :as io])
(:import (java.io BufferedWriter File FileWriter)))
(def collector
(fn [_]
{:collect (fn [_ {:keys [batch]}]
batch)
:combine concat}))
(def generator
(fn [_]
{:generate identity
:as-str #(let [all-requests (mapcat :requests %)]
(str "Finished " (count all-requests) " requests."))}))
(def in-memory-reporter
{:reporter-key :raw
:collector 'trombi.reporters.raw-reporter/collector
:generator 'trombi.reporters.raw-reporter/generator})
(defn- write-edns-as-lines [file-name lines]
(with-open [wtr (BufferedWriter. (FileWriter. file-name))]
(loop [lines-left lines]
(let [line (first lines-left)]
(.write wtr (pr-str line))
(when (seq (rest lines-left))
(.newLine wtr)
(recur (rest lines-left)))))))
(defn- append-edns-as-lines [writer edns]
(doseq [edn edns]
(.write writer (pr-str edn))
(.newLine writer)))
(defn- file->lines-seq [file-name]
(let [lines (io/reader file-name)
cleanup-fn (fn []
(.close lines)
(.delete (File. file-name)))]
[(remove nil? (map read-string (line-seq lines))) cleanup-fn]))
(defn- raw-file-name [base-path]
(str base-path "/raw.log"))
(def file-collector
(fn [{:keys [results-dir]}]
(let [writer (BufferedWriter. (FileWriter. (raw-file-name results-dir)))]
{:collect (fn [_ {:keys [batch node-id batch-id]}]
(let [file-name (str results-dir "/batch-" node-id "-" batch-id ".log")]
(write-edns-as-lines file-name batch)
[writer file-name]))
:combine (fn [param1 param2]
(doseq [[writer file-name] [param1 param2]]
(when-not (= file-name (raw-file-name results-dir))
(let [[lines-seq cleanup-fn] (file->lines-seq file-name)]
(append-edns-as-lines writer lines-seq)
(cleanup-fn))))
[writer (raw-file-name results-dir)])})))
(def file-generator
(fn [{:keys [results-dir]}]
{:generate (fn [[writer file-name]]
;;In here we are sure that new results are not generated anymore
;;and log collector writer can be closed
(.close writer)
;;Note! Returned reader is never closed. For now this is by design
(first (file->lines-seq file-name)))
:as-str (constantly (str "Generated raw report to " (raw-file-name results-dir)))}))
(def file-reporter
{:reporter-key :raw
:collector 'trombi.reporters.raw-reporter/file-collector
:generator 'trombi.reporters.raw-reporter/file-generator})