# Домашнее задание №7

Установите pySpark, и попробуйте подобрать параметры для ALS.

1. Выбрать по 6 предсказаний для пользователей
2. Удалить фейковый элемент
3. Оставить 5 остортированных предсказаний для каждого пользователя
4. Посчитать метрику (map@5, precision@5)

Определите сильно ли отличается качество ALS из implicit и pyspark, сравнивайте по метрикам map@5, precision@5

### Финальный проект

Мы уже прошли всю необходимую теорию для финального проекта. Проект осуществляется на данных из вебинара (данные считаны в начале ДЗ).
Рекомендуем вам **начать делать проект сразу после этого домашнего задания**
- Целевая метрика - map@5. Порог для уcпешной сдачи проекта map@5 > 20%
- НЕ обязательно, но крайне желательно использовать 2-ух уровневые рекоммендательные системы в проекте, идеальный вариант на первом уровне использовать Spark
- Вы сдаете код проекта и csv файл с рекомендациями 

In [1]:
import pandas as pd
import numpy as np
import os, sys
import warnings

warnings.filterwarnings('ignore')

module_path = os.path.abspath(os.path.join(os.pardir))
if module_path not in sys.path:
    sys.path.append(module_path)
    
# Для работы pyspark с JAVA.JDK и работы самого pyspark
os.environ["JAVA_HOME"] = "C:\Program Files\Java\jdk-19"
os.environ["PATH"] = os.environ["JAVA_HOME"] + "\bin:" + os.environ["PATH"]
os.environ['PYSPARK_DRIVER_PYTHON_OPTS']= "notebook"
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable
os.environ['PYSPARK_PYTHON'] = sys.executable

# Для работы с pyspark
import pyspark.sql.functions as sf
from pyspark.ml.recommendation import ALS
from pyspark.sql import SparkSession
from pyspark.sql.types import DoubleType

# Для работы с матрицами
from scipy.sparse import csr_matrix, coo_matrix

# Матричная факторизация
from implicit.als import AlternatingLeastSquares
from implicit.nearest_neighbours import bm25_weight, tfidf_weight

# Импорт MainRecommender, prefilter_items, precision_at_k, recall_at_k
from src.utils import prefilter_items
from src.metrics import precision_at_k, recall_at_k, reciprocal_rank_at_k, ndcg_at_k, ap_k_2 
from src.recommenders import MainRecommender

In [2]:
data = pd.read_csv('../Lesson_2/retail_train.csv')
item_features = pd.read_csv('../Lesson_2/product.csv')
user_features = pd.read_csv('../Lesson_2/hh_demographic.csv')

# column processing
item_features.columns = [col.lower() for col in item_features.columns]
user_features.columns = [col.lower() for col in user_features.columns]

item_features.rename(columns={'product_id': 'item_id'}, inplace=True)
user_features.rename(columns={'household_key': 'user_id'}, inplace=True)

# train test split
test_size_weeks = 3

data_train = data[data['week_no'] < data['week_no'].max() - test_size_weeks]
data_test = data[data['week_no'] >= data['week_no'].max() - test_size_weeks]

data_train.head(2)

Unnamed: 0,user_id,basket_id,day,item_id,quantity,sales_value,store_id,retail_disc,trans_time,week_no,coupon_disc,coupon_match_disc
0,2375,26984851472,1,1004906,1,1.39,364,-0.6,1631,1,0.0,0.0
1,2375,26984851472,1,1033142,1,0.82,364,0.0,1631,1,0.0,0.0


In [3]:
n_items_before = data_train['item_id'].nunique()

data_train = prefilter_items(data_train, 5000,item_features)

n_items_after = data_train['item_id'].nunique()
print('Decreased # items from {} to {}'.format(n_items_before, n_items_after))

Decreased # items from 86865 to 5001


In [4]:
popularity = data_train.groupby('item_id')['quantity'].sum().reset_index()
popularity.rename(columns={'quantity': 'n_sold'}, inplace=True)

top_5000 = popularity.sort_values('n_sold', ascending=False).head(5000).item_id.tolist()

In [5]:
data_train.loc[~data_train['item_id'].isin(top_5000), 'item_id'] = 999999

In [6]:
# Заведем фиктивный item_id (если юзер покупал товары из топ-5000, то он "купил" такой товар)
# data_train.loc[~data_train['item_id'].isin(top_5000), 'item_id'] = 999999

user_item_matrix = pd.pivot_table(data_train, 
                                  index='user_id', columns='item_id', 
                                  values='quantity', # Можно пробоват ьдругие варианты
                                  aggfunc='count', 
                                  fill_value=0
                                 )

user_item_matrix = user_item_matrix.astype(float) # необходимый тип матрицы для implicit

# переведем в формат saprse matrix
sparse_user_item = csr_matrix(user_item_matrix).tocsr()

user_item_matrix.head(2)

item_id,117847,818981,819255,819308,819400,819487,819590,819594,819840,819845,...,15926775,15926844,15926886,15972074,15972298,15972565,15972790,16100266,16729299,16729415
user_id,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1,Unnamed: 14_level_1,Unnamed: 15_level_1,Unnamed: 16_level_1,Unnamed: 17_level_1,Unnamed: 18_level_1,Unnamed: 19_level_1,Unnamed: 20_level_1,Unnamed: 21_level_1
1,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,...,0.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
2,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0


In [7]:
data_test = data_test[data_test['item_id'].isin(data_train['item_id'].unique())]

In [8]:
result = data_test.groupby('user_id')['item_id'].unique().reset_index()
result.columns=['user_id', 'actual']
result.head(2)

Unnamed: 0,user_id,actual
0,1,"[856942, 865456, 951954, 971585, 979707, 99065..."
1,3,[920626]


In [9]:
userids = user_item_matrix.index.values
itemids = user_item_matrix.columns.values

matrix_userids = np.arange(len(userids))
matrix_itemids = np.arange(len(itemids))

id_to_itemid = dict(zip(matrix_itemids, itemids))
id_to_userid = dict(zip(matrix_userids, userids))

itemid_to_id = dict(zip(itemids, matrix_itemids))
userid_to_id = dict(zip(userids, matrix_userids))

### SparkSession

In [10]:
session = (
        SparkSession.builder.config("spark.driver.memory", "1g")
        .config("spark.sql.shuffle.partitions", "100")
        .config("spark.driver.bindAddress", "127.0.0.1")
        .config("spark.driver.host", "localhost")
        .master("local[*]")
        .enableHiveSupport()
        .getOrCreate()
    )

In [11]:
session

In [12]:
spark_data_train=session.createDataFrame(data_train[["user_id","item_id","quantity"]])

In [13]:
spark_data_train= spark_data_train.withColumnRenamed("quantity","relevance")

In [14]:
spark_data_train.show(10)

+-------+-------+---------+
|user_id|item_id|relevance|
+-------+-------+---------+
|   2375|1085983|        1|
|   1364| 999999|        1|
|   1364| 999999|        1|
|   1364| 999999|        1|
|   1364| 937406|        1|
|   1172| 999999|        1|
|   1172| 999999|        1|
|   1172|1000493|        1|
|   1172| 999999|        1|
|   1172|1075214|        1|
+-------+-------+---------+
only showing top 10 rows



In [15]:
model = ALS(
            rank=30,
            userCol="user_id",
            itemCol="item_id",
            ratingCol="relevance",
            maxIter = 10,
            alpha = 1.0,
            regParam = 0.1,
            implicitPrefs=True,
            seed=42,
            coldStartStrategy="drop",
        ).fit(spark_data_train)

In [16]:
model.userFactors.show()

+---+--------------------+
| id|            features|
+---+--------------------+
| 10|[-0.0830877, -0.3...|
| 20|[0.018859226, -0....|
| 30|[-0.12530617, -0....|
| 40|[-0.059800778, -0...|
| 50|[-0.104898944, -0...|
| 60|[-0.12452123, -0....|
| 70|[-0.046614755, -0...|
| 80|[-0.19489606, -0....|
| 90|[-0.06411172, -0....|
|100|[-0.118710324, -0...|
|110|[-0.12317703, -0....|
|120|[-0.10826519, -0....|
|130|[-0.15873677, -0....|
|140|[-0.2022574, -0.4...|
|150|[-0.066191465, -0...|
|160|[-0.13881871, -0....|
|170|[-0.072075315, -0...|
|180|[-0.033121612, -0...|
|190|[-0.09017785, -0....|
|200|[-0.16803524, -0....|
+---+--------------------+
only showing top 20 rows



In [17]:
recs_als = model.recommendForAllUsers(6)

In [18]:
recs_als.count()

2497

In [19]:
recs_als.show()

+-------+--------------------+
|user_id|     recommendations|
+-------+--------------------+
|      4|[{999999, 1.47513...|
|      7|[{999999, 1.80322...|
|      8|[{999999, 1.77219...|
|     23|[{999999, 1.78213...|
|     26|[{999999, 1.57036...|
|     27|[{963686, 1.73333...|
|     28|[{999999, 1.77851...|
|     31|[{999999, 1.74919...|
|     34|[{999999, 1.35183...|
|     39|[{999999, 1.74897...|
|     44|[{999999, 1.52943...|
|     49|[{999999, 1.77759...|
|     51|[{999999, 1.69727...|
|     53|[{999999, 1.79796...|
|     55|[{999999, 1.64983...|
|     59|[{999999, 1.61712...|
|     63|[{999999, 1.62612...|
|     65|[{999999, 1.80919...|
|     69|[{999999, 1.66811...|
|     76|[{999999, 1.36989...|
+-------+--------------------+
only showing top 20 rows



In [20]:
recs_als=(recs_als.withColumn(
                    "recommendations", sf.explode("recommendations")
                )
                .withColumn("item_id", sf.col("recommendations.item_id"))
                .withColumn(
                    "relevance",
                    sf.col("recommendations.rating").cast(DoubleType()),
                )
                .select("user_id", "item_id", "relevance")
            )

In [21]:
recs_als.show()

+-------+-------+------------------+
|user_id|item_id|         relevance|
+-------+-------+------------------+
|      4| 999999|1.4751384258270264|
|      4|1029743| 1.166101098060608|
|      4|1106523| 1.019912838935852|
|      4|5569230|0.9501171112060547|
|      4|1075368| 0.893861711025238|
|      4| 916122|0.7827973365783691|
|      7| 999999|1.8032265901565552|
|      7|1029743|1.4123412370681763|
|      7|1106523|1.2852665185928345|
|      7| 916122|1.0509498119354248|
|      7|1126899|0.9857137203216553|
|      7| 866211|0.9066903591156006|
|      8| 999999|1.7721924781799316|
|      8|1029743| 1.498128890991211|
|      8|1106523|1.3509629964828491|
|      8| 916122| 1.225055456161499|
|      8|5569230| 1.180891513824463|
|      8| 844179|1.1787959337234497|
|     23| 999999|1.7821309566497803|
|     23|1029743|1.3663578033447266|
+-------+-------+------------------+
only showing top 20 rows



In [22]:
recs_all = pd.concat([recs_als.toPandas().groupby('user_id')['relevance'].unique().reset_index(), 
                        pd.DataFrame(pd.DataFrame(recs_als.toPandas().groupby('user_id')['item_id'].
                                                  unique().reset_index()['item_id'].values.tolist()).
                                     drop(0, axis=1).apply(lambda x: list(x), axis=1), columns=['rec_item'])], 
                       axis=1).reindex(columns=['user_id', 'rec_item', 'relevance'])

In [23]:
recs_all

Unnamed: 0,user_id,rec_item,relevance
0,1,"[856942, 1029743, 940947, 1070820, 5569374]","[1.754142165184021, 1.4877140522003174, 1.3815..."
1,2,"[1029743, 1106523, 5569230, 916122, 844179]","[1.7930480241775513, 1.45193612575531, 1.34983..."
2,3,"[1106523, 1029743, 1044078, 5569230, 844179]","[1.5328633785247803, 1.3596229553222656, 1.357..."
3,4,"[1029743, 1106523, 5569230, 1075368, 916122]","[1.4751384258270264, 1.166101098060608, 1.0199..."
4,5,"[1029743, 1106523, 916122, 5569230, 1126899]","[1.7350740432739258, 1.242747187614441, 1.0607..."
...,...,...,...
2492,2496,"[1029743, 1106523, 844179, 1044078, 916122]","[1.7483608722686768, 1.5411415100097656, 1.468..."
2493,2497,"[1029743, 1106523, 5569230, 899624, 5569471]","[1.7116905450820923, 1.5871490240097046, 1.471..."
2494,2498,"[1029743, 1106523, 916122, 1070820, 5569230]","[1.7716944217681885, 1.3768523931503296, 1.147..."
2495,2499,"[1029743, 1106523, 5569230, 1044078, 916122]","[1.7756187915802002, 1.51124107837677, 1.39539..."


In [24]:
result = pd.merge(result, recs_all, on='user_id')

In [25]:
result

Unnamed: 0,user_id,actual,rec_item,relevance
0,1,"[856942, 865456, 951954, 971585, 979707, 99065...","[856942, 1029743, 940947, 1070820, 5569374]","[1.754142165184021, 1.4877140522003174, 1.3815..."
1,3,[920626],"[1106523, 1029743, 1044078, 5569230, 844179]","[1.5328633785247803, 1.3596229553222656, 1.357..."
2,6,"[1104227, 825541, 870315, 956672, 6979393, 948...","[1026118, 1029743, 854852, 878996, 1106523]","[1.7849090099334717, 1.494471549987793, 1.4691..."
3,7,"[898068, 909714, 993838, 1082185, 1106523, 110...","[1029743, 1106523, 916122, 1126899, 866211]","[1.8032265901565552, 1.4123412370681763, 1.285..."
4,8,"[835098, 872137, 5569230, 5569471, 13071586, 8...","[1029743, 1106523, 916122, 5569230, 844179]","[1.7721924781799316, 1.498128890991211, 1.3509..."
...,...,...,...,...
1910,2494,"[859075, 5568729]","[1029743, 1106523, 916122, 5569230, 1126899]","[1.6786227226257324, 1.295777678489685, 1.0958..."
1911,2497,"[1016709, 871756, 1068719, 1115187, 1134222, 1...","[1029743, 1106523, 5569230, 899624, 5569471]","[1.7116905450820923, 1.5871490240097046, 1.471..."
1912,2498,"[901776, 958382, 972437, 1070820, 1022066, 978...","[1029743, 1106523, 916122, 1070820, 5569230]","[1.7716944217681885, 1.3768523931503296, 1.147..."
1913,2499,"[902396, 951590, 1060872, 1106091, 1119993, 55...","[1029743, 1106523, 5569230, 1044078, 916122]","[1.7756187915802002, 1.51124107837677, 1.39539..."


In [26]:
print(f'precision@k = ' 
      f'{result.apply(lambda row: precision_at_k(row["rec_item"],row["actual"], k=5), axis=1).mean()}\n',
      f'recall@k = {result.apply(lambda row: recall_at_k(row["rec_item"], row["actual"], k=5), axis=1).mean()}\n', 
      f'map@k = {result.apply(lambda row: ap_k_2(row["rec_item"], row["actual"], k=5), axis=1).mean()}\n',
      f'ndcg@k = {result.apply(lambda row: ndcg_at_k(row["rec_item"], row["actual"], k=5), axis=1).mean()}')

precision@k = 0.17221932114882307
 recall@k = 0.056687276322194696
 map@k = 0.025716752017380506
 ndcg@k = 0.18903675706226686


In [27]:
recommender = MainRecommender(data_train)

  0%|          | 0/15 [00:00<?, ?it/s]

  0%|          | 0/5000 [00:00<?, ?it/s]

In [28]:
bm25_user_item_matrix = bm25_weight(user_item_matrix).tocsr()
tfidf_user_item_matrix = tfidf_weight(user_item_matrix).tocsr()

In [29]:
%%time

recommender.model = AlternatingLeastSquares(factors=350, 
                                regularization=0.05,
                                iterations=1, 
                                calculate_training_loss=True, 
                                num_threads=4,
                                random_state=42) # K - кол-во билжайших соседей

recommender.model.fit(tfidf_user_item_matrix,  # На вход item-user matrix
          show_progress=True)

result['als_tfidf'] = result['user_id'].map(lambda x: recommender.get_als_recommendations(x, N=5))

result.apply(lambda row: precision_at_k(row['als_tfidf'], row['actual']), axis=1).mean()

  0%|          | 0/1 [00:00<?, ?it/s]

CPU times: total: 40.5 s
Wall time: 6.41 s


0.22266318537858706

In [30]:
%%time

recommender.model = AlternatingLeastSquares(factors=350, 
                                regularization=0.05,
                                iterations=1, 
                                calculate_training_loss=True, 
                                num_threads=4,
                                random_state=42) # K - кол-во билжайших соседей

recommender.model.fit(csr_matrix(bm25_user_item_matrix).tocsr(), 
                      show_progress=True)

result['als_bm25'] = result['user_id'].map(lambda x: recommender.get_als_recommendations(x, N=5))

result.apply(lambda row: precision_at_k(row['als_bm25'], row['actual']), axis=1).mean()

  0%|          | 0/1 [00:00<?, ?it/s]

CPU times: total: 41.4 s
Wall time: 6.42 s


0.22997389033942284

In [31]:
print(f'pyspark_precision@k = ' 
      f'{result.apply(lambda row: precision_at_k(row["rec_item"],row["actual"], k=5), axis=1).mean()}\n',
      f'pyspark_recall@k = '
      f'{result.apply(lambda row: recall_at_k(row["rec_item"], row["actual"], k=5), axis=1).mean()}\n', 
      f'pyspark_map@k = '
      f'{result.apply(lambda row: ap_k_2(row["rec_item"], row["actual"], k=5), axis=1).mean()}\n',
      f'pyspark_ndcg@k = '
      f'{result.apply(lambda row: ndcg_at_k(row["rec_item"], row["actual"], k=5), axis=1).mean()}\n\n\n')
print(f'implicit_precision@k = ' 
      f'{result.apply(lambda row: precision_at_k(row["als_bm25"],row["actual"], k=5), axis=1).mean()}\n',
      f'implicit_recall@k = '
      f'{result.apply(lambda row: recall_at_k(row["als_bm25"], row["actual"], k=5), axis=1).mean()}\n',  
      f'implicit_map@k = '
      f'{result.apply(lambda row: ap_k_2(row["als_bm25"], row["actual"], k=5), axis=1).mean()}\n',
      f'implicit_ndcg@k = '
      f'{result.apply(lambda row: ndcg_at_k(row["als_bm25"], row["actual"], k=5), axis=1).mean()}')

pyspark_precision@k = 0.17221932114882307
 pyspark_recall@k = 0.056687276322194696
 pyspark_map@k = 0.025716752017380506
 pyspark_ndcg@k = 0.18903675706226686



implicit_precision@k = 0.22997389033942284
 implicit_recall@k = 0.08414273284902231
 implicit_map@k = 0.043079187075706744
 implicit_ndcg@k = 0.2501779670713585


### pyspark.ALS на много хуже чем implicit.als с весами bm25.

##### А с учетом того времени, которое было затрачено на то чтобы запустить _pyspark_ на jupyter notebook, 
##### и заставить работать pyspark 3.3.x c JDK 19, _implicit_ значительно выигрывает в условиях данной задачи.