In [1]:
import numpy as np
import pandas as pd
from tqdm import tqdm
from scipy.sparse import coo_matrix
from pyspark.ml.recommendation import ALS

import sklearn
import random 

from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, VectorAssembler, IndexToString
from pyspark.ml.feature import QuantileDiscretizer
from pyspark.ml.evaluation import MulticlassClassificationEvaluator



In [2]:
from pyspark.sql import SparkSession


MAX_MEMORY = "5g"
spark = SparkSession.builder.appName("recommender")\
.config("spark.executor.memory", MAX_MEMORY)\
.config("spark.driver.memory", MAX_MEMORY)\
.getOrCreate()

# spark = SparkSession\
#         .builder\
#         .appName('recommender_system')\
#         .getOrCreate()

22/04/27 17:05:56 WARN Utils: Your hostname, RedStone-MacBookAir.local resolves to a loopback address: 127.0.0.1; using 172.30.1.23 instead (on interface en0)
22/04/27 17:05:56 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
22/04/27 17:05:57 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [49]:
# 데이터 불러오기
customers = pd.read_csv("data/customers.csv")
tran = pd.read_csv("data/transactions_train.csv")
item = pd.read_csv("data/articles.csv")

In [22]:
customers.head(3)

Unnamed: 0,customer_id,FN,Active,club_member_status,fashion_news_frequency,age,postal_code
0,00000dbacae5abe5e23885899a1fa44253a17956c6d1c3...,,,ACTIVE,NONE,49.0,52043ee2162cf5aa7ee79974281641c6f11a68d276429a...
1,0000423b00ade91418cceaf3b26c6af3dd342b51fd051e...,,,ACTIVE,NONE,25.0,2973abc54daa8a5f8ccfe9362140c63247c5eee03f1d93...
2,000058a12d5b43e67d225668fa1f8d618c13dc232df0ca...,,,ACTIVE,NONE,24.0,64f17e6a330a85798e4998f62d0930d14db8db1c054af6...


In [5]:
tran['count'] = '1'


In [50]:
ALL_USERS = customers['customer_id'].unique().tolist()
ALL_ITEMS = item['article_id'].unique().tolist()

user_ids = dict(list(enumerate(ALL_USERS)))
item_ids = dict(list(enumerate(ALL_ITEMS)))

user_map = {u: uidx for uidx, u in user_ids.items()}
item_map = {i: iidx for iidx, i in item_ids.items()}

tran['user_id'] = tran['customer_id'].map(user_map)
tran['item_id'] = tran['article_id'].map(item_map)

# del customers, customers


In [39]:
tran.head()

Unnamed: 0,t_dat,customer_id,article_id,price,sales_channel_id,count,user_id,item_id
0,2018-09-20,000058a12d5b43e67d225668fa1f8d618c13dc232df0ca...,663713001,0.050831,2,1,2,40179
1,2018-09-20,000058a12d5b43e67d225668fa1f8d618c13dc232df0ca...,541518023,0.030492,2,1,2,10520
2,2018-09-20,00007d2de826758b65a93dd24ce629ed66842531df6699...,505221004,0.015237,2,1,7,6387
3,2018-09-20,00007d2de826758b65a93dd24ce629ed66842531df6699...,685687003,0.016932,2,1,7,46304
4,2018-09-20,00007d2de826758b65a93dd24ce629ed66842531df6699...,685687004,0.016932,2,1,7,46305


In [51]:
tran = tran.drop(['t_dat','price', 'sales_channel_id'], axis=1)
tran.head(20)

Unnamed: 0,customer_id,article_id,user_id,item_id
0,000058a12d5b43e67d225668fa1f8d618c13dc232df0ca...,663713001,2,40179
1,000058a12d5b43e67d225668fa1f8d618c13dc232df0ca...,541518023,2,10520
2,00007d2de826758b65a93dd24ce629ed66842531df6699...,505221004,7,6387
3,00007d2de826758b65a93dd24ce629ed66842531df6699...,685687003,7,46304
4,00007d2de826758b65a93dd24ce629ed66842531df6699...,685687004,7,46305
5,00007d2de826758b65a93dd24ce629ed66842531df6699...,685687001,7,46302
6,00007d2de826758b65a93dd24ce629ed66842531df6699...,505221001,7,6386
7,00083cda041544b2fbb0e0d2905ad17da7cf1007526fb4...,688873012,198,47416
8,00083cda041544b2fbb0e0d2905ad17da7cf1007526fb4...,501323011,198,5944
9,00083cda041544b2fbb0e0d2905ad17da7cf1007526fb4...,598859003,198,22540


In [29]:
rfm = pd.read_csv("data/result.csv")

In [30]:
rfm = rfm[['customer_id', 'article_id', 't_dat_R', 'count_F', 'total_M']]
sum_factor = rfm['t_dat_R'] + rfm['count_F'] + rfm['total_M']
rfm['rfm'] = sum_factor

In [31]:
rfm = rfm.drop(['t_dat_R', 'count_F', 'total_M'], axis=1)

In [53]:
rfm.head()

Unnamed: 0,customer_id,article_id,rfm,user_id,item_id
0,000058a12d5b43e67d225668fa1f8d618c13dc232df0ca...,663713001.0,20,2,40179
1,000058a12d5b43e67d225668fa1f8d618c13dc232df0ca...,541518023.0,8,2,10520
2,00007d2de826758b65a93dd24ce629ed66842531df6699...,505221004.0,5,7,6387
3,00007d2de826758b65a93dd24ce629ed66842531df6699...,685687003.0,5,7,46304
4,00007d2de826758b65a93dd24ce629ed66842531df6699...,685687004.0,5,7,46305


In [44]:
tran.head()

Unnamed: 0,t_dat,customer_id,article_id,price,sales_channel_id,count,user_id,item_id
0,2018-09-20,000058a12d5b43e67d225668fa1f8d618c13dc232df0ca...,663713001,0.050831,2,1,2,40179
1,2018-09-20,000058a12d5b43e67d225668fa1f8d618c13dc232df0ca...,541518023,0.030492,2,1,2,10520
2,2018-09-20,00007d2de826758b65a93dd24ce629ed66842531df6699...,505221004,0.015237,2,1,7,6387
3,2018-09-20,00007d2de826758b65a93dd24ce629ed66842531df6699...,685687003,0.016932,2,1,7,46304
4,2018-09-20,00007d2de826758b65a93dd24ce629ed66842531df6699...,685687004,0.016932,2,1,7,46305


In [52]:
rfm = rfm.merge(tran, on=['customer_id', 'article_id'], how='right')

In [54]:
# rfm['customer_id'] = rfm['customer_id'].astype(int)
# rfm['article_id'] = rfm['article_id'].astype(int)
rfm.info()

<class 'pandas.core.frame.DataFrame'>
Int64Index: 31788324 entries, 0 to 31788323
Data columns (total 5 columns):
 #   Column       Dtype  
---  ------       -----  
 0   customer_id  object 
 1   article_id   float64
 2   rfm          int64  
 3   user_id      int64  
 4   item_id      int64  
dtypes: float64(1), int64(3), object(1)
memory usage: 1.4+ GB


In [55]:
rfm.to_csv('data/df2.csv', index=False)

In [74]:
rfm.head()

Unnamed: 0,customer_id,article_id,rfm,user_id,item_id
0,000058a12d5b43e67d225668fa1f8d618c13dc232df0ca...,663713001.0,20,2,40179
1,000058a12d5b43e67d225668fa1f8d618c13dc232df0ca...,541518023.0,8,2,10520
2,00007d2de826758b65a93dd24ce629ed66842531df6699...,505221004.0,5,7,6387
3,00007d2de826758b65a93dd24ce629ed66842531df6699...,685687003.0,5,7,46304
4,00007d2de826758b65a93dd24ce629ed66842531df6699...,685687004.0,5,7,46305


In [81]:
rfm.loc[rfm['article_id'] == 810792006]

Unnamed: 0,customer_id,article_id,rfm,user_id,item_id
29314265,fb52989c4dc4f66d7d737bd1887cf44acd3f243c3d1bc9...,810792006.0,29,1347002,82979
29314266,fb52989c4dc4f66d7d737bd1887cf44acd3f243c3d1bc9...,810792006.0,29,1347002,82979
29365527,176c85b9d5a82cd58ee9a1d9ac82bb402f3d8a3cf00b99...,810792006.0,18,125630,82979


22/04/27 21:36:16 WARN HeartbeatReceiver: Removing executor driver with no recent heartbeats: 921974 ms exceeds timeout 120000 ms
22/04/27 21:36:16 WARN SparkContext: Killing executors is not supported by current scheduler.
22/04/28 14:31:15 WARN TransportChannelHandler: Exception in connection from /172.30.1.23:49328
java.io.IOException: Operation timed out
	at sun.nio.ch.FileDispatcherImpl.read0(Native Method)
	at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
	at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)
	at sun.nio.ch.IOUtil.read(IOUtil.java:192)
	at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:379)
	at io.netty.buffer.PooledByteBuf.setBytes(PooledByteBuf.java:253)
	at io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:1132)
	at io.netty.channel.socket.nio.NioSocketChannel.doReadBytes(NioSocketChannel.java:350)
	at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:151)
	at io.netty.channel.n

In [7]:
# coo 변환
row = df['user_id'].values
col = df['item_id'].values
data = np.ones(tran.shape[0])
coo_train = coo_matrix((data, (row, col)), shape=(len(ALL_USERS), len(ALL_ITEMS)))
coo_train

<1371980x105542 sparse matrix of type '<class 'numpy.float64'>'
	with 31788324 stored elements in COOrdinate format>

In [56]:
df_spark = spark.read.csv("data/df2.csv", inferSchema=True, header=True)
# coo_train = spark.createDataFrame(coo_train)

                                                                                

In [57]:
train, test = df_spark.randomSplit([0.75, 0.25])

rec = ALS(maxIter=10,
        regParam=0.01,
        userCol='user_id',
        itemCol='article_id',
        ratingCol='rfm', # label -> predict할 때는 필요 없음!
        nonnegative=True,
        coldStartStrategy='drop')
# ALS모델 학습 -> dataframe을 넣어주기
rec_model = rec.fit(train)

# transform을 이용해 예측 -> dataframe을 넣어주기
pred_ratings = rec_model.transform(test)
pred_ratings.limit(5).toPandas()

                                                                                

Unnamed: 0,customer_id,article_id,rfm,user_id,item_id,prediction
0,0001177027259b455f979d85a278e4b280205d4de5cce4...,820032004.0,13,26,85498,19.041037
1,0001177027259b455f979d85a278e4b280205d4de5cce4...,851936003.0,16,26,91960,13.116589
2,0001177027259b455f979d85a278e4b280205d4de5cce4...,198714001.0,12,26,222,4.660093
3,0001177027259b455f979d85a278e4b280205d4de5cce4...,811715001.0,27,26,83291,25.944149
4,0001177027259b455f979d85a278e4b280205d4de5cce4...,831429001.0,27,26,88074,22.967142


In [58]:
pred_ratings.limit(30).toPandas()

                                                                                

Unnamed: 0,customer_id,article_id,rfm,user_id,item_id,prediction
0,0001177027259b455f979d85a278e4b280205d4de5cce4...,820032004.0,13,26,85498,19.041037
1,0001177027259b455f979d85a278e4b280205d4de5cce4...,851936003.0,16,26,91960,13.116589
2,0001177027259b455f979d85a278e4b280205d4de5cce4...,198714001.0,12,26,222,4.660093
3,0001177027259b455f979d85a278e4b280205d4de5cce4...,811715001.0,27,26,83291,25.944149
4,0001177027259b455f979d85a278e4b280205d4de5cce4...,831429001.0,27,26,88074,22.967142
5,0001177027259b455f979d85a278e4b280205d4de5cce4...,831429001.0,27,26,88074,22.967142
6,0001177027259b455f979d85a278e4b280205d4de5cce4...,834063003.0,17,26,88639,29.697178
7,0001177027259b455f979d85a278e4b280205d4de5cce4...,851450001.0,15,26,91859,19.557447
8,0001177027259b455f979d85a278e4b280205d4de5cce4...,857725002.0,26,26,93343,23.667292
9,0001177027259b455f979d85a278e4b280205d4de5cce4...,857725002.0,26,26,93343,23.667292


In [60]:
# Get metric for training
from pyspark.ml.evaluation import RegressionEvaluator

evaluator = RegressionEvaluator(labelCol='rfm',
                              predictionCol='prediction',
                              metricName='rmse')
# evaluate 메소드에 예측값 담겨있는 dataframe 넣어주기
rmse = evaluator.evaluate(pred_ratings)

mae_eval = RegressionEvaluator(labelCol='rfm',
                              predictionCol='prediction',
                              metricName='mae')
mae = mae_eval.evaluate(pred_ratings)

print("RMSE:", rmse)
print("MAE:", mae)



RMSE: 4.953672069828236
MAE: 3.6449042083623993


                                                                                

In [71]:
# Pyspark Library #
# SQL
from pyspark.sql import SparkSession
from pyspark.sql import SQLContext
from pyspark.sql.functions import mean, col, split, regexp_extract, when, lit
# ML
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, VectorAssembler, IndexToString
from pyspark.ml.feature import QuantileDiscretizer
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.recommendation import ALS

df = spark.read.csv('data/df2.csv',
                    inferSchema=True, header=True)

# stringIndexer = StringIndexer(inputCol='article_id',
#                             outputCol='article_new')
# print(stringIndexer)
# model = stringIndexer.bfit(df)
# indexed = model.transform(df)

# 숫자로 바꾼 영화제목들 중 Unique한 값들만 담아 추출하기 -> Dataframe 반환
unique_movies = df.select("article_id").distinct()


                                                                                

In [73]:
def top_movies(user_id, n):
    """
    특정 user_id가 좋아할 만한 n개의 영화 추천해주는 함수
    """
    # unique_movies 데이터프레임을 'a'라는 데이터프레임으로 alias시키기
    a = unique_movies.alias('a')

    # 특정 user_id가 본 영화들만 담은 새로운 데이터프레임 생성
    watched_movies = df.filter(df['user_id'] == user_id)\
                            .select('article_id')

    # 특정 user_id가 본 영화들을 'b'라는 데이터프레임으로 alias시키기
    b = watched_movies.alias('b')

    # unique_movies를 기준으로 watched_movies를 조인시켜서 user_id가 보지 못한 영화들 파악 가능
    total_movies = a.join(b, a['article_id'] == b['article_id'],
                        how='left')

    # b 데이터프레임의 title_new값이 결측치를 갖고 있는 행의 a.title_new를 뽑아냄으로써 user_id가 아직 못본 영화들 추출
    # col('b.title_new') => b 데이터프레임의 title_new칼럼 의미(SQL처럼 가능!)
    remaining_movies = total_movies\
                    .where(col('b.article_id').isNull())\
                    .select('a.article_id').distinct()
    # remaining_movies 데이터프레임에 특정 user_id값을 동일하게 새로운 변수로 추가해주기
    remaining_movies = remaining_movies.withColumn('user_id',
                                                lit(int(user_id)))
    # 위에서 만든 ALS 모델을 사용하여 추천 평점 예측 후 n개 만큼 view -> 
    recommender = rec_model.transform(remaining_movies)\
                        .orderBy('prediction', ascending=False)\
                        .limit(n)
    # StringIndexer로 만든 것을 역으로 바꾸기 위해 IndexToString사용(영화제목을 숫자->한글제목)
    # movie_title = IndexToString(inputCol='article_new',
    #                         outputCol='article_id',
    #                         labels=model.labels) #여기서 model.labels는 StringIndexer에서 fit시켰을 때 생긴 레이블. 즉, 영화 제목들
    # # transform해서 영화제목을 숫자->한글로 변환! => dataframe으로 반환
    # final_recommendations = movie_title.transform(recommender)

    return recommender.show(n, truncate=False)

# userid가 1817번인 유저가 볼만한 영화 상위 5개 추천해주기
top_movies(26, 5)



+------------+-------+----------+
|article_id  |user_id|prediction|
+------------+-------+----------+
|8.10792006E8|26     |174.1048  |
|8.23724002E8|26     |134.23601 |
|7.31889008E8|26     |125.745186|
|8.12918001E8|26     |121.816605|
|7.68410001E8|26     |106.491554|
+------------+-------+----------+



                                                                                

In [None]:
df.head(26)