In [1]:
# Find path to PySpark.
import findspark
#findspark.init() 


# even though SPARK_HOME is defined in .zshrc, had to put it here
findspark.init('/Users/otto/spark-2.4.4-bin-hadoop2.6/')

In [2]:
# Import PySpark and initialize SparkContext object.
import pyspark
sc = pyspark.SparkContext()

In [4]:
# Read `recent-grads.csv` in to an RDD.
f = sc.textFile('recent-grads.csv')
data = f.map(lambda line: line.split('\n'))
data.take(3)

[['Rank,Major_code,Major,Total,Men,Women,Major_category,ShareWomen,Sample_size,Employed,Full_time,Part_time,Full_time_year_round,Unemployed,Unemployment_rate,Median,P25th,P75th,College_jobs,Non_college_jobs,Low_wage_jobs'],
 ['1,2419,PETROLEUM ENGINEERING,2339,2057,282,Engineering,0.120564344,36,1976,1849,270,1207,37,0.018380527,110000,95000,125000,1534,364,193'],
 ['2,2416,MINING AND MINERAL ENGINEERING,756,679,77,Engineering,0.101851852,7,640,556,170,388,85,0.117241379,75000,55000,90000,350,257,50']]

In [5]:
raw_hamlet = sc.textFile("hamlet.txt")
raw_hamlet.take(5)

['hamlet@0\t\tHAMLET',
 'hamlet@8',
 'hamlet@9',
 'hamlet@10\t\tDRAMATIS PERSONAE',
 'hamlet@29']

In [6]:
split_hamlet = raw_hamlet.map(lambda line: line.split('\t'))
split_hamlet.take(5)

[['hamlet@0', '', 'HAMLET'],
 ['hamlet@8'],
 ['hamlet@9'],
 ['hamlet@10', '', 'DRAMATIS PERSONAE'],
 ['hamlet@29']]

we'll use the flatMap() method with the named function hamlet_speaks to check whether a line in the play contains the text HAMLET in all caps (indicating that Hamlet spoke). flatMap() is different than map() because it doesn't require an output for every element in the RDD. The flatMap() method is useful whenever we want to generate a sequence of values from an RDD.

In this case, we want an RDD object that contains tuples of the unique line IDs and the text "hamlet speaketh!," but only for the elements in the RDD that have "HAMLET" in one of the values. We can't use the map() method for this because it requires a return value for every element in the RDD.

We want each element in the resulting RDD to have the following format:

The first value should be the unique line ID (e.g.'hamlet@0') , which is the first value in each of the elements in the split_hamlet RDD.
The second value should be the string "hamlet speaketh!"

In [7]:
def hamlet_speaks(line):
    id = line[0]
    speaketh = False
    
    if "HAMLET" in line:
        speaketh = True
    
    if speaketh:
        yield id,"hamlet speaketh!"

hamlet_spoken = split_hamlet.flatMap(lambda x: hamlet_speaks(x))
# hamlet_spoken.take(10)

hamlet_spoken now contains the line numbers for the lines where Hamlet spoke. While this is handy, we don't have the full line anymore. Instead, let's use a filter() with a named function to extract the original lines where Hamlet spoke. The functions we pass into filter() must return values, which will be either True or False.

In [8]:
def filter_hamlet_speaks(line):
    if "HAMLET" in line:
        return True
    else:
        return False

hamlet_spoken_lines = split_hamlet.filter(lambda line: filter_hamlet_speaks(line))
hamlet_spoken_lines.take(5)

[['hamlet@0', '', 'HAMLET'],
 ['hamlet@75', 'HAMLET', 'son to the late, and nephew to the present king.'],
 ['hamlet@1004', '', 'HAMLET'],
 ['hamlet@9144', '', 'HAMLET'],
 ['hamlet@12313',
  'HAMLET',
  '[Aside]  A little more than kin, and less than kind.']]

In [9]:
spoken_count = 0
spoken_101 = list()

spoken_count = hamlet_spoken_lines.count()
spoken_collect = hamlet_spoken_lines.collect()
spoken_101 = spoken_collect[100]

print(spoken_count)

381


In [10]:
raw_hamlet = sc.textFile("hamlet.txt")
split_hamlet = raw_hamlet.map(lambda line: line.split('\t'))
split_hamlet.take(5)

# remove @ from id
def format_id(x):
    id = x[0].split('@')[1]
    results = list()
    results.append(id)
    if len(x) > 1:
        for y in x[1:]:
            results.append(y)
    return results

hamlet_with_ids = split_hamlet.map(lambda line: format_id(line))
hamlet_with_ids.take(10)

[['0', '', 'HAMLET'],
 ['8'],
 ['9'],
 ['10', '', 'DRAMATIS PERSONAE'],
 ['29'],
 ['30'],
 ['31', 'CLAUDIUS', 'king of Denmark. (KING CLAUDIUS:)'],
 ['74'],
 ['75', 'HAMLET', 'son to the late, and nephew to the present king.'],
 ['131']]

In [11]:
#hamlet_with_ids.take(5)

# Remove blank values
real_text = hamlet_with_ids.filter(lambda line: len(line) > 1)
hamlet_text_only = real_text.map(lambda line: [l for l in line if l != ''])
hamlet_text_only.take(10)

[['0', 'HAMLET'],
 ['10', 'DRAMATIS PERSONAE'],
 ['31', 'CLAUDIUS', 'king of Denmark. (KING CLAUDIUS:)'],
 ['75', 'HAMLET', 'son to the late, and nephew to the present king.'],
 ['132', 'POLONIUS', 'lord chamberlain. (LORD POLONIUS:)'],
 ['177', 'HORATIO', 'friend to Hamlet.'],
 ['204', 'LAERTES', 'son to Polonius.'],
 ['230', 'LUCIANUS', 'nephew to the king.'],
 ['261', 'VOLTIMAND', '|'],
 ['273', '|']]

In [12]:
# Remove pipe chars
def fix_pipe(line):
    results = list()
    for l in line:
        if l == "|":
            pass
        elif "|" in l:
            fmtd = l.replace("|", "")
            results.append(fmtd)
        else:
            results.append(l)
    return results

clean_hamlet = hamlet_text_only.map(lambda line: fix_pipe(line))