#### ДЗ. 7

In [215]:
import pandas as pd
import numpy as np

from pyspark.ml.recommendation import ALS
from pyspark.sql import SparkSession
from pyspark.sql.types import DoubleType
import pyspark.sql.functions as sf

# для работы с матрицами
from scipy.sparse import csr_matrix, coo_matrix

# матричная факторизация
from implicit.als import AlternatingLeastSquares
from implicit.nearest_neighbours import bm25_weight, tfidf_weight

#
import os, sys
module_path = os.path.abspath(os.path.join(os.pardir))
if module_path not in sys.path:
    sys.path.append(module_path)

# Написанные нами функции
from src.metrics import precision_at_k, recall_at_k
from src.utils import prefilter_items

import warnings
warnings.filterwarnings('ignore')

In [216]:
data = pd.read_csv('../webinar_3/raw_data/retail_train.csv')
item_features = pd.read_csv('../webinar_3/raw_data/product.csv')
user_features = pd.read_csv('../webinar_3/raw_data/hh_demographic.csv')

# column processing
item_features.columns = [col.lower() for col in item_features.columns]
user_features.columns = [col.lower() for col in user_features.columns]

item_features.rename(columns={'product_id': 'item_id'}, inplace=True)
user_features.rename(columns={'household_key': 'user_id'}, inplace=True)

test_size_weeks = 3

data_train = data[data['week_no'] < data['week_no'].max() - test_size_weeks]
data_test = data[data['week_no'] >= data['week_no'].max() - test_size_weeks]

data_train.head(2)

Unnamed: 0,user_id,basket_id,day,item_id,quantity,sales_value,store_id,retail_disc,trans_time,week_no,coupon_disc,coupon_match_disc
0,2375,26984851472,1,1004906,1,1.39,364,-0.6,1631,1,0.0,0.0
1,2375,26984851472,1,1033142,1,0.82,364,0.0,1631,1,0.0,0.0


In [217]:
n_items_before = data_train['item_id'].nunique()

# data_train = prefilter_items(data_train, item_features, take_n_popular=5000)

n_items_after = data_train['item_id'].nunique()
print('Decreased # items from {} to {}'.format(n_items_before, n_items_after))

Decreased # items from 86865 to 86865


In [218]:
popularity = data.groupby('item_id')['quantity'].sum().reset_index()
popularity.rename(columns={'quantity': 'n_sold'}, inplace=True)
                   
top_5000 = popularity.sort_values('n_sold', ascending=False).head(5000).item_id.tolist()

In [219]:
# Заведем фиктивный item_id (если юзер покупал товары из топ-5000, то он "купил" такой товар)
data_train.loc[~data_train['item_id'].isin(top_5000), 'item_id'] = 999999

user_item_matrix = pd.pivot_table(data_train, 
                                  index='user_id', columns='item_id', 
                                  values='quantity', # Можно пробоват ьдругие варианты
                                  aggfunc='count', 
                                  fill_value=0
                                 )

user_item_matrix = user_item_matrix.astype(float) # необходимый тип матрицы для implicit

# переведем в формат saprse matrix
sparse_user_item = csr_matrix(user_item_matrix).tocsr()

user_item_matrix.head(2)

item_id,202291,397896,420647,480014,545926,707683,731106,818980,819063,819227,...,15926775,15926844,15926885,15926886,15926927,15927403,15927661,15927850,16809471,17105257
user_id,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1,Unnamed: 14_level_1,Unnamed: 15_level_1,Unnamed: 16_level_1,Unnamed: 17_level_1,Unnamed: 18_level_1,Unnamed: 19_level_1,Unnamed: 20_level_1,Unnamed: 21_level_1
1,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,...,0.0,1.0,0.0,0.0,2.0,0.0,0.0,0.0,0.0,0.0
2,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0


In [220]:
data_test = data_test[data_test['item_id'].isin(data_train['item_id'].unique())]
data_test.head(2)

Unnamed: 0,user_id,basket_id,day,item_id,quantity,sales_value,store_id,retail_disc,trans_time,week_no,coupon_disc,coupon_match_disc
2277417,338,41260573635,636,1037348,1,0.89,369,-0.3,112,92,0.0,0.0
2278236,2324,41297185696,636,1027569,1,1.99,334,0.0,1159,92,0.0,0.0


In [221]:
result = data_test.groupby('user_id')['item_id'].unique().reset_index()
result.columns=['user_id', 'actual']
result.head(3)

Unnamed: 0,user_id,actual
0,1,"[821867, 834484, 856942, 865456, 914190, 95804..."
1,3,"[851057, 872021, 878302, 879948, 909638, 91320..."
2,6,"[920308, 926804, 1017061, 1078346, 1120741, 82..."


In [222]:
userids = user_item_matrix.index.values
itemids = user_item_matrix.columns.values

matrix_userids = np.arange(len(userids))
matrix_itemids = np.arange(len(itemids))

id_to_itemid = dict(zip(matrix_itemids, itemids))
id_to_userid = dict(zip(matrix_userids, userids))

itemid_to_id = dict(zip(itemids, matrix_itemids))
userid_to_id = dict(zip(userids, matrix_userids))

### Spark

In [223]:
session = (
    SparkSession.builder
    .config("spark.driver.memory", "1g")
    .config("spark.sql.shuffle.partitions", "100")
    .config("spark.driver.bindAddress", "127.0.0.1")
    .config("spark.driver.host", "localhost")
    .config("spark.sql.execution.arrow.enabled", "true")
    .config("sparkContext.setLogLevel", "OFF")
    .master("local*]")
    .enableHiveSupport()
    .getOrCreate()
)

session.sparkContext.setLogLevel('ERROR')

In [224]:
session

In [225]:
data_train

Unnamed: 0,user_id,basket_id,day,item_id,quantity,sales_value,store_id,retail_disc,trans_time,week_no,coupon_disc,coupon_match_disc
0,2375,26984851472,1,1004906,1,1.39,364,-0.60,1631,1,0.0,0.0
1,2375,26984851472,1,1033142,1,0.82,364,0.00,1631,1,0.0,0.0
2,2375,26984851472,1,1036325,1,0.99,364,-0.30,1631,1,0.0,0.0
3,2375,26984851472,1,1082185,1,1.21,364,0.00,1631,1,0.0,0.0
4,2375,26984851472,1,8160430,1,1.50,364,-0.39,1631,1,0.0,0.0
...,...,...,...,...,...,...,...,...,...,...,...,...
2282320,222,41297772783,635,1120741,1,0.59,304,0.00,1716,91,0.0,0.0
2282321,462,41297773713,635,999999,1,1.99,304,0.00,2040,91,0.0,0.0
2282322,462,41297773713,635,995242,1,1.00,304,-0.89,2040,91,0.0,0.0
2282323,462,41297773713,635,10180324,1,3.00,304,-0.29,2040,91,0.0,0.0


In [226]:
# data_train['item_idx'] = data_train['item_id'].map(lambda x: itemid_to_id[x])
# data_train['user_idx'] = data_train['user_id'].map(lambda x: userid_to_id[x])

In [227]:
spark_data_train = session.createDataFrame(data_train[['user_id', 'item_id', 'quantity']])

In [228]:
spark_data_train = spark_data_train.withColumnRenamed('quantity', 'relevance')

In [229]:
spark_data_train.show(10)

+-------+-------+---------+
|user_id|item_id|relevance|
+-------+-------+---------+
|   2375|1004906|        1|
|   2375|1033142|        1|
|   2375|1036325|        1|
|   2375|1082185|        1|
|   2375|8160430|        1|
|   2375| 826249|        2|
|   2375| 999999|        1|
|   2375|1085983|        1|
|   2375| 999999|        1|
|   2375| 999999|        1|
+-------+-------+---------+
only showing top 10 rows



In [230]:
model = ALS(
    rank=30,
    userCol='user_id',
    itemCol='item_id',
    ratingCol='relevance',
    maxIter=10,
    alpha=1.0,
    regParam=0.1,
    implicitPrefs=True,
    seed=42,
    coldStartStrategy='drop',
    ).fit(spark_data_train)

                                                                                

In [231]:
recs_als = model.recommendForAllUsers(6)

In [232]:
recs_als.show()



+-------+--------------------+
|user_id|     recommendations|
+-------+--------------------+
|      4|[{999999, 1.56638...|
|      7|[{999999, 1.74371...|
|      8|[{999999, 1.79822...|
|     10|[{999999, 1.09234...|
|     23|[{999999, 1.77283...|
|     26|[{999999, 1.64601...|
|     27|[{999999, 1.73402...|
|     28|[{999999, 1.73532...|
|     31|[{999999, 1.70242...|
|     34|[{999999, 1.35568...|
|     39|[{999999, 1.77345...|
|     44|[{999999, 1.59723...|
|     49|[{6544236, 1.8885...|
|     51|[{999999, 1.67057...|
|     53|[{999999, 1.71598...|
|     55|[{999999, 1.75768...|
|     59|[{999999, 1.55759...|
|     63|[{999999, 1.51729...|
|     65|[{999999, 1.75027...|
|     69|[{999999, 1.67838...|
+-------+--------------------+
only showing top 20 rows



                                                                                

In [233]:
recs_als = (recs_als.withColumn(
                        'recommendations', sf.explode('recommendations')
                                )
            .withColumn('item_id', sf.col('recommendations.item_id'))
            .withColumn(
                        'relevance', sf.col('recommendations.rating').cast(DoubleType()),
            )
            .select('user_id', 'item_id', 'relevance')
           )

In [234]:
recs_als.show()



+-------+-------+------------------+
|user_id|item_id|         relevance|
+-------+-------+------------------+
|      4| 999999|1.5663890838623047|
|      4|1082185|1.3677072525024414|
|      4|1029743|1.2358711957931519|
|      4| 995242|1.1148192882537842|
|      4|1106523|1.1012330055236816|
|      4| 981760|1.0215811729431152|
|      7| 999999| 1.743717908859253|
|      7|1082185|1.6265090703964233|
|      7|1029743|1.4299474954605103|
|      7| 995242|1.3118711709976196|
|      7|1106523|1.2830193042755127|
|      7| 981760|1.2775311470031738|
|      8| 999999|1.7982242107391357|
|      8|1082185|1.7066829204559326|
|      8|1029743|1.6206679344177246|
|      8|1106523| 1.494848608970642|
|      8| 981760|1.4861304759979248|
|      8| 883404|1.4012715816497803|
|     10| 999999|1.0923452377319336|
|     10|1082185|0.9284350275993347|
+-------+-------+------------------+
only showing top 20 rows



                                                                                

#### Задание
1. Выбрать по 6 предсказаний для пользователей
2. Удалить фейковый элемент
3. Оставить 5 отсортированных предсказаний для каждого пользователя
4. Посчитать метрику (map@5, precision@5)

In [235]:
user_idx = pd.DataFrame(data_train['user_id'].unique())

In [236]:
result_spark = recs_als.toPandas()

                                                                                

In [237]:
result_spark.head(5)

Unnamed: 0,user_id,item_id,relevance
0,4,999999,1.566389
1,4,1082185,1.367707
2,4,1029743,1.235871
3,4,995242,1.114819
4,4,1106523,1.101233


In [238]:
result_spark.shape

(14994, 3)

In [239]:
result_spark.drop(result_spark.loc[result_spark['item_id']==999999].index, inplace=True)

In [240]:
def spark_recomend(data, usr_id):
    return data.loc[data['user_id']==usr_id].head(5).\
                            sort_values('relevance', ascending=False)['item_id'].tolist()

In [241]:
spark_recomend(result_spark, 1), spark_recomend(result_spark, 10)

([1082185, 1029743, 981760, 995242, 862349],
 [1082185, 1029743, 6534178, 995242, 1106523])

In [242]:
result['spark_rec'] = result.apply(lambda x: spark_recomend(result_spark, x['user_id']), axis=1)

In [243]:
result

Unnamed: 0,user_id,actual,spark_rec
0,1,"[821867, 834484, 856942, 865456, 914190, 95804...","[1082185, 1029743, 981760, 995242, 862349]"
1,3,"[851057, 872021, 878302, 879948, 909638, 91320...","[1029743, 1106523, 1082185, 951590, 883404]"
2,6,"[920308, 926804, 1017061, 1078346, 1120741, 82...","[1082185, 1026118, 1051516, 854852, 878996]"
3,7,"[840386, 889774, 898068, 909714, 953476, 97699...","[1082185, 1029743, 995242, 1106523, 981760]"
4,8,"[835098, 872137, 910439, 924610, 1041259, 5569...","[1082185, 1029743, 1106523, 981760, 883404]"
...,...,...,...
1970,2496,[6534178],"[1082185, 1029743, 1106523, 981760, 904360]"
1971,2497,"[1016709, 845294, 871756, 873654, 1023226, 106...","[1082185, 1029743, 904360, 981760, 1106523]"
1972,2498,"[834484, 901776, 914190, 1070820, 903567, 8502...","[480014, 1082185, 1404121, 1029743, 1126899]"
1973,2499,"[867188, 902396, 914190, 951590, 958137, 97699...","[1082185, 1029743, 1106523, 883404, 826249]"


In [244]:
prec = result['user_id'].apply(lambda x: 
                        precision_at_k(result.loc[result['user_id']==x, 
                                                              'spark_rec'].squeeze(), 
                                       result.loc[result['user_id']==x, 
                                                              'actual'].squeeze(), 
                                       k=5)).mean()
map_k = result['user_id'].apply(lambda x: 
                                ap_k(result.loc[result['user_id']==x, 
                                                              'spark_rec'].squeeze(), 
                                       result.loc[result['user_id']==x, 
                                                              'actual'].squeeze(), 
                                       k=5)).mean()

print(f'precision@5:{prec:10.5f}\nmap@5:{map_k:10.5f}')

precision@5:   0.22734
map@5:   0.43719
