In [1]:
!pip install --upgrade  pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('steam_recommender').getOrCreate()

Collecting pyspark
Installing collected packages: pyspark
  Found existing installation: pyspark 2.3.4
    Can't uninstall 'pyspark'. No files were found to uninstall.
Successfully installed pyspark-2.4.5


In [2]:
# Set the path and load the Games_1 file for the recommender system
path = "gs://dataproc-d5da8056-80df-436a-8ab5-db077106cb06-europe-west6/notebooks/"
games_df = spark.read.csv(path + "Games_1.csv", header=True)

In [3]:
games_df.show(10)

+-----------------+------+---------------+----------------+--------------------+
|          steamid| appid|playtime_2weeks|playtime_forever|       dateretrieved|
+-----------------+------+---------------+----------------+--------------------+
|76561198001291264|  8870|           null|            1392|2013-06-09 01:01:...|
|76561198001291264|   400|           null|             239|2013-06-09 01:01:...|
|76561198001291264|212910|           null|             130|2013-06-09 01:01:...|
|76561198001291264|   550|           null|           17547|2013-06-09 01:01:...|
|76561198001291264|   420|           null|             534|2013-06-09 01:01:...|
|76561198001291264|   380|           null|             290|2013-06-09 01:01:...|
|76561198001291264|   620|           null|             966|2013-06-09 01:01:...|
|76561198001291264|202990|              8|            1766|2013-06-09 01:01:...|
|76561198001291264|202970|           null|             807|2013-06-09 01:01:...|
|76561198001291264|   340|  

In [4]:
# Drop unnecessary columns
games_df = games_df.drop(*['playtime_2weeks', 'dateretrieved'])

In [5]:
# Remove records with null values
from pyspark.sql.functions import col
games_df = games_df.where(col('playtime_forever').isNotNull())
games_df = games_df.where(col('steamid').isNotNull())
games_df = games_df.where(col('appid').isNotNull())

In [6]:
games_df.show(10)

+-----------------+------+----------------+
|          steamid| appid|playtime_forever|
+-----------------+------+----------------+
|76561198001291264|  8870|            1392|
|76561198001291264|   400|             239|
|76561198001291264|212910|             130|
|76561198001291264|   550|           17547|
|76561198001291264|   420|             534|
|76561198001291264|   380|             290|
|76561198001291264|   620|             966|
|76561198001291264|202990|            1766|
|76561198001291264|202970|             807|
|76561198001291264|   340|              45|
+-----------------+------+----------------+
only showing top 10 rows



In [7]:
# Restructure the dataframe so that it is suitable for my recommender system

from pyspark.sql.types import IntegerType
games_df = games_df.withColumn('playtime_forever', games_df['playtime_forever'].cast(IntegerType()))
reshaped_games_df = games_df.groupby('steamid').pivot('appid').max('playtime_forever').fillna(0)

In [8]:
reshaped_games_df.show()

+-----------------+----+---+-----+----+-----+------+------+-----+-----+------+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+------+-----+-----+------+-----+------+-----+------+------+------+------+-----+------+------+-----+------+-----+------+-----+------+-----+-----+------+-----+------+-----+-----+------+-----+------+-----+------+-----+-----+------+-----+------+------+------+-----+------+------+------+-----+------+-----+-----+-----+-----+-----+-----+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+-----+------+-----+------+-----+------+------+------+------+------+------+------+-----+-----+------+-----+------+-----+------+-----+-----+------+-----+------+-----+------+-----+-----+-----+-----+-----+------+------+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+------+------+------+-----+------+------+------+------+-----+-----+-----+------+-----+-----+----+-----+-----+-----+-----+--

In [14]:
# Write the new dataframe to csv
reshaped_games_df.repartition(1).write.csv(path + 'reshaped_games_df2.csv', header='true')