In [1]:
%matplotlib inline
import matplotlib.pyplot as plt
import seaborn as sns

DATA_DIR = "/data/ml-latest"

In [2]:
# мы будем использовать данные, сохраннённые в предыдущем ноутбуке
!ls {DATA_DIR}/tf_idf.parquet

_SUCCESS
part-00000-b69c4d7d-be7d-4701-9ce7-f5d64ebf4fc9-c000.snappy.parquet
part-00001-b69c4d7d-be7d-4701-9ce7-f5d64ebf4fc9-c000.snappy.parquet
part-00002-b69c4d7d-be7d-4701-9ce7-f5d64ebf4fc9-c000.snappy.parquet
part-00003-b69c4d7d-be7d-4701-9ce7-f5d64ebf4fc9-c000.snappy.parquet
part-00004-b69c4d7d-be7d-4701-9ce7-f5d64ebf4fc9-c000.snappy.parquet
part-00005-b69c4d7d-be7d-4701-9ce7-f5d64ebf4fc9-c000.snappy.parquet
part-00006-b69c4d7d-be7d-4701-9ce7-f5d64ebf4fc9-c000.snappy.parquet
part-00007-b69c4d7d-be7d-4701-9ce7-f5d64ebf4fc9-c000.snappy.parquet
part-00008-b69c4d7d-be7d-4701-9ce7-f5d64ebf4fc9-c000.snappy.parquet
part-00009-b69c4d7d-be7d-4701-9ce7-f5d64ebf4fc9-c000.snappy.parquet
part-00010-b69c4d7d-be7d-4701-9ce7-f5d64ebf4fc9-c000.snappy.parquet
part-00011-b69c4d7d-be7d-4701-9ce7-f5d64ebf4fc9-c000.snappy.parquet
part-00012-b69c4d7d-be7d-4701-9ce7-f5d64ebf4fc9-c000.snappy.parquet
part-00013-b69c4d7d-be7d-4701-9ce7-f5d64ebf4fc9-c000.snappy.parquet
part-00014-b69c4d7d-be7d

In [3]:
# создаём сессию Spark
from pyspark.sql import SparkSession

spark = (
    SparkSession
    .builder
    .master("local[*]")
    .config("spark.driver.memory", "4g")
    .getOrCreate()
)

In [4]:
import os
import pyspark.sql.functions as sql_func

# поскольку в Parquet схема данных хранится внутри самого файла, читать их очень просто
tf_idf = spark.read.parquet(os.path.join(DATA_DIR, "tf_idf.parquet")).cache()
tf_idf.show()

+-------+--------------------+--------------------+
|movieId|               title|              tf_idf|
+-------+--------------------+--------------------+
|     35|   Carrington (1995)|(1024,[8,74,189,2...|
|    503| New Age, The (1994)|(1024,[434,769,82...|
|    583|Dear Diary (Caro ...|(1024,[434,741,84...|
|    594|Snow White and th...|(1024,[29,52,60,8...|
|    610|  Heavy Metal (1981)|(1024,[32,93,112,...|
|    614|       Loaded (1994)|(1024,[263,434],[...|
|    761| Phantom, The (1996)|(1024,[43,169,196...|
|    880|Island of Dr. Mor...|(1024,[44,81,219,...|
|   1369|I Can't Sleep (J'...|(1024,[263,434],[...|
|   1519|Broken English (1...|(1024,[434,829],[...|
|   1589|     Cop Land (1997)|(1024,[37,61,164,...|
|   1815|         Eden (1997)|(1024,[434,829],[...|
|   1881|Quest for Camelot...|(1024,[57,165,337...|
|   2080|Lady and the Tram...|(1024,[29,37,83,1...|
|   2324|Life Is Beautiful...|(1024,[3,31,32,45...|
|   2444|24 7: Twenty Four...|(1024,[122,221,43...|
|   2445|At 

In [5]:
# считаем данные об оценках
ratings = (
    spark
    .read
    .csv(
        os.path.join(DATA_DIR, "ratings.csv"),
        header=True,
        inferSchema=True
    )
    # только о миллионе. потому что даже на таком объёме обсчёт модели на четырёх ядрах
    # занимает пару часов
    .limit(1000000)
    .select("movieId", "userId", "rating")
)

In [6]:
from sklearn.linear_model import ElasticNet
import numpy as np
from pyspark.sql.types import FloatType, ArrayType

def sklearn_lr(spark_x: list, spark_y: list) -> list:
    """
        spark_x: список pyspark.ml.linalg.SparseVector - фичи для регрессии
        spark_y: список значений целевой переменной регрессии
        return: список коэффициентов регресии
    """
    # перевеодим данные из формата Spark в удобный для sklearn
    numpy_x = np.array([vector.toArray() for vector in spark_x])
    numpy_y = np.array(spark_y).reshape(-1, 1)
    # применяем обычную модель из sklearn
    lr = ElasticNet().fit(numpy_x, numpy_y)
    # возвращаем в ответе плотный вектор коэффициентов регрессии
    return [lr.sparse_coef_.todense().tolist()[0], lr.intercept_.tolist()]

# определяем Spark UDF, которая обучает регрессию на своих аргументах
reg_udf = sql_func.udf(
    sklearn_lr,
    returnType=ArrayType(ArrayType(FloatType()))
)

In [7]:
# разбиваем полученные данные на обучающую и тестовую выборки
train_data, test_data = ratings.join(tf_idf, "movieId").randomSplit([0.8, 0.2], seed=42)
train_data.cache()
test_data.cache()
# строим для каждого пользователя свою модель регрессии
model_coef = (
    train_data
    .groupBy("userId")
    .agg(
        sql_func.collect_list("tf_idf").alias("x"),
        sql_func.collect_list("rating").alias("y")
    )
    .withColumn("model_coeff", reg_udf("x", "y"))
    .cache()
)

In [8]:
# на всякий случай сохраняем полученные коэффициенты регрессий на диск
model_coef.write.mode("overwrite").parquet(os.path.join(DATA_DIR, "model_coef.parquet"))

In [9]:
model_coef.count()

10142

In [10]:
# поскольку мы применяли L1-регуляризацию, среди коэффициентов много нулей
model_coef.show()

+------+--------------------+--------------------+--------------------+
|userId|                   x|                   y|         model_coeff|
+------+--------------------+--------------------+--------------------+
|     1|[(1024,[8,29,39,4...|[4.5, 5.0, 5.0, 4...|[[0.0, 0.0, 0.0, ...|
|     2|[(1024,[113,115,2...|[3.0, 3.0, 2.0, 3...|[[0.0, 0.0, 0.0, ...|
|     3|[(1024,[7,37,63,9...|[2.0, 4.0, 3.0, 3...|[[0.0, 0.0, 0.0, ...|
|     4|[(1024,[49,51,54,...|[4.0, 4.0, 5.0, 4...|[[0.0, 0.0, 0.0, ...|
|     5|[(1024,[123,145,1...|[2.0, 4.0, 5.0, 5...|[[0.0, 0.0, 0.0, ...|
|     6|[(1024,[51,55,70,...|     [3.0, 3.0, 5.0]|[[0.0, 0.0, 0.0, ...|
|     7|[(1024,[3,17,22,2...|[5.0, 4.0, 4.0, 5...|[[0.0, 0.0, 0.0, ...|
|     8|[(1024,[16,103,14...|[2.0, 3.0, 4.0, 3...|[[0.0, 0.0, 0.0, ...|
|     9|[(1024,[26,37,41,...|[4.0, 5.0, 4.0, 4...|[[0.0040322472, 0...|
|    10|[(1024,[4,27,87,8...|[4.0, 5.0, 4.0, 5...|[[0.0, 0.0, 0.0, ...|
|    11|[(1024,[23,24,29,...|[3.5, 4.0, 3.5, 3...|[[0.0, 0.0, 0.

In [11]:
from pyspark.ml.linalg import SparseVector

def lr_apply(x: SparseVector, lr_coef: list) -> float:
    """
        param x: вектор фич для регрессии
        param lr_coef: 
        return: предсказанное моделью регрессии значение
    """
    return float(np.array(x).dot(np.array(lr_coef[0])) + lr_coef[1][0])

lr_apply_udf = sql_func.udf(lr_apply, returnType=FloatType())

In [12]:
from pyspark.sql import DataFrame

def get_prediction(data: DataFrame) -> DataFrame:
    return (
        data
        .join(model_coef, "userId")
        .select(
            "userId",
            "rating",
            "movieId",
            "tf_idf", 
            lr_apply_udf("tf_idf", "model_coeff").alias("prediction"))
        .cache()
    )

In [13]:
train_prediction = get_prediction(train_data)
(
    train_prediction.write.mode("overwrite")
    .parquet(os.path.join(DATA_DIR, "train_prediction.parquet"))
)

In [14]:
train_prediction.show()

+------+------+-------+--------------------+----------+
|userId|rating|movieId|              tf_idf|prediction|
+------+------+-------+--------------------+----------+
|   148|   4.0|    260|(1024,[1,2,5,7,8,...|  3.995837|
|   148|   4.0|    589|(1024,[3,9,29,36,...| 4.0452337|
|   148|   3.5|   1196|(1024,[9,10,23,29...|  3.517842|
|   148|   3.5|   1210|(1024,[4,9,23,29,...| 3.5223696|
|   148|   2.5|   1291|(1024,[23,29,51,5...| 2.5620809|
|   148|   4.0|   2028|(1024,[3,4,29,52,...|  3.994406|
|   148|   5.0|   3300|(1024,[27,29,37,4...|  4.906541|
|   148|   1.5|   4246|(1024,[13,44,50,5...| 1.6575054|
|   148|   5.0|   4993|(1024,[4,9,10,13,...|  5.053576|
|   148|   4.5|   5445|(1024,[8,15,44,46...|  4.464959|
|   148|   4.5|   5574|(1024,[17,49,54,5...| 4.4506645|
|   148|   5.0|   5952|(1024,[9,10,13,17...| 4.9269485|
|   148|   3.5|   6373|(1024,[5,29,40,51...|  3.593023|
|   148|   4.5|   7143|(1024,[1,10,13,32...|  4.452602|
|   148|   5.0|   7153|(1024,[3,6,9,17,2...|  4.

In [15]:
def evaluate_prediction(prediction: DataFrame) -> float:
    return np.sqrt(
        prediction
        .selectExpr("""
            CASE
                WHEN prediction > 5 THEN 5
                WHEN prediction < 0.5 THEN 0.5
                ELSE prediction
            END AS prediction
        """, "rating")
        .select(
            sql_func.pow(sql_func.col("rating") - sql_func.col("prediction"), 2)
            .alias("squared_error")
        )
        .agg(sql_func.avg("squared_error"))
        .first()[0]
    )

In [16]:
evaluate_prediction(train_prediction)

0.6416910375297744

In [17]:
test_prediction = get_prediction(test_data)
(
    test_prediction.write.mode("overwrite")
    .parquet(os.path.join(DATA_DIR, "test_prediction.parquet"))
)

In [18]:
evaluate_prediction(test_prediction)

0.9859142865193938