# PySpark 를 이용한 Steam 게임 추천 알고리즘

project reference

https://towardsdatascience.com/build-recommendation-system-with-pyspark-using-alternating-least-squares-als-matrix-factorisation-ebe1ad2e7679

### 임포팅

In [1]:
# written in python3
# pyspark version 3.0.1

!pip install pyspark
import os
import pyspark as spark
from pyspark import SparkContext
from pyspark.conf import SparkConf
from pyspark.sql import SparkSession, HiveContext, Row
from pyspark.sql.functions import desc
from pyspark.mllib.recommendation import ALS
import json



In [2]:
# 데이터 경로 임포팅
# 자기가 돌릴 때는 데이터 셋과 결과물 세트의 경로를 조정하세요
# 그대로 사용하고 싶으면 colab을 googledrive 와 연동하고, drive 내에 Colab Notebooks/ 안에 ipynb파일을 저장하고, Colab Notebooks/dataset/에  데이터셋을 집어넣으면 작동합니다.
# 하기 폴더가 존재하지 않으면 에러 생성함

game_detail = '/content/drive/MyDrive/Colab Notebooks/dataset/game_detail_v2.json'
user_owned_games = '/content/drive/MyDrive/Colab Notebooks/dataset/user_owned_games.json'
user_friend_list = '/content/drive/MyDrive/Colab Notebooks/dataset/user_friend_list.json'
user_recent_games = '/content/drive/MyDrive/Colab Notebooks/dataset/user_recently_played_games.json'
user_idx = '/content/drive/MyDrive/Colab Notebooks/dataset/user_idx.json'
recommended_by_playtimeforever = '/content/drive/MyDrive/Colab Notebooks/dataset/recommended_by_playtimeforever.json'
final_recommended_by_playtimeforever = '/content/drive/MyDrive/Colab Notebooks/dataset/final_recommended_by_playtimeforever'
recommended_by_playtime2weeks = '/content/drive/MyDrive/Colab Notebooks/dataset/recommended_by_playtime2weeks.json'
final_recommended_by_playtime2weeks = '/content/drive/MyDrive/Colab Notebooks/dataset/final_recommended_by_playtime2weeks'

## PySpark

PySpark 는 Spark를 python 에서 사용할 수 있게 해주는 라이브러리입니다.

Spark는 빅데이터 분석을 위한 병렬 분산 처리 플랫폼입니다.



### PySpark 세션 초기화

In [3]:
# pyspark 세션 빌딩
# Hive Support = Enable

sc = SparkSession.builder.appName("spark-recommender").enableHiveSupport().getOrCreate()

### 게임 디테일 처리

In [4]:
# 게임 디테일 df 변경

df_game = sc.read.json(game_detail)
df_game.printSchema()

root
 |-- about_the_game: string (nullable = true)
 |-- achievements: struct (nullable = true)
 |    |-- highlighted: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- name: string (nullable = true)
 |    |    |    |-- path: string (nullable = true)
 |    |-- total: long (nullable = true)
 |-- background: string (nullable = true)
 |-- categories: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- description: string (nullable = true)
 |    |    |-- id: long (nullable = true)
 |-- content_descriptors: struct (nullable = true)
 |    |-- ids: array (nullable = true)
 |    |    |-- element: long (containsNull = true)
 |    |-- notes: string (nullable = true)
 |-- controller_support: string (nullable = true)
 |-- demos: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- appid: long (nullable = true)
 |    |    |-- description: string (nullable = true)
 |-- detailed_descript

In [5]:
# 게임 디테일 유효성 검사

df_game.registerTempTable("temp_game_detail")
df_valid_game = sc.sql("SELECT * FROM temp_game_detail")
df_valid_game.registerTempTable("game_detail")
df_valid_game.show(1)

+--------------------+--------------------+--------------------+--------------------+-------------------+------------------+------------+--------------------+--------------------+----+----------+-----------------------+--------------------+--------------------+-------+--------------------+------------------+----------------+----------+--------------------+--------------------+--------------------+--------------+--------------------+--------------------+--------------------+----------+---------------+--------------------+------------+-------+--------------------+--------------------+-----------+--------------------+--------------------+----+--------------------+
|      about_the_game|        achievements|          background|          categories|content_descriptors|controller_support|       demos|detailed_description|          developers| dlc|drm_notice|ext_user_account_notice|              genres|        header_image|is_free|        legal_notice|linux_requirements|mac_requirements|meta

### 유저 소유 게임 처리

In [6]:
# 유저 소유 게임 입력

df_user_owned_games = sc.read.json(user_owned_games)
df_user_owned_games.printSchema()
df_user_owned_games.registerTempTable("user_owned_games")

root
 |-- game_count: string (nullable = true)
 |-- games: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- appid: long (nullable = true)
 |    |    |-- playtime_2weeks: long (nullable = true)
 |    |    |-- playtime_forever: long (nullable = true)
 |    |    |-- playtime_linux_forever: long (nullable = true)
 |    |    |-- playtime_mac_forever: long (nullable = true)
 |    |    |-- playtime_windows_forever: long (nullable = true)
 |-- steamid: string (nullable = true)



In [7]:
# 사람들이 가장 많이 플레이 한 10개의 게임 추천(콜드 스타트 문제 해결)
df_global_popular_games = sc.sql("SELECT b.game_id, SUM(b.playtime_forever) AS play_time FROM (SELECT played_games['appid'] AS game_id, played_games['playtime_forever'] AS playtime_forever FROM (SELECT EXPLODE(games) AS played_games FROM user_owned_games) a) b GROUP BY game_id ORDER BY play_time DESC LIMIT 10")
df_global_popular_games.registerTempTable('popular_games')

# rank = 유저 전부 합친 플레이타임
df_global_popular_games = sc.sql("SELECT b.name AS name, a.play_time AS rank, b.steam_appid, b.genres FROM popular_games a, game_detail b WHERE a.game_id = b.steam_appid ORDER BY rank DESC")
df_global_popular_games.show()

+--------------------+---------+-----------+--------------------+
|                name|     rank|steam_appid|              genres|
+--------------------+---------+-----------+--------------------+
|Counter-Strike: G...|136557735|        730|[[Action, 1], [Fr...|
|         Garry's Mod| 68785425|       4000|[[Indie, 23], [Si...|
|  Grand Theft Auto V| 35155163|     271590|[[Action, 1], [Ad...|
|Counter-Strike: S...| 29459482|        240|       [[Action, 1]]|
|            Warframe| 28659453|     230410|[[Action, 1], [Fr...|
|       Left 4 Dead 2| 21343026|        550|       [[Action, 1]]|
|            PAYDAY 2| 21032415|     218620|[[Action, 1], [RP...|
|      Counter-Strike| 19126275|         10|       [[Action, 1]]|
|            Terraria| 18232159|     105600|[[Action, 1], [Ad...|
|The Elder Scrolls...| 18182393|      72850|          [[RPG, 3]]|
+--------------------+---------+-----------+--------------------+



### 친구 리스트 처리

In [8]:
# 친구 리스트 데이터 등록

sample_user = '76561197960434622'

df_user_friend_list = sc.read.json(user_friend_list)
df_user_friend_list.printSchema()
df_user_friend_list.registerTempTable('friend_list')


df_friend_list = sc.sql("SELECT friends['steamid'] AS steamid FROM (SELECT EXPLODE(friends) AS friends FROM friend_list WHERE steamid = %s) a"%sample_user)
df_friend_list.show(10)
df_friend_list.registerTempTable('user_friend_list')

root
 |-- friends: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- friend_since: long (nullable = true)
 |    |    |-- relationship: string (nullable = true)
 |    |    |-- steamid: string (nullable = true)
 |-- steamid: string (nullable = true)

+-----------------+
|          steamid|
+-----------------+
|76561197960265738|
|76561197960265740|
|76561197960265743|
|76561197960265754|
|76561197960265838|
|76561197960269198|
|76561197960275076|
|76561197960276281|
|76561197960327485|
|76561197960381818|
+-----------------+
only showing top 10 rows



In [9]:
# 친구의 게임 플레이 시간을 등록
sc.sql("SELECT game_id, SUM(playtime_forever) AS play_time FROM (SELECT games['appid'] AS game_id, games['playtime_forever'] AS playtime_forever FROM (SELECT a.steamid, EXPLODE(b.games) AS games FROM user_friend_list a, user_owned_games b WHERE a.steamid = b.steamid) c) d GROUP BY game_id ORDER BY play_time DESC LIMIT 10").registerTempTable('temp_local_popular_games')

df_global_popular_games = sc.sql("SELECT DISTINCT b.name AS game_name, a.play_time FROM temp_local_popular_games a, game_detail b WHERE a.game_id = b.steam_appid")
df_global_popular_games.show()

+--------------------+---------+
|           game_name|play_time|
+--------------------+---------+
|Counter-Strike: G...|  1011736|
|     Elite Dangerous|   158316|
|         Left 4 Dead|   113271|
|       Left 4 Dead 2|   148037|
|       Borderlands 2|   120164|
|Counter-Strike: S...|   243005|
|        Just Cause 2|   121389|
|      Clicker Heroes|   805415|
|            Warframe|   344961|
+--------------------+---------+



## Collaboratie Filtering

CF implementation using the tequenics that aims to fill empty entries in the matrix

Spark mllib uses Alternating Least Square Algorithms to learn these factors

reference : https://spark.apache.org/docs/2.2.0/ml-collaborative-filtering.html

### 유저의 최근 플레이 게임을 기반으로 데이터 처리

In [10]:
df_user_recent_games = sc.read.json(user_recent_games)
df_user_recent_games.printSchema()
df_user_recent_games.registerTempTable("user_recent_games")
df_valid_user_recent_games = sc.sql("SELECT * FROM user_recent_games where total_count != 0")
df_valid_user_recent_games.show(1)

root
 |-- games: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- appid: long (nullable = true)
 |    |    |-- img_icon_url: string (nullable = true)
 |    |    |-- img_logo_url: string (nullable = true)
 |    |    |-- name: string (nullable = true)
 |    |    |-- playtime_2weeks: long (nullable = true)
 |    |    |-- playtime_forever: long (nullable = true)
 |    |    |-- playtime_linux_forever: long (nullable = true)
 |    |    |-- playtime_mac_forever: long (nullable = true)
 |    |    |-- playtime_windows_forever: long (nullable = true)
 |-- steamid: string (nullable = true)
 |-- total_count: string (nullable = true)

+--------------------+-----------------+-----------+
|               games|          steamid|total_count|
+--------------------+-----------------+-----------+
|[[427520, 267f5a8...|76561197960434622|          7|
+--------------------+-----------------+-----------+
only showing top 1 row



In [11]:
df_user_idx = sc.read.json(user_idx)
df_user_idx.printSchema()
df_user_idx.registerTempTable('user_idx')
df_valid_user_recent_games = sc.sql("SELECT b.user_idx, a.games FROM user_recent_games a JOIN user_idx b ON b.user_id = a.steamid WHERE a.total_count != 0")
df_valid_user_recent_games.printSchema()
df_valid_user_recent_games.show(10)

root
 |-- user_id: string (nullable = true)
 |-- user_idx: long (nullable = true)

root
 |-- user_idx: long (nullable = true)
 |-- games: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- appid: long (nullable = true)
 |    |    |-- img_icon_url: string (nullable = true)
 |    |    |-- img_logo_url: string (nullable = true)
 |    |    |-- name: string (nullable = true)
 |    |    |-- playtime_2weeks: long (nullable = true)
 |    |    |-- playtime_forever: long (nullable = true)
 |    |    |-- playtime_linux_forever: long (nullable = true)
 |    |    |-- playtime_mac_forever: long (nullable = true)
 |    |    |-- playtime_windows_forever: long (nullable = true)

+--------+--------------------+
|user_idx|               games|
+--------+--------------------+
|       0|[[427520, 267f5a8...|
|       6|[[255710, 6cf7b10...|
|       7|[[1151640, 08a1b6...|
|      13|[[300570, 7566ab0...|
|      15|[[550, 7d5a243f95...|
|      20|[[440, e3f595a925...|
|    

### 최근 플레이 타임을 기반으로 Matrix를 생성, 이후 Spark의 mllib ALS 를 사용하여 트레이닝

In [12]:
# spark rdd
# 플레이 타임을 flat으로 매핑
# 플레이 타임이 0인 게임들은 제외함
training_rdd_forever = df_valid_user_recent_games.rdd.flatMapValues(lambda x : x).map(lambda x_y : (x_y[0], x_y[1].appid, x_y[1].playtime_forever)).filter(lambda x_y_z : x_y_z[2] > 0)
training_rdd_forever.collect()
training_rdd_2week = df_valid_user_recent_games.rdd.flatMapValues(lambda x : x).map(lambda x_y : (x_y[0], x_y[1].appid, x_y[1].playtime_2weeks)).filter(lambda x_y_z : x_y_z[2] > 0)
training_rdd_2week.collect()

[(0, 427520, 973),
 (0, 296490, 300),
 (0, 322330, 161),
 (0, 1295510, 64),
 (0, 1127400, 64),
 (0, 271590, 37),
 (0, 12100, 8),
 (6, 255710, 59),
 (7, 1151640, 125),
 (7, 1262580, 62),
 (13, 300570, 996),
 (13, 965200, 514),
 (13, 418950, 125),
 (13, 376520, 95),
 (13, 372210, 90),
 (13, 311850, 90),
 (13, 746140, 88),
 (15, 550, 713),
 (15, 386360, 432),
 (15, 228380, 406),
 (15, 563560, 65),
 (15, 252950, 48),
 (15, 312530, 46),
 (20, 440, 2),
 (23, 108800, 462),
 (23, 714120, 335),
 (23, 320240, 159),
 (23, 960090, 45),
 (23, 550, 3),
 (23, 1150950, 1),
 (2571, 359550, 1356),
 (2571, 730, 1006),
 (2571, 945360, 258),
 (2571, 674940, 114),
 (2571, 24790, 32),
 (26, 359550, 1356),
 (26, 730, 1006),
 (26, 945360, 258),
 (26, 674940, 114),
 (26, 24790, 32),
 (27, 945360, 406),
 (27, 620, 401),
 (27, 728880, 302),
 (27, 400, 118),
 (27, 252950, 40),
 (27, 437920, 15),
 (27, 1016920, 1),
 (28, 730, 3889),
 (28, 406290, 1059),
 (28, 884660, 958),
 (28, 739630, 292),
 (28, 384180, 183),
 (

In [13]:
als_model_forever = ALS.trainImplicit(training_rdd_forever, 10)
als_model_2week = ALS.trainImplicit(training_rdd_2week, 10)

In [14]:
result_rating_forever = als_model_forever.recommendProducts(0,10)
print(result_rating_forever)
try_df_result_forever=sc.createDataFrame(result_rating_forever)
print(try_df_result_forever.sort(desc("rating")).show())

[Rating(user=0, product=427520, rating=0.9397116685102327), Rating(user=0, product=381210, rating=0.9103427270796072), Rating(user=0, product=252950, rating=0.7664733921530291), Rating(user=0, product=227300, rating=0.7360603952576324), Rating(user=0, product=582010, rating=0.7009055450262944), Rating(user=0, product=730, rating=0.6746829146791318), Rating(user=0, product=306130, rating=0.6724613051000867), Rating(user=0, product=240, rating=0.6499993995309961), Rating(user=0, product=359550, rating=0.6497742961256631), Rating(user=0, product=945360, rating=0.5815707128476543)]
+----+-------+------------------+
|user|product|            rating|
+----+-------+------------------+
|   0| 427520|0.9397116685102327|
|   0| 381210|0.9103427270796072|
|   0| 252950|0.7664733921530291|
|   0| 227300|0.7360603952576324|
|   0| 582010|0.7009055450262944|
|   0|    730|0.6746829146791318|
|   0| 306130|0.6724613051000867|
|   0|    240|0.6499993995309961|
|   0| 359550|0.6497742961256631|
|   0| 

In [15]:
result_rating_2week = als_model_2week.recommendProducts(0,10)
print(result_rating_2week)
try_df_result_2week=sc.createDataFrame(result_rating_2week)
print(try_df_result_2week.sort(desc("rating")).show())

[Rating(user=0, product=730, rating=0.5224145324185524), Rating(user=0, product=440, rating=0.4359672101705221), Rating(user=0, product=427520, rating=0.4337000172547246), Rating(user=0, product=39210, rating=0.31437491842285264), Rating(user=0, product=976730, rating=0.2554020053399536), Rating(user=0, product=4000, rating=0.23659414199739282), Rating(user=0, product=377160, rating=0.23501508702383472), Rating(user=0, product=292030, rating=0.22418963447246298), Rating(user=0, product=374320, rating=0.2147829766990773), Rating(user=0, product=356190, rating=0.21202888955808513)]
+----+-------+-------------------+
|user|product|             rating|
+----+-------+-------------------+
|   0|    730| 0.5224145324185524|
|   0|    440| 0.4359672101705221|
|   0| 427520| 0.4337000172547246|
|   0|  39210|0.31437491842285264|
|   0| 976730| 0.2554020053399536|
|   0|   4000|0.23659414199739282|
|   0| 377160|0.23501508702383472|
|   0| 292030|0.22418963447246298|
|   0| 374320| 0.21478297669

## 결과물 저장

### 추천 게임 목록을 json형식으로 저장

In [16]:
# 유저가 플레이한 게임이 없다면 결과값 누락이 있을 수 있음. (매트릭스 기반이기 때문에)
# 따라서 이런 콜드 스타트의 경우 글로벌 인기 게임을 추천

with open(recommended_by_playtimeforever, 'w') as output_file:
    for user_idx in range(0, df_user_idx.count()):
        try:
            lst_recommended = [i.product for i in als_model_forever.recommendProducts(user_idx, 10)]
            rank = 1
            for app_id in lst_recommended:
                dict_recommended = {'user_idx': user_idx, 'game_id': app_id, 'rank': rank}
                json.dump(dict_recommended, output_file)
                output_file.write('\n')
                rank += 1
        except:
            pass
with open(recommended_by_playtime2weeks, 'w') as output_file:
    for user_idx in range(0, df_user_idx.count()):
        try:
            lst_recommended = [i.product for i in als_model_2week.recommendProducts(user_idx, 10)]
            rank = 1
            for app_id in lst_recommended:
                dict_recommended = {'user_idx': user_idx, 'game_id': app_id, 'rank': rank}
                json.dump(dict_recommended, output_file)
                output_file.write('\n')
                rank += 1
        except:
            pass

In [18]:
df_recommend_result_forever = sc.read.json(recommended_by_playtimeforever)
df_recommend_result_forever.show(20)

+-------+----+--------+
|game_id|rank|user_idx|
+-------+----+--------+
| 427520|   1|       0|
| 381210|   2|       0|
| 252950|   3|       0|
| 227300|   4|       0|
| 582010|   5|       0|
|    730|   6|       0|
| 306130|   7|       0|
|    240|   8|       0|
| 359550|   9|       0|
| 945360|  10|       0|
| 255710|   1|       6|
| 306130|   2|       6|
| 582010|   3|       6|
| 238960|   4|       6|
|    220|   5|       6|
| 291550|   6|       6|
| 388080|   7|       6|
| 457140|   8|       6|
|     70|   9|       6|
| 304390|  10|       6|
+-------+----+--------+
only showing top 20 rows



In [19]:
df_recommend_result_2week = sc.read.json(recommended_by_playtime2weeks)
df_recommend_result_2week.show(20)

+-------+----+--------+
|game_id|rank|user_idx|
+-------+----+--------+
|    730|   1|       0|
|    440|   2|       0|
| 427520|   3|       0|
|  39210|   4|       0|
| 976730|   5|       0|
|   4000|   6|       0|
| 377160|   7|       0|
| 292030|   8|       0|
| 374320|   9|       0|
| 356190|  10|       0|
|    570|   1|       6|
| 255710|   2|       6|
|    730|   3|       6|
|1235140|   4|       6|
| 976730|   5|       6|
| 346110|   6|       6|
| 431960|   7|       6|
| 594570|   8|       6|
| 292030|   9|       6|
| 646910|  10|       6|
+-------+----+--------+
only showing top 20 rows



### 최종 결과물 저장

In [20]:
df_recommend_result_forever.registerTempTable('recommend_result_forever')
df_final_recommend_result_forever = sc.sql("SELECT DISTINCT b.user_id, a.rank, c.name, c.genres, c.short_description, c.steam_appid FROM recommend_result_forever a, user_idx b, game_detail c WHERE a.user_idx = b.user_idx AND a.game_id = c.steam_appid ORDER BY b.user_id, a.rank") 
df_final_recommend_result_forever.show(20)

+-----------------+----+--------------------+--------------------+--------------------+-----------+
|          user_id|rank|                name|              genres|   short_description|steam_appid|
+-----------------+----+--------------------+--------------------+--------------------+-----------+
|76561197960265754|   1|           Destiny 2|[[Action, 1], [Ad...|Destiny 2 is an a...|    1085660|
|76561197960265754|   2|         Garry's Mod|[[Indie, 23], [Si...|Garry's Mod is a ...|       4000|
|76561197960265754|   3|  Grand Theft Auto V|[[Action, 1], [Ad...|Grand Theft Auto ...|     271590|
|76561197960265754|   4|            Soundpad|[[Audio Productio...|Play sounds in vo...|     629520|
|76561197960265754|   5|            RimWorld|[[Indie, 23], [Si...|A sci-fi colony s...|     294100|
|76561197960265754|   6|     Killing Floor 2|       [[Action, 1]]|6-player co-op Ze...|     232090|
|76561197960265754|   7|              Arma 3|[[Action, 1], [Si...|Experience true c...|     107410|


In [21]:
df_recommend_result_2week.registerTempTable('recommend_result_2week')
df_final_recommend_result_2week = sc.sql("SELECT DISTINCT b.user_id, a.rank, c.name, c.genres, c.short_description, c.steam_appid FROM recommend_result_2week a, user_idx b, game_detail c WHERE a.user_idx = b.user_idx AND a.game_id = c.steam_appid ORDER BY b.user_id, a.rank") 
df_final_recommend_result_2week.show(20)

+-----------------+----+--------------------+--------------------+--------------------+-----------+
|          user_id|rank|                name|              genres|   short_description|steam_appid|
+-----------------+----+--------------------+--------------------+--------------------+-----------+
|76561197960265754|   1|           Destiny 2|[[Action, 1], [Ad...|Destiny 2 is an a...|    1085660|
|76561197960265754|   2|Halo: The Master ...|       [[Action, 1]]|The Master Chief’...|     976730|
|76561197960265754|   3|            Terraria|[[Action, 1], [Ad...|Dig, fight, explo...|     105600|
|76561197960265754|   4|            Warframe|[[Action, 1], [Fr...|Warframe is a coo...|     230410|
|76561197960265754|   5|       Apex Legends™|[[Action, 1], [Ad...|Apex Legends is t...|    1172470|
|76561197960265754|   6|      Sea of Thieves|[[Action, 1], [Ad...|Sea of Thieves of...|    1172620|
|76561197960265754|   7|     Team Fortress 2|[[Action, 1], [Fr...|Nine distinct cla...|        440|


In [22]:
# 최종 추천값 전달
df_final_recommend_result_forever.write.save(final_recommended_by_playtimeforever, format="json")

In [25]:
df_final_recommend_result_2week.write.save(final_recommended_by_playtime2weeks, format="json")