# Try Apache Beam - Python

In this notebook, we set up your development environment and work through a simple example using the [DirectRunner](https://beam.apache.org/documentation/runners/direct/). You can explore other runners with the [Beam Capatibility Matrix](https://beam.apache.org/documentation/runners/capability-matrix/).

To navigate through different sections, use the table of contents. From **View**  drop-down list, select **Table of contents**.

To run a code cell, you can click the **Run cell** button at the top left of the cell, or by select it and press **`Shift+Enter`**. Try modifying a code cell and re-running it to see what happens.

To learn more about Colab, see [Welcome to Colaboratory!](https://colab.sandbox.google.com/notebooks/welcome.ipynb).

# Setup

First, you need to set up your environment, which includes installing `apache-beam` and downloading a text file from Cloud Storage to your local file system. We are using this file to test your pipeline.

In [3]:
!pip install apache-beam

Collecting apache-beam
  Downloading apache_beam-2.69.0.tar.gz (2.9 MB)
[K     |████████████████████████████████| 2.9 MB 3.1 MB/s eta 0:00:01
[?25h  Installing build dependencies ... [?25ldone
[?25h  Getting requirements to build wheel ... [?25ldone
[?25h    Preparing wheel metadata ... [?25ldone
[?25hCollecting hdfs<3.0.0,>=2.1.0
  Downloading hdfs-2.7.3.tar.gz (43 kB)
[K     |████████████████████████████████| 43 kB 8.2 MB/s eta 0:00:01
[?25hCollecting objsize<0.8.0,>=0.6.1
  Downloading objsize-0.7.1-py3-none-any.whl (11 kB)
Collecting pydot<2,>=1.2.0
  Downloading pydot-1.4.2-py2.py3-none-any.whl (21 kB)
Collecting jsonpickle<4.0.0,>=3.0.0
  Downloading jsonpickle-3.4.2-py3-none-any.whl (46 kB)
[K     |████████████████████████████████| 46 kB 21.7 MB/s eta 0:00:01
[?25hCollecting crcmod<2.0,>=1.7
  Downloading crcmod-1.7.tar.gz (89 kB)
[K     |████████████████████████████████| 89 kB 17.1 MB/s eta 0:00:01
[?25hCollecting cryptography<48.0.0,>=39.0.0
  Using cached crypto

In [2]:
# Run and print a shell command.
def run(cmd):
  print('>> {}'.format(cmd))
  !{cmd}
  print('')

# Install apache-beam.
# run('pip install --quiet apache-beam')

# Copy the input file into the local file system.
run('mkdir -p data')
run('gsutil cp gs://dataflow-samples/shakespeare/kinglear.txt data/')

>> mkdir -p data

>> gsutil cp gs://dataflow-samples/shakespeare/kinglear.txt data/
zsh:1: command not found: gsutil



# Word count with comments

Below is mostly the same code as above, but with comments explaining every line in more detail.

In [5]:
import apache_beam as beam
import re

inputs_pattern = 'data/*'
outputs_prefix = 'outputs/part'

# Running locally in the DirectRunner.
with beam.Pipeline() as pipeline:
  # Store the word counts in a PCollection.
  # Each element is a tuple of (word, count) of types (str, int).
  word_counts = (
      # The input PCollection is an empty pipeline.
      pipeline

      # Read lines from a text file.
      | 'Read lines' >> beam.io.ReadFromText(inputs_pattern)
      # Element type: str - text line

      # Use a regular expression to iterate over all words in the line.
      # FlatMap will yield an element for every element in an iterable.
      | 'Find words' >> beam.FlatMap(lambda line: re.findall(r"[a-zA-Z']+", line))
      # Element type: str - word

      # Create key-value pairs where the value is 1, this way we can group by
      # the same word while adding those 1s and get the counts for every word.
      | 'Pair words with 1' >> beam.Map(lambda word: (word, 1))
      # Element type: (str, int) - key: word, value: 1

      # Group by key while combining the value using the sum() function.
      | 'Group and sum' >> beam.CombinePerKey(sum)
      # Element type: (str, int) - key: word, value: counts
  )

  # We can process a PCollection through other pipelines too.
  (
      # The input PCollection is the word_counts created from the previous step.
      word_counts

      # Format the results into a string so we can write them to a file.
      | 'Format results' >> beam.Map(lambda word_count: str(word_count))
      # Element type: str - text line

      # Finally, write the results to a file.
      | 'Write results' >> beam.io.WriteToText(outputs_prefix)
  )

# Sample the first 20 results, remember there are no ordering guarantees.
run('head -n 200 {}-00000-of-*'.format(outputs_prefix))



>> head -n 200 outputs/part-00000-of-*
('KING', 243)
('LEAR', 236)
('DRAMATIS', 1)
('PERSONAE', 1)
('king', 65)
('of', 447)
('Britain', 2)
('OF', 15)
('FRANCE', 10)
('DUKE', 3)
('BURGUNDY', 8)
('CORNWALL', 63)
('ALBANY', 67)
('EARL', 2)
('KENT', 156)
('GLOUCESTER', 141)
('EDGAR', 126)
('son', 29)
('to', 438)
('Gloucester', 26)
('EDMUND', 99)
('bastard', 7)
('CURAN', 6)
('a', 366)
('courtier', 1)
('Old', 13)
('Man', 11)
('tenant', 3)
('Doctor', 12)
('Fool', 73)
('OSWALD', 53)
('steward', 2)
('Goneril', 12)
('A', 51)
('Captain', 12)
('employed', 1)
('by', 69)
('Edmund', 32)
('Gentleman', 48)
('attendant', 1)
('on', 93)
('Cordelia', 22)
('Herald', 6)
('Servants', 9)
('Cornwall', 12)
('First', 7)
('Servant', 11)
('Second', 4)
('Third', 4)
('GONERIL', 71)
('REGAN', 86)
('daughters', 24)
('Lear', 17)
('CORDELIA', 42)
('Knights', 2)
("Lear's", 4)
('train', 9)
('Captains', 1)
('Messengers', 1)
('Soldiers', 7)
('and', 594)
('Attendants', 8)
('Knight', 8)
('Messenger', 10)
('SCENE', 27)
('ACT', 