# Apache Beam Text Analysis Pipeline

This notebook demonstrates an enhanced text analysis pipeline using Apache Beam with the [DirectRunner](https://beam.apache.org/documentation/runners/direct/). We'll perform advanced word counting, filtering, and statistical analysis on text data.

**Key Features:**
- Case-insensitive word counting
- Stop word filtering
- Word length analysis
- Top N word extraction
- Statistical summaries

To navigate through different sections, use the table of contents. From **View** drop-down list, select **Table of contents**.

To run a code cell, click the **Run cell** button at the top left of the cell, or select it and press **`Shift+Enter`**.

# Environment Setup

This section sets up the development environment by installing Apache Beam and preparing the input data. We'll download a Shakespeare text file to analyze.

In [3]:
# Run and print a shell command.
def run(cmd):
  print('>> {}'.format(cmd))
  !{cmd}
  print('')

# Install apache-beam.
# run('pip install --quiet apache-beam')

# Copy the input file into the local file system.
run('mkdir -p data')
run('gsutil cp gs://dataflow-samples/shakespeare/kinglear.txt data/')

>> mkdir -p data

>> gsutil cp gs://dataflow-samples/shakespeare/kinglear.txt data/
Copying gs://dataflow-samples/shakespeare/kinglear.txt...
/ [1 files][153.6 KiB/153.6 KiB]                                                
Operation completed over 1 objects/153.6 KiB.                                    



# Enhanced Text Analysis Pipeline

This pipeline performs multiple text analysis operations:
1. **Case-insensitive word counting** - Normalizes all words to lowercase
2. **Stop word filtering** - Removes common words that don't add meaning
3. **Word length analysis** - Calculates statistics about word lengths
4. **Top N extraction** - Identifies the most frequent meaningful words

In [None]:
import apache_beam as beam
import re
from collections import Counter

# Configuration
inputs_pattern = 'data/*'
outputs_prefix = 'outputs/word_counts'
word_length_output = 'outputs/word_lengths'
top_words_output = 'outputs/top_words'

# Define common stop words to filter out
STOP_WORDS = {'the', 'a', 'an', 'and', 'or', 'but', 'in', 'on', 'at', 'to', 'for', 
              'of', 'with', 'by', 'from', 'as', 'is', 'was', 'are', 'were', 'been',
              'be', 'have', 'has', 'had', 'do', 'does', 'did', 'will', 'would',
              'could', 'should', 'may', 'might', 'must', 'can', 'this', 'that',
              'these', 'those', 'i', 'you', 'he', 'she', 'it', 'we', 'they',
              'his', 'her', 'its', 'our', 'their', 'my', 'your', 'me', 'him',
              'us', 'them', 'what', 'which', 'who', 'whom', 'whose', 'where',
              'when', 'why', 'how', 'all', 'each', 'every', 'both', 'few',
              'more', 'most', 'other', 'some', 'such', 'no', 'nor', 'not',
              'only', 'own', 'same', 'so', 'than', 'too', 'very', 's', 't',
              'can', 'will', 'just', 'don', 'should', 'now'}

# Running locally in the DirectRunner.
with beam.Pipeline() as pipeline:
  # Step 1: Read and extract words (case-insensitive)
  words = (
      pipeline
      | 'Read lines' >> beam.io.ReadFromText(inputs_pattern)
      | 'Extract words' >> beam.FlatMap(
          lambda line: re.findall(r"[a-zA-Z']+", line.lower())
      )
      | 'Filter empty' >> beam.Filter(lambda word: len(word) > 0)
  )
  
  # Step 2: Count words (case-insensitive)
  word_counts = (
      words
      | 'Pair with count' >> beam.Map(lambda word: (word, 1))
      | 'Sum counts' >> beam.CombinePerKey(sum)
  )
  
  # Step 3: Filter out stop words and short words
  meaningful_words = (
      word_counts
      | 'Filter stop words' >> beam.Filter(
          lambda word_count: word_count[0] not in STOP_WORDS and len(word_count[0]) > 2
      )
  )
  
  # Step 4: Calculate word length statistics
  word_lengths = (
      words
      | 'Get word lengths' >> beam.Map(lambda word: len(word))
      | 'Pair length with count' >> beam.Map(lambda length: (length, 1))
      | 'Count by length' >> beam.CombinePerKey(sum)
  )
  
  # Step 5: Get top 50 most frequent meaningful words
  top_words = (
      meaningful_words
      | 'Sort by count' >> beam.combiners.Top.Of(50, key=lambda x: x[1])
      | 'Flatten top list' >> beam.FlatMap(lambda x: x)
  )
  
  # Write outputs
  (
      meaningful_words
      | 'Format word counts' >> beam.Map(
          lambda word_count: f"{word_count[0]}: {word_count[1]}"
      )
      | 'Write word counts' >> beam.io.WriteToText(outputs_prefix)
  )
  
  (
      word_lengths
      | 'Format lengths' >> beam.Map(
          lambda length_count: f"Length {length_count[0]}: {length_count[1]} words"
      )
      | 'Write lengths' >> beam.io.WriteToText(word_length_output)
  )
  
  (
      top_words
      | 'Format top words' >> beam.Map(
          lambda word_count: f"{word_count[0]}: {word_count[1]}"
      )
      | 'Write top words' >> beam.io.WriteToText(top_words_output)
  )

# Display results
print("=" * 60)
print("TOP 30 MOST FREQUENT MEANINGFUL WORDS:")
print("=" * 60)
run('head -n 30 {}-00000-of-*'.format(top_words_output))

print("\n" + "=" * 60)
print("WORD LENGTH DISTRIBUTION (first 20):")
print("=" * 60)
run('head -n 20 {}-00000-of-*'.format(word_length_output))



>> head -n 200 outputs/part-00000-of-*
('KING', 243)
('LEAR', 236)
('DRAMATIS', 1)
('PERSONAE', 1)
('king', 65)
('of', 447)
('Britain', 2)
('OF', 15)
('FRANCE', 10)
('DUKE', 3)
('BURGUNDY', 8)
('CORNWALL', 63)
('ALBANY', 67)
('EARL', 2)
('KENT', 156)
('GLOUCESTER', 141)
('EDGAR', 126)
('son', 29)
('to', 438)
('Gloucester', 26)
('EDMUND', 99)
('bastard', 7)
('CURAN', 6)
('a', 366)
('courtier', 1)
('Old', 13)
('Man', 11)
('tenant', 3)
('Doctor', 12)
('Fool', 73)
('OSWALD', 53)
('steward', 2)
('Goneril', 12)
('A', 51)
('Captain', 12)
('employed', 1)
('by', 69)
('Edmund', 32)
('Gentleman', 48)
('attendant', 1)
('on', 93)
('Cordelia', 22)
('Herald', 6)
('Servants', 9)
('Cornwall', 12)
('First', 7)
('Servant', 11)
('Second', 4)
('Third', 4)
('GONERIL', 71)
('REGAN', 86)
('daughters', 24)
('Lear', 17)
('CORDELIA', 42)
('Knights', 2)
("Lear's", 4)
('train', 9)
('Captains', 1)
('Messengers', 1)
('Soldiers', 7)
('and', 594)
('Attendants', 8)
('Knight', 8)
('Messenger', 10)
('SCENE', 27)
('ACT', 

In [None]:
# Character frequency analysis
char_output = 'outputs/char_frequency'

with beam.Pipeline() as pipeline:
  char_freq = (
      pipeline
      | 'Read lines for chars' >> beam.io.ReadFromText(inputs_pattern)
      | 'Extract characters' >> beam.FlatMap(
          lambda line: [char.lower() for char in line if char.isalpha()]
      )
      | 'Pair char with count' >> beam.Map(lambda char: (char, 1))
      | 'Sum char counts' >> beam.CombinePerKey(sum)
      | 'Format char frequency' >> beam.Map(
          lambda char_count: f"{char_count[0]}: {char_count[1]}"
      )
      | 'Write char frequency' >> beam.io.WriteToText(char_output)
  )

print("=" * 60)
print("CHARACTER FREQUENCY ANALYSIS:")
print("=" * 60)
run('head -n 26 {}-00000-of-*'.format(char_output))
