# 1. 환경설정

In [4]:
cd /content/drive/MyDrive/Colab/ssafy_second_pjt/recsys/restaurant_data

/content/drive/MyDrive/Colab/ssafy_second_pjt/recsys/restaurant_data


In [5]:
!pip install pyspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyspark
  Downloading pyspark-3.3.0.tar.gz (281.3 MB)
[K     |████████████████████████████████| 281.3 MB 43 kB/s 
[?25hCollecting py4j==0.10.9.5
  Downloading py4j-0.10.9.5-py2.py3-none-any.whl (199 kB)
[K     |████████████████████████████████| 199 kB 47.4 MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.3.0-py2.py3-none-any.whl size=281764026 sha256=5456d4f817666311ae9b3107e94d730bd7d44fc10f0f8111d0684f3e5dcc484a
  Stored in directory: /root/.cache/pip/wheels/7a/8e/1b/f73a52650d2e5f337708d9f6a1750d451a7349a867f928b885
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9.5 pyspark-3.3.0


In [6]:
from pyspark.sql import SparkSession

In [7]:
spark=SparkSession.builder.appName('Recsys2').getOrCreate()

# 2. 추천시스템 구현

## [1] data load

+ 지역별로 restaturant, user, score 를 컬럼으로하는 df 가져오기

### (1) 원본 데이터 로드

In [176]:
# 원본데이터 로드해서 서울만 떼오기
sdf = spark.read.csv('restaurant.csv', header=True,inferSchema=True)
sdf = sdf.drop('_c0')  # id가 원래데이터에 있으므로, 해당 열 삭제

In [None]:
# 리뷰가 존재하는 것만 가져오기
review_sdf = sdf.filter(sdf['review_cnt'] >= 1)
review_sdf.show()

In [178]:
print('row 개수 : ', review_sdf.count()  ,'column 개수 : ',len(review_sdf.columns) )

row 개수 :  45667 column 개수 :  13


In [None]:
# 서울만 뽑기
from pyspark.sql.functions import col
seoul_sdf = review_sdf.filter(col('address').like("서울%"))
seoul_sdf.show()

### (2) 서울의 restaturant, user, score 로 구성된 데이터 로드

In [175]:
# 서울  restaturant, user, score 로 구성된 df 가져오기
seoul_rec_sdf = spark.read.csv('seoul_rec.csv', header=True, inferSchema=True)
seoul_rec_sdf.show()

+-----------+------+------+
|restaurants| users|scores|
+-----------+------+------+
|         38|115682|     3|
|         49|388883|     5|
|         49|180541|     4|
|         50|249051|     5|
|         92|872014|     4|
|         92|391017|     5|
|        110| 64301|     5|
|        129|148720|     5|
|        131|385612|     3|
|        134| 53368|     4|
|        140|298886|     5|
|        149|872014|     3|
|        149|368333|     3|
|        149|  8272|     4|
|        149|264572|     3|
|        149|651198|     3|
|        149|328775|     4|
|        149|237668|     4|
|        149| 75782|     5|
|        149|385612|     4|
+-----------+------+------+
only showing top 20 rows



## [2] 추천모델적합

In [None]:
# 추천 모델 생성
from pyspark.ml.recommendation import ALS

train, test = seoul_rec_sdf.randomSplit([0.75, 0.25], seed=1)

seoul_rec = ALS(maxIter=10,
         regParam=0.01,
         userCol='users',
         itemCol='restaurants',
         ratingCol='scores', 
         nonnegative=True,
         coldStartStrategy='drop')
# ALS모델 학습 -> dataframe을 넣어주기
seoul_rec_model = seoul_rec.fit(train)

# transform을 이용해 예측 -> dataframe을 넣어주기
seoul_pred_ratings = seoul_rec_model.transform(test)

In [None]:
# 모델 살펴보기
seoul_pred_ratings.limit(50).toPandas()

In [118]:
# 추천모델 평가
# Get metric for training
from pyspark.ml.evaluation import RegressionEvaluator

evaluator = RegressionEvaluator(labelCol='scores',
                               predictionCol='prediction',
                               metricName='rmse')
# evaluate 메소드에 예측값 담겨있는 dataframe 넣어주기
rmse = evaluator.evaluate(seoul_pred_ratings)

mae_eval = RegressionEvaluator(labelCol='scores',
                              predictionCol='prediction',
                              metricName='mae')
mae = mae_eval.evaluate(seoul_pred_ratings)

print("RMSE:", rmse)
print("MAE:", mae)

RMSE: 1.8678592493760358
MAE: 1.5006922598826642


## [3] 개인화 추천

### (1) 개인화 추천 함수 정의

In [194]:
# Pyspark Library #
# SQL
from pyspark.sql import SparkSession
from pyspark.sql import SQLContext
from pyspark.sql.functions import mean, col, split, regexp_extract, when, lit
# ML
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, VectorAssembler, IndexToString
from pyspark.ml.feature import QuantileDiscretizer
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.recommendation import ALS


def top_restaurants(user_id, n):
    """
    특정 user_id가 좋아할 만한 n개의 식당 추천해주는 함수
    """
 
    # 특정 user_id가 리뷰한 식당들만 담은 새로운 데이터프레임 생성
    reviewed_restaurant = seoul_rec_sdf.filter(seoul_rec_sdf['users'] == user_id).select('restaurants')

    # 특정 user_id가 리뷰한 식당들을 'tmp'라는 데이터프레임으로 alias시키기
    tmp = reviewed_restaurant.alias('tmp')
    tmp = tmp.withColumnRenamed('restaurants', 'tmp_restaurants')

    # seoul_rec_sdf 기준으로 tmp 조인시켜서 user_id가 보지 못한 영화들 파악 가능
    seoul_total_restaurants = seoul_rec_sdf.join(tmp, seoul_rec_sdf['restaurants'] == tmp['tmp_restaurants'],how='left')


    # tmp 데이터프레임의 restaurants 결측치를 갖고 있는 행의 seoul_rec_sdf.restaurants 뽑아냄으로써 user_id가 아직 안가본 식당들 추출
    seoul_remaining_restaurants = seoul_total_restaurants\
                       .where(col('tmp_restaurants').isNull())\
                       .select('restaurants').distinct()

    # seoul_remaining_restaurants 데이터프레임에 특정 user_id값을 동일하게 새로운 변수로 추가해주기
    seoul_remaining_restaurants = seoul_remaining_restaurants.withColumn('users', lit(int(user_id)))

    # 위에서 만든 ALS 모델을 사용하여 추천 평점 예측 후 n개 만큼 view -> 
    recommender = seoul_rec_model.transform(seoul_remaining_restaurants)\
                           .orderBy('prediction', ascending=False)\
                           .limit(n)

    final_recommendations = seoul_sdf.join(recommender, seoul_sdf['id']==recommender['restaurants'], how='inner')

    return final_recommendations.select(['id','name', 'area', 'category_list','prediction']).orderBy('prediction', ascending=False).show(n, truncate=False)



### (2) 개인화 추천 함수 실행

In [None]:
# userid가 385612 유저가 볼만한 영화 상위 5개 추천해주기
top_restaurants(385612, 5)

In [None]:
top_restaurants(724982, 5)

### (3) 원래 간 곳들 확인

In [197]:
# 원래 간 곳들과 비교하기 => join 활용
user_reviewed_restaurant = seoul_rec_sdf.filter(seoul_rec_sdf['users']==724982)
# user_reviewed_restaurant.show()

In [None]:
# 방문했던 곳들
user_reviewd_seoul_sdf = seoul_sdf.join(user_reviewed_restaurant, seoul_sdf['id']== user_reviewed_restaurant['restaurants'], how="inner")
user_reviewd_seoul_sdf.select(['id','name', 'area', 'category_list','scores']).orderBy('scores', ascending=False).show(n=50, truncate=False)