# Victorian Author Dataset Analysis for AIMLBig Data

## Data ingestion and preprocessing pipeline setup

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from pyspark.sql.types import IntegerType
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# Do this when rerunning from the start?
# spark.stop()
spark = SparkSession.builder.appName("AuthorPrediction").getOrCreate()
data = spark.read.option(
    "header", "true").option(
        "encoding", "latin1").option("inferSchema", "true").csv(
    "data/dataset/Gungor_2018_VictorianAuthorAttribution_data.csv", 
)

data = data.withColumn("author", col("author").cast(IntegerType()))


# For local testing: take a small subset of data
sample = data.sample(withReplacement=False, fraction=0.002, seed=100)
# sample = data

(training_data, test_data) = sample.randomSplit([0.8, 0.2], seed=100)

# print(sample.printSchema())
# sample.show(5)

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/06/06 19:05:48 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
                                                                                

In [2]:
from pyspark.ml.feature import Tokenizer, StopWordsRemover, CountVectorizer, NGram

# Weird issue with the data where "â" is the most common word, i think its a stop character
stop_words = StopWordsRemover.loadDefaultStopWords("english")
custom_stop_words = stop_words + ["â"]

tokenizer = Tokenizer(inputCol="text", outputCol="words")
remover = StopWordsRemover(inputCol="words", outputCol="filtered", stopWords=custom_stop_words)

bigram = NGram(n=2, inputCol="filtered", outputCol="bigrams")
trigram = NGram(n=3, inputCol="filtered", outputCol="trigrams")

unigram_vectorizer = CountVectorizer(inputCol="filtered", outputCol="features")
bigram_vectorizer = CountVectorizer(inputCol="bigrams", outputCol="features")
trigram_vectorizer = CountVectorizer(inputCol="trigrams", outputCol="features")

24/06/06 19:06:01 WARN GarbageCollectionMetrics: To enable non-built-in garbage collector(s) List(G1 Concurrent GC), users should configure it(them) to spark.eventLog.gcMetrics.youngGenerationGarbageCollectors or spark.eventLog.gcMetrics.oldGenerationGarbageCollectors


## Logistic regression implementation

In [50]:
from pyspark.ml.classification import LogisticRegression

lr = LogisticRegression(featuresCol="features", labelCol="author")
for i in range(3):
    lr_pipeline_name = ""
    if i == 0:
        lr_pipeline = Pipeline(stages=[tokenizer, remover, unigram_vectorizer, lr])
        lr_pipeline_name = "Unigram"
    elif i == 1:
        lr_pipeline = Pipeline(stages=[tokenizer, remover, bigram, bigram_vectorizer, lr])
        lr_pipeline_name = "Bigram"
    elif i == 2:
        lr_pipeline = Pipeline(stages=[tokenizer, remover, trigram, trigram_vectorizer, lr])
        lr_pipeline_name = "Trigram"
    else:
        raise Exception("this should not happen")


    lr_model = lr_pipeline.fit(training_data)
    lr_predictions = lr_model.transform(test_data)

    lr_evaluator = MulticlassClassificationEvaluator(labelCol="author", predictionCol="prediction", metricName="accuracy")
    lr_accuracy = lr_evaluator.evaluate(lr_predictions)
    print(f"Test Accuracy for {lr_pipeline_name}: {lr_accuracy:.5f}")


24/06/06 16:46:16 WARN DAGScheduler: Broadcasting large task binary with size 4.0 MiB
                                                                                

Test Accuracy: 0.79


24/06/06 16:46:23 WARN DAGScheduler: Broadcasting large task binary with size 3.9 MiB
24/06/06 16:46:25 WARN DAGScheduler: Broadcasting large task binary with size 3.9 MiB
24/06/06 16:46:26 WARN DAGScheduler: Broadcasting large task binary with size 3.9 MiB
24/06/06 16:46:30 ERROR Executor: Exception in task 4.0 in stage 1040.0 (TID 5293)
java.lang.OutOfMemoryError: Java heap space
	at java.base/java.lang.reflect.Array.newInstance(Array.java:76)
	at scala.collection.mutable.ArrayOps.slice(ArrayOps.scala:53)
	at scala.collection.mutable.ArrayOps.slice$(ArrayOps.scala:48)
	at scala.collection.mutable.ArrayOps$ofDouble.slice(ArrayOps.scala:282)
	at scala.collection.IndexedSeqOptimized.take(IndexedSeqOptimized.scala:141)
	at scala.collection.IndexedSeqOptimized.take$(IndexedSeqOptimized.scala:141)
	at scala.collection.mutable.ArrayOps$ofDouble.take(ArrayOps.scala:282)
	at org.apache.spark.ml.optim.aggregator.MultinomialLogisticBlockAggregator.linear$lzycompute(MultinomialLogisticBlockAggre

ConnectionRefusedError: [Errno 111] Connection refused

In [46]:
lr_predictions.select("text", "author", "prediction").show(5)

24/06/06 16:44:29 WARN DAGScheduler: Broadcasting large task binary with size 14.1 MiB
24/06/06 16:44:29 WARN DAGScheduler: Broadcasting large task binary with size 14.1 MiB


+--------------------+------+----------+
|                text|author|prediction|
+--------------------+------+----------+
|asked hurriedly w...|     8|       8.0|
|it is i wouldn t ...|     8|       8.0|
|own and kin bette...|     4|       8.0|
|sham and a had ju...|     9|       8.0|
|and i trust swall...|    18|       8.0|
+--------------------+------+----------+
only showing top 5 rows



## Code to analyze top words per author

In [47]:
from pyspark.sql.functions import col, explode, lower
from pyspark.sql.functions import explode, col

for idx in range(3):
    vectorizer = [unigram_vectorizer, bigram_vectorizer, trigram_vectorizer][idx]
    
    
    word_count_words_data = tokenizer.transform(sample)
    word_count_filtered_data = remover.transform(word_count_words_data)
    if idx == 0:
        word_count_model = unigram_vectorizer.fit(word_count_filtered_data)
        vectorized_data = word_count_model.transform(word_count_filtered_data)
        column_name = 'word'
    elif idx == 1:
        bigrammed_data = bigram.transform(word_count_filtered_data)
        word_count_model = bigram_vectorizer.fit(bigrammed_data)
        vectorized_data = word_count_model.transform(bigrammed_data)
        column_name = 'bigrams'
    elif idx == 2:
        trigrammed_data = trigram.transform(word_count_filtered_data)
        word_count_model = trigram_vectorizer.fit(trigrammed_data)
        vectorized_data = word_count_model.transform(trigrammed_data)
        column_name = 'trigrams'
    else:
        raise Exception("this should not happen")



    # word_count_vocab = word_count_model.vocabulary
    # word_counts = vectorized_data.select("filtered").rdd.flatMap(lambda x: x[0]).countByValue()

    # word_count_df = spark.createDataFrame(list(word_counts.items()), ["word", "count"])
    # word_count_df.orderBy(col("count").desc()).show(10)


    # exploded_data = word_count_filtered_data.withColumn("word", explode(col("filtered"))).select("author", "word")
    # exploded_data = exploded_data.withColumn("word", lower(col("word")))
    # word_author_counts = exploded_data.groupBy("author", "word").count()

    # # Print most popular words per author
    # for author in word_author_counts.select("author").distinct().collect():
    #     print(f"Top words for author ID {author['author']}:")
    #     word_author_counts.filter(col("author") == author['author']).orderBy(col("count").desc()).show(10)
    #     break

    word_count_vocab = word_count_model.vocabulary

    # Select the appropriate column based on n-gram type
    filtered_col_name = "filtered" if idx == 0 else column_name
    vectorized_col_name = "features" if idx == 0 else column_name

    # Create an exploded DataFrame for unigrams, bigrams, and trigrams
    exploded_data = vectorized_data.withColumn("ngram", explode(col(filtered_col_name))).select("author", "ngram")
    exploded_data = exploded_data.withColumn("ngram", lower(col("ngram")))

    # Count occurrences of each n-gram
    ngram_counts = exploded_data.groupBy("ngram").count()
    # ngram_counts.orderBy(col("count").desc()).show(10)

    # Count occurrences of each n-gram per author
    ngram_author_counts = exploded_data.groupBy("author", "ngram").count()

    # Print most popular n-grams per author
    for author in ngram_author_counts.select("author").distinct().collect():
        print(f"Top n-grams for author ID {author['author']}:")
        ngram_author_counts.filter(col("author") == author['author']).orderBy(col("count").desc()).show(10)
        break


                                                                                

Top n-grams for author ID 1:
+------+-----+-----+
|author|ngram|count|
+------+-----+-----+
|     1|  one|   17|
|     1|   mr|   16|
|     1| said|   12|
|     1|place|   10|
|     1| talk|    9|
|     1| time|    8|
|     1| know|    8|
|     1|  way|    8|
|     1| like|    7|
|     1|  man|    7|
+------+-----+-----+
only showing top 10 rows



                                                                                

Top n-grams for author ID 1:
+------+-------------+-----+
|author|        ngram|count|
+------+-------------+-----+
|     1|marriage bond|    4|
|     1|    took hand|    3|
|     1|     sit said|    2|
|     1|   mr strange|    2|
|     1|   first time|    2|
|     1|     know one|    2|
|     1|lawyer office|    2|
|     1|  another man|    2|
|     1|   shook head|    2|
|     1|    hand held|    2|
+------+-------------+-----+
only showing top 10 rows



                                                                                

Top n-grams for author ID 1:
+------+--------------------+-----+
|author|               ngram|count|
+------+--------------------+-----+
|     1|    get chance world|    1|
|     1|   far content total|    1|
|     1|stand different a...|    1|
|     1|    twice mr begging|    1|
|     1|   know school first|    1|
|     1|      dear boy silly|    1|
|     1|      sky heads blue|    1|
|     1|accomplished quit...|    1|
|     1|  knew entire secret|    1|
|     1|corrected matter ...|    1|
+------+--------------------+-----+
only showing top 10 rows



                                                                                

## Decision Tree Implementation

In [4]:
from pyspark.ml.classification import DecisionTreeClassifier
dt = DecisionTreeClassifier(labelCol="author", featuresCol="features")

# Create a pipeline
dt_pipeline = Pipeline(stages=[tokenizer, remover, unigram_vectorizer, dt])

# Train the model
dt_model = dt_pipeline.fit(training_data)
dt_predictions = dt_model.transform(test_data)

dt_evaluator = MulticlassClassificationEvaluator(labelCol="author", predictionCol="prediction", metricName="accuracy")
dt_accuracy = dt_evaluator.evaluate(dt_predictions)
print(f"Decision tree test accuracy: {dt_accuracy:.2f}")



Decision tree test accuracy: 0.09


                                                                                

In [5]:
from collections import Counter
from pyspark.sql import SparkSession

# Extract decision tree model and print it
dt_classifier_model = dt_model.stages[-1]
dt_vectorizer = dt_model.stages[2]
dt_vocabulary = dt_vectorizer.vocabulary
tree_string = dt_classifier_model.toDebugString

import re

def replace_feature_indices(tree_string, vocabulary):
    def replace_match(match):
        feature_index = int(match.group(1))
        feature_name = vocabulary[feature_index]
        return f"feature ({feature_index}) [{feature_name}]"
    
    # Use regular expressions to find and replace feature indices
    tree_string_with_names = re.sub(r'feature (\d+)', replace_match, tree_string)
    return tree_string_with_names

# Replace feature indices in the decision tree string
tree_string_with_names = replace_feature_indices(tree_string, dt_vocabulary)
print(tree_string_with_names)


print(dt_classifier_model.toDebugString)

DecisionTreeClassificationModel: uid=DecisionTreeClassifier_ca56567b4e34, depth=5, numNodes=25, numClasses=51, numFeatures=6406
  If (feature (510) [tail] <= 0.5)
   If (feature (594) [chair] <= 0.5)
    If (feature (2704) [condemned] <= 0.5)
     If (feature (2366) [acceptable] <= 0.5)
      If (feature (365) [government] <= 0.5)
       Predict: 8.0
      Else (feature (365) [government] > 0.5)
       Predict: 33.0
     Else (feature (2366) [acceptable] > 0.5)
      Predict: 26.0
    Else (feature (2704) [condemned] > 0.5)
     Predict: 38.0
   Else (feature (594) [chair] > 0.5)
    If (feature (71) [love] <= 1.5)
     Predict: 8.0
    Else (feature (71) [love] > 1.5)
     If (feature (0) [said] <= 0.5)
      Predict: 42.0
     Else (feature (0) [said] > 0.5)
      If (feature (1) [one] <= 0.5)
       Predict: 26.0
      Else (feature (1) [one] > 0.5)
       Predict: 21.0
  Else (feature (510) [tail] > 0.5)
   If (feature (128) [perhaps] <= 0.5)
    If (feature (1) [one] <= 1.5)
     

In [6]:
# Extract decision tree model and print it
dt_classifier_model = dt_model.stages[-1]
dt_vectorizer = dt_model.stages[2]
dt_vocabulary = dt_vectorizer.vocabulary
tree_string = dt_classifier_model.toDebugString

import re
from collections import Counter
from pyspark.sql import SparkSession

def replace_and_collect_feature_indices(tree_string, vocabulary):
    feature_indices = []
    
    def replace_match(match):
        feature_index = int(match.group(1))
        feature_name = vocabulary[feature_index]
        feature_indices.append(feature_index)
        return f"feature ({feature_index}) [{feature_name}]"
    
    # Use regular expressions to find and replace feature indices
    tree_string_with_names = re.sub(r'feature (\d+)', replace_match, tree_string)
    return tree_string_with_names, feature_indices

# Replace feature indices in the decision tree string and collect the indices
tree_string_with_names, feature_indices = replace_and_collect_feature_indices(tree_string, dt_vocabulary)

# Print the decision tree with feature names
print(tree_string_with_names)

# Count occurrences of each feature index
feature_counts = Counter(feature_indices)

# Create a list of tuples (feature_name, count)
feature_usage_data = [(dt_vocabulary[idx], count) for idx, count in feature_counts.items()]

# Initialize SparkSession (if not already initialized)
spark = SparkSession.builder.appName("FeatureUsage").getOrCreate()

# Create a DataFrame from the feature usage data
feature_usage_df = spark.createDataFrame(feature_usage_data, ["feature_name", "count"])

# Show the DataFrame
feature_usage_df.orderBy(col("count").desc()).show()


DecisionTreeClassificationModel: uid=DecisionTreeClassifier_ca56567b4e34, depth=5, numNodes=25, numClasses=51, numFeatures=6406
  If (feature (510) [tail] <= 0.5)
   If (feature (594) [chair] <= 0.5)
    If (feature (2704) [condemned] <= 0.5)
     If (feature (2366) [acceptable] <= 0.5)
      If (feature (365) [government] <= 0.5)
       Predict: 8.0
      Else (feature (365) [government] > 0.5)
       Predict: 33.0
     Else (feature (2366) [acceptable] > 0.5)
      Predict: 26.0
    Else (feature (2704) [condemned] > 0.5)
     Predict: 38.0
   Else (feature (594) [chair] > 0.5)
    If (feature (71) [love] <= 1.5)
     Predict: 8.0
    Else (feature (71) [love] > 1.5)
     If (feature (0) [said] <= 0.5)
      Predict: 42.0
     Else (feature (0) [said] > 0.5)
      If (feature (1) [one] <= 0.5)
       Predict: 26.0
      Else (feature (1) [one] > 0.5)
       Predict: 21.0
  Else (feature (510) [tail] > 0.5)
   If (feature (128) [perhaps] <= 0.5)
    If (feature (1) [one] <= 1.5)
     

24/06/06 19:07:45 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


+------------+-----+
|feature_name|count|
+------------+-----+
|        said|    6|
|         one|    4|
|       chair|    2|
|        love|    2|
|     perhaps|    2|
|  acceptable|    2|
|   condemned|    2|
|  government|    2|
|        tail|    2|
+------------+-----+



                                                                                

In [27]:
# datapoint = dt_predictions.select("text", "features").take(1)[0]
# print(f"Single datapoint text: {datapoint['text']}")
# print(f"Single datapoint features: {datapoint['features']}")

# # Function to trace the decisions
# def follow_tree(node, features):
#     if node.nodeType == 'leaf':
#         return node.prediction
#     else:
#         split = node.split
#         feature_index = split.feature
#         threshold = split.threshold
        
#         if features[feature_index] <= threshold:
#             return follow_tree(node.leftChild, features)
#         else:
#             return follow_tree(node.rightChild, features)

# # Get the root node of the tree
# root_node = dt_classifier_model._call_java('rootNode')

# # Follow the tree for the datapoint
# prediction = follow_tree(root_node, datapoint['features'])
# print(f"Prediction for the datapoint: {prediction}")


In [28]:
# root_node = dt_classifier_model._call_java('rootNode')
# root_node.leftChild().impurity()

In [7]:
print(feature_usage_df)

DataFrame[feature_name: string, count: bigint]
