# WordCount Example

In this example we'll be using the Back to the Future transcript which is formatted as `Character: Line`. For example:

`Doc: Marty, is that you?`

In the first part we'll count the number of words in the transcript (we'll filter out the character names) and sort them by most frequently used to least frequently used.

In the second part we'll filter out common words, known as stop words, by importing a Python package using pip. 

Finally, we'll find the most common words used by each character.


## Part 1: Simple Word Count

In [None]:
# Import the regular expression package
import re

In [None]:
# Load the transcript using SparkContext.textFile
# This will return an RDD of strings - one for each line in the transcript 
lines = sc.textFile("file:///usr/data/backtothefuture_transcript.txt")

In [None]:
# This function will be called for each line in the transcript
# We will strip out the character names (i.e. Marty:)
# We'll also strip out special characters in each string
# Finally, we'll return an array of words
def parseLine(line):
    line = re.sub("^[^:]+:", "", line)
    line = re.sub("[^a-zA-Z ']", "", line)
    lineWords = re.split("\s+", line.lower())
    return filter(lambda w: w not in "", lineWords)

In [None]:
# flatMap can map each input to 0 or more outputs
# In this case each line of text will be mapped to 0 or more words
words = lines.flatMap(parseLine)

In [None]:
# Map each RDD to (key, 1) where key is the word
wordCounts = words.map(lambda x: (x, 1))

In [None]:
# reduceByKey takes 2 RDDs with the same key, combines them into a single RDD,
# and sets the value to the output of the lambda function
# In this case that value is x + y giving us the total count for each word (the key)
wordCounts = wordCounts.reduceByKey(lambda x, y: x + y)

In [None]:
# Here we reverse the RDDs, so instead of (word, count)
# They will be stored as (count, word)
# This will allow us to sort by the key (count)
wordCountsReversed = wordCounts.map(lambda x: (x[1], x[0]))

In [None]:
# Sort by key (which is now count) descending
wordCountsSorted = wordCountsReversed.sortByKey(ascending=False)

In [None]:
# Find the top 10 words
wordCountsSorted.take(10)

## Part 2: Filter Out Stop Words

In [None]:
# Install the stop-words package
!pip3 install stop-words

In [None]:
# Import the stop_words package (note the underscore)
import stop_words

In [None]:
# In addition to stripping out character names and special characters
# this time we also strip out stop words
def parseLine2(line):
    line = re.sub("^[^:]+:", "", line)
    line = re.sub("[^a-zA-Z ']", "", line)
    lineWords = re.split("\s+", line.lower())
    lineWords = filter(lambda w: w not in "", lineWords)
    stopWords = stop_words.get_stop_words('en')
    return filter(lambda w: w not in stopWords, lineWords)

In [None]:
# Get the words from the lines
words = lines.flatMap(parseLine2)

In [None]:
# Map and reduce by key to get total word counts
wordCounts = words.map(lambda x: (x, 1)).reduceByKey(lambda x, y: x + y)

In [None]:
# Reverse and sort by key (count) descending
wordCountsSorted = wordCounts.map(lambda x: (x[1], x[0])).sortByKey(ascending=False)

In [None]:
# Find the top 10 words - not including stop words
wordCountsSorted.take(10)

## Part 3: Word Counts by Character

In [None]:
# Get the entire contents of the file, so we can group words by character
# wholeTextFiles returns an RDD formatted as (fileName, fileContents)
contents = sc.wholeTextFiles("file:///usr/data/backtothefuture_transcript.txt")

In [None]:
# In order to group words by character we need to parse the entire contents
# of the transcript at once. This way we can keep track of the last character specified
# as we handle each new line (some new lines won't specify a character)
def parseContents(contents):
    regex = r'(^[a-z^:]+:)'
    strs = re.compile(regex, re.UNICODE|re.MULTILINE).split(contents.lower())
    key = None
    tuples = []
    for str in strs:
        if re.match(regex, str) is not None:
            key = str[:-1] # get rid of the ending colon
        elif key is not None:
            words = parseLine2(str)
            for word in words:
                tuples.append((key, word))
    return tuples

In [None]:
# Parse the contents of the text file
# No Spark magic here - just using Python to create a list of (character, word) tuples
characterWordTuples = parseContents(contents.values().take(1)[0])

In [None]:
# Use SparkContext.parallelize to convert the list of tuples to RDDs
characterWords = sc.parallelize(characterWordTuples)

In [None]:
# Let's take a look at a few of them
characterWords.take(10)

In [None]:
# Similarly to how we did our generic word count we are going to
# map each RDD to (key, 1) where key is the (character, word) tuple
characterWordCounts = characterWords.map(lambda x: (x, 1))

In [None]:
# We'll reduce by key the same way we did before
characterWordCounts = characterWordCounts.reduceByKey(lambda x, y: x + y)

In [None]:
# Let's see what our RDDs look like now
characterWordCounts.take(10)

In [None]:
# Reverse the RDDs, so we can sort by count as the key
characterWordCountsReversed = characterWordCounts.map(lambda x: (x[1], x[0]))

In [None]:
# Sort by count descending
characterWordCountsSorted = characterWordCountsReversed.sortByKey(ascending=False)

In [None]:
# Find the top 10 caracter/word combinations and display them
characterWordCountsSorted.take(10)

In [None]:
# What we really want is the top word for each character, not the top character/word combinations
# We'll start by making the character the key
# We'll map the RDD above (count, (character, word)) to (character, (word, count)) 
characterWordCounts2 = characterWordCountsSorted.map(lambda x: (x[1][0], (x[1][1],x[0])))

In [None]:
# Let's see what it looks like
characterWordCounts2.take(10)

In [None]:
# Now, we'll reduceByKey, but this time we won't add the values
# We'll take the (word, count) tuple that has the highest count
characterWordCounts2 = characterWordCounts2.reduceByKey(lambda x, y: (x if x[1] > y[1] else y))

In [None]:
# Let's restructure the RDDs, so we can sort by the characters with the most words
characterWordCounts2Reversed = characterWordCounts2.map(lambda x: (x[1][1], (x[0],x[1][0])))

In [None]:
# Sort by key (count) descending
characterWordCounts2Sorted = characterWordCounts2Reversed.sortByKey(ascending=False)

In [None]:
# Finally! Here are the top words for the 10 characters with the most lines
characterWordCounts2Sorted.take(10)