In [1]:
import findspark
findspark.init()

import pyspark
sc = pyspark.SparkContext(appName="wordcount")

#### Basic operations

In [3]:
wordlist = ['cat', 'elephant', 'rat', 'rat', 'cat']
wordrdd  = sc.parallelize(wordlist, 4)

type(wordrdd)

pyspark.rdd.RDD

In [5]:
#Pluralize word
def makeplural(text):
    return text + 's'

makeplural('cat')

'cats'

In [15]:
# pass defined function to map
pluralrdd = wordrdd.map(makeplural).collect()

# pass lambda function to map
plurallambdardd = wordrdd.map(lambda x: x + 's').collect()

pluralrdd,plurallambdardd

(['cats', 'elephants', 'rats', 'rats', 'cats'],
 ['cats', 'elephants', 'rats', 'rats', 'cats'])

In [10]:
# length of word
def lenword(text):
    return len(text)

lenword('cat')

3

In [16]:
# use defined function lenword
lenwordrdd = wordrdd.map(lenword).collect()

# use lambda function
lenlambdaword = wordrdd.map(lambda x: len(x)).collect()

lenwordrdd, lenlambdaword

([3, 8, 3, 3, 3], [3, 8, 3, 3, 3])

In [20]:
# pairs of RDDs
wordpairsrdd = wordrdd.map(lambda x: (x,1))

wordpairsrdd.collect()

[('cat', 1), ('elephant', 1), ('rat', 1), ('rat', 1), ('cat', 1)]

#### Counting with pair RDDs

In [31]:
#grouByKey() to collect the pairs (key, value)
wordgroupbykeyrdd = wordpairsrdd.groupByKey()

for key, value in wordgroupbykeyrdd.collect():
    print('{0},{1}'.format(key,list(value)))

# use the groupbykey RDD to make the count
wordgroupbykeycountrdd = wordgroupbykeyrdd.map(lambda item: (item[0],sum(item[1]))) # index 0 is the key, index 1 is the value

wordgroupbykeycountrdd.collect()

cat,[1, 1]
elephant,[1]
rat,[1, 1]


[('cat', 2), ('elephant', 1), ('rat', 2)]

In [32]:
#reduceByKey() to count the pairs directly without need to group them
wordreducebykeyrdd = wordpairsrdd.reduceByKey(lambda x,y: x+y)

wordreducebykeyrdd.collect()

[('cat', 2), ('elephant', 1), ('rat', 2)]

In [51]:
#putting it together
wordcountrdd = (wordrdd
                .map(lambda x: (x,1))
                .reduceByKey(lambda x,y : x+y))

wordcountrdd.collect()

[('cat', 2), ('elephant', 1), ('rat', 2)]

In [64]:
# also we can use countByValue() and get returned a dictionary of keys with their values
wordrdd.countByValue()

defaultdict(int, {'cat': 2, 'elephant': 1, 'rat': 2})

#### Finding unique words and average value

In [52]:
# use count method on wordcount
uniquewords = wordcountrdd.count()

uniquewords

3

In [59]:
# find the mean
totalcount = (wordcountrdd
              .map(lambda item: item[1])
              .reduce(lambda x,y: x+y))
average = totalcount/float(uniquewords)
print('Total count of words is {} and the average value is {:.2f}'.format(totalcount,average))

Total count of words is 5 and the average value is 1.67


#### Apply word count to Shakespear plays

In [60]:
# create a defined function to count words
def countwords(wordrdd):
    
    return (wordrdd
            .map(lambda x: (x,1))
            .reduceByKey(lambda x,y: x+y))

In [61]:
countwords(wordrdd).collect()

[('cat', 2), ('elephant', 1), ('rat', 2)]

In [65]:
# deal with Capitalization and punctuation 
import re
import string

In [340]:
#function to remove punctuation
def removepunctuation(text):
    
    regex = re.compile('[%s]' % (string.punctuation))
    txt = regex.sub('',text.lower()).strip()
    return ''.join([x for x in txt if ord(x)<128]) # be sure that we remove non ascii characters

In [341]:
# test function
removepunctuation('Hi, you!'), removepunctuation(' No under_score!'), removepunctuation('“’tis')

('hi you', 'no underscore', 'tis')

In [220]:
# access shakespear text file from http://www.gutenberg.org
import requests
from bs4 import BeautifulSoup

In [221]:
page = requests.get('http://www.gutenberg.org/files/100/100-0.txt')
soup = BeautifulSoup(page.content, 'html.parser')

In [271]:
text = soup.contents[0]

In [272]:
text[:2970].splitlines() # after trial and error, 2970 seemed a good choice for the header that doesn't need to be included

['',
 'Project Gutenberg’s The Complete Works of William Shakespeare, by William',
 'Shakespeare',
 '',
 'This eBook is for the use of anyone anywhere in the United States and',
 'most other parts of the world at no cost and with almost no restrictions',
 'whatsoever.  You may copy it, give it away or re-use it under the terms',
 'of the Project Gutenberg License included with this eBook or online at',
 'www.gutenberg.org.  If you are not located in the United States, you’ll',
 'have to check the laws of the country where you are located before using',
 'this ebook.',
 '',
 'See at the end of this file: * CONTENT NOTE (added in 2017) *',
 '',
 '',
 'Title: The Complete Works of William Shakespeare',
 '',
 'Author: William Shakespeare',
 '',
 'Release Date: January 1994 [EBook #100]',
 'Last Updated: February 19, 2018',
 '',
 'Language: English',
 '',
 'Character set encoding: UTF-8',
 '',
 '*** START OF THIS PROJECT GUTENBERG EBOOK THE COMPLETE WORKS OF WILLIAM SHAKESPEARE ***',
 '',
 

In [273]:
header = 2970

In [274]:
text = text[header:].splitlines() # slice the start of the actual text

In [342]:
shakespeareRDD = (sc
                  .parallelize(text , 8)
                  .map(removepunctuation)) # make an RDD with every element is a new line of the text


In [343]:
shakespeareRDD.take(10) # top 10 first lines

['',
 '',
 'the sonnets',
 '',
 '1',
 '',
 'from fairest creatures we desire increase',
 'that thereby beautys rose might never die',
 'but as the riper should by time decease',
 'his tender heir might bear his memory']

In [344]:
print( '\n'.join(shakespeareRDD
                .zipWithIndex()  # to (line, lineNum)
                .map(lambda item: '{0}: {1}'.format(item[1], item[0]))  # to 'lineNum: line'
                .take(15)))

0: 
1: 
2: the sonnets
3: 
4: 1
5: 
6: from fairest creatures we desire increase
7: that thereby beautys rose might never die
8: but as the riper should by time decease
9: his tender heir might bear his memory
10: but thou contracted to thine own bright eyes
11: feedst thy lights flame with selfsubstantial fuel
12: making a famine where abundance lies
13: thy self thy foe to thy sweet self too cruel
14: thou that art now the worlds fresh ornament


In [345]:
shakespeareWordsRDD = shakespeareRDD.flatMap(lambda x: x.split()) # RDD of all words split and flattened
shakespeareWordCount = shakespeareWordsRDD.count()

shakespeareWordsRDD.top(20),shakespeareWordCount

(['zwounds',
  'zwounds',
  'zwounds',
  'zwounds',
  'zwounds',
  'zwounds',
  'zwounds',
  'zwaggered',
  'zounds',
  'zounds',
  'zounds',
  'zounds',
  'zounds',
  'zounds',
  'zounds',
  'zounds',
  'zounds',
  'zounds',
  'zounds',
  'zounds'],
 958999)

In [360]:
# display 15 top most frequent words in descending order
top15wordscount = countwords(shakespeareWordsRDD).takeOrdered(15,lambda item: -item[1])
print('\n'.join(map(lambda item: '{0}: {1}'.format(item[0], item[1]), top15wordscount)))

the: 29943
and: 28346
i: 21860
to: 20884
of: 18774
a: 15990
you: 14434
my: 13191
in: 12024
that: 11781
is: 9710
not: 9067
with: 8519
me: 8271
for: 8182
