In [1]:
#Imports
## 3 years worth of data
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
import warnings
warnings.filterwarnings('ignore')

#Create a spark session
spark = SparkSession.builder.appName("CompanyFinancialComplaints").getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/01/22 21:29:17 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


## Loading and Pre-processing

In [2]:
# Load training data into df
complaints_df = spark.read.json('./input/complaints.json')

# Preview and verify data is structured in usable way
complaints_df.printSchema()
complaints_df.show(5)

                                                                                

root
 |-- _id: string (nullable = true)
 |-- _index: string (nullable = true)
 |-- _score: string (nullable = true)
 |-- _source: struct (nullable = true)
 |    |-- company: string (nullable = true)
 |    |-- company_public_response: string (nullable = true)
 |    |-- company_response: string (nullable = true)
 |    |-- complaint_id: string (nullable = true)
 |    |-- complaint_what_happened: string (nullable = true)
 |    |-- consumer_consent_provided: string (nullable = true)
 |    |-- consumer_disputed: string (nullable = true)
 |    |-- date_received: string (nullable = true)
 |    |-- date_sent_to_company: string (nullable = true)
 |    |-- issue: string (nullable = true)
 |    |-- product: string (nullable = true)
 |    |-- state: string (nullable = true)
 |    |-- sub_issue: string (nullable = true)
 |    |-- sub_product: string (nullable = true)
 |    |-- submitted_via: string (nullable = true)
 |    |-- tags: string (nullable = true)
 |    |-- timely: string (nullable = true)


                                                                                

In [3]:
# Unnest _source and keep all fields related to complaints for now
from pyspark.sql.functions import col

columns_to_keep = [
    "_source.company",
    "_source.company_public_response",
    "_source.company_response",
    "_source.complaint_id",
    "_source.complaint_what_happened",
    "_source.consumer_consent_provided",
    "_source.consumer_disputed",
    "_source.date_received",
    "_source.date_sent_to_company",
    "_source.issue",
    "_source.product",
    "_source.state",
    "_source.sub_issue",
    "_source.sub_product",
    "_source.submitted_via",
    "_source.tags",
    "_source.timely",
    "_source.zip_code",
]

# Select and rename columns -- removing '_source.'
unnested_df = complaints_df.select([col(column).alias(column.split(".")[1]) for column in columns_to_keep])

# Show the result of data
unnested_df.show(truncate=False)

+----------------------------------+-----------------------------------------------------------------------------------------------+-------------------------------+------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

                                                                                

In [4]:
# Grabbing only rows with complaints
clean_df = unnested_df.filter((col("complaint_what_happened").isNotNull()) & (col("complaint_what_happened") != ""))

# Selecting only relevant columns
final_df = clean_df.select(["issue", "complaint_what_happened"])

# Getting total len of df
print(f"df length: {final_df.count()}")
final_df.show()

                                                                                

df length: 38423
+--------------------+-----------------------+
|               issue|complaint_what_happened|
+--------------------+-----------------------+
|Struggling to pay...|   Wells Fargo engag...|
|Trouble during pa...|   Hi, On XX/XX/XXXX...|
|Trouble during pa...|   In XXXX, XXXX, I ...|
|Trouble during pa...|   it started with X...|
|Closing on a mort...|   XXXX who funds th...|
|Trouble during pa...|   My loan was trans...|
|Trouble during pa...|   Dear CFPB, Refere...|
|Struggling to pay...|   Wells Fargo bank ...|
|Trouble during pa...|   Received notice f...|
|Applying for a mo...|   XXXX  first appli...|
|Struggling to pay...|   I requested a for...|
|Struggling to pay...|   We asked for a Fo...|
|Applying for a mo...|   On XX/XX/2023, I ...|
|Trouble during pa...|   So I am not sure ...|
|Trouble during pa...|   XXXX  ENCLOSED EV...|
|Trouble during pa...|   IN RE : UNITED ST...|
|Applying for a mo...|   In 2018 I was man...|
|Closing on a mort...|   I, XXXX XXXX, am .

In [5]:
# Distinct count of target variables values
unique_count_ap = final_df.select("issue").distinct().count()

# Printing number of unique issues; need to see how many classes in data
print(f"Number of unique issues: {unique_count_ap}")

Number of unique issues: 11


                                                                                

## Data preparation, Text Preprocessing & Models

In [6]:
from pyspark.sql.functions import regexp_replace

# Removing special characters (backslashes, dollar signs) and digits
cleaned_final_df = final_df.withColumn("cleaned_text", regexp_replace(col("complaint_what_happened"), r"[\$0-9\n]+", ""))

# Removing newline, tabs, and whitespace characters
cleaned_words_df = cleaned_final_df.withColumn("cleaned_complaints", regexp_replace(col("cleaned_text"), r"[^\w\s]", "").alias("cleaned_complaints"))

cleaned_words_df.show(5)

+--------------------+-----------------------+--------------------+--------------------+
|               issue|complaint_what_happened|        cleaned_text|  cleaned_complaints|
+--------------------+-----------------------+--------------------+--------------------+
|Struggling to pay...|   Wells Fargo engag...|Wells Fargo engag...|Wells Fargo engag...|
|Trouble during pa...|   Hi, On XX/XX/XXXX...|Hi, On XX/XX/XXXX...|Hi On XXXXXXXX I ...|
|Trouble during pa...|   In XXXX, XXXX, I ...|In XXXX, XXXX, I ...|In XXXX XXXX I ob...|
|Trouble during pa...|   it started with X...|it started with X...|it started with X...|
|Closing on a mort...|   XXXX who funds th...|XXXX who funds th...|XXXX who funds th...|
+--------------------+-----------------------+--------------------+--------------------+
only showing top 5 rows



### Modifying Classes

In [7]:
# Selecting only complaints and class name --> issue category from cleaned table
preprocessed_final = cleaned_words_df.select("issue", "cleaned_complaints")

preprocessed_final.show(5)

+--------------------+--------------------+
|               issue|  cleaned_complaints|
+--------------------+--------------------+
|Struggling to pay...|Wells Fargo engag...|
|Trouble during pa...|Hi On XXXXXXXX I ...|
|Trouble during pa...|In XXXX XXXX I ob...|
|Trouble during pa...|it started with X...|
|Closing on a mort...|XXXX who funds th...|
+--------------------+--------------------+
only showing top 5 rows



                                                                                

In [8]:
from pyspark.ml.feature import StringIndexer
indexer = StringIndexer(inputCol="issue", outputCol="issue_index")
encoded_df = indexer.fit(preprocessed_final).transform(preprocessed_final)

# label encoding the classes / 'issues'
ml_df = encoded_df.filter(F.col("issue_index").isNotNull())

# Getting count of classes
issue_counts = ml_df.groupBy("issue_index").count()
issue_counts.show(12)

+-----------+-----+
|issue_index|count|
+-----------+-----+
|        8.0|   39|
|        0.0|20075|
|        7.0|  170|
|        1.0| 9060|
|        4.0|  754|
|        3.0| 3206|
|        2.0| 4670|
|       10.0|   18|
|        6.0|  199|
|        5.0|  205|
|        9.0|   27|
+-----------+-----+



                                                                                

In [9]:
# Filtering for issues with enough records to train on
# After seeing counts, decided it was best to restrict classes with records > 1,000
fltrd_df = ml_df.filter(F.col('issue_index').isin([0.0, 1.0, 2.0, 3.0]))

final_process_df = fltrd_df.select(['issue', 'cleaned_complaints'])

final_process_df.show(12)

+--------------------+--------------------+
|               issue|  cleaned_complaints|
+--------------------+--------------------+
|Struggling to pay...|Wells Fargo engag...|
|Trouble during pa...|Hi On XXXXXXXX I ...|
|Trouble during pa...|In XXXX XXXX I ob...|
|Trouble during pa...|it started with X...|
|Closing on a mort...|XXXX who funds th...|
|Trouble during pa...|My loan was trans...|
|Trouble during pa...|Dear CFPB Referen...|
|Struggling to pay...|Wells Fargo bank ...|
|Trouble during pa...|Received notice f...|
|Applying for a mo...|XXXX  first appli...|
|Struggling to pay...|I requested a for...|
|Struggling to pay...|We asked for a Fo...|
+--------------------+--------------------+
only showing top 12 rows



### Logistic Regression

In [10]:
from pyspark.ml.feature import StopWordsRemover
from pyspark.ml.feature import Tokenizer, HashingTF, IDF, StringIndexer

# Use tokenizer to tokenize the text
# Simple tokenizer to split words
tokenizer = Tokenizer(inputCol="cleaned_complaints", outputCol="tokens")

# Removing stop words by using the default stop words ie. 'The', 'We', 'and', etc.
default_stopwords = StopWordsRemover.loadDefaultStopWords("english")
# Creating custom list of stopwords; such as classified data in the complaint
# Concatenating to create one list of stop words
custom_stopwords = default_stopwords + ["xxxx", "xx", "xx/xx/xxxx", "XXX", 
                                        "XXXX", "XX", "XX/XX/XXXX", "xxx",
                                        "xxxxxxxxxx", "XXXXXXXXXX",
                                        "XXXXXXXX", "xxxxxxxx"]

remover = StopWordsRemover(inputCol="tokens", outputCol="filtered_tokens")

# Term Frequency-Inverse Document Frequency
# Adjusting numFeatures hyperparameter due to high dimensionality
hashingTF = HashingTF(inputCol="filtered_tokens", outputCol="rawFeatures", numFeatures=1000)
idf = IDF(inputCol="rawFeatures", outputCol="features")

# Re-encoding data since we are using original structure to train
issue_indexer = StringIndexer(inputCol="issue", outputCol="indexed_issue")

# Initializing logistic regression model
lr_classifier = LogisticRegression(featuresCol="features", labelCol="indexed_issue")

# Creating pipeline to create model with different actions
pipeline = Pipeline(stages=[tokenizer, remover, hashingTF, idf, issue_indexer, lr_classifier])


train_df, test_df = final_process_df.randomSplit([0.8, 0.2], seed=42)
model = pipeline.fit(train_df)
predictions = model.transform(test_df)

# Evaluating logistic regression on test data
evaluator = MulticlassClassificationEvaluator(labelCol="indexed_issue", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
# Viewing Accuracy score
print(f"Accuracy: {accuracy:.2f}")

25/01/22 21:41:14 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS


CodeCache: size=131072Kb used=41627Kb max_used=41637Kb free=89444Kb
 bounds [0x00000001091f8000, 0x000000010bad8000, 0x00000001111f8000]
 total_blobs=15534 nmethods=14326 adapters=1118
 compilation: disabled (not enough contiguous free space left)




Accuracy: 0.74


                                                                                

In [16]:
# Manually creating confusion matrix to view correct predictions
from pyspark.sql.functions import count

# Setting up the confusion matrix
confusion_matrix = predictions.groupBy("indexed_issue", "prediction").count().orderBy("indexed_issue", "prediction")

# total number of predictions
total_predictions = predictions.count()

# Total issue predictions
index_predictions = predictions.groupBy("indexed_issue").agg(count("prediction").alias("issue_total"))

confusion_matrix = confusion_matrix.withColumn(
    "percentage",
    (F.col("count") / total_predictions) * 100
)

# Joining to get issue total for percentage within each issue
# Will be able to see what predictions were correct within each issue
confusion_matrix_final = confusion_matrix.join(index_predictions, "indexed_issue").withColumn(
                        "issue_percentage", F.col("count")  / col("issue_total") * 100
                        )

confusion_matrix_final = confusion_matrix_final.orderBy("indexed_issue", "prediction")

#Display the confusion matrix
confusion_matrix_final.show()



+-------------+----------+-----+-------------------+-----------+------------------+
|indexed_issue|prediction|count|         percentage|issue_total|  issue_percentage|
+-------------+----------+-----+-------------------+-----------+------------------+
|          0.0|       0.0| 3620| 48.958615093318905|       4077| 88.79077753249939|
|          0.0|       1.0|  292|  3.949147957803625|       4077| 7.162129016433652|
|          0.0|       2.0|   84| 1.1360562618339194|       4077| 2.060338484179544|
|          0.0|       3.0|   81| 1.0954828239112793|       4077|1.9867549668874174|
|          1.0|       0.0|  545|  7.370841222612929|       1812| 30.07726269315673|
|          1.0|       1.0| 1177|  15.91831214498242|       1812| 64.95584988962473|
|          1.0|       2.0|   68| 0.9196645929131728|       1812|3.7527593818984544|
|          1.0|       3.0|   22|0.29753854476602654|       1812|1.2141280353200883|
|          2.0|       0.0|  251| 3.3946443061942118|        891|28.170594837

                                                                                

### Naive Bayes

In [18]:
from pyspark.ml.classification import NaiveBayes
# Running same as above
# Adjusting threshold due to imbalance in data; lowered the threshold for classes 2 and 3 due to its rarity
# Decreasing smoothing since features are high and complaints contain many words and vary in length
nb = NaiveBayes(featuresCol="features", labelCol="indexed_issue", thresholds=[0.25, 0.25, 0.2, 0.2], smoothing=0.1)

pipeline = Pipeline(stages=[tokenizer, remover, hashingTF, idf, issue_indexer, nb])
train_df, test_df = final_process_df.randomSplit([0.8, 0.2], seed=42)

model = pipeline.fit(train_df)
predictions = model.transform(test_df)

evaluator = MulticlassClassificationEvaluator(labelCol="indexed_issue", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print(f"Accuracy: {accuracy:.2f}")



Accuracy: 0.72


                                                                                

In [19]:
confusion_matrix = predictions.groupBy("indexed_issue", "prediction").count().orderBy("indexed_issue", "prediction")

# total number of predictions
total_predictions = predictions.count()

# Total issue predictions
index_predictions = predictions.groupBy("indexed_issue").agg(count("prediction").alias("issue_total"))

confusion_matrix = confusion_matrix.withColumn(
    "percentage",
    (F.col("count") / total_predictions) * 100
)

# Joining to get issue total for percentage within each issue
# Will be able to see what predictions were correct within each issue
confusion_matrix_final = confusion_matrix.join(index_predictions, "indexed_issue").withColumn(
                        "issue_percentage", F.col("count")  / col("issue_total") * 100
                        )

confusion_matrix_final = confusion_matrix_final.orderBy("indexed_issue", "prediction")

#Display the confusion matrix
confusion_matrix_final.show()



+-------------+----------+-----+------------------+-----------+------------------+
|indexed_issue|prediction|count|        percentage|issue_total|  issue_percentage|
+-------------+----------+-----+------------------+-----------+------------------+
|          0.0|       0.0| 3062| 41.41195563970787|       4077| 75.10424331616385|
|          0.0|       1.0|  512| 6.924533405463889|       4077|12.558253617856266|
|          0.0|       2.0|  215| 2.907763051122532|       4077|5.2734854059357374|
|          0.0|       3.0|  288|3.8950500405734383|       4077|  7.06401766004415|
|          1.0|       0.0|  228|3.0835812821206385|       1812|12.582781456953644|
|          1.0|       1.0| 1398| 18.90722207195023|       1812| 77.15231788079471|
|          1.0|       2.0|  105|1.4200703272923993|       1812|5.7947019867549665|
|          1.0|       3.0|   81|1.0954828239112793|       1812| 4.470198675496689|
|          2.0|       0.0|   93|1.2577765756018393|        891|10.437710437710438|
|   

                                                                                