- Type your name on a code block to indicate if you are still working on the codes.
- Try running codes in a separate Colab file to test the results and then copy the codes in this main file.
- I am using Kaggle API to download the data. Use your own API token from Kaggle to run the codes.

Stage 0 - Import: Completed (Shaon)
Stage 1- Data Prep: Shaon (will work from Friday night)/ Jacob
Stage 2 - Feature Extraction (Jacob)
Stage 3 - Machine Learning (Alex)
Stage 4 - Evaluation (Nelson)

In [None]:
# Upload Kaggle API file
from google.colab import files
print ("Upload your Kaggle API .JSON File")
files.upload()

Upload your Kaggle API .JSON File


Saving kaggle.json to kaggle (1).json


{'kaggle (1).json': b'{"username":"mshaon","key":"0598fced3b3a9586d32b772969c5c789"}'}

In [None]:
# Move API to appropriate folder
!mkdir -p ~/.kaggle
!mv kaggle.json ~/.kaggle/
!chmod 600 ~/.kaggle/kaggle.json

In [None]:
# Download target data
!kaggle competitions download -c spooky-author-identification

spooky-author-identification.zip: Skipping, found more recently modified local copy (use --force to force download)


In [None]:
# Unzip the main competition archive (this might contain all the others)
!unzip -q spooky-author-identification.zip

# Unzip individual components (if they weren't in the main archive, or just to be sure)
!unzip -q train.zip
!unzip -q test.zip
!unzip -q sample_submission.zip

replace sample_submission.zip? [y]es, [n]o, [A]ll, [N]one, [r]ename: y
replace test.zip? [y]es, [n]o, [A]ll, [N]one, [r]ename: y
replace train.zip? [y]es, [n]o, [A]ll, [N]one, [r]ename: y
replace train.csv? [y]es, [n]o, [A]ll, [N]one, [r]ename: y
replace test.csv? [y]es, [n]o, [A]ll, [N]one, [r]ename: y
replace sample_submission.csv? [y]es, [n]o, [A]ll, [N]one, [r]ename: y


In [None]:
!ls

'kaggle (1).json'	 sample_submission.zip		    test.zip
 sample_data		 spooky-author-identification.zip   train.csv
 sample_submission.csv	 test.csv			    train.zip


In [None]:
# Install PySpark and FindSpark
!pip install pyspark findspark

Collecting findspark
  Downloading findspark-2.0.1-py2.py3-none-any.whl.metadata (352 bytes)
Downloading findspark-2.0.1-py2.py3-none-any.whl (4.4 kB)
Installing collected packages: findspark
Successfully installed findspark-2.0.1


In [None]:
# Initialize FindSpark
import findspark
findspark.init()

# Import SparkSession
from pyspark.sql import SparkSession

In [None]:
# Create a SparkSession
spark = SparkSession.builder.appName("SpookyAuthorship").getOrCreate()

In [None]:
# Load the training data
train_df = spark.read.csv("train.csv", header=True, inferSchema=True)

# Load the test data
test_df = spark.read.csv("test.csv", header=True, inferSchema=True)

# Display schema and show a few rows of the training data
print("Train Data Schema:")
train_df.printSchema()
print("\nTrain Data Sample:")
train_df.show(5, truncate=False)

# Display schema and show a few rows of the test data
print("\nTest Data Schema:")
test_df.printSchema()
print("\nTest Data Sample:")
test_df.show(5, truncate=False)

# Display the size of the dataframes
print(f"\nTrain Data Size: {train_df.count()} rows")
print(f"Test Data Size: {test_df.count()} rows")

# Check distribution of authors in training data
print("\nAuthor Distribution in Training Data:")
train_df.groupBy("author").count().show()

Train Data Schema:
root
 |-- id: string (nullable = true)
 |-- text: string (nullable = true)
 |-- author: string (nullable = true)


Train Data Sample:
+-------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+------+
|id     |text                                                                                                                                                                                                                                   |author|
+-------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+------+
|id26305|This process, however, afforded me no means of ascertaining the dimensions of my dungeon; a

In [None]:
# Import necessary PySpark functions and libraries
from pyspark.sql.functions import lower, regexp_replace, udf
from pyspark.sql.types import ArrayType, StringType
from pyspark.ml.feature import Tokenizer, StopWordsRemover
import re

# --- Data Preprocessing ---

# 1. Convert text to lowercase and remove special characters
# Define a UDF for cleaning text (removing special characters and extra spaces)
def clean_text_udf(text):
    if text is None:
        return None
    # Remove non-alphanumeric characters (keeping spaces)
    text = re.sub(r'[^a-zA-Z\s]', '', text)
    # Replace multiple spaces with a single space
    text = re.sub(r'\s+', ' ', text).strip()
    return text

# Register the UDF
clean_text = udf(clean_text_udf, StringType())

# Apply lowercase and cleaning to the 'text' column for both train and test DataFrames
# Ensure the 'text' column is converted to string type before applying lower()
train_df = train_df.withColumn("cleaned_text", clean_text(lower(train_df["text"].cast(StringType()))))
test_df = test_df.withColumn("cleaned_text", clean_text(lower(test_df["text"].cast(StringType()))))

print("Data after lowercasing and special character removal (Train):")
train_df.select("text", "cleaned_text").show(5, truncate=False)
print("\nData after lowercasing and special character removal (Test):")
test_df.select("text", "cleaned_text").show(5, truncate=False)




Data after lowercasing and special character removal (Train):
+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|text                                                                                                                                                                                                                                   |cleaned_text                                                                                                                                                                                                                    |
+-------------------

In [None]:
# 2. Tokenization
# Initialize the Tokenizer to split the cleaned text into individual words (tokens)
tokenizer = Tokenizer(inputCol="cleaned_text", outputCol="tokens")

# Apply tokenization to both DataFrames
train_df = tokenizer.transform(train_df)
test_df = tokenizer.transform(test_df)

print("\nData after Tokenization (Train):")
train_df.select("cleaned_text", "tokens").show(5, truncate=False)
print("\nData after Tokenization (Test):")
test_df.select("cleaned_text", "tokens").show(5, truncate=False)


# 3. Stop Words Removal
# Initialize the StopWordsRemover
# PySpark's default stop words list is quite comprehensive.
stopwords_remover = StopWordsRemover(inputCol="tokens", outputCol="filtered_tokens")

# Apply stop words removal to both DataFrames
train_df = stopwords_remover.transform(train_df)
test_df = stopwords_remover.transform(test_df)

print("\nData after Stop Words Removal (Train):")
train_df.select("tokens", "filtered_tokens").show(5, truncate=False)
print("\nData after Stop Words Removal (Test):")
test_df.select("tokens", "filtered_tokens").show(5, truncate=False)


# 4. Lemmatization (using a simple UDF)
# PySpark does not have a built-in lemmatizer.
# For simplicity, we'll use a basic lemmatization UDF.
# For more advanced lemmatization, you would typically integrate NLTK or SpaCy within a UDF.
# This example uses a very basic approach for demonstration.
# In a real-world scenario, you'd use a more robust lemmatizer.

from nltk.stem import WordNetLemmatizer
from nltk.corpus import wordnet
import nltk

# Download necessary NLTK data (run this once)
try:
    nltk.data.find('corpora/wordnet')
except nltk.downloader.DownloadError:
    nltk.download('wordnet')
try:
    nltk.data.find('corpora/omw-1.4')
except nltk.downloader.DownloadError:
    nltk.download('omw-1.4')


lemmatizer = WordNetLemmatizer()

def get_wordnet_pos(word):
    """Map POS tag to first character used by WordNetLemmatizer"""
    tag = nltk.pos_tag([word])[0][1][0].upper()
    tag_dict = {"J": wordnet.ADJ,
                "N": wordnet.NOUN,
                "V": wordnet.VERB,
                "R": wordnet.ADV}
    return tag_dict.get(tag, wordnet.NOUN)

def lemmatize_tokens(tokens):
    if tokens is None:
        return None
    return [lemmatizer.lemmatize(token, get_wordnet_pos(token)) for token in tokens]

# Register the UDF for lemmatization
lemmatize_udf = udf(lemmatize_tokens, ArrayType(StringType()))

# Apply lemmatization to the 'filtered_tokens' column
train_df = train_df.withColumn("lemmas", lemmatize_udf(train_df["filtered_tokens"]))
test_df = test_df.withColumn("lemmas", lemmatize_udf(test_df["filtered_tokens"]))

print("\nData after Lemmatization (Train):")
train_df.select("filtered_tokens", "lemmas").show(5, truncate=False)
print("\nData after Lemmatization (Test):")
test_df.select("filtered_tokens", "lemmas").show(5, truncate=False)


Data after Tokenization (Train):
+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|cleaned_text                                                                                                                                                                                                                    |tokens                                                                                                                                                                                                                                                

AttributeError: module 'nltk.downloader' has no attribute 'DownloadError'

In [None]:
from pyspark.ml.feature import CountVectorizer, IDF
from pyspark.ml.feature import Normalizer, Pipeline

vectorizer = CountVectorizer(inputCol="lemmas", outputCol = "vectorized_tokens")
idf = IDF(inputCol="vectorized_tokens", outputCol="tfidf")
normalizer = Normalizer(inputCol="tfidf", outputCol="normalized_features")
pipeline = Pipeline(stages=[tokenizer, stopwords_remover, vectorizer, idf, normalizer])
processed_data = pipeline.fit(train_df).transform(train_df)