---
format:
  html:
    toc: true
    embed-resources: true
    code-fold: false
execute:
  echo: true
---

In [2]:
# Setup - Run only once per Kernel App
%conda install openjdk -y

# install PySpark
%pip install pyspark==3.4.0

# install spark-nlp
%pip install spark-nlp==5.1.3

# restart kernel
from IPython.core.display import HTML
HTML("<script>Jupyter.notebook.kernel.restart()</script>")

Retrieving notices: ...working... done
Collecting package metadata (current_repodata.json): done
Solving environment: done


  current version: 23.3.1
  latest version: 23.10.0

Please update conda by running

    $ conda update -n base -c defaults conda

Or to minimize the number of packages updated during conda update use

     conda install conda=23.10.0



## Package Plan ##

  environment location: /opt/conda

  added / updated specs:
    - openjdk


The following packages will be downloaded:

    package                    |            build
    ---------------------------|-----------------
    ca-certificates-2023.08.22 |       h06a4308_0         123 KB
    certifi-2023.11.17         |  py310h06a4308_0         158 KB
    openjdk-11.0.13            |       h87a67e3_0       341.0 MB
    ------------------------------------------------------------
                                           Total:       341.3 MB

The following NEW packages will be INSTALLED:

  openjdk            pk

In [3]:
import json
import sparknlp
import numpy as np
import pandas as pd
from sparknlp.base import *
from pyspark.ml import Pipeline
from sparknlp.annotator import *
import pyspark.sql.functions as F
from pyspark.sql import SparkSession
from sparknlp.pretrained import PretrainedPipeline

In [4]:
spark = SparkSession.builder \
    .appName("Spark NLP and PySparkApp") \
    .master("local[*]") \
    .config("spark.driver.memory", "16G") \
    .config("spark.driver.maxResultSize", "0") \
    .config("spark.kryoserializer.buffer.max", "2000M") \
    .config("spark.jars.packages", 
            "com.johnsnowlabs.nlp:spark-nlp_2.12:5.1.3,org.apache.hadoop:hadoop-aws:3.2.2") \
    .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
    .config("fs.s3a.aws.credentials.provider", "com.amazonaws.auth.ContainerCredentialsProvider") \
    .getOrCreate()




:: loading settings :: url = jar:file:/opt/conda/lib/python3.10/site-packages/pyspark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /root/.ivy2/cache
The jars for the packages stored in: /root/.ivy2/jars
com.johnsnowlabs.nlp#spark-nlp_2.12 added as a dependency
org.apache.hadoop#hadoop-aws added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-f6607d9c-497e-4ab5-a3d4-7f5a7a92302a;1.0
	confs: [default]
	found com.johnsnowlabs.nlp#spark-nlp_2.12;5.1.3 in central
	found com.typesafe#config;1.4.2 in central
	found org.rocksdb#rocksdbjni;6.29.5 in central
	found com.amazonaws#aws-java-sdk-bundle;1.11.828 in central
	found com.github.universal-automata#liblevenshtein;3.0.0 in central
	found com.google.protobuf#protobuf-java-util;3.0.0-beta-3 in central
	found com.google.protobuf#protobuf-java;3.0.0-beta-3 in central
	found com.google.code.gson#gson;2.3 in central
	found it.unimi.dsi#fastutil;7.0.12 in central
	found org.projectlombok#lombok;1.16.8 in central
	found com.google.cloud#google-cloud-storage;2.20.1 in central
	found com.google.guava#guava;31.1-jre in c

In [5]:
%%time
import sagemaker
session = sagemaker.Session()
bucket = session.default_bucket()
output_prefix_data_submissions = f"project/submissions/yyyy=*"
s3_path = f"s3a://{bucket}/{output_prefix_data_submissions}"
print(f"reading submissions from {s3_path}")
submissions = spark.read.parquet(s3_path, header=True)
print(f"shape of the submissions dataframe is {submissions.count():,}x{len(submissions.columns)}")

sagemaker.config INFO - Not applying SDK defaults from location: /etc/xdg/sagemaker/config.yaml
sagemaker.config INFO - Not applying SDK defaults from location: /root/.config/sagemaker/config.yaml
sagemaker.config INFO - Not applying SDK defaults from location: /etc/xdg/sagemaker/config.yaml
sagemaker.config INFO - Not applying SDK defaults from location: /root/.config/sagemaker/config.yaml
reading submissions from s3a://sagemaker-us-east-1-861795727138/project/submissions/yyyy=*


23/11/30 06:56:22 WARN MetricsConfig: Cannot locate configuration: tried hadoop-metrics2-s3a-file-system.properties,hadoop-metrics2.properties
23/11/30 06:56:30 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.

shape of the submissions dataframe is 95,932x68
CPU times: user 3.39 s, sys: 211 ms, total: 3.6 s
Wall time: 6min 40s


                                                                                

In [6]:
topcreators= pd.read_csv('../../data/csv/Top100Creators.csv')
topcreators= sampled_topcreators['author'].tolist()
topcreators = list(set(topcreators))

In [7]:
from pyspark.sql.functions import col, explode, regexp_replace, expr, sum as sum_

In [8]:
import sparknlp
from pyspark.sql.functions import col, lower, regexp_replace, trim, when, count, desc, sum as sum_
from pyspark.sql.window import Window
from pyspark.sql.functions import rank
import plotly.express as px
from sparknlp.base import DocumentAssembler
from sparknlp.annotator import Tokenizer, Normalizer
from sparknlp.base import Finisher
from pyspark.ml import Pipeline

In [9]:
from pyspark.sql.functions import length

In [10]:
submissions_var= submissions.withColumn("created_hour",F.hour(F.to_timestamp(F.col("created_utc").cast('int'))))\
                                            .withColumn('text_length',F.length(F.col('selftext')))\
                                            .withColumn("title_length", length(col("title")))\
                                            .withColumn("is_top_100_creator", col("author").isin(sampled_topcreators))\
                                            .withColumn("is_peak_hour", col("created_hour").isin([16, 17, 18, 19, 20, 21, 22, 23, 0]))\
                                            .withColumn("has_media", col("media").isNotNull() | col("media_embed").getField('content').isNotNull())\
                                            .withColumn("is_long_text",(F.col('text_length') > 200))


# Dummy submissions varaibles
submissions_var= submissions_var.withColumn("skincare",F.col("selftext").rlike("""(?i)body|(?i)hair|(?i)facial|(?i)nails|(?i)lip|(?i)sunscreen|(?i)SPF|(?i)acne|(?i)pimples|(?i)scar|(?i)aging"""))\
                                  .withColumn("skincare_product",F.col("selftext").rlike("""(?i)moisturizer|(?i)cleanser|(?i)serum|(?i)toner|(?i)lotion"""))\
                                  .withColumn("skincare_product_brand",F.col("selftext").rlike("""(?i)Clinique|(?i)Neutrogena|(?i)Cetaphil|(?i)Kiehl's|(?i)Olay"""))\
                                  .withColumn("makeup",F.col("selftext").rlike("""(?i)beauty|(?i)bodypaint|(?i)cosmetics|(?i)style|(?i)artist|(?i)cosplay|(?i)fashion|(?i)celebrity|(?i)party|(?i)wedding|(?i)palette"""))\
                                  .withColumn("makeup_product",F.col("selftext").rlike("""(?i)eyeliner|(?i)contour|(?i)foundation|(?i)blush|(?i)lipstick|(?i)concealer"""))\
                                  .withColumn("makeup_product_brand",F.col("selftext").rlike("""(?i)MAC|(?i)NARS|(?i)Sephora|(?i)Fenty|(?i)Revlon|(?i)NYX|(?i)L'Oreal|(?i)Maybelline"""))


In [11]:
# List of columns to select as independent variables
independent_vars = [
    "is_top_100_creator", "text_length", "title_length", "archived", "is_peak_hour", "gilded", "hidden", "hide_score", "is_crosspostable",
    "is_reddit_media_domain", "is_self", "is_video", "num_crossposts", "over_18", "has_media", "pinned", "score",
    "spoiler", "stickied", "is_long_text", "skincare",
    "skincare_product", "skincare_product_brand", "makeup", "makeup_product",
    "makeup_product_brand","num_comments"
]

# Selecting the independent variables from the DataFrame
selected_df = submissions_var.select(independent_vars)

In [12]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import GBTRegressor
from pyspark.ml.evaluation import RegressionEvaluator

## Load Models

### Number of Comments Random Forest Model

In [49]:
from pyspark.ml import PipelineModel

# Path where the model is saved
model_path = "num_comments_rf_model"  # Use the same path as before

# Load the saved model
loaded_rf_model = PipelineModel.load(model_path)

# Make predictions using the loaded model
loaded_rf_predictions = loaded_rf_model.transform(test_data)

# Evaluate the predictions
rmse_evaluator = RegressionEvaluator(labelCol="num_comments", predictionCol="prediction", metricName="rmse")
r2_evaluator = RegressionEvaluator(labelCol="num_comments", predictionCol="prediction", metricName="r2")
mae_evaluator = RegressionEvaluator(labelCol="num_comments", predictionCol="prediction", metricName="mae")

# Calculate evaluation metrics
loaded_rf_rmse = rmse_evaluator.evaluate(loaded_rf_predictions)
loaded_rf_r2 = r2_evaluator.evaluate(loaded_rf_predictions)
loaded_rf_mae = mae_evaluator.evaluate(loaded_rf_predictions)

# Print the evaluation metrics
print(f"Loaded Random Forest Model - RMSE: {loaded_rf_rmse}, R2: {loaded_rf_r2}, MAE: {loaded_rf_mae}")




Loaded Random Forest Model - RMSE: 39.37996771881387, R2: 0.3972837164739184, MAE: 7.12384739487361


                                                                                

### Controversiality SVM Model

In [92]:
import joblib

# Load the saved SVM model from the file
best_svm_model = joblib.load('/root/fall-2023-reddit-project-team-33/code/ML/best_svm_model.joblib')



In [70]:

# Import necessary libraries
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.naive_bayes import MultinomialNB
from sklearn.pipeline import Pipeline
from sklearn.metrics import classification_report, accuracy_score, confusion_matrix
predictions_svm = best_svm_model.predict(test_data['clean_body'])

accuracy_svm = accuracy_score(test_data['controversiality'], predictions_svm)
print(f"SVM Accuracy: {accuracy_svm:.2f}")
print(classification_report(test_data['controversiality'], predictions_svm))


#  'controversiality' is a binary column in the test_data DataFrame
confusion_mat_svm = confusion_matrix(test_data['controversiality'], predictions_svm)

# Convert the confusion matrix to a DataFrame
cm_df_svm = pd.DataFrame(confusion_mat_svm, columns=['Predicted 0', 'Predicted 1'], index=['Actual 0', 'Actual 1'])

# Print the confusion matrix DataFrame
print("Confusion Matrix:")
print(cm_df_svm)


SVM Accuracy: 0.68
              precision    recall  f1-score   support

           0       0.71      0.55      0.62      2955
           1       0.66      0.79      0.72      3178

    accuracy                           0.68      6133
   macro avg       0.68      0.67      0.67      6133
weighted avg       0.68      0.68      0.67      6133

Confusion Matrix:
          Predicted 0  Predicted 1
Actual 0         1638         1317
Actual 1          675         2503


### Title Classification Logistic Regression Model

In [7]:
from pyspark.ml.classification import LogisticRegressionModel

# Evaluate best model
# Load a pre-trained model
lrModel = LogisticRegressionModel.load("../../data/models/best_logistic_regression_model")

                                                                                

In [8]:
# Make predictions on the new data
predictions = lrModel.transform(testing_data)
predictions.select("label","prediction", "probability").show()

23/12/07 09:15:15 WARN DAGScheduler: Broadcasting large task binary with size 41.2 MiB
[Stage 20:>                                                         (0 + 1) / 1]

+-----+----------+--------------------+
|label|prediction|         probability|
+-----+----------+--------------------+
|  0.0|       0.0|[1.0,2.2574461237...|
|  0.0|       0.0|[0.99999429112962...|
|  1.0|       1.0|[1.56182057744478...|
|  0.0|       0.0|[1.0,3.2186355964...|
|  1.0|       1.0|[2.81837635406791...|
|  4.0|       4.0|[9.27858568667229...|
|  0.0|       0.0|[1.0,3.9805376240...|
|  1.0|       1.0|[2.35222368714639...|
|  0.0|       0.0|[1.0,1.2301904949...|
|  0.0|       0.0|[1.0,1.0298742724...|
|  1.0|       0.0|[1.0,1.7401249905...|
|  1.0|       1.0|[1.53576186600649...|
|  2.0|       2.0|[4.55599402455953...|
|  1.0|       1.0|[7.96642016175923...|
|  0.0|       0.0|[1.0,8.2526283755...|
|  1.0|       1.0|[2.29613276177369...|
|  1.0|       1.0|[1.96106700064173...|
|  0.0|       0.0|[1.0,5.8391558668...|
|  7.0|       7.0|[3.81266501720795...|
|  0.0|       0.0|[1.0,1.5822241615...|
+-----+----------+--------------------+
only showing top 20 rows



23/12/07 09:15:26 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS
                                                                                

### Score Model

In [17]:
# label and feature columns setup
label_col = "score"
feature_cols = [col for col in selected_df.columns if col not in ['score']]

# Split Data
(train_data, test_data) = selected_df.randomSplit([0.8, 0.2], seed=42)

# Load the saved model
model_path = "../../data/models/score_pred_model_rf" 
loaded_model = PipelineModel.load(model_path)

loaded_predictions = loaded_model.transform(test_data)

evaluator = RegressionEvaluator(labelCol=label_col, predictionCol="prediction", metricName="mae")
loaded_mae = round(evaluator.evaluate(loaded_predictions),3)

evaluator = RegressionEvaluator(labelCol=label_col, predictionCol="prediction", metricName="rmse")
loaded_rmse = round(evaluator.evaluate(loaded_predictions),3)

evaluator = RegressionEvaluator(labelCol=label_col, predictionCol="prediction", metricName="r2")
loaded_r2 = round(evaluator.evaluate(loaded_predictions),3)

# Print the evaluation metrics
print(f"Loaded Random Forest Model - RMSE: {loaded_rmse}, R2: {loaded_r2}, MAE: {loaded_mae}")




Loaded Random Forest Model - RMSE: 359.219, R2: 0.464, MAE: 78.528


                                                                                