ASSIGNMENT 1: SPARK

# Kaggle: Spooky Author Identification
Developed to run in Databricks environment

- https://www.kaggle.com/competitions/spooky-author-identification

Competition Task: Predict the author of excerpts from horror stories by Edgar Allan Poe, Mary Shelley, and HP Lovecraft.

Competition Files:
- train.csv (training dataset 3.14 MB)
- test.csv (test dataset 1.28 MB)

Data Fields:
- The training dataset has 3 fields:
  - id (String): unique identifier
  - text (String): text excerpt by one of the named authors
  - author (String): author of the given text excerpt (EAP, MWS, HPL)

## 1. Project Setup

1.1 Inspect Spooky Files

Preview train.csv and test.csv before loading and defining schemas

The train.csv file reveals 3 columns:
- id
- text
- author

In [None]:
spark.read.csv("../sample-data/train.csv", header=True, inferSchema=True)

While test.csv file has only 2 columns:

- id
- text



In [0]:

%fs head /FileStore/tables/test.csv

1.2 Project Imports

In [0]:
from pyspark.sql.types import StructType, StructField, StringType # Schema definition
from pyspark.sql.functions import col, length, avg, stddev, min, max, explode, split, lower, trim
from pyspark.ml.feature import Tokenizer, RegexTokenizer, StopWordsRemover, CountVectorizer, IDF, StringIndexer
from pyspark.ml.classification import LogisticRegression, RandomForestClassifier, NaiveBayes
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.functions import vector_to_array

1.3 Define Schemas

We could use Sparks infer schema here, but it's best practice to manually define the schema.

In [0]:
# Train.csv
trainSchema = StructType([
    StructField("id", StringType(), True),
    StructField("text", StringType(), True),
    StructField("author", StringType(), True)
])

# Test.csv (no author)
testSchema = StructType([
    StructField("id", StringType(), True),
    StructField("text", StringType(), True)
])

1.4 Create Dataframes

Load & Show Data

In [0]:

# File paths (kaggle files uploaded to Databrickss)
trainPath = "/FileStore/tables/train.csv"
testPath = "/FileStore/tables/test.csv"
# File type being read
fileType = "csv"
# Load kaggle train.csv into Dataframe
k_trainDF = (
    spark.read.format(fileType)
        .option('header', 'true')
        .option('escape', '"') # ensures csv is read by spark correctly
        .option('quote', '"')
        .option('multiLine', 'true')
        .schema(trainSchema)
        .load(trainPath)
)

# Load kaggle test.csv into Dataframe
k_testDF = (
    spark.read.format(fileType)
        .option('header', 'true')
        .option('escape', '"')
        .schema(testSchema)
        .load(testPath)
)

# Quick check Dataframes before caching
k_trainDF.show(5, truncate=70) # show max 70 chars per field
k_testDF.show(5, truncate=70)

k_trainDF.printSchema()
k_testDF.printSchema()

+-------+----------------------------------------------------------------------+------+
|     id|                                                                  text|author|
+-------+----------------------------------------------------------------------+------+
|id26305|This process, however, afforded me no means of ascertaining the dim...|   EAP|
|id17569|It never once occurred to me that the fumbling might be a mere mist...|   HPL|
|id11008|In his left hand was a gold snuff box, from which, as he capered do...|   EAP|
|id27763|How lovely is spring As we looked from Windsor Terrace on the sixte...|   MWS|
|id12958|Finding nothing else, not even gold, the Superintendent abandoned h...|   HPL|
+-------+----------------------------------------------------------------------+------+
only showing top 5 rows

+-------+----------------------------------------------------------------------+
|     id|                                                                  text|
+-------+------------

1.5 Verify Data Integrity

Basic integrity check before deeper data analysis and normalisation

In [0]:

# check for null or empty values
trainNullCheck = k_trainDF.filter(
    (col("text").isNull()) | (col("text") == "") | (col("author").isNull())
)
testNullCheck = k_testDF.filter(
    (col("text").isNull()) | (col("text") == "")
)

print("Null check train dataframe:", trainNullCheck.count())
print("Null check test dataframe:", testNullCheck.count())

# Check for duplicate rows in training dataset
dupeCheckDF = (
    k_trainDF
        .groupBy("text", "author")
        .count()
        .where(col("count") > 1)
)

print("Check for duplicate values: ")
dupeCheckDF.show()

# Row count
print("Train dataframe row count: ", k_trainDF.count())
print("Test dataframe row count: ", k_testDF.count())

# No null, empty or duplicate values were found

# Check partitioning allocated
print("Number of partitions in Kaggle trainDF:", k_trainDF.rdd.getNumPartitions())
print("Number of partitions in Kaggle testDF:", k_testDF.rdd.getNumPartitions())

""" 
Sparks partitioning system usually automatically splits data into 8 partitions.
In this case, it hasn't as the train dataset is only 3.14 MB (extremely small for Spark!). 
I have forcibly repartitioned the train dataframe into 8 partitions.
However, for such a small dataset this won't provide any extra performance benefits and is just for demonstration purposes.
"""
k_trainDF = k_trainDF.repartition(8)
print("Number of partitions in Kaggle trainDF:", k_trainDF.rdd.getNumPartitions())

Null check train dataframe: 0
Null check test dataframe: 0
Check for duplicate values: 
+----+------+-----+
|text|author|count|
+----+------+-----+
+----+------+-----+

Train dataframe row count:  19579
Test dataframe row count:  8392
Number of partitions in Kaggle trainDF: 1
Number of partitions in Kaggle testDF: 1
Number of partitions in Kaggle trainDF: 8


1.6 Caching

Cache the training dataframe as it will undergo multiple transformations

In [0]:

k_trainDF.cache()
print("k_trainDF dataframe has been cached!")

k_trainDF dataframe has been cached!


## 2. Data Exploration

2.1 Author Exploration

In [0]:

print("====== Distinct Authors ======")
k_trainDF.select("author").distinct().show()

print("====== Distribution of Entries by Author ======")
authorDistributionDF = (
    k_trainDF
        .groupBy("author")
        .count()
        .orderBy(col("count").desc())
)
authorDistributionDF.show()

+------+
|author|
+------+
|   MWS|
|   HPL|
|   EAP|
+------+

+------+-----+
|author|count|
+------+-----+
|   EAP| 7900|
|   MWS| 6044|
|   HPL| 5635|
+------+-----+



2.2 Text Exploration

In [0]:
# Create copy of dataframe 'with column' textLen
textExplorationDF = (
    k_trainDF
        .withColumn("textLen", length(col("text")))
        .select("id", "author", "text", "textLen")
)

print("====== Longest Excerpts -> descending ======")
textExplorationDF.orderBy(col("textLen").desc()).show(100)

print("====== Shortest Excerpts -> ascending ======")
textExplorationDF.orderBy(col("textLen").asc()).show(10)

print("====== TextLen Column Stats ======")
textExplorationDF.select("textLen").describe().show()

"""
Findings:
- 19,579 text excerpts
- Longest text excerpt is 4,663 characters
- Shortest text excerpt is 21 characters
- Average text excerpt length is approx 149 characters
- Standard deviation in lenghth is appox 107 characters 
- The top 5 longest excerpts all belong to Mary Shelley (MWS)
- The very long text excerpts appear to be outliers, 
  the majority of entries are below 1000 characters
"""

+-------+------+--------------------+-------+
|     id|author|                text|textLen|
+-------+------+--------------------+-------+
|id27184|   MWS|Diotima approache...|   4663|
|id17485|   MWS|Oh no I will beco...|   3048|
|id13677|   MWS|They are gone for...|   2275|
|id20439|   MWS|To chambers of pa...|   2200|
|id20549|   MWS|As for those who ...|   1849|
|id12818|   EAP|Burning with the ...|   1533|
|id20687|   EAP|I have much to sa...|   1523|
|id24207|   MWS|At first indeed t...|   1032|
|id19267|   MWS|This also was my ...|    995|
|id03319|   MWS|I invited him to ...|    993|
|id02143|   MWS|Oh God help me Le...|    983|
|id23272|   EAP|awful calamity wa...|    964|
|id06581|   MWS|They were certain...|    948|
|id05745|   MWS|It was in Rome th...|    924|
|id25287|   EAP|It should be reme...|    910|
|id04831|   EAP|To muse for long ...|    908|
|id14957|   HPL|A weak, filtered ...|    900|
|id24339|   MWS|He seemed incapab...|    857|
|id14117|   EAP|End of Text Notes.

2.3 Text Length Distribution by Author

In [0]:
authorStatsDF = (
    textExplorationDF
        .groupBy("author")
        .agg(
            avg("textLen").alias("avgTxtLen"),
            stddev("textLen").alias("stdTxtLen"),
            min("textLen").alias("minTxtLen"),
            max("textLen").alias("maxTxtLen"))
        .orderBy(col("avgTxtLen").desc())
)

print("====== Distribution of Text Length by Author ======")
authorStatsDF.show()

"""
Findings:
- Average text length approximately the same across authors, ranging from 142 to 156 
  characters
- Standard deviation is considerably higher for MWS (127) - EAP (106), HPL (82)
- Min text Length is the same for all authors
"""

+------+------------------+------------------+---------+---------+
|author|         avgTxtLen|         stdTxtLen|minTxtLen|maxTxtLen|
+------+------------------+------------------+---------+---------+
|   HPL|155.84347826086957| 82.02064723110335|       21|      900|
|   MWS|151.65982792852415|126.30500751783727|       21|     4663|
|   EAP|142.22594936708862|105.75133420774067|       21|     1533|
+------+------------------+------------------+---------+---------+

Out[8]: '\nFindings:\n- Average text length approximately the same across authors, ranging from 142 to 156 \n  characters\n- Standard deviation is considerably higher for MWS (127) - EAP (106), HPL (82)\n- Min text Length is the same for all authors\n'

2.4 Word Frequency

In [0]:
wordsDF =(
    textExplorationDF
    # explode text excerpts into single words
    .select("author", explode(split(col("text"), " ")).alias("rawWord"))
    # Convert to lowercase, trim whitespace
    .select("author", lower(trim(col("rawWord"))).alias("word"))
    # Ignore empty strings
    .filter(col("word")!="")
)

topWordsDF = (
    wordsDF
        .groupBy("word")
        .count()
        .orderBy(col("count").desc())
)
print("====== Top 20 Most Common Words ======")
topWordsDF.show(20)

topWordsByAuthorDF =(
    wordsDF
        .groupBy("author", "word")
        .count()
        .orderBy(col("count").desc())
)
print("====== Top 40 Most Common Words by Author ======")
topWordsByAuthorDF.show(40)

+-----+-----+
| word|count|
+-----+-----+
|  the|35425|
|   of|20931|
|  and|17507|
|   to|12778|
|    a|10672|
|    i|10383|
|   in| 9383|
|  was| 6471|
| that| 6142|
|   my| 5367|
|  had| 4369|
| with| 4334|
|   he| 4286|
|   it| 4129|
|  his| 4073|
|   as| 3802|
|  for| 3437|
|   at| 3207|
|which| 3206|
|  but| 3144|
+-----+-----+
only showing top 20 rows

+------+----+-----+
|author|word|count|
+------+----+-----+
|   EAP| the|14887|
|   HPL| the|10923|
|   MWS| the| 9615|
|   EAP|  of| 8961|
|   MWS|  of| 6134|
|   HPL| and| 6090|
|   MWS| and| 6002|
|   HPL|  of| 5836|
|   EAP| and| 5415|
|   MWS|  to| 4812|
|   EAP|  to| 4722|
|   EAP|   a| 4674|
|   MWS|   i| 4155|
|   EAP|  in| 4086|
|   EAP|   i| 3599|
|   HPL|   a| 3293|
|   HPL|  to| 3244|
|   HPL|  in| 2713|
|   MWS|   a| 2705|
|   MWS|  my| 2632|
|   HPL|   i| 2629|
|   MWS|  in| 2584|
|   MWS| was| 2211|
|   EAP|that| 2139|
|   EAP| was| 2132|
|   HPL| was| 2128|
|   MWS|that| 2020|
|   HPL|that| 1983|
|   EAP|  it| 1963

Top Words
- Overall 20 top words

In [0]:
display(topWordsDF.limit(100))

word,count
the,35425
of,20931
and,17507
to,12778
a,10672
i,10383
in,9383
was,6471
that,6142
my,5367


Databricks visualization. Run in Databricks to view.

## 3. Data Preparation & Feature Engineering
- Demonstration of a basic pipeline
- Sequential creation and testing of each feature
- Logistic Regression is manually implemented as final modelling step for testing    
  purposes

3.1 Split Training Dataset

Train/Validate Split 70/30


In [0]:
trainDF, valDF = k_trainDF.randomSplit([0.7, 0.3], seed=42)
trainDF.cache()
valDF.cache()

print("====== k_trainDF Split ======")
print("The original kaggle dataframe:", k_trainDF.count(), "rows")
print("trainDF:", trainDF.count(), "rows -", trainDF.count()/k_trainDF.count(), "%")
print("testDF:", valDF.count(), "rows -", valDF.count()/k_trainDF.count(), "%")

#  Transformations are done on a copy of trainDF to keep original in tact for final pipeline implementation
demoDF = trainDF.select("*")

The original kaggle dataframe: 19579 rows
trainDF: 13830 rows - 0.7063690689003524 %
testDF: 5749 rows - 0.2936309310996476 %


3.2 Tokenization
- Plain tokenizer: splits strings on whitespace and returns array of tokens 
- Punctuation is preserveed

In [0]:
# Plain tokenizer - punctuation is preserved
tokenizer = Tokenizer(inputCol="text", outputCol="plainTokens")
tokenizedDF = tokenizer.transform(demoDF)

print("====== Plain Tokenizer Output ======")
tokenizedDF.select("author", "text", "plainTokens").show(10, truncate=50)

+------+--------------------------------------------------+--------------------------------------------------+
|author|                                              text|                                       plainTokens|
+------+--------------------------------------------------+--------------------------------------------------+
|   EAP|His warehouse was the resort of all the princip...|[his, warehouse, was, the, resort, of, all, the...|
|   HPL|The moribund hermit's rage and fear, swelling t...|[the, moribund, hermit's, rage, and, fear,, swe...|
|   MWS|Raymond was weak and exhausted, yet the interes...|[raymond, was, weak, and, exhausted,, yet, the,...|
|   HPL|As the weeks passed, I observed with regret tha...|[as, the, weeks, passed,, i, observed, with, re...|
|   MWS|Death, cruel and relentless, had entered these ...|[death,, cruel, and, relentless,, had, entered,...|
|   HPL|An' the smell was awful, like what it is araoun...|[an', the, smell, was, awful,, like, what, it, ...|
|

3.3 Regex Tokenization
- I ended up using this instead of the plain tokenizer
- Custom regex pattern allows you to define how you want to split the data
- More precision and control

In [0]:

# Use custom regex pattern - punctuation is removed
regexTokenizer =(
    RegexTokenizer(
        inputCol="text",
        outputCol="tokens",
        pattern="[^\w']+"  # splits on anything that is not a word char or apostrophe
    )
)

regexTokenizedDF = regexTokenizer.transform(demoDF)
print("====== Regex Tokenizer Output ======")
regexTokenizedDF.select("author", "text", "tokens").show(10, truncate=50)
display(regexTokenizedDF)

+------+--------------------------------------------------+--------------------------------------------------+
|author|                                              text|                                            tokens|
+------+--------------------------------------------------+--------------------------------------------------+
|   EAP|His warehouse was the resort of all the princip...|[his, warehouse, was, the, resort, of, all, the...|
|   HPL|The moribund hermit's rage and fear, swelling t...|[the, moribund, hermit's, rage, and, fear, swel...|
|   MWS|Raymond was weak and exhausted, yet the interes...|[raymond, was, weak, and, exhausted, yet, the, ...|
|   HPL|As the weeks passed, I observed with regret tha...|[as, the, weeks, passed, i, observed, with, reg...|
|   MWS|Death, cruel and relentless, had entered these ...|[death, cruel, and, relentless, had, entered, t...|
|   HPL|An' the smell was awful, like what it is araoun...|[an', the, smell, was, awful, like, what, it, i...|
|

id,text,author,tokens
id00034,"His warehouse was the resort of all the principal people of the place, and especially of the editorial corps a body which inspires all about it with profound veneration and awe.",EAP,"List(his, warehouse, was, the, resort, of, all, the, principal, people, of, the, place, and, especially, of, the, editorial, corps, a, body, which, inspires, all, about, it, with, profound, veneration, and, awe)"
id00049,"The moribund hermit's rage and fear, swelling to grotesque proportions, seemed likely to shatter what remained of his failing physique; and once a spasm caused him to clap his hands to his eyes and rush into the bathroom.",HPL,"List(the, moribund, hermit's, rage, and, fear, swelling, to, grotesque, proportions, seemed, likely, to, shatter, what, remained, of, his, failing, physique, and, once, a, spasm, caused, him, to, clap, his, hands, to, his, eyes, and, rush, into, the, bathroom)"
id00079,"Raymond was weak and exhausted, yet the interest he perceived to be excited on his account, filled him with proud pleasure.",MWS,"List(raymond, was, weak, and, exhausted, yet, the, interest, he, perceived, to, be, excited, on, his, account, filled, him, with, proud, pleasure)"
id00107,"As the weeks passed, I observed with regret that my new friend was indeed slowly but unmistakably losing ground physically, as Mrs. Herrero had suggested.",HPL,"List(as, the, weeks, passed, i, observed, with, regret, that, my, new, friend, was, indeed, slowly, but, unmistakably, losing, ground, physically, as, mrs, herrero, had, suggested)"
id00108,"Death, cruel and relentless, had entered these beloved walls.",MWS,"List(death, cruel, and, relentless, had, entered, these, beloved, walls)"
id00128,"An' the smell was awful, like what it is araound Wizard Whateley's ol' haouse. . .",HPL,"List(an', the, smell, was, awful, like, what, it, is, araound, wizard, whateley's, ol', haouse)"
id00159,Where if anywhere had he been on those nights of daemoniac alienage?,HPL,"List(where, if, anywhere, had, he, been, on, those, nights, of, daemoniac, alienage)"
id00161,These responses from aesthetes told a disturbing tale.,HPL,"List(these, responses, from, aesthetes, told, a, disturbing, tale)"
id00198,I do not believe I need to wait for the full change as most have waited.,HPL,"List(i, do, not, believe, i, need, to, wait, for, the, full, change, as, most, have, waited)"
id00239,"The author who was much thought of in his day was one Miller, or Mill; and we find it recorded of him, as a point of some importance, that he had a mill horse called Bentham.",EAP,"List(the, author, who, was, much, thought, of, in, his, day, was, one, miller, or, mill, and, we, find, it, recorded, of, him, as, a, point, of, some, importance, that, he, had, a, mill, horse, called, bentham)"


3.4 Stopword Remover
- Default list of english stopwords are removed (e.g. 'the, and, a, as...')
- Can create your own custom stopwords list by appending to the default

In [0]:

customStopWords = StopWordsRemover.loadDefaultStopWords("english") # to customise concat "+ ('word to remove', ...)"
print("====== Default & Custom Stop Words ======")
print(customStopWords)

remover = StopWordsRemover(inputCol="tokens", outputCol="filteredTokens", stopWords=customStopWords)
stopWordsRemovedDF = remover.transform(regexTokenizedDF)

print("====== Tokens Before/After Stopword Removal ======")
stopWordsRemovedDF.select("tokens", "filteredTokens").show(5, truncate=50)

['i', 'me', 'my', 'myself', 'we', 'our', 'ours', 'ourselves', 'you', 'your', 'yours', 'yourself', 'yourselves', 'he', 'him', 'his', 'himself', 'she', 'her', 'hers', 'herself', 'it', 'its', 'itself', 'they', 'them', 'their', 'theirs', 'themselves', 'what', 'which', 'who', 'whom', 'this', 'that', 'these', 'those', 'am', 'is', 'are', 'was', 'were', 'be', 'been', 'being', 'have', 'has', 'had', 'having', 'do', 'does', 'did', 'doing', 'a', 'an', 'the', 'and', 'but', 'if', 'or', 'because', 'as', 'until', 'while', 'of', 'at', 'by', 'for', 'with', 'about', 'against', 'between', 'into', 'through', 'during', 'before', 'after', 'above', 'below', 'to', 'from', 'up', 'down', 'in', 'out', 'on', 'off', 'over', 'under', 'again', 'further', 'then', 'once', 'here', 'there', 'when', 'where', 'why', 'how', 'all', 'any', 'both', 'each', 'few', 'more', 'most', 'other', 'some', 'such', 'no', 'nor', 'not', 'only', 'own', 'same', 'so', 'than', 'too', 'very', 's', 't', 'can', 'will', 'just', 'don', 'should', 'no

3.5 Quick Exploration of Filtered Data
- Examined the data once more -- this time with the stopwords removed

In [0]:
# Top tokens
tokensDF = (
    stopWordsRemovedDF
        .select(explode(col("filteredTokens")).alias("token"))
        .groupBy("token")
        .count()
        .orderBy(col("count").desc())
)
print("====== Top 20 Filtered Words (tokens) ======")
tokensDF.show(20)

# Top tokens by authors
tokensByAuthorDF = (
    stopWordsRemovedDF
        .select("author", explode(col("filteredTokens")).alias("token"))
        .groupBy("author","token")
        .count()
        .orderBy(col("count").desc())
)
print("====== Top 20 Filtered Words (tokens) by Author ======")
tokensByAuthorDF.show(20)

# Top tokens HPL
tokensByHPL =(
    tokensByAuthorDF
        .filter(col("author") == "HPL")
        .orderBy(col("count").desc())
)
# tokensByHPL.show(50)

# Top tokens EAP
tokensByEAP =(
    tokensByAuthorDF
        .filter(col("author") == "EAP")
        .orderBy(col("count").desc())
)
# tokensByEAP.show(50)

# Top tokens MWS
tokensByMWS =(
    tokensByAuthorDF
        .filter(col("author") == "MWS")
        .orderBy(col("count").desc())
)
# tokensByMWS.show(50)

+------+-----+
| token|count|
+------+-----+
|   one| 1078|
|  upon|  984|
|   yet|  512|
|  time|  507|
|   man|  491|
|  even|  485|
|  said|  477|
|  like|  440|
| might|  432|
|    us|  428|
|   old|  425|
|  must|  422|
| first|  421|
| night|  404|
| never|  404|
|  made|  390|
|seemed|  388|
|  life|  386|
|  eyes|  381|
|little|  379|
+------+-----+
only showing top 20 rows

+------+------+-----+
|author| token|count|
+------+------+-----+
|   EAP|  upon|  701|
|   EAP|   one|  427|
|   MWS|   one|  330|
|   HPL|   one|  321|
|   HPL|   old|  279|
|   EAP|  said|  233|
|   MWS|  life|  229|
|   MWS|   yet|  225|
|   EAP|little|  214|
|   MWS|    us|  198|
|   MWS|  love|  196|
|   HPL|seemed|  194|
|   EAP|   say|  193|
|   MWS| heart|  188|
|   HPL|  like|  185|
|   MWS|  eyes|  184|
|   EAP|  even|  183|
|   EAP| first|  182|
|   MWS| might|  181|
|   EAP|  time|  181|
+------+------+-----+
only showing top 20 rows



HP Lovecraft (HPL) - Top 100 words

In [0]:

display(tokensByHPL.limit(100))

author,token,count
HPL,one,321
HPL,old,279
HPL,seemed,194
HPL,like,185
HPL,night,178
HPL,man,169
HPL,things,169
HPL,time,165
HPL,saw,165
HPL,though,160


Databricks visualization. Run in Databricks to view.

Edgar Allan Poe (EAP) - Top 100 words

In [0]:

display(tokensByEAP.limit(100))

author,token,count
EAP,upon,701
EAP,one,427
EAP,said,233
EAP,little,214
EAP,say,193
EAP,even,183
EAP,first,182
EAP,time,181
EAP,well,180
EAP,however,176


Databricks visualization. Run in Databricks to view.

Mary Shelley (MWS) - Top 100 words

In [0]:

display(tokensByMWS.limit(100))

author,token,count
MWS,one,330
MWS,life,229
MWS,yet,225
MWS,us,198
MWS,love,196
MWS,heart,188
MWS,eyes,184
MWS,might,181
MWS,raymond,179
MWS,even,171


Databricks visualization. Run in Databricks to view.

3.6 TF-IDF (Term Frequency - Inverse Documentation Frequency)
- A two step process involving CountVectorizer and IDF

Count vectorizer
- Convert filtered tokens into term frequency vectors

In [0]:

# Convert filtered tokens into term frequency vectors
countVec = CountVectorizer(inputCol="filteredTokens", outputCol="termFrequency")
countVec_model = countVec.fit(stopWordsRemovedDF)               # Fit countVec mode
vectorizedDF = countVec_model.transform(stopWordsRemovedDF)     # Tranform Dataframe

print("====== Term Frequency ======")
vectorizedDF.select("filteredTokens", "termFrequency").show(20, truncate=50)

+--------------------------------------------------+--------------------------------------------------+
|                                    filteredTokens|                                     termFrequency|
+--------------------------------------------------+--------------------------------------------------+
|[warehouse, resort, principal, people, place, e...|(22384,[52,130,185,422,1035,1897,2069,5599,6869...|
|[moribund, hermit's, rage, fear, swelling, grot...|(22384,[16,18,97,183,230,517,1191,1303,1863,224...|
|[raymond, weak, exhausted, yet, interest, perce...|(22384,[2,94,272,298,306,455,458,528,1165,2419,...|
|[weeks, passed, observed, regret, new, friend, ...|(22384,[68,89,99,122,279,385,707,863,922,1390,2...|
|[death, cruel, relentless, entered, beloved, wa...|(22384,[41,225,335,542,2660,4444],[1.0,1.0,1.0,...|
|[an', smell, awful, like, araound, wizard, what...|(22384,[7,144,1117,2459,2483,3950,5043,7842,896...|
|           [anywhere, nights, daemoniac, alienage]|  (22384,[26

IDF (Inverse Documentation Frequency)
- Upweight unique words
- Downweight common words

In [0]:

idf = IDF(inputCol="termFrequency", outputCol="tfidfFeatures")
idf_model = idf.fit(vectorizedDF) # fit idf_model
tfidfDF = idf_model.transform(vectorizedDF) # Transform dataframe

print("====== TF-IDF Features ======")
tfidfDF.select("termFrequency", "tfidfFeatures").show(20, truncate=50)

+--------------------------------------------------+--------------------------------------------------+
|                                     termFrequency|                                     tfidfFeatures|
+--------------------------------------------------+--------------------------------------------------+
|(22384,[52,130,185,422,1035,1897,2069,5599,6869...|(22384,[52,130,185,422,1035,1897,2069,5599,6869...|
|(22384,[16,18,97,183,230,517,1191,1303,1863,224...|(22384,[16,18,97,183,230,517,1191,1303,1863,224...|
|(22384,[2,94,272,298,306,455,458,528,1165,2419,...|(22384,[2,94,272,298,306,455,458,528,1165,2419,...|
|(22384,[68,89,99,122,279,385,707,863,922,1390,2...|(22384,[68,89,99,122,279,385,707,863,922,1390,2...|
|(22384,[41,225,335,542,2660,4444],[1.0,1.0,1.0,...|(22384,[41,225,335,542,2660,4444],[3.9817081437...|
|(22384,[7,144,1117,2459,2483,3950,5043,7842,896...|(22384,[7,144,1117,2459,2483,3950,5043,7842,896...|
|  (22384,[2652,3080,6252,10675],[1.0,1.0,1.0,1.0])|(22384,[2652

3.7 String Indexer
- Assign numeric index values to each unique author
- Authors are labelled from 0 to 1, 0 being most frequent and 2 least

In [0]:

si = StringIndexer(inputCol="author", outputCol="authorIndex")
si_model = si.fit(tfidfDF) # fit string indexer model
indexedDF = si_model.transform(tfidfDF) # Transform dataframe

print("====== Author String Indexing ======")
indexedDF.select("author", "authorIndex").show(10)

print("====== Label Order ======")
print(si_model.labels)

"""
String Indexing Label Ordering: ['EAP', 'MWS', 'HPL']
EAP -> 0
MWS -> 1
HPL -> 2
"""

+------+-----------+
|author|authorIndex|
+------+-----------+
|   EAP|        0.0|
|   HPL|        2.0|
|   MWS|        1.0|
|   HPL|        2.0|
|   MWS|        1.0|
|   HPL|        2.0|
|   HPL|        2.0|
|   HPL|        2.0|
|   HPL|        2.0|
|   EAP|        0.0|
+------+-----------+
only showing top 10 rows

['EAP', 'MWS', 'HPL']
Out[17]: "\nString Indexing Label Ordering: ['EAP', 'MWS', 'HPL']\nEAP -> 0\nMWS -> 1\nHPL -> 2\n"

3.8 Logistic Regression Manual
- Testing basic pipeline with with logistic regression

In [0]:

lr_demo = LogisticRegression(featuresCol="tfidfFeatures", labelCol="authorIndex", regParam=0.1)
lr_model_demo = lr_demo.fit(indexedDF)              # Fit logistic regression model
lr_preds_demo = lr_model_demo.transform(indexedDF)

print("====== Logistic Regression Predictions ======")
lr_preds_demo.select("id", "author", "authorIndex", "prediction", "probability").show()

+-------+------+-----------+----------+--------------------+
|     id|author|authorIndex|prediction|         probability|
+-------+------+-----------+----------+--------------------+
|id00034|   EAP|        0.0|       0.0|[0.96006240178304...|
|id00049|   HPL|        2.0|       2.0|[0.02736386616541...|
|id00079|   MWS|        1.0|       1.0|[0.06961296061569...|
|id00107|   HPL|        2.0|       2.0|[0.08191137265469...|
|id00108|   MWS|        1.0|       1.0|[0.18320025462655...|
|id00128|   HPL|        2.0|       2.0|[0.03193269043645...|
|id00159|   HPL|        2.0|       2.0|[0.06774607763422...|
|id00161|   HPL|        2.0|       2.0|[0.03396396634197...|
|id00198|   HPL|        2.0|       2.0|[0.24457367474160...|
|id00239|   EAP|        0.0|       0.0|[0.95532536550354...|
|id00260|   MWS|        1.0|       1.0|[0.10815025211117...|
|id00276|   HPL|        2.0|       2.0|[0.06904639982673...|
|id00292|   MWS|        1.0|       1.0|[0.36896286981304...|
|id00314|   HPL|        

## 4. Model Creation & Evaluation

4.1 Pipeline Builder Function and MCE Evaluator
- buildPipeline function takes a classifier as a parameter (LR, NB, RF), and applies it as the final pipeline   
  stage.
- MCE (MulticlassClassificationEvaluator) evaluates the model's performance on logLoss - was also used   
  to test accuracy.

In [0]:

# Combines previous steps to build the pipeline
def buildPipeline(classifier):
    return Pipeline(
        stages=[
            regexTokenizer,     # Cell 34
            remover,            # Cell 36
            countVec,           # Cell 40
            idf,                # Cell 42
            si,                 # Cell 44
            classifier
        ]
    )

# evaluate logLoss
mce = MulticlassClassificationEvaluator(
    labelCol="authorIndex",
    predictionCol="prediction",
    probabilityCol="probability",   # need for logLoss evaluation
    metricName="logLoss"            # also tested accuracy here
)

4.2 Model Tuning Function
- modelTuningFunction uses CrossValidator to test each parameter combination
- Returns the best model, predictions and log loss value


In [0]:

def modelTuning(trainData, testData, pipeline, paramgrid, evaluator=mce, folds=3, parallelism=5):
    crossVal = CrossValidator(
        estimator = pipeline,               # trained pipeline
        estimatorParamMaps = paramgrid,     # hyperparameter tuning values
        evaluator = evaluator,              # multiclass classification evaluator for log loss
        numFolds = folds,                   # number of subsets to split the training data
        parallelism =  parallelism          # number of param settings tested in parallel
    )
    crossVal_model = crossVal.fit(trainData)
    predictions = crossVal_model.transform(testData)
    logloss = evaluator.evaluate(predictions)

    return crossVal_model, predictions, logloss

4.3 Logistic Regression
- Applies logistic (sigmoid) function to weighted sum of TF-IDF features to calculate probability of each author

In [0]:

lr = LogisticRegression(featuresCol="tfidfFeatures", labelCol="authorIndex", family="multinomial")

lr_paramGrid = (
    ParamGridBuilder()
        .addGrid(lr.regParam,[0.08, 0.09])
        .addGrid(lr.elasticNetParam, [0.0])
        .addGrid(countVec.vocabSize,[11500, 12000])
        .build()
)

# Build and fine tune Logistic Regression pipeline
lr_pipeline = buildPipeline(lr) 
lr_model, lr_preds, lr_logloss = modelTuning(trainDF, valDF, lr_pipeline, lr_paramGrid)

# Best model parameters
lr_bestModel = lr_model.bestModel           # Extract best model
lr_best = lr_bestModel.stages[-1]           # LR last step in pipeline
lr_countVec_best = lr_bestModel.stages[2]   # CountVec 3rd step in pipeline


print("\n=================== LR Predictions ===================")
lr_preds.show(5)

print("\n=================== LR LogLoss ===================")
print("LogLoss on 30% split       :", lr_logloss)

print("\n=================== LR Best Model ===================")
print("Best regParam              :", lr_best.getRegParam())
print("Best elasticNetParam       :", lr_best.getElasticNetParam())
print("Best vocabSize             :", lr_countVec_best.getVocabSize())


+-------+--------------------+------+--------------------+--------------------+--------------------+--------------------+-----------+--------------------+--------------------+----------+
|     id|                text|author|              tokens|      filteredTokens|       termFrequency|       tfidfFeatures|authorIndex|       rawPrediction|         probability|prediction|
+-------+--------------------+------+--------------------+--------------------+--------------------+--------------------+-----------+--------------------+--------------------+----------+
|id00070|Yet, even then, I...|   MWS|[yet, even, then,...|[yet, even, check...|(12000,[0,2,5,9,1...|(12000,[0,2,5,9,1...|        1.0|[-1.8000968476202...|[0.01105778700637...|       1.0|
|id00123|The writer spoke ...|   EAP|[the, writer, spo...|[writer, spoke, a...|(12000,[39,69,99,...|(12000,[39,69,99,...|        0.0|[3.39036782274577...|[0.98646584468935...|       0.0|
|id00133|Found in a Bottle...|   EAP|[found, in, a, bo...|[found

4.4 Naive Bayes
- Applies Bayes theorem under assumption all features are independent of each other
- Returns probabilistic outcomes for each author 

In [0]:

nb = NaiveBayes(featuresCol="tfidfFeatures", labelCol="authorIndex", modelType="multinomial")

# Define hyperparameters for NB classifier
nb_paramGrid = (
    ParamGridBuilder()
        .addGrid(nb.smoothing, [560, 570, 580])
        .addGrid(countVec.vocabSize, [22000, 22500])
        .build()
)

# Build and fine tune NB pipeline
nb_pipeline = buildPipeline(nb) 
nb_model, nb_preds, nb_logloss = modelTuning(trainDF, valDF, nb_pipeline, nb_paramGrid)

# Best model parameters
nb_bestModel = nb_model.bestModel           # Extract best model
nb_best = nb_bestModel.stages[-1]           # NB last step in pipeline
nb_countVec_best = nb_bestModel.stages[2]   # CountVec 3rd step in pipeline


print("\n=================== NB Predictions ===================")
print("LogLoss on 30% split       :", nb_logloss)
print("\n=================== NB Best Model ===================")
print("Best smoothing             :", nb_best.getSmoothing())
print("Best vocabSize             :", nb_countVec_best.getVocabSize())
print("\n=================== NB Predictions ===================")
nb_preds.show(5)


LogLoss on 30% split       : 0.5912778480258282

Best smoothing             : 570.0
Best vocabSize             : 22500

+-------+--------------------+------+--------------------+--------------------+--------------------+--------------------+-----------+--------------------+--------------------+----------+
|     id|                text|author|              tokens|      filteredTokens|       termFrequency|       tfidfFeatures|authorIndex|       rawPrediction|         probability|prediction|
+-------+--------------------+------+--------------------+--------------------+--------------------+--------------------+-----------+--------------------+--------------------+----------+
|id00070|Yet, even then, I...|   MWS|[yet, even, then,...|[yet, even, check...|(22384,[0,2,5,9,1...|(22384,[0,2,5,9,1...|        1.0|[-723.54346479882...|[0.00224869235772...|       1.0|
|id00123|The writer spoke ...|   EAP|[the, writer, spo...|[writer, spoke, a...|(22384,[39,68,99,...|(22384,[39,68,99,...|        0.

4.5 Random Forest
- Type of decision tree algorithm - trains multiple trees instead of one (reduces risk of overfitting)

In [0]:
rf = RandomForestClassifier(featuresCol="tfidfFeatures", labelCol="authorIndex")

rf_paramGrid = (
    ParamGridBuilder()
        .addGrid(rf.maxDepth, [10, 15])
        .addGrid(rf.numTrees, [60, 100])
        .addGrid(countVec.vocabSize, [2000, 4000])
        .build()
)

rf_pipeline = buildPipeline(rf) # Random Forest pipeline
rf_model, rf_preds, rf_logLoss = modelTuning(trainDF, valDF, rf_pipeline, rf_paramGrid) # Model fine tuning

rf_bestModel = rf_model.bestModel # RF best model
rf_best = rf_bestModel.stages[-1] # RF -> last step in pipeline stages
countVec_best = rf_bestModel.stages[2] # CountVec -> 3rd step in pipeline stages

print("====== RF Predictions ======")
rf_preds.show(5)

print("====== RF LogLoss ======")
print("LogLoss on 30% split       :", rf_logLoss)

print("====== RF Best Model ======")
print("Best numTrees              :", rf_best.getNumTrees)
print("Best maxDepth              :", rf_best.getMaxDepth())
print("Best vocabSize             :", countVec_best.getVocabSize())

+-------+--------------------+------+--------------------+--------------------+--------------------+--------------------+-----------+--------------------+--------------------+----------+
|     id|                text|author|              tokens|      filteredTokens|       termFrequency|       tfidfFeatures|authorIndex|       rawPrediction|         probability|prediction|
+-------+--------------------+------+--------------------+--------------------+--------------------+--------------------+-----------+--------------------+--------------------+----------+
|id00070|Yet, even then, I...|   MWS|[yet, even, then,...|[yet, even, check...|(2000,[0,2,5,9,11...|(2000,[0,2,5,9,11...|        1.0|[36.5762993546024...|[0.36576299354602...|       0.0|
|id00123|The writer spoke ...|   EAP|[the, writer, spo...|[writer, spoke, a...|(2000,[40,70,98,1...|(2000,[40,70,98,1...|        0.0|[42.0379316519967...|[0.42037931651996...|       0.0|
|id00133|Found in a Bottle...|   EAP|[found, in, a, bo...|[found,

## 5. Kaggle Submission

5.1 Final Pipeline - Retraining & Generating Predictions
- Logistic regression was the top performing classifier
- Hardcode LR and its best param values into the final pipeline
- Pipeline is retrained on the full kaggle training file  - k_trainDF
- k_testDF (the unlabelled test set) is transformed by the trained pipeline to generate probability predictions

In [0]:

# Final Logistic Regression implementation with best values
logisticRegression = LogisticRegression(
    featuresCol="tfidfFeatures", 
    labelCol="authorIndex",
    regParam=0.09,
    elasticNetParam=0.0
)

# Update final vocab size
countVec.setVocabSize(11500)

# Build final pipeline with tuned LR
pipeline = buildPipeline(logisticRegression)

# Train the model on the full train file
trainedPL = pipeline.fit(k_trainDF)

# Final predictions based on full test file
final_preds = trainedPL.transform(k_testDF)
final_preds.select("id", "text", "prediction", "probability").show(5)

+-------+--------------------+----------+--------------------+
|     id|                text|prediction|         probability|
+-------+--------------------+----------+--------------------+
|id02310|Still, as I urged...|       1.0|[0.13479444020730...|
|id24541|If a fire wanted ...|       0.0|[0.83705385665936...|
|id00134|And when they had...|       2.0|[0.22179307587592...|
|id27757|While I was think...|       2.0|[0.47027982913499...|
|id04081|I am not sure to ...|       0.0|[0.84319002900923...|
+-------+--------------------+----------+--------------------+
only showing top 5 rows



5.2 Create Kaggle Submission File
- Check label ordering matches Kaggles submission requirements
- Organise columns
- Create submission CSV file

In [0]:

# Label ordering - ensure authors are in correct order for competition submission
string_indexer = trainedPL.stages[4] # si (string indexer) at index 4 of pipeline
labelOrder = string_indexer.labels
print("Label Order:", labelOrder)

kaggleDF = (
    final_preds
        .withColumn("probArray", vector_to_array(col("probability")))
        .withColumn("EAP", col("probArray")[labelOrder.index("EAP")])
        .withColumn("MWS", col("probArray")[labelOrder.index("MWS")])
        .withColumn("HPL", col("probArray")[labelOrder.index("HPL")])
        .select("id", "EAP", "MWS", "HPL")
)

# Review final submission DF
kaggleDF.show(10, truncate=False)
display(kaggleDF)

# Uncomment to create submission file - coalesce repartitions (if necessary) before writing file
# kaggleDF.coalesce(1).write.csv("/FileStore/tables/spookySub.csv", header=True)

Label Order: ['EAP', 'MWS', 'HPL']
+-------+-------------------+---------------------+--------------------+
|id     |EAP                |MWS                  |HPL                 |
+-------+-------------------+---------------------+--------------------+
|id02310|0.1347944402073078 |0.8113291676238609   |0.05387639216883136 |
|id24541|0.8370538566593693 |0.023099708194767844 |0.1398464351458628  |
|id00134|0.22179307587592462|0.015052794829501033 |0.7631541292945744  |
|id27757|0.4702798291349928 |0.019735858358769672 |0.5099843125062377  |
|id04081|0.8431900290092367 |0.09246901786496164  |0.06434095312580175 |
|id27337|0.4325554062991552 |0.04040306750728614  |0.5270415261935586  |
|id24265|0.5852345200583124 |0.1229289615772901   |0.2918365183643974  |
|id25917|0.1906415278919855 |0.5468666302606607   |0.2624918418473538  |
|id04951|0.9779549044564125 |0.0017186458575208913|0.020326449686066474|
|id14549|0.6921052207483169 |0.1741563980094436   |0.13373838124223938 |
+-------+-------

id,EAP,MWS,HPL
id02310,0.1347944402073078,0.8113291676238609,0.0538763921688313
id24541,0.8370538566593693,0.0230997081947678,0.1398464351458628
id00134,0.2217930758759246,0.015052794829501,0.7631541292945744
id27757,0.4702798291349928,0.0197358583587696,0.5099843125062377
id04081,0.8431900290092367,0.0924690178649616,0.0643409531258017
id27337,0.4325554062991552,0.0404030675072861,0.5270415261935586
id24265,0.5852345200583124,0.1229289615772901,0.2918365183643974
id25917,0.1906415278919855,0.5468666302606607,0.2624918418473538
id04951,0.9779549044564124,0.0017186458575208,0.0203264496860664
id14549,0.6921052207483169,0.1741563980094436,0.1337383812422393


5.3 Download Spooky CSV

In [0]:

%fs ls /FileStore/tables/