In [None]:
%matplotlib inline
import matplotlib.pyplot as plt
import seaborn as sns
import os
import pyspark.sql.functions as sql_func
from pyspark.sql import SparkSession
from sklearn.linear_model import ElasticNet
import numpy as np
from pyspark.sql.types import FloatType, ArrayType
from pyspark.ml.linalg import SparseVector
from pyspark.sql import DataFrame

In [None]:
spark = (
    SparkSession
    .builder
    .master("local[*]")
    .config("spark.driver.memory", "4g")
    .getOrCreate()
)

In [None]:
tf_idf = spark.read.parquet(os.path.join(DATA_DIR, "tf_idf.parquet")).cache()

In [None]:
#ranking counting

In [None]:
ratings = (spark.read.csv(os.path.join(DATA_DIR, "ratings.csv"),header=True,inferSchema=True)
    .select("movieId", "userId", "rating")
)

In [None]:
# averege rank by user

In [None]:
user_avg = ratings.groupBy('userId').agg(sql_func.avg("rating").alias("avg_rating_user"))

In [None]:
# averege rank by movie

In [None]:
movie_avg = ratings.groupBy('movieID').agg(sql_func.avg("rating").alias("avg_rating_movie"))

In [None]:
#Joining data

In [None]:
join_all_data = (ratings.alias("r")\
        .join(tf_idf.alias("tf-idf"), sql_func.col("tf-idf.movieId") ==  sql_func.col("r.movieId"))\
        .join(user_avg.alias("user"), sql_func.col("user.userId") ==  sql_func.col("r.userId"))\
        .join(movie_avg.alias("movie"), sql_func.col("movie.movieId") ==  sql_func.col("r.movieId"))\
        .select(sql_func.col("r.userId"),sql_func.col("r.movieId"),sql_func.col("r.rating"),\
        sql_func.col("user.avg_rating_user"),sql_func.col("movie.avg_rating_movie"),sql_func.col("tf-idf.tf_idf")))

In [None]:
# building vector

In [None]:
def sklearn_lr(spark_x: list, spark_y: list) -> list:
    numpy_x = np.array([vector.toArray() for vector in spark_x])
    numpy_y = np.array(spark_y).reshape(-1, 1)
    lr = ElasticNet().fit(numpy_x, numpy_y)
    return [lr.sparse_coef_.todense().tolist()[0], lr.intercept_.tolist()]

reg_udf = sql_func.udf( sklearn_lr, returnType=ArrayType(ArrayType(FloatType())))

In [None]:
# train _ test _ split

In [None]:
train_data, test_data  = join_all_data.select(sql_func.col("userId"),sql_func.col("movieId"),sql_func.col("rating"),\
        list_concat("tf_idf","avg_rating_user","avg_rating_movie").alias("tf_idf"))\.randomSplit([0.8, 0.2], seed=42)
                         
train_data.cache()
test_data.cache()

In [None]:
#regression

In [None]:
model_coef = (train_data.groupBy("userId").agg(
        sql_func.collect_list("tf_idf").alias("x"),
        sql_func.collect_list("rating").alias("y"))
    .withColumn("model_coeff", reg_udf("x", "y")).cache())

In [None]:

def lr_apply(x: SparseVector, lr_coef: list) -> float:
    return float(np.array(x).dot(np.array(lr_coef[0])) + lr_coef[1][0])


lr_apply_udf = sql_func.udf(lr_apply, returnType=FloatType())

In [None]:
#make prediction func

In [None]:

def get_prediction(data: DataFrame) -> DataFrame:
    return (
        data
        .join(model_coef, "userId")
        .select(
            "userId",
            "rating",
            "movieId",
            "tf_idf", 
            lr_apply_udf("tf_idf", "model_coeff").alias("prediction"))
        .cache()
    )

In [None]:
train_prediction = get_prediction(train_data)
(train_prediction.write.mode("overwrite")
    .parquet(os.path.join(DATA_DIR, "train_prediction.parquet"))
)

In [None]:
# get train predictions

In [None]:
def evaluate_prediction(prediction: DataFrame) -> float:
    return np.sqrt(
        prediction
        .selectExpr("""
            CASE
                WHEN prediction > 5 THEN 5
                WHEN prediction < 0.5 THEN 0.5
                ELSE prediction
            END AS prediction
        """, "rating")
        .select(
            sql_func.pow(sql_func.col("rating") - sql_func.col("prediction"), 2)
            .alias("squared_error")
        )
        .agg(sql_func.avg("squared_error"))
        .first()[0]
    )

In [None]:
evaluate_prediction(train_prediction)

In [None]:
# get test predictions

In [None]:
test_prediction = get_prediction(test_data)
(
test_prediction.write.mode("overwrite")
    .parquet(os.path.join(DATA_DIR, "test_prediction.parquet"))
)

In [None]:
evaluate_prediction(test_prediction)