# Word Counter Application using PySpark

### This is an app for a simple word counter.

In [None]:
# I have developed the app to calculate the most common words
# in the Complete Works of William Shakespeare from
# Project Gutenberg. http://www.gutenberg.org/files/100/100-0.txt
# This could be scaled to find the most common words on the internet.

In [1]:
import os
import sys
from pyspark import SparkContext

In [2]:
from pyspark import SparkConf
from pyspark.sql import SparkSession

In [8]:
# Establishing Spark Context and spark session

sc.stop()
conf = SparkConf().setAppName('appName').setMaster('local')
sc = SparkContext(conf=conf)
spark = SparkSession(sc)


In [13]:
# Created a Resilient Distributed Dataset

wordsList = ['cat', 'elephant', 'rat', 'cat', 'rat']
wordsRDD = sc.parallelize(wordsList, 4)

In [17]:
# Write a function to pluralize each word 

def add_s_to_word(word):
    return word + "s"

In [113]:
# Pass each item in the RDD into a map() transformation that applies 
# makePlural() function to each item

pluralRDD = wordsRDD.map(add_s_to_word)
print(pluralRDD.collect())

['dogs', 'bats', 'bears', 'lions', 'bears', 'dogs']


In [25]:
# Use map and lambda function to return
# the number of occurences in each word. 

pluralLengths = (pluralRDD.map(lambda x: len(x))).collect()
print(pluralLengths)

[4, 9, 4, 4, 4]


In [26]:
# Create a new type of RDD called a pair RDD.
# A Pair RDD is an RDD, where each element is a (k, v) pair where,
# k is the key and v is the value

# In this cell, we will emit (word, 1) pairs

wordPairs = wordsRDD.map(lambda x : (x, 1))
wordPairs.collect()



[('cat', 1), ('elephant', 1), ('rat', 1), ('cat', 1), ('rat', 1)]

In [34]:
# Count with pair RDDs

# Brute Force Approach -> collect() all of the elements and count
# them in the driver program. But, that is really not effecient.


# groupByKey() approach
# Use groupByKey() to generate a pair RDD of type ('word', iterator)
wordsGrouped = wordPairs.groupByKey()
for key, value in wordsGrouped.collect():
    print('{0}: {1}'.format(key, list(value)))
    print(key, sum(value))
    print('######')
    

cat: [1, 1]
cat 2
######
elephant: [1]
elephant 1
######
rat: [1, 1]
rat 2
######


# reduceByKey()

In [38]:
print(wordPairs.collect())
wordCounts = wordPairs.reduceByKey(lambda x,y: x+y)
wordCounts.collect()

[('cat', 1), ('elephant', 1), ('rat', 1), ('cat', 1), ('rat', 1)]


[('cat', 2), ('elephant', 1), ('rat', 2)]

# All together

In [39]:
wordCountsCollected = wordsRDD.map(lambda x: (x, 1)).reduceByKey(lambda x, y: x+y)
print(wordCountsCollected.collect())

[('cat', 2), ('elephant', 1), ('rat', 2)]


# Part 2: Finding unique words and a mean value

In [47]:
# Find unique words in wordsRDD

uniqueWords = wordsRDD.distinct().count()
print(uniqueWords)

3


In [50]:
# Find the mean number of words per unique word in wordCounts.

# Use a reduce() action to sum the counts in wordCounts 
# and then divide by the number of unique words
from operator import add
totalCount = (wordCounts.map(lambda x : x[1]).reduce(lambda x, y: x+y))
average = totalCount/uniqueWords
print(totalCount)
print(average)
print(round(average, 2))

5
1.6666666666666667
1.67


# Part 3: Apply word count to a file

In [108]:
# Generate (word, count) pairs for all words in the file
# Create a function which takes a RDD as an input
# and returns <word, count> pairs

def word_count(wordListRDD):
    emitWords = wordListRDD.map(lambda x: (x, 1))
    wordCountPairs = emitWords.reduceByKey(lambda x, y : x+ y)
    return wordCountPairs


wordList = ['dog', 'bat', 'bear', 'lion', 'bear', 'dog']
wordsRDD = sc.parallelize(wordList)
print(word_count(wordsRDD))

PythonRDD[143] at RDD at PythonRDD.scala:53


# Capitalization and Punctuation

In [59]:
# 1. Words should be counted independent of lower or upper case.
# 2. Punctuations should not be considered.
# 3. Any leading or trailing spaces should be removed.

import re
def removePunctuationWithoutRegex(line):
    line = line.lower().strip()
    words = []
    temp = ""
    for char in line:
        if ' ' == char:
            words.append(temp)
            temp = ""
            continue
        if char.isdigit():
            temp += char
            continue
        if char in 'abcdefghijklmnopqrstuvwxyz':
            temp += char
    if char != ' ':
        words.append(temp)
    return words


def removePunctuation(string):
    string = string.strip().lower()
    string = re.sub(r'[^0-9a-zA-Z\s]', '', string)
    return string
    

help(re.sub)
test_cases = [" ksdfY'bjksdh, YY'Y uI      ", "Hi, you!", "  No under_score!"]
for string in test_cases:
    print(removePunctuation(string))

Help on function sub in module re:

sub(pattern, repl, string, count=0, flags=0)
    Return the string obtained by replacing the leftmost
    non-overlapping occurrences of the pattern in string by the
    replacement repl.  repl can be either a string or a callable;
    if a string, backslash escapes in it are processed.  If it is
    a callable, it's passed the match object and must return
    a replacement string to be used.

ksdfybjksdh yyy ui
hi you
no underscore


# Load a text file

In [94]:
# We will use "Complete Works of William Shakespeare" from 
# Project Gutenberg -> http://www.gutenberg.org/files/100/100-0.txt
# We use SparkContext.textFile() to convert a text file into a RDD
import os.path

baseDir = os.getcwd()
# inputDir = os.path.join('General', 'Spark', 'works_of_shakespeare.txt')
# fileName = os.path.join(baseDir, inputDir)
fileName = os.path.join(baseDir, 'works_of_shakespeare.txt')

shakespeareRDD = sc.textFile(fileName, 8).map(removePunctuation)
print('\n'.join(shakespeareRDD
               .zipWithIndex() # to (line, lineNum)
               .map(lambda x: '{0}: {1}'.format(x[1], x[0])) # to 'lineNum: line'
                .take(15)))


0: 
1: project gutenbergs the complete works of william shakespeare by william
2: shakespeare
3: 
4: this ebook is for the use of anyone anywhere in the united states and
5: most other parts of the world at no cost and with almost no restrictions
6: whatsoever  you may copy it give it away or reuse it under the terms
7: of the project gutenberg license included with this ebook or online at
8: wwwgutenbergorg  if you are not located in the united states youll
9: have to check the laws of the country where you are located before using
10: this ebook
11: 
12: 
13: title the complete works of william shakespeare
14: 


### # Before we calculate wordCount, we need two address two issues

### First issue is that we need to split each line by its spaces.

In [102]:
shakespeareWordsRDD = shakespeareRDD.flatMap(lambda x: x.split(' '))
shakespeareWordCount = shakespeareWordsRDD.count()
print(shakespeareWordsRDD.top(5))
print(shakespeareWordCount)

['zwounds', 'zwounds', 'zwounds', 'zwounds', 'zwounds']
1006999


### Second issue is we need to remove empty lines.

In [104]:
# This step involves removing all entries where the word is ""
shakeWordsRDD = shakespeareWordsRDD.filter(lambda x: x!="")
shakeWordCount = shakeWordsRDD.count()
print(shakeWordCount)

960941


### Count the words

In [109]:
top15WordsAndCounts = word_count(shakeWordsRDD).takeOrdered(15, lambda x:-1*x[1])
print('\n'.join(map(lambda x: '{0}: {1}'.format(x[0], x[1]), top15WordsAndCounts)))

the: 30186
and: 28388
i: 21944
to: 20912
of: 18811
a: 16150
you: 14437
my: 13182
in: 12186
that: 11778
is: 9713
not: 9066
with: 8524
me: 8263
for: 8195
