## Beam word count example

In [9]:
from __future__ import absolute_import

import apache_beam as beam
from apache_beam.io import ReadFromText
from apache_beam.io import WriteToText
import re
import pandas as pd

A text file `Read` transform is applied to the `Pipeline` object itself, and produces a `PCollection` as output. Each element in the output PCollection represents one line of text from the input file.

This transform splits the lines in `PCollection<String>`, where each element is an individual word in Shakespeare’s collected texts. 

In [6]:
p = beam.Pipeline()

lines = p | 'read' >> ReadFromText("kinglear.txt")
lines | 'write' >> WriteToText("copy-of-kinglear.txt")

result = p.run()
result.wait_until_finish()

'DONE'

In [7]:
class ExtractWordsDoFn(beam.DoFn):
    def process(self, element):
        text_line = element.strip()
        words = re.findall(r'[\w\']+', text_line, re.UNICODE)
        return words
    

# Count the occurrences of each word.
def count_ones(word_ones):
    (word, ones) = word_ones
    return (word, sum(ones))    

# Format the counts into a PCollection of strings.
def format_result(word_count):
    (word, count) = word_count
    return '%s\t%d' % (word, count)


In [8]:
# Creating a pipeline
p = beam.Pipeline()

lines = p | 'read' >> ReadFromText("kinglear.txt")

counts = (lines
    | 'split' >> (beam.ParDo(ExtractWordsDoFn()))
    | 'pair_with_one' >> beam.Map(lambda x: (x, 1))
    | 'group' >> beam.GroupByKey()
    | 'count' >> beam.Map(count_ones))

output = counts | 'format' >> beam.Map(format_result)
output | 'write' >> WriteToText("counts.txt")

result = p.run()
result.wait_until_finish()

'DONE'

In [11]:
data = pd.read_csv('counts.txt-00000-of-00001', sep = '\t', header = None)
data

Unnamed: 0,0,1
0,1606,1
1,THE,7
2,TRAGEDY,1
3,OF,16
4,KING,1
...,...,...
4902,journey,1
4903,weight,1
4904,ought,1
4905,oldest,1


In [15]:
data.sort_values(by=[1], ascending=False)

Unnamed: 0,0,1
128,the,705
119,I,620
122,and,587
13,of,456
27,to,430
...,...,...
2576,outface,1
2578,persecutions,1
2579,sky,1
2581,precedent,1


## Task

Copy and adjust the beam job above so that it ignores a set of stop words

Use the filename `counts-nostop.txt` for the output


In [None]:
stopwords = set(['the', 'and', 'of', 'to'])

# TODO copy and adjust beam job

In [None]:
data = pd.read_csv('counts-nostop.txt-00000-of-00001', sep = '\t', header = None)
data.sort_values(by=[1], ascending=False)