In [0]:
from pyspark.sql import SparkSession
from pyspark.ml import Pipeline
from pyspark.ml.feature import Tokenizer, StopWordsRemover, HashingTF, OneHotEncoder, IDF, StringIndexer, VectorAssembler
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator
from pyspark.sql.functions import when, col
from pyspark.mllib.evaluation import MulticlassMetrics
import seaborn as sns
import matplotlib.pyplot as plt

In [0]:
# Initialize Spark session
spark = SparkSession.builder \
.appName("FeatureEngineering") \
.getOrCreate()

In [0]:
# Reading back the cleaned data from the 'raw' folder
parquet_file_path = "s3://amazon-reviews-ma/raw/cleaned_amazon_reviews_us_Apparel_v1_00.parquet"
sdf = spark.read.parquet(parquet_file_path)

In [0]:
from functools import reduce
from pyspark.sql import DataFrame
# Initializing Spark session
spark = SparkSession.builder \
.appName("SimplifiedClassificationModeling") \
.getOrCreate()
# List of file paths
file_paths = [
# "s3://amazon-reviews-ma/raw/cleaned_amazon_reviews_us_Apparel_v1_00.parquet",
"s3://amazon-reviews-ma/raw/cleaned_amazon_reviews_us_Digital_Music_Purchase_v1_00.parquet",
# "s3://amazon-reviews-ma/raw/cleaned_amazon_reviews_us_Baby_v1_00.parquet",
"s3://amazon-reviews-ma/raw/cleaned_amazon_reviews_us_Digital_Software_v1_00.parquet"
# more files if I want to add
]


In [0]:
# Function to read and sample a parquet file
def read_and_sample(file_path, fraction=0.1, seed=42):
    df = spark.read.parquet(file_path)
    return df.sample(withReplacement=False, fraction=fraction, seed=seed)

In [0]:
# Read and sample each DataFrame
sampled_dfs = [read_and_sample(file_path) for file_path in file_paths]

In [0]:
# Function to union two DataFrames
def union_dfs(df1, df2):
    return df1.union(df2)

In [0]:
# Combine all DataFrames
sdf = reduce(union_dfs, sampled_dfs)

In [0]:
# Convert 'star_rating' into a binary label: 1 for ratings > 3, 0 for ratings <= 3
sdf = sdf.withColumn("label", when(col("star_rating") > 3, 1).otherwise(0))

In [0]:
# Process text data for 'cleaned_review_body'
bodyTokenizer = Tokenizer(inputCol="clean_review_body", outputCol="tokenized_body")
bodyRemover = StopWordsRemover(inputCol="tokenized_body", outputCol="filtered_body")
bodyHashingTF = HashingTF(inputCol="filtered_body", outputCol="body_features")
bodyIdf = IDF(inputCol="body_features", outputCol="final_body_features")

In [0]:
# Process text data for 'cleaned_review_headline'
headlineTokenizer = Tokenizer(inputCol="clean_review_headline", 
outputCol="tokenized_headline")
headlineRemover = StopWordsRemover(inputCol="tokenized_headline", 
outputCol="filtered_headline")
headlineHashingTF = HashingTF(inputCol="filtered_headline", 
outputCol="headline_features")
headlineIdf = IDF(inputCol="headline_features", outputCol="final_headline_features")

In [0]:
# StringIndexer for the 'product_category' column
categoryIndexer = StringIndexer(inputCol="product_category", 
outputCol="categoryIndex")
categoryEncoder = OneHotEncoder(inputCol="categoryIndex", outputCol="categoryVec")

In [0]:
# Assemble all features into a single vector column
assembler = VectorAssembler(inputCols=["final_body_features", "final_headline_features", "categoryVec"], outputCol="features")

In [0]:
# Defining the classification model
classifier = LogisticRegression(featuresCol="features", labelCol="label")

In [0]:
# Creating a Pipeline that combines all the stages
pipeline = Pipeline(stages=[
bodyTokenizer, bodyRemover, bodyHashingTF, bodyIdf,
headlineTokenizer, headlineRemover, headlineHashingTF, headlineIdf,
categoryIndexer, categoryEncoder,
assembler, classifier
])

In [0]:
# Spliting data
train_data, test_data = sdf.randomSplit([0.7, 0.3], seed=42)

In [0]:
# Fiting the model
model = pipeline.fit(train_data)

In [0]:
# Making predictions
predictions = model.transform(test_data)

In [0]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator
# Evaluating the model
evaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction")
roc_auc = evaluator.evaluate(predictions, {evaluator.metricName: "areaUnderROC"})
print("Test Area Under ROC: " + str(roc_auc))

In [0]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator
from pyspark.mllib.evaluation import MulticlassMetrics
# Accuracy, Precision, Recall, and F1 Score
evaluator = MulticlassClassificationEvaluator(labelCol="label", 
predictionCol="prediction")
accuracy = evaluator.evaluate(predictions, {evaluator.metricName: "accuracy"})
precision = evaluator.evaluate(predictions, {evaluator.metricName: "weightedPrecision"})
recall = evaluator.evaluate(predictions, {evaluator.metricName: "weightedRecall"})
f1 = evaluator.evaluate(predictions, {evaluator.metricName: "f1"})
print("Accuracy: ", accuracy)
print("Precision: ", precision)
print("Recall: ", recall)
print("F1 Score: ", f1)


In [0]:
# Confusion Matrix
predictionAndLabels = predictions.select("prediction", "label").rdd
metrics = MulticlassMetrics(predictionAndLabels)
confusion_matrix = metrics.confusionMatrix().toArray()
print("Confusion Matrix:\n", confusion_matrix)

In [0]:
TN, FP, FN, TP = confusion_matrix.ravel()
import pandas as pd
conf_matrix_df = pd.DataFrame(confusion_matrix, 
index=["Actual Negative", "Actual Positive"], 
columns=["Predicted Negative", "Predicted Positive"])
# Plot
plt.figure(figsize=(8, 6))
sns.heatmap(conf_matrix_df, annot=True, fmt='g', cmap='Blues')
plt.title('Confusion Matrix')
plt.xlabel('Predicted labels')
plt.ylabel('True labels')
plt.show()

In [0]:
# Saving the model to S3
model.write().overwrite().save("s3://amazon-reviews-ma/models/myAmazonLogisticRegressionModel2")