# 0. 設定 Spark

In [1]:
spark

In [2]:
import pandas as pd
import pyspark.pandas as ps
import numpy as np
import gc
import pyspark.sql.functions as sf
from pyspark.context import SparkContext
from pyspark.sql.functions import array_max
from pyspark.ml.feature import StringIndexer, IndexToString
from pyspark.pandas.config import set_option, reset_option
from dateutil.relativedelta import relativedelta

In [3]:
spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", True)
ps.set_option("compute.default_index_type", "distributed")
set_option("compute.ops_on_diff_frames", True)

In [4]:
spark.conf.get("spark.kryoserializer.buffer.max")

'512m'

In [5]:
spark.conf.get("spark.yarn.appMasterEnv.SURPRISE_DATA_FOLDER")

'/home/hadoop'

In [6]:
spark.conf.get("spark.executorEnv.SURPRISE_DATA_FOLDER")

'/home/hadoop'

In [7]:
TRAIN_PERIOD = 30

# 1. 讀取檔案 => tran_ps

In [None]:
# 讀取檔案
# customers = ps.read_parquet('/user/HM_parquet/customers.parquet')
# articles = ps.read_parquet('/user/HM_parquet/articles.parquet')
tran_ps = ps.read_parquet('/user/HM_parquet/transactions_train.parquet').drop(['price', 'sales_channel_id'], axis=1)

In [None]:
tran_ps.set_index('t_dat',inplace=True)
tran_ps['start_test'] = ''
tran_ps['split_id'] = ''
tran_ps.head(5)

# 2. 作時間分割 tran_ps => split_data, split_id

In [None]:
def split(data,train_period=30, test_period=7, stride=30,show_progress=False):
    
    split_data = ps.DataFrame(columns = ['t_dat','customer_id', 'article_id', 'split_id', 'start_test']).set_index('t_dat',inplace=True)

    end_test = data.index.max()
    start_test = end_test - relativedelta(days=test_period)
    start_train = start_test - relativedelta(days=train_period)
    split_id=0

    while start_train >= data.index.min():

        df = data.loc[start_train:end_test]
        df['start_test']=start_test
        df['split_id'] = split_id
        split_data = ps.concat([split_data,df])

        if(show_progress):
            print("Split_id:",split_id,", Train period:",start_train,"-" , start_test, ", test period", start_test, "-", end_test)

        # update dates:
        end_test = end_test - relativedelta(days=stride)
        start_test = end_test - relativedelta(days=test_period)
        start_train = start_test - relativedelta(days=train_period)
        split_id += 1
    
    return split_data, split_id

In [None]:
split_data, split_id = split(tran_ps,TRAIN_PERIOD,7,30,True)

In [None]:
split_data.reset_index(inplace=True)

In [None]:
split_data.count()

In [None]:
split_data.head(5)

# 3. 製作參數表 split_id => para_cross_split

In [None]:
# 製作參數表 paras_grid
from itertools import product

paras = list(
    product(
        [25,50,100,150,200],
        [20,30,40,50],
        [0.01]
    )
)
paras_grid = ps.DataFrame(paras,columns= ['n_factors','n_epochs','reg_all'])
paras_grid.count()

In [None]:
# 製作 split_id 表
split_id_ps = ps.DataFrame({'split_id': range(split_id)})
split_id_ps.count()

In [None]:
# 將 paras_grid 與 split_id 做 cross join
paras_grid['key'] = 1
split_id_ps['key'] = 1

para_cross_split = ps.merge(paras_grid, split_id_ps, on ='key').drop('key')
del split_id_ps
len(para_cross_split)

In [None]:
# 將 cross join 後的表新增遞增的 group_id 欄位，之後要用來做 pandas_udf 的 groupby
para_cross_split['group_id'] = 0
para_cross_split['group_id'] = np.arange(len(para_cross_split)).tolist()
para_cross_split

In [None]:
# para_cross_split.to_parquet('/user/HM_parquet/SVD_model/para_cross_split.parquet')

# 4. join 參數表和資料表 split_data, para_cross_split => join_data

In [None]:
join_data = split_data.join(para_cross_split.set_index('split_id'), on='split_id')
join_data.set_index('t_dat',inplace=True)

In [None]:
join_data.head(5)

In [None]:
39919883*20

In [None]:
join_data.count()

In [None]:
# join_data.to_parquet('/user/HM_parquet/SVD_model/join_data30.parquet')

In [None]:
# 刪除用不到的資料表
del split_data, para_cross_split
gc.collect()

# -----------------------------

# 讀取資料表

In [8]:
df = spark.read.option('header','true').parquet('/user/HM_parquet/SVD_model/join_data30.parquet')
# join_data.set_index('t_dat',inplace=True)
df.count()

                                                                                

798397660

In [None]:
# df.head(5)

# surpriseSVD

In [9]:
import pandas as pd
# from surprise import NormalPredictor
from surprise import Dataset
from surprise import Reader
from surprise import SVDpp,SVD
from surprise import accuracy
from surprise.model_selection import train_test_split
from collections import defaultdict
import numpy as np
import average_precision as metrics

class surpriseSVD():
    def __init__(self):
        self = self

    def get_top_n(self, predictions, n=12):
        """Return the top-N recommendation for each user from a set of predictions.
        Args:
            predictions(list of Prediction objects): The list of predictions, as
                returned by the test method of an algorithm.
            n(int): The number of recommendation to output for each user. Default
                is 10.
        Returns:
        A dict where keys are user (raw) ids and values are lists of tuples:
            [(raw item id, rating estimation), ...] of size n.
        """

        # First map the predictions to each user.
        top_n = defaultdict(list)
        for uid, iid, true_r, est, _ in predictions:
            top_n[uid].append((iid, est))

        # Then sort the predictions for each user and retrieve the k highest ones.
        for uid, user_ratings in top_n.items():
            user_ratings.sort(key=lambda x: x[1], reverse=True)
            top_n[uid] = user_ratings[:n]

        return top_n

    def get_set(self,df):
        reader = Reader(rating_scale=(1, 500))
        data_set = Dataset.load_from_df(df[['customer_id','article_id','rating']], reader)
        return data_set

    def get_rating_set(self,df):
        rating = df[['customer_id','article_id','split_id']].groupby(['customer_id','article_id']).count().reset_index()
        rating.columns = ['customer_id','article_id','rating']
        rating_set = self.get_set(rating)
        return rating_set


    def train_SVD(self, train_data, test_data, paras={}):
        
        ## 讀取評分資料為surprise可以訓練的格式
        trainset = self.get_rating_set(train_data)
        testset = self.get_rating_set(test_data)

        ## rmse 需要的資料
        testset2 = [testset.df.loc[i].to_list() for i in range(len(testset.df))]

        ## map@k testing 需要產的資料
        test_data.loc[:,'rating']=0
        test_processed = self.get_set(test_data)
        NA, test2 = train_test_split(test_processed, test_size=1.0)

        # ======= 消費者的實際購買清單 =======
        test_data['article_id'] = test_data['article_id'].astype('str')
        test_uni = test_data.drop_duplicates(subset=['customer_id', 'article_id'], keep='first')
        buy_n = test_uni[['customer_id','article_id']].groupby('customer_id')['article_id'].apply(list).to_dict()

        cust_actual_list = []
        for uid, user_ratings in buy_n.items():
            cust_pred_tuple = (uid, [iid for iid in user_ratings])
            cust_actual_list.append(cust_pred_tuple)

        # ======= 訓練 SVD 模型 =======
        algo = SVD(random_state=42,**paras)

        # 訓練模型
        algo.fit(trainset.build_full_trainset())

        ##### rmse #####
        predictions = algo.test(testset2)
        rmse = accuracy.rmse(predictions)

        ##### map@k #####
        predictions_map = algo.test(test2)
        # est = [i.est for i in predictions_map] 

        ##  消費者的預測清單 
        top_n = self.get_top_n(predictions=predictions_map, n=12)

        cust_pred_list = []
        for uid, user_ratings in top_n.items():
            cust_pred_tuple = (uid, [str(iid) for (iid, _) in user_ratings])
            cust_pred_list.append(cust_pred_tuple)

        final_list = list(zip(cust_actual_list, cust_pred_list))

        # map@k計算 
        mapk_list = []
        for i in range(len(final_list)):
            map_k = metrics.mapk([final_list[i][0][1]],[final_list[i][1][1]],12)
            mapk_list.append(map_k)

        map_k = sum(mapk_list)/len(mapk_list)

        return rmse, map_k

# pandas_udf

In [None]:
# pdf = df.where("group_id == 43").toPandas()

In [16]:
def time_split_hyperparameter_search(data):

    paras = {
        'n_factors':data.n_factors.values[0], 
        'n_epochs':data.n_epochs.values[0], 
        'reg_all':data.reg_all.values[0]
    }
    
    test_index = data['t_dat'] > data['start_test'][0]
    train_data = data.loc[~test_index]
    test_data = data.loc[test_index]
    
    model = surpriseSVD()
    rmse, map12 = model.train_SVD(train_data, test_data, paras)
    
    paras.update({
        'stride': TRAIN_PERIOD,
        'start_test' : data.start_test.values[0],
        'rmse' : rmse,
        'map12' : map12
    })
    
    results = pd.DataFrame([paras])
    
    return results


In [17]:
from pyspark.sql.types import (
    DateType, DoubleType, FloatType, IntegerType, StringType, StructField, StructType
)
# pandas_udf

schema = StructType(
    [
        StructField("stride", IntegerType(), True),
        StructField('start_test', DateType(),True),
        StructField("n_factors", IntegerType(), True),
        StructField("n_epochs", IntegerType(), True),
        StructField('reg_all', FloatType(),True),
        StructField('rmse', FloatType(),True),
        StructField('map12', FloatType(),True),
     ]
)

results = df.groupby('group_id').applyInPandas(time_split_hyperparameter_search, schema)

In [18]:
results.count()

2022-04-01 15:34:10,181 WARN scheduler.TaskSetManager: Lost task 94.0 in stage 12.0 (TID 306) (bdse42.example.com executor 4): FetchFailed(BlockManagerId(9, bdse91.example.com, 36595, None), shuffleId=5, mapIndex=11, mapId=198, reduceId=100, message=
org.apache.spark.shuffle.FetchFailedException
	at org.apache.spark.storage.ShuffleBlockFetcherIterator.throwFetchFailedException(ShuffleBlockFetcherIterator.scala:1165)
	at org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:903)
	at org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:84)
	at org.apache.spark.util.CompletionIterator.next(CompletionIterator.scala:29)
	at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:486)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:492)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:31)
	at org.apache.sp

480



In [25]:
results.groupBy(['n_factors','n_epochs','reg_all']).mean('rmse','map12').sort("avg(rmse)").limit(5).show()

2022-04-01 19:43:22,068 WARN scheduler.TaskSetManager: Lost task 157.0 in stage 27.0 (TID 980) (bdse89.example.com executor 16): FetchFailed(BlockManagerId(4, bdse42.example.com, 44135, None), shuffleId=11, mapIndex=26, mapId=825, reduceId=174, message=
org.apache.spark.shuffle.FetchFailedException
	at org.apache.spark.storage.ShuffleBlockFetcherIterator.throwFetchFailedException(ShuffleBlockFetcherIterator.scala:1165)
	at org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:903)
	at org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:84)
	at org.apache.spark.util.CompletionIterator.next(CompletionIterator.scala:29)
	at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:486)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:492)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:31)
	at org.apache

+---------+--------+-------+------------------+--------------------+
|n_factors|n_epochs|reg_all|         avg(rmse)|          avg(map12)|
+---------+--------+-------+------------------+--------------------+
|       25|      20|   0.01| 21.12461559350292|5.767838883912191E-4|
|       50|      20|   0.01|21.125548214962084|5.791609328298364E-4|
|      100|      20|   0.01| 21.12736424803734| 5.80218322284054E-4|
|       25|      30|   0.01| 21.12763550753395|5.803498206660151E-4|
|       50|      30|   0.01|21.128597273180883|5.836315916288489E-4|
+---------+--------+-------+------------------+--------------------+





# 存檔

In [21]:
results.write.parquet(f'/user/HM_parquet/SVD_model/params/para{TRAIN_PERIOD}.parquet',mode='overwrite',partitionBy='rmse')

2022-04-01 17:04:35,033 WARN scheduler.TaskSetManager: Lost task 36.0 in stage 19.0 (TID 541) (bdse106.example.com executor 2): TaskKilled (Stage cancelled)
2022-04-01 17:04:35,033 WARN scheduler.TaskSetManager: Lost task 42.0 in stage 19.0 (TID 543) (bdse106.example.com executor 2): TaskKilled (Stage cancelled)
2022-04-01 17:07:31,391 WARN scheduler.TaskSetManager: Lost task 13.0 in stage 19.0 (TID 503) (bdse91.example.com executor 12): TaskKilled (Stage cancelled)
2022-04-01 17:45:15,407 WARN scheduler.TaskSetManager: Lost task 127.0 in stage 24.0 (TID 690) (bdse106.example.com executor 2): FetchFailed(BlockManagerId(13, bdse89.example.com, 41377, None), shuffleId=10, mapIndex=22, mapId=563, reduceId=137, message=
org.apache.spark.shuffle.FetchFailedException
	at org.apache.spark.storage.ShuffleBlockFetcherIterator.throwFetchFailedException(ShuffleBlockFetcherIterator.scala:1165)
	at org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:903)
	at 

In [None]:
results.write.parquet('file:///home/hadoop/git_workspace/BDSE23_HM_recommendation/spark/params/para_30.parquet')

In [None]:
para30_ps['stride']=30
para30_ps.head(5)

In [None]:
para30_ps.to_parquet('/user/HM_parquet/SVD_model/params/para30.parquet',mode = 'overwrite',partition_cols='start_test')

In [None]:
para30 = ps.read_parquet('/user/HM_parquet/SVD_model/para_30.parquet',columns=[])

# (DataFrame)

## 1. 讀取檔案 /DataFrame

In [None]:
# # 讀取檔案
# customers = spark.read.option('header','true').parquet('/user/HM_parquet/customers.parquet')
# articles = spark.read.option('header','true').parquet('/user/HM_parquet/articles.parquet')
# transactions = spark.read.option('header','true').parquet('/user/HM_parquet/transactions_train.parquet')

In [None]:
# transactions.show()

## 2. 將customer_id(字串)轉為customer_index(整數) /DataFrame

In [None]:
# # 將customers的customer_id轉為數字(buffer要增加到512m)
# toIndex = StringIndexer(inputCol="customer_id", outputCol="customer_index").fit(customers)
# customers = toIndex.transform(customers)
# # customers.head(5)

In [None]:
# # 將transactions的customer_id轉為數字
# transactions = toIndex.transform(transactions)
# # transactions.head(5)

In [None]:
# transactions.describe()