/
pardomap_example.py
64 lines (49 loc) · 1.86 KB
/
pardomap_example.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
from __future__ import absolute_import
import argparse
import logging
import apache_beam as beam
from apache_beam.io import ReadFromText
from apache_beam.io import WriteToText
from apache_beam.options.pipeline_options import PipelineOptions
def run(argv=None):
parser = argparse.ArgumentParser()
parser.add_argument(
'--input',
dest='input',
default='./data/dates.txt',
help='Input file to process.')
parser.add_argument(
'--output',
dest='output',
default='./outputs/pardo',
help='Output file to write results to.')
known_args, pipeline_args = parser.parse_known_args(argv)
pipeline_options = PipelineOptions(pipeline_args)
# Naive pardo
with beam.Pipeline(options=pipeline_options) as p:
class DateExtractor(beam.DoFn):
def process(self, data_item):
return (str(data_item).split(','))[0]
(p
| 'ReadMyFile' >> ReadFromText('./data/dates.csv')
| 'Splitter using beam.ParDo 01' >> beam.ParDo(DateExtractor())
| 'Output' >> WriteToText(known_args.output + "_pardo"))
# Good result with map
with beam.Pipeline(options=pipeline_options) as p:
(p
| 'ReadMyFile' >> ReadFromText('./data/dates.csv')
| 'Splitter using beam.Map' >> beam.Map(lambda record: (record.split(','))[0])
| 'Output' >> WriteToText(known_args.output + "_map")
)
# Fix pardo
with beam.Pipeline(options=pipeline_options) as p:
class DateExtractorCorrected(beam.DoFn):
def process(self, data_item):
return [(str(data_item).split(','))[0]]
(p
| 'ReadMyFile' >> ReadFromText('./data/dates.csv')
| 'Splitter using beam.ParDo 02' >> beam.ParDo(DateExtractorCorrected())
| 'Output' >> WriteToText(known_args.output + "_pardo_corrected"))
if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
run()