#### `a.` classification model to predict product category

In [None]:
from pyspark.sql import SparkSession
from pyspark import SparkContext, SparkConf
conf = SparkConf()
conf.set('spark.executor.memory', '30g')
conf.set('spark.driver.memory', '30g')
conf.set("spark.executor.instances", 6)
sc = SparkContext(conf=conf)
spark = SparkSession.builder.master("local").appName('Classification').config(conf=conf).getOrCreate()
spark


In [None]:
file_location = './data/train_test/dataset_en_train.json'
df = spark.read.json(file_location)
df.show()
df.printSchema()
df.describe().show()

In [None]:
# preprocess data.
df = df.select("product_category", "review_body")
df = df.na.drop()


In [None]:
# from pyspark.sql.functions import concat_ws
# df = df.withColumn("text", concat_ws(" ", "review_body", "review_title", "product_id"))
train_data, test_data = df.randomSplit([0.7, 0.3], seed = 42)

played around with few iterations. considered logistic regression model where i combine product id, review title, and review body in singular column called 'words'. considered an idf with stop words. decided against that as perhaps some of the stop words would be useful. 

the highest accuracy i got was combining review body, review title, product id into a words column, used stopwords remover and then attempted to get product_category. only got 40% accuracy however....y

with current model, only got 30% accuracy but the model is signficantly leaner. 

In [None]:
# build logistic regression model
from pyspark.ml.feature import Tokenizer, StopWordsRemover, CountVectorizer, StringIndexer, HashingTF, IDF, VectorAssembler
from pyspark.ml.classification import LogisticRegression
from pyspark.ml import Pipeline
indexer = StringIndexer(inputCol="product_category", outputCol="label")
tokenizer = Tokenizer(inputCol="review_body", outputCol="words")
hashing_tf = HashingTF(numFeatures=1000, inputCol="words", outputCol="rawFeatures")
idf = IDF(inputCol="rawFeatures", outputCol="features")
assembler = VectorAssembler(inputCols=["features"], outputCol="assembled_features")

lr = LogisticRegression(featuresCol="assembled_features", labelCol="label", maxIter=100, regParam=0.01, elasticNetParam=0.01)
pipeline = Pipeline(stages=[indexer, tokenizer, hashing_tf, idf, assembler, lr])

In [None]:
model = pipeline.fit(train_data)

In [None]:
predictions = model.transform(test_data)
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
evaluator = MulticlassClassificationEvaluator(labelCol='label', predictionCol='prediction', metricName='accuracy')
accuracy = evaluator.evaluate(predictions)
print("Accuracy: ", accuracy)


#### `b.` predict customer ratings

In [47]:
from pyspark.sql.functions import col
data = spark.read.json(file_location)
data = data.select(col("review_id"), col("product_id"), col("reviewer_id"), col("stars").cast("double"), col("review_body"))
(training_data, test_data) = data.randomSplit([0.8, 0.2], seed=42)


In [48]:
from pyspark.ml.feature import Tokenizer, StopWordsRemover, HashingTF, IDF

# Define the stages for the pipeline
tokenizer = Tokenizer(inputCol="review_body", outputCol="words")
stop_words_remover = StopWordsRemover(inputCol=tokenizer.getOutputCol(), outputCol="filtered_words")
hashing_tf = HashingTF(numFeatures=2**16, inputCol=stop_words_remover.getOutputCol(), outputCol="tf_features")
idf = IDF(inputCol=hashing_tf.getOutputCol(), outputCol="features")

# Create the pipeline
from pyspark.ml import Pipeline
pipeline = Pipeline(stages=[tokenizer, stop_words_remover, hashing_tf, idf])

# Fit the pipeline on the training data
pipeline_model = pipeline.fit(training_data)

# Transform the training and test data
training_data = pipeline_model.transform(training_data)
test_data = pipeline_model.transform(test_data)

In [49]:
from pyspark.ml.regression import LinearRegression

# Define the linear regression model
lr = LinearRegression(featuresCol="features", labelCol="stars", maxIter=10, regParam=0.3, elasticNetParam=0.8)

# Train the model on the training data
lr_model = lr.fit(training_data)

# Make predictions on the test data
predictions = lr_model.transform(test_data)

In [52]:
from pyspark.ml.evaluation import RegressionEvaluator

# Evaluate the model using RMSE
evaluator = RegressionEvaluator(labelCol="stars", predictionCol="prediction", metricName="rmse")
rmse = evaluator.evaluate(predictions)

evaluator_r2 = RegressionEvaluator(predictionCol='prediction', labelCol='stars', metricName='r2')
r2 = evaluator_r2.evaluate(predictions)

print('R-Squared (R2):', r2)
print("Root Mean Squared Error (RMSE) = %g" % rmse)

R-Squared (R2): 0.016053469001464205
Root Mean Squared Error (RMSE) = 1.39857


ALA model 

In [59]:
from pyspark.ml.recommendation import ALS
from pyspark.ml.feature import StringIndexer


# Load the data into a Spark DataFrame
training_reviews = spark.read.json(file_location)
test_reviews = spark.read.json('./data/train_test/dataset_en_test.json')

training_reviews = training_reviews.withColumn('stars', training_reviews['stars'].cast('float'))
test_reviews = test_reviews.withColumn('stars', test_reviews['stars'].cast('float'))

indexer = StringIndexer(inputCol='reviewer_id', outputCol='reviewer_id_index')
training_reviews = indexer.fit(training_reviews).transform(training_reviews)
test_reviews = indexer.fit(test_reviews).transform(test_reviews)

indexer_2 = StringIndexer(inputCol='product_id', outputCol='product_id_index')
training_reviews = indexer_2.fit(training_reviews).transform(training_reviews)
test_reviews = indexer_2.fit(test_reviews).transform(test_reviews)


# Train a collaborative filtering model using ALS algorithm
als = ALS(rank=10, maxIter=5, regParam=0.01, userCol="reviewer_id_index", itemCol="product_id_index", ratingCol="stars")
model = als.fit(training_reviews)

# Evaluate the model on the test data
predictions = model.transform(test_reviews)

evaluator = RegressionEvaluator(metricName='rmse', labelCol='stars', predictionCol='prediction')
rmse = evaluator.evaluate(predictions)
print('Root Mean Squared Error =', rmse)

# Generate top 5 product recommendations for each customer
userRecs = model.recommendForAllUsers(5)

userRecs.show()

Root Mean Squared Error = 3.65522486051963
+-----------------+--------------------+
|reviewer_id_index|     recommendations|
+-----------------+--------------------+
|               28|[{9111, 6.524316}...|
|               31|[{11573, 7.129544...|
|               34|[{2155, 7.886073}...|
|               53|[{1557, 7.18665},...|
|               65|[{11140, 7.310829...|
|               78|[{12164, 7.566792...|
|               81|[{10760, 6.494387...|
|               85|[{820, 8.648042},...|
|              101|[{741, 7.243065},...|
|              108|[{12350, 6.405946...|
|              115|[{11140, 6.918545...|
|              126|[{7546, 7.0045276...|
|              133|[{1242, 6.8541527...|
|              137|[{948, 7.6337466}...|
|              148|[{12148, 6.402157...|
|              155|[{1360, 6.8862157...|
|              183|[{507, 6.322838},...|
|              193|[{3315, 6.109494}...|
|              210|[{76, 7.9763365},...|
|              211|[{12788, 7.136730...|
+-------------

In [112]:
# 1. Identify the most popular products based on the number of reviews and ratings
most_popular_products = test_reviews.groupBy(["product_id", 'product_category']).agg({"stars": "mean", "review_id": "count"}).withColumnRenamed("avg(stars)", "avg_rating").withColumnRenamed("count(review_id)", "num_reviews").orderBy(col("num_reviews").desc()).show(25)

+------------------+--------------------+-----------+----------+
|        product_id|    product_category|num_reviews|avg_rating|
+------------------+--------------------+-----------+----------+
|product_en_0710283|            wireless|          2|       3.0|
|product_en_0546607|     lawn_and_garden|          2|       4.0|
|product_en_0926827|                 toy|          2|       3.5|
|product_en_0828472|           drugstore|          2|       1.5|
|product_en_0948054|              beauty|          2|       5.0|
|product_en_0448239|               other|          2|       3.0|
|product_en_0296897|                 toy|          2|       1.0|
|product_en_0496200|             luggage|          2|       3.0|
|product_en_0257027|        pet_products|          2|       3.0|
|product_en_0719005|    home_improvement|          1|       1.0|
|product_en_0575047|    home_improvement|          1|       1.0|
|product_en_0478313|digital_ebook_pur...|          1|       2.0|
|product_en_0878772|     

In [61]:
# 2. Explore the performance of the model on different subsets of the data
perf_by_language = predictions.groupBy("language").agg({"stars": "mean", "prediction": "mean", "review_id": "count"}).withColumnRenamed("avg(stars)", "avg_actual_rating").withColumnRenamed("avg(prediction)", "avg_predicted_rating").withColumnRenamed("count(review_id)", "num_reviews").orderBy(col("num_reviews").desc())
perf_by_language.show()

+--------+-----------+--------------------+-----------------+
|language|num_reviews|avg_predicted_rating|avg_actual_rating|
+--------+-----------+--------------------+-----------------+
|      en|       5000|-0.01669013895327...|              3.0|
+--------+-----------+--------------------+-----------------+

