## Clean Hamlet script using Spark pipelines

This notebook outlines a basic cleaning procedure to get acustomed to using Spark pipelines for cleaning data sets.
The data set used here is a text file containing script lines from Hamlet. The idea is to import the data in an RDD
and clean it using custom transformation functions.

In [1]:
import findspark
findspark.init()

# Import PySpark and initialize SparkContext object.
import pyspark
sc = pyspark.SparkContext()

Load the file into an RDD and examine the format:

In [2]:
raw_hamlet = sc.textFile("hamlet.txt")
raw_hamlet.take(10)

['hamlet@0\t\tHAMLET',
 'hamlet@8',
 'hamlet@9',
 'hamlet@10\t\tDRAMATIS PERSONAE',
 'hamlet@29',
 'hamlet@30',
 'hamlet@31\tCLAUDIUS\tking of Denmark. (KING CLAUDIUS:)',
 'hamlet@74',
 'hamlet@75\tHAMLET\tson to the late, and nephew to the present king.',
 'hamlet@131']

From here we can see that the lines are in tab-separated format so we need to split based on tabs. Let's first separate each element to get a better understanding of the data structure.

In [3]:
# split data based on tab separators
split_hamlet = raw_hamlet.map(lambda line: line.split('\t'))

The first value in each element that identifies the line from the play that appears in the following format: <br>
`'hamlet@0'`,<br>
`'hamlet@8'`,<br>
`'hamlet@9'`,<br>
`...` <br>

We don't need the `hamlet@` portion of the identifier so we will remove that.

In [4]:
# define function to format line ids
def format_ids(line):
    x_id = line[0].split('@')[1]
    new_line = list()
    new_line.append(x_id)
    # return the rest of the line
    if len(line) > 1:
        for l in line[1:]:
            new_line.append(l)
            
    return new_line

hamlet_with_ids = split_hamlet.map(lambda line: format_ids(line))
hamlet_with_ids.take(10)

[['0', '', 'HAMLET'],
 ['8'],
 ['9'],
 ['10', '', 'DRAMATIS PERSONAE'],
 ['29'],
 ['30'],
 ['31', 'CLAUDIUS', 'king of Denmark. (KING CLAUDIUS:)'],
 ['74'],
 ['75', 'HAMLET', 'son to the late, and nephew to the present king.'],
 ['131']]

Now lets can remomve the empty lines:

In [5]:
real_text = hamlet_with_ids.filter(lambda line: len(line) >1)
real_text.take(10)

[['0', '', 'HAMLET'],
 ['10', '', 'DRAMATIS PERSONAE'],
 ['31', 'CLAUDIUS', 'king of Denmark. (KING CLAUDIUS:)'],
 ['75', 'HAMLET', 'son to the late, and nephew to the present king.'],
 ['132', 'POLONIUS', 'lord chamberlain. (LORD POLONIUS:)'],
 ['177', 'HORATIO', 'friend to Hamlet.'],
 ['204', 'LAERTES', 'son to Polonius.'],
 ['230', 'LUCIANUS', 'nephew to the king.'],
 ['261', 'VOLTIMAND', '|'],
 ['273', '', '|']]

From the first 10 entries we can see that there are elements with blank values `''` and pipe symbols `'|'`, which clutter the data set. Lets now remove those:

In [6]:
real_text_only = real_text.map(lambda line: [l for l in line if l != ''])
real_text_only.take(10)

# remove pipe symbols with custom map function
def remove_pipes(line):
    new_line = list()
    for l in line:
        if l == '|':
            pass
        elif  '|' in l:
            new_line.append(l.replace('|',''))
        else:
            new_line.append(l)
            
    return new_line

real_text_no_pipes = real_text_only.map(lambda line: remove_pipes(line))
real_text_no_pipes.take(10)

[['0', 'HAMLET'],
 ['10', 'DRAMATIS PERSONAE'],
 ['31', 'CLAUDIUS', 'king of Denmark. (KING CLAUDIUS:)'],
 ['75', 'HAMLET', 'son to the late, and nephew to the present king.'],
 ['132', 'POLONIUS', 'lord chamberlain. (LORD POLONIUS:)'],
 ['177', 'HORATIO', 'friend to Hamlet.'],
 ['204', 'LAERTES', 'son to Polonius.'],
 ['230', 'LUCIANUS', 'nephew to the king.'],
 ['261', 'VOLTIMAND'],
 ['273']]

Finally, lets remove entries with only a line id (no text in line):

In [7]:
clean_hamlet_text = real_text_no_pipes.filter(lambda line: len(line) > 1)
clean_hamlet_text.take(10)

[['0', 'HAMLET'],
 ['10', 'DRAMATIS PERSONAE'],
 ['31', 'CLAUDIUS', 'king of Denmark. (KING CLAUDIUS:)'],
 ['75', 'HAMLET', 'son to the late, and nephew to the present king.'],
 ['132', 'POLONIUS', 'lord chamberlain. (LORD POLONIUS:)'],
 ['177', 'HORATIO', 'friend to Hamlet.'],
 ['204', 'LAERTES', 'son to Polonius.'],
 ['230', 'LUCIANUS', 'nephew to the king.'],
 ['261', 'VOLTIMAND'],
 ['276', 'CORNELIUS']]

Now we have a clean data set that could be used for further analysis. As a final exercise, lets implement all of these steps in a single pipeline:

In [8]:
clean_hamlet_pipeline = raw_hamlet.map(lambda line: line.split('\t'))\
.map(lambda line: format_ids(line))\
.filter(lambda line: len(line) >1)\
.map(lambda line: [l for l in line if l != ''])\
.map(lambda line: remove_pipes(line))\
.filter(lambda line: len(line) > 1)

clean_hamlet_pipeline.take(10)

[['0', 'HAMLET'],
 ['10', 'DRAMATIS PERSONAE'],
 ['31', 'CLAUDIUS', 'king of Denmark. (KING CLAUDIUS:)'],
 ['75', 'HAMLET', 'son to the late, and nephew to the present king.'],
 ['132', 'POLONIUS', 'lord chamberlain. (LORD POLONIUS:)'],
 ['177', 'HORATIO', 'friend to Hamlet.'],
 ['204', 'LAERTES', 'son to Polonius.'],
 ['230', 'LUCIANUS', 'nephew to the king.'],
 ['261', 'VOLTIMAND'],
 ['276', 'CORNELIUS']]

We obtain the same result with a single pipeline as with intermediate transformation-action stages.