In [None]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.ml.feature import StringIndexer
from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator

# load the data
csv_dir = "/Users/pepijnschouten/Desktop/Python_Scripts/" \
    "Python_Scripts_Books/Distributed_ML_with_PySpark/" \
        "Python_Own_Files/Chapter 13/data/amazon_product_ratings.csv"

column_names = ["user_id", "product_id",
                "rating", "timestamp"]

pandas_df = pd.read_csv(csv_dir,
                        names=column_names)

spark = SparkSession.builder.getOrCreate()
spark_df = spark.createDataFrame(pandas_df)

In [None]:
# EDA
print(spark_df.count()), len(spark_df.columns)
print(spark_df.columns)
spark_df.show(5)

unique_sorted_ratings = (spark_df
                         .select("rating")
                         .distinct()
                         .orderBy(F.col("rating")))
unique_sorted_ratings.show()

spark_df.printSchema()

(spark_df
 .groupBy("user_id")
 .count()
 .orderBy("count", ascending=False)
 .show(5)
)

(spark_df
 .groupBy("user_id")
 .count()
 .orderBy("count", ascending=True)
 .show(5)
)

(spark_df
 .groupBy("product_id")
 .count()
 .orderBy("count", ascending=False)
 .show(5)
)

In [None]:
# Recommender system
user_indexer = StringIndexer(
    inputCol="user_id",
    outputCol="user_index")
product_indexer = StringIndexer(
    inputCol="product_id",
    outputCol="product_index")
indexed_data = user_indexer.fit(spark_df).transform(spark_df)
indexed_data = product_indexer.fit(indexed_data).transform(indexed_data)

train, test = indexed_data.randomSplit([0.8, 0.2],
                                       seed=42)

als = ALS(userCol="user_index",
          itemCol="product_index",
          ratingCol="rating",
          coldStartStrategy="drop",
          nonnegative=True)

model = als.fit(train)

predictions = model.transform(test)
evaluator = RegressionEvaluator(
    labelCol="rating", metricName="rmse")
rmse_score = evaluator.evaluate(predictions)
print(f"RMSE score: {rmse_score}")

predictions.select("user_id", "product_id",
                   "rating", "prediction").show(5)
