In [None]:
# The order of this notebook has been arranged to simulate user modification and out-of-order re-execution.
# Simply run the notebook top-down will be sufficient to demonstrate the steps in task_4.
import re

import apache_beam as beam
from apache_beam.runners.interactive.interactive_runner import InteractiveRunner
from apache_beam.runners.interactive.interactive_beam import *


class ReadWordsFromText(beam.PTransform):
    
    def __init__(self, file_pattern):
        self._file_pattern = file_pattern
    
    def expand(self, pcoll):
        return pcoll.pipeline | beam.io.ReadFromText(self._file_pattern) | beam.FlatMap(lambda line: re.findall(r'[\w\']+', line.strip(), re.UNICODE))

In [None]:
p = beam.Pipeline(InteractiveRunner())
words = p | 'read' >> ReadWordsFromText('gs://apache-beam-samples/shakespeare/kinglear.txt')
show(words)

In [None]:
counts = words | 'count' >> beam.combiners.Count.PerElement()
show(counts)

In [None]:
output = counts | 'format' >> beam.MapTuple(lambda word, count: '%s: %d' % (word, count))
show(output)

In [None]:
output | 'write' >> beam.io.WriteToText('/tmp/output.txt')

In [None]:
p.run()

In [None]:
# Give output a new format.
output = counts | 'format' >> beam.MapTuple(lambda word, count: '%s -> %d' % (word, count))
show(output)

In [None]:
# Also output to new targets.
output | 'write to local' >> WriteToText('/tmp/count.txt')
output | 'write to gcs' >> WriteToText('gs://bucket_xyz/count.txt')

In [None]:
p.run()