This repository has been archived by the owner on Jan 6, 2023. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 206
/
seq.clj
56 lines (49 loc) · 2.21 KB
/
seq.clj
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
(ns onyx.tasks.seq
(:require [onyx.schema :as os]
[onyx.plugin.seq]
[schema.core :as s])
(:import [java.io FileReader BufferedReader]))
(s/defn input-serialized
[task-name :- s/Keyword opts sequential]
{:task {:task-map (merge {:onyx/name task-name
:onyx/plugin :onyx.plugin.seq/input
:onyx/type :input
:onyx/medium :seq
:onyx/n-peers 1
:onyx/doc "Reads segments from a seq"}
opts)
:lifecycles [{:lifecycle/task task-name
:seq/sequential sequential
:lifecycle/calls :onyx.plugin.seq/inject-seq-via-lifecycle}]}})
(s/defn input-materialized
[task-name :- s/Keyword opts lifecycle]
{:task {:task-map (merge {:onyx/name task-name
:onyx/plugin :onyx.plugin.seq/input
:onyx/type :input
:onyx/medium :seq
:onyx/n-peers 1
:onyx/doc "Reads segments from a seq"}
opts)
:lifecycles [lifecycle]}})
(defn inject-in-reader [event lifecycle]
(let [rdr (FileReader. ^String (:buffered-reader/filename lifecycle))]
{:seq/rdr rdr
:seq/seq (map (fn [line] {:line line})
(line-seq (BufferedReader. rdr)))}))
(defn close-reader [event lifecycle]
(.close ^FileReader (:seq/rdr event)))
(def in-file-calls
{:lifecycle/before-task-start inject-in-reader
:lifecycle/after-task-stop close-reader})
(s/defn input-file
[task-name :- s/Keyword opts filename]
{:task {:task-map (merge {:onyx/name task-name
:onyx/plugin :onyx.plugin.seq/input
:onyx/type :input
:onyx/medium :seq
:onyx/n-peers 1
:onyx/doc "Reads segments from a seq"}
opts)
:lifecycles [{:lifecycle/task :in
:buffered-reader/filename filename
:lifecycle/calls ::in-file-calls}]}})