-
Notifications
You must be signed in to change notification settings - Fork 73
/
Copy pathParallelParse.spl
166 lines (142 loc) · 5.24 KB
/
ParallelParse.spl
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
/*
* Licensed Materials - Property of IBM
* Copyright IBM Corp. 2015
*/
type ManyAttributes = tuple <
rstring str00, rstring str01, rstring str02, rstring str03, rstring str04,
rstring str05, rstring str06, rstring str07, rstring str08, rstring str09,
rstring str10, rstring str11, rstring str12, rstring str13, rstring str14,
rstring str15, rstring str16, rstring str17, rstring str18, rstring str19,
rstring str20, rstring str21, rstring str22, rstring str23, rstring str24,
rstring str25, rstring str26, rstring str27, rstring str28, rstring str29,
rstring str30, rstring str31, rstring str32, rstring str33, rstring str34,
rstring str35, rstring str36, rstring str37, rstring str38, rstring str39,
rstring str40, rstring str41, rstring str42, rstring str43, rstring str44,
rstring str45, rstring str46, rstring str47, rstring str48, rstring str49,
float64 num00, float64 num01, float64 num02, float64 num03, float64 num04,
float64 num05, float64 num06, float64 num07, float64 num08, float64 num09,
float64 num10, float64 num11, float64 num12, float64 num13, float64 num14,
float64 num15, float64 num16, float64 num17, float64 num18, float64 num19,
float64 num20, float64 num21, float64 num22, float64 num23, float64 num24,
float64 num25, float64 num26, float64 num27, float64 num28, float64 num29,
float64 num30, float64 num31, float64 num32, float64 num33, float64 num34,
float64 num35, float64 num36, float64 num37, float64 num38, float64 num39,
float64 num40, float64 num41, float64 num42, float64 num43, float64 num44,
float64 num45, float64 num46, float64 num47, float64 num48, float64 num49
>;
composite CountProcessed(input In) {
graph
() as Sink = Custom(In) {
logic state: {
mutable uint64 _processed = 0ul;
}
onTuple In: {
_processed++;
}
onPunct In: {
if (currentPunct() == Sys.FinalMarker) {
printStringLn("parsed tuples processed: " + (rstring)_processed);
}
}
}
}
composite FileSourceOnly {
graph
stream<ManyAttributes> Parsed = FileSource() {
param file: "data.csv";
format: csv;
parsing: fast;
}
() as Sink = CountProcessed(Parsed) {}
}
composite FileSourceString {
graph
stream<rstring line> Parsed = FileSource() {
param file: "data.csv";
format: line;
parsing: fast;
}
() as Sink = CountProcessed(Parsed) {}
}
composite SequentialParse {
graph
stream<blob raw> Raw = FileSource() {
param file: "data.csv";
format: block;
blockSize: 1024u;
}
stream<ManyAttributes> Parsed = Parse(Raw) {
param format: csv;
parsing: fast;
config threadedPort: queue(Raw, Sys.Wait);
}
() as Sink = CountProcessed(Parsed) {}
}
composite ParallelParse {
graph
stream<rstring line> Lines = FileSource() {
param file: "data.csv";
format: line;
parsing: fast;
}
stream<blob bytes> LinesAsBytes = Functor(Lines) {
output LinesAsBytes: bytes = convertToBlob(Lines.line + "\n");
}
@parallel(width=(int32)getSubmissionTimeValue("parallelism"))
stream<ManyAttributes> Parsed = Parse(LinesAsBytes) {
param format: csv;
parsing: fast;
}
() as Sink = CountProcessed(Parsed) {}
}
composite ParallelMerge(input In; output Out) {
param attribute $key;
graph
stream<In> Out = Custom(In) {
logic
state: {
mutable map<uint64, tuple<In>> _tuples;
mutable uint64 _next = 1;
}
onTuple In: {
if (_next == $key) {
submit(In, Out);
++_next;
while (_next in _tuples) {
submit(_tuples[_next], Out);
removeM(_tuples, _next);
++_next;
}
}
else {
_tuples[$key] = In;
}
}
}
}
composite ParallelParseOrdered {
graph
stream<rstring line> Lines = FileSource() {
param file: "data.csv";
format: line;
parsing: fast;
}
stream<blob bytes> LinesAsBytes = Custom(Lines) {
logic state: {
mutable uint64 _seqnoGenerator = 0ul;
}
onTuple Lines: {
++_seqnoGenerator;
submit({bytes=convertToBlob(rtrim(Lines.line, "\n") + "," + (rstring)_seqnoGenerator + "\n")}, LinesAsBytes);
}
}
@parallel(width=(int32)getSubmissionTimeValue("parallelism"))
stream<ManyAttributes, tuple<uint64 seqno>> Parsed = Parse(LinesAsBytes) {
param format: csv;
parsing: fast;
}
stream<ManyAttributes, tuple<uint64 seqno>> Merged = ParallelMerge(Parsed) {
param key: Parsed.seqno;
}
() as Sink = CountProcessed(Merged) {}
}