## Dependencies

In [1]:
import os
import pickle
import scipy
import pandas as pd
import numpy as np
import implicit
from pyspark import sql, SparkConf, SparkContext
from tqdm import tqdm_notebook

## Tools

In [2]:
# add zeros to k items length
def add_to_k(lst, k):
    return lst + [0] * max(k - len(lst), 0)

# precision at k
def precision_at_k(r_true_arr, k):
    return np.sum(r_true_arr[:k]) / k


# average precision at k
def average_precision_at_k(r_true_arr, k):
    apk = 0
    for n in range(0, k):
        apk += precision_at_k(r_true_arr, n + 1) * r_true_arr[n]
    if np.sum(r_true_arr[:k]) != 0:
        return (apk) / k
    else:
        return 0


# average normed precision at k
def average_normed_precision_at_k(r_true_arr, k, n_true):
    apk = 0
    apk_ideal = n_true / k
    
    for n in range(0, k):
        apk += precision_at_k(r_true_arr, n + 1) * r_true_arr[n]
    if np.sum(r_true_arr[:k]) != 0:
        return ((apk) / k) / apk_ideal
    else:
        return 0

In [3]:
def enumerated_dict(values):
    enum_dict = {}
    reverse_dict = {}
    
    for n, value in enumerate(values):
        enum_dict[value] = n
        reverse_dict[n] = value
        
    return enum_dict, reverse_dict


def predict_user(model, user_id, products, product_dict, reverse_product_dict, matrix_shape):
    enum_clients = np.zeros(len(products))
    enum_products = np.array([product_dict[product] for product in products])

    sparse_matrix = scipy.sparse.csr_matrix((np.ones(shape=(len(enum_clients))), 
                                             (enum_clients, enum_products)), 
                                            shape=matrix_shape)
    
    rec = model.recommend(0, sparse_matrix, N=30, recalculate_user=True,
                     filter_already_liked_items=False)
    
    return [[user_id, reverse_product_dict[r[0]]] for r in rec]

## Spark preparation

In [4]:
os.environ['PYSPARK_PYTHON'] = '/usr/bin/python3'

In [5]:
conf = SparkConf().setAppName("Read_CSV").setAll([('spark.executor.memory', '6g'), ('spark.executor.cores', '7'), ('spark.cores.max', '7'), ('spark.driver.memory','16g'), ('spark.driver.maxResultSize','2g')])
sc = SparkContext(conf=conf)
sqlContext = sql.SQLContext(sc)

## DATA preparations

In [6]:
# load purchases
purchases = sqlContext.read.format("csv").option("header", "true").load("data/purchases.csv")

In [7]:
# будем рекомендовать только "современное"
contemporary_items = purchases.select(purchases.product_id)\
                                .where(purchases.transaction_datetime < "2019-02-15 00:00:00")\
                                .distinct()

contemporary_items = [row['product_id'] for row in contemporary_items.collect()]

purchases = purchases.filter(purchases.product_id.isin(contemporary_items))

In [8]:
# выберем только пользователей с более, чем одной транзакцией
transactions_cnt = purchases\
                    .groupby("client_id")\
                    .count()

transactions_cnt = transactions_cnt.filter(transactions_cnt["count"] > 1)
multi_trans_users = [row['client_id'] for row in transactions_cnt.collect()]

In [9]:
# Train/Test split
test_users = np.random.choice(multi_trans_users, 1).tolist()

test = purchases.filter(purchases.client_id.isin(test_users))

train = purchases.filter(~purchases.client_id.isin(test_users))



In [10]:
client_dic = [row['client_id'] for row in purchases.select(purchases.client_id).distinct().collect()]

In [11]:
product_dic = [row['product_id'] for row in purchases.select(purchases.product_id).distinct().collect()]

In [12]:
# клиенты только из train, а продукты из всего набора данных
client_dict, reverse_client_dict = enumerated_dict(client_dic)

product_dict, reverse_product_dict = enumerated_dict(product_dic)

In [13]:
# сохраняем словари, чтобы была возможность создать матрицу
with open("x5_dic.pkl", "wb") as f:
    pickle.dump((client_dict, reverse_client_dict, 
                 product_dict, reverse_product_dict), f)

In [14]:
enum = train.select(train.client_id)

In [15]:
enum_pr = train.select(train.product_id)

In [16]:
enum_clients = np.concatenate(
                    enum.select("client_id").rdd.glom().map(
                      lambda x: np.array([client_dict[elem[0]] for elem in x]))
                    .collect())

In [17]:
enum_products = np.concatenate(
                    enum_pr.select("product_id").rdd.glom().map(
                      lambda x: np.array([product_dict[elem[0]] for elem in x]))
                    .collect())

In [18]:
# Определим размер матрицы
matrix_shape = (max(reverse_client_dict.keys()) + 1, max(reverse_product_dict.keys()) + 1)

In [19]:
sparse_matrix = scipy.sparse.coo_matrix((np.ones(shape=(len(enum_clients))), 
                                         (enum_clients, enum_products)), 
                                        shape=matrix_shape)


## Train 

In [20]:
# Initialize model
model = implicit.nearest_neighbours.TFIDFRecommender(K=1)

# Fit model
model.fit((sparse_matrix.T))

HBox(children=(FloatProgress(value=0.0, max=40389.0), HTML(value='')))




In [21]:
out_dir = "./tmp/implicit/"
os.makedirs(out_dir, exist_ok=True)
print("Dump model to " + out_dir)
pickle.dump(model, open(out_dir + "/model.pkl", "wb"))


Dump model to ./tmp/implicit/


## Validation

In [22]:
model = pickle.load(open("./tmp/implicit/model.pkl", "rb"))

In [23]:
with open("x5_dic.pkl", "rb") as f:
    (
        client_dict,
        reverse_client_dict,
        product_dict,
        reverse_product_dict,
    ) = pickle.load(f)

In [24]:
test_data = test.toPandas()

In [25]:
last_transactions = test_data.drop_duplicates(subset="client_id", keep="last")["transaction_id"]
test_validation = test_data[test_data["transaction_id"].isin(last_transactions)]
test_data = test_data[~test_data["transaction_id"].isin(last_transactions)]

In [26]:
# Определим размер матрицы
matrix_shape = (max(reverse_client_dict.keys()) + 1, max(reverse_product_dict.keys()) + 1)

# Рекомендации для отсутствующих пользователей
recommendations = []

for test_client in tqdm_notebook(test_data["client_id"].unique()):
    products = test_data[test_data["client_id"]==test_client]["product_id"]
    rec = predict_user(model, test_client, products, product_dict, reverse_product_dict,
                       (1, matrix_shape[1]))
    recommendations.extend(rec)

Please use `tqdm.notebook.tqdm` instead of `tqdm.tqdm_notebook`
  import sys


HBox(children=(FloatProgress(value=0.0, max=1.0), HTML(value='')))




In [27]:
# датафрейм с покупками в реальности
reality = test_validation[["client_id", "product_id"]].copy()
reality.loc[:, "is_buyed"] = 1

In [28]:
rec_df = pd.DataFrame(recommendations, columns=["client_id", "product_id"])\
            .merge(reality, 
                   on=["client_id", "product_id"], 
                   how="left", 
                   sort=False)\
            .fillna(0)

In [29]:
# словарь с количеством покупок на валидации
real_dict = reality.groupby(by="client_id")["is_buyed"].sum().to_dict()

In [30]:
np.mean([precision_at_k(i, 30) for i in 
         rec_df.groupby(by="client_id", sort=False)["is_buyed"].apply(list)])

0.0

In [31]:
np.mean([average_precision_at_k(add_to_k(i, 30), 30) for client, i in 
         rec_df.groupby(by="client_id")["is_buyed"].apply(list).reset_index().values])

0.0

In [32]:
np.mean([average_normed_precision_at_k(add_to_k(i, 30), 30, real_dict.get(client, 0)) for client, i in 
         rec_df.groupby(by="client_id")["is_buyed"].apply(list).reset_index().values])

0.0