forked from cs327e-fall2019/snippets
-
Notifications
You must be signed in to change notification settings - Fork 0
/
oscars_2.py
30 lines (23 loc) · 988 Bytes
/
oscars_2.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
import apache_beam as beam
from apache_beam.io import ReadFromText
from apache_beam.io import WriteToText
# PTransform: parse line in file, return (actor name, 1)
class ActorCountFn(beam.DoFn):
def process(self, element):
values = element.strip().split('\t')
year = values[0]
category = values[1]
winner = values[2]
entity = values[3]
if 'ACTOR' in category or 'ACTRESS' in category:
return [(entity, 1)]
# Create a Pipeline using a local runner for execution
with beam.Pipeline('DirectRunner') as p:
# create a PCollection from the file contents
in_pcoll = p | 'Read File' >> ReadFromText('oscars_input.tsv')
# apply a ParDo to the PCollection
actor_pcoll = in_pcoll | 'Extract Actor' >> beam.ParDo(ActorCountFn())
# apply GroupByKey to the PCollection
out_pcoll = actor_pcoll | 'Group by Actor' >> beam.GroupByKey()
# write PCollection to a file
out_pcoll | 'Write File' >> WriteToText('oscars_output.txt')