# Word Count using DataFrames

Spark DataFrames, a relatively new feature, provide a higher level interface similar to pandas DataFrames that abstract away much of the detail when using Spark. This exercise implements the previous word count with RDDs example, using DataFrames.

We will cover:

1. Creating DataFrames
2. Counting with `.groupBy()` and `.count()`
3. Finding unique words and a mean value
4. Applying word count to a file

Note that, for reference, you can look up the details of the relevant methods in [Spark's Python API](https://spark.apache.org/docs/latest/api/python/pyspark.html#pyspark.RDD)

In [1]:
import pandas as pd
import numpy as np
from pyspark.sql import types
import pyspark.sql.functions as func

## (1) Creating a Spark DataFrame

In this part of the exercise, we will explore creating a DataFrame with `createDataFrame`.

### (1a) Create a base DataFrame

We'll start by generating a base DataFrame by using a pandas DataFrame and the `spark.createDataFrame` method.  Then we'll print out the type of the DataFrame.

In [2]:
import pandas as pd

wordsPandasDF = pd.DataFrame({'word': ['cat', 'elephant', 'rat', 'rat', 'cat']})
wordsDF = spark.createDataFrame(wordsPandasDF)

# Print out the type of wordsRDD
print(type(wordsDF))

<class 'pyspark.sql.dataframe.DataFrame'>


In [3]:
# Convert back to a pandas DataFrame for pretty display
wordsDF.toPandas()

Unnamed: 0,word
0,cat
1,elephant
2,rat
3,rat
4,cat


### (1b) Pluralize and test

Like in the previous exercise, let's create a function to pluralize a word.

In [4]:
# One way of completing the function
def makePlural(word):
    return word + 's'

print(makePlural('cat'))

cats


In [5]:
# Load in the testing code and check to see if your answer is correct
# If incorrect it will report back '1 test failed' for each failed test
# Make sure to rerun any cell you change before trying the test again
from test_helper import Test
# TEST Pluralize and test (1b)
Test.assertEquals(makePlural('rat'), 'rats', 'incorrect result: makePlural does not add an s')

1 test passed.


### (1c) Apply `makePlural` to the base DataFrame

In order to apply `makePlural` to a column from a Spark DataFrame, we must first create a Spark function. [pyspark.sql.functions](http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#module-pyspark.sql.functions) provides a number of built in functions you can use, as well as [udf()](http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.functions.udf), which allows you to create user defined functions.

`udf()` takes as its arguments both the python function you want to use and a Spark type, which are defined in the [pyspark.sql.types](http://spark.apache.org/docs/2.0.0/api/python/pyspark.sql.html#module-pyspark.sql.types) module.

In [6]:
import pyspark.sql.functions as func
from pyspark.sql import types

# Create a Spark User Defined Function
makePluralUDF = func.udf(makePlural, types.StringType())

You can then use `.select()` on a DataFrame to return a new DataFrame with the modified column:

In [7]:
# Generate a new DataFrame with a pluralized column
pluralDF = wordsDF.select(makePluralUDF('word'))
pluralDF.toPandas()

Unnamed: 0,makePlural(word)
0,cats
1,elephants
2,rats
3,rats
4,cats


In [8]:
# TEST Apply makePlural to the base RDD(1c)
Test.assertEquals(list(pluralDF.toPandas()['makePlural(word)']),
                  ['cats', 'elephants', 'rats', 'rats', 'cats'],
                  'incorrect values for pluralRDD')

1 test passed.


### (1d) Make a UDF from a `lambda` function

You can also create a UDF without first doing a `def` using a Python lambda function. Create the same UDF as above using a lambda function and apply it to the DataFrame.

In [9]:
# TODO: Replace <FILL IN> with appropriate code
# Hint: Don't forget to pass in both the lambda function and the output type, e.g. types.StringType()
makePluralLambdaUDF = func.udf(lambda x: x+'s',types.StringType())

pluralLambdaDF = wordsDF.select(makePluralLambdaUDF('word'))
pluralLambdaDF.toPandas()

Unnamed: 0,<lambda>(word)
0,cats
1,elephants
2,rats
3,rats
4,cats


Note as well that you can control the name of the column in the output DataFrame using `.alias()` on the UDF call:

In [10]:
pluralLambdaDF = wordsDF.select(makePluralLambdaUDF('word').alias('plural'))
pluralLambdaDF.toPandas()

Unnamed: 0,plural
0,cats
1,elephants
2,rats
3,rats
4,cats


In [11]:
# TEST Pass a lambda function to map (1d)
Test.assertEquals(list(pluralLambdaDF.toPandas()['plural']),
                  ['cats', 'elephants', 'rats', 'rats', 'cats'],
                  'incorrect values for pluralLambdaRDD (1d)')

1 test passed.


### (1e) Length of each word

Now construct a UDF to return the number of characters in each word.

In [14]:
# TODO: Replace <FILL IN> with appropriate code
# Hint: Don't forget to pass in both a Python function and the output type, e.g. types.StringType()
lenUDF = func.udf(len,types.StringType())

lengthsDF = pluralLambdaDF.select('*', lenUDF('plural'))
lengthsDF.toPandas()

Unnamed: 0,plural,len(plural)
0,cats,4
1,elephants,9
2,rats,4
3,rats,4
4,cats,4


In [15]:
# TEST Length of each word (1e)
Test.assertEquals(list(lengthsDF.toPandas()['len(plural)']),
                  [4, 9, 4, 4, 4],
                  'incorrect values for pluralLengths')

1 test failed. incorrect values for pluralLengths


## (2) Counting with DataFrames

### (2a) `.groupBy()`

When using DataFrames, we have a more abstract interface for performing complex operations on data. The `.groupBy()` method on a DataFrame returns a [GroupedData](https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.GroupedData) object, which supports a number of actions on grouped data.

In [None]:
groupedWords = wordsDF.groupBy('word')
help(groupedWords)

### (2b) `.groupBy().count()`

In our case, where we have a single column of strings, only the `.count()` method really makes sense. This returns a new DataFrame with two columns, one with the words and one with the number of times they appeared in the source:

In [16]:
wordCounts = wordsDF.groupBy('word').count()
wordCounts.toPandas()

Unnamed: 0,word,count
0,rat,2
1,cat,2
2,elephant,1


In [17]:
# TEST word counts (2b)
wordCountsPandasDF = wordCounts.toPandas()
Test.assertEquals(sorted(zip(wordCountsPandasDF['word'], wordCountsPandasDF['count'])),
                  [('cat', 2), ('elephant', 1), ('rat', 2)],
                  'incorrect value for wordCountsCollected')

1 test passed.


## (3) Finding unique words and a mean value

### (3a) Unique words

Calculate the number of unique words in `wordsDF`.  You can use other DataFrames that you have already created to make this easier.

In [19]:
# TODO: Replace <FILL IN> with appropriate code
numUniqueWords = wordCounts.count()
print(numUniqueWords)

3


In [20]:
# TEST Unique words (3a)
Test.assertEquals(numUniqueWords, 3, 'incorrect count of numUniqueWords')

1 test passed.


### (3b) Mean word count

Find the mean number of occurrences of each word in `wordCounts`.

In [21]:
# TODO: Replace <FILL IN> with appropriate code
averageDF = wordCounts.select(func.avg('count'))

# Collect DataFrame and get first value out of it
average = np.array(averageDF.toPandas())[0, 0]
average

1.6666666666666667

In [22]:
# TEST Mean using reduce (3b)
Test.assertEqualsTol(average, 1.6666667, 0.01, 'incorrect value of average')

1 test passed.


## (4) Apply word count to a file

In this section we will finish developing our word count application.  We'll have to build the `wordCount` function, deal with real world problems like capitalization and punctuation, load in our data source, and compute the word count on the new data.

### (4a) `wordCount` function

First, define a function for word counting.  This function should take in a DataFrame that is a list of words like `wordListDF` and return a DataFrame that has all of the words and their associated counts.

In [24]:
# TODO: Replace <FILL IN> with appropriate code
def wordCount(wordListDF):
    """Creates a DataFrame with word counts from a DataFrame of words.

    Args:
        wordListDF (DataFrame): A DataFrame consisting of words.

    Returns:
        DataFrame: A Spark DataFrame with word, count columns.
    """
    return wordListDF.groupby('word').count()

wordCount(wordsDF).toPandas()

Unnamed: 0,word,count
0,rat,2
1,cat,2
2,elephant,1


In [25]:
# TEST wordCount function (4a)
wordCountsPandasDF = wordCount(wordsDF).toPandas()
Test.assertEquals(sorted(zip(wordCountsPandasDF['word'], wordCountsPandasDF['count'])),
                  [('cat', 2), ('elephant', 1), ('rat', 2)],
                  'incorrect value for wordCountsCollected')

1 test passed.


### (4b) Capitalization and punctuation

Here we defined the `removePunctuation` function from the previous exercise to preprocess text to cleaned words:

In [35]:
# TODO: Replace <FILL IN> with appropriate code
import re

def removePunctuation(text):
    """Removes punctuation, changes to lower case, and strips leading and trailing spaces.

    Note:
        Only spaces, letters, and numbers should be retained.  Other characters should should be
        eliminated (e.g. it's becomes its).  Leading and trailing spaces should be removed after
        punctuation is removed.

    Args:
        text (str): A string.

    Returns:
        str: The cleaned up string.
    """
    return re.sub(r'[^a-zA-Z0-9 ]','',text.lower().strip())

print(removePunctuation('Hi, you!'))
print(removePunctuation(' No under_score!'))
print(removePunctuation(" The Elephant's 4 cats. "))

hi you
no underscore
the elephants 4 cats


In [27]:
re.sub?

In [36]:
# TEST Capitalization and punctuation (4b)
Test.assertEquals(removePunctuation(" The Elephant's 4 cats. "),
                  'the elephants 4 cats',
                  'incorrect definition for removePunctuation function')

1 test passed.


### (4c) Load a text file

For the next part, we will use the [Complete Works of William Shakespeare](http://www.gutenberg.org/ebooks/100) from [Project Gutenberg](http://www.gutenberg.org/wiki/Main_Page). To convert a text file into a DataFrame, we use the `SparkContext.textFile()` method. We also apply the recently defined `removePunctuation()` function using a `map()` transformation to strip out the punctuation and change all text to lowercase.  Since the file is large we use `take(15)`, so that we only print 15 lines.

Let's start by fetching the data.

In [37]:
!rm shakespeare.txt*
!wget https://s3-eu-west-1.amazonaws.com/asi-training-data/spark/shakespeare.txt

--2017-09-24 20:43:00--  https://s3-eu-west-1.amazonaws.com/asi-training-data/spark/shakespeare.txt
Resolving s3-eu-west-1.amazonaws.com (s3-eu-west-1.amazonaws.com)... 52.218.17.4
Connecting to s3-eu-west-1.amazonaws.com (s3-eu-west-1.amazonaws.com)|52.218.17.4|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 5327978 (5.1M) [binary/octet-stream]
Saving to: ‘shakespeare.txt’


2017-09-24 20:43:01 (87.6 MB/s) - ‘shakespeare.txt’ saved [5327978/5327978]



In [38]:
filename = "shakespeare.txt"

# Create a UDF for removePunctuation
removePunctuationUDF = func.udf(removePunctuation, types.StringType())

# Read the data from file and apply removePunctuation
shakespeareDF = spark.read.text(filename).select(removePunctuationUDF('value').alias('line'))

shakespeareDF.toPandas().head(10)

Unnamed: 0,line
0,1609
1,
2,the sonnets
3,
4,by william shakespeare
5,
6,
7,
8,1
9,from fairest creatures we desire increase


### (4d) Words from lines 

Before we can use the `wordCount()` function, we have to address two issues with the format of the DataFrame:

* The first issue is that  that we need to split each line by its spaces.
* The second issue is we need to filter out empty lines.
 
Apply a transformation that will split each element of the DataFrame by its spaces. For each element of the DataFrame, you should apply Python's string [split()](https://docs.python.org/3/library/stdtypes.html#str.split) function. You might think that applying a UDF to a column directly is the way to do this, but think about what the result of the `split()` function will be.

In [42]:
# TODO: Replace <FILL IN> with appropriate code
shakespeareWordsDF = (shakespeareDF
                      .select(func.explode(func.split('line',' '))).alias('word'))

shakespeareWordsDF.toPandas().head()

Unnamed: 0,col
0,1609
1,
2,the
3,sonnets
4,


In [43]:
# TEST Words from lines (4d)
# This test allows for leading spaces to be removed either before or after
# punctuation is removed.
Test.assertTrue(shakespeareWordsDF.count() in [927631, 928908],
                'incorrect number of words in shakespeareWordsDF')

1 test passed.


### (4e) Remove empty elements

The next step is to filter out the empty elements.  Remove all entries where the word is `''`.

In [52]:
# TODO: Replace <FILL IN> with appropriate code
shakeWordsDF = shakespeareWordsDF.filter(func.length('word') > 0)
shakeWordCount = shakeWordsDF.count()
print(shakeWordCount)

AnalysisException: "cannot resolve '`word`' given input columns: [col];;\n'Filter (length('word) > 0)\n+- SubqueryAlias word\n   +- Project [col#95]\n      +- Generate explode(split(line#74,  )), false, false, [col#95]\n         +- Project [removePunctuation(value#71) AS line#74]\n            +- Relation[value#71] text\n"

In [47]:
# TEST Remove empty elements (4e)
Test.assertEquals(shakeWordCount, 882996, 'incorrect value for shakeWordCount')

NameError: name 'shakeWordCount' is not defined

### (4f) Count the words

We now have an RDD that is only words.  Next, let's apply the `wordCount()` function to produce a list of word counts. We can view the top 15 words by using the `takeOrdered()` action; however, since the elements of the RDD are pairs, we need a custom sort function that sorts using the value part of the pair.

You'll notice that many of the words are common English words. These are called stopwords. In a later lab, we will see how to eliminate them from the results.

Use the `wordCount()` function and `takeOrdered()` to obtain the fifteen most common words and their counts.

In [51]:
# TODO: Replace <FILL IN> with appropriate code
top15WordsAndCounts = wordCount(ShakeWordsDF.sort('count',ascending = False).head(15))

for row in top15WordsAndCounts:
    print('{0}: {1}'.format(*row))

NameError: name 'ShakeWordsDF' is not defined

In [None]:
# TEST Count the words (4f)
Test.assertEquals(top15WordsAndCounts,
                  [(u'the', 27361), (u'and', 26028), (u'i', 20681), (u'to', 19150), (u'of', 17463),
                   (u'a', 14593), (u'you', 13615), (u'my', 12481), (u'in', 10956), (u'that', 10890),
                   (u'is', 9134), (u'not', 8497), (u'with', 7771), (u'me', 7769), (u'it', 7678)],
                  'incorrect value for top15WordsAndCounts')