# Steam Analysis 

Pavel Petruneac

**Description:**

This is an analysis of the Steam Dataset. Data was provided in a small and larger version via GCS and more info about this dataset can be found [here](https://steam.internet.byu.edu/). 


---



# Exercise 1: Data Engineering

This exercise should be completed using PySpark (although feel free to use any of the APIs). Here is a guide on how to install PySpark on your local machine.

1. Install and run PySpark.
- Load .csv for Player_Summaries, Game_Publishers, Game_Genres, Game_Developers, Games_1 into PySpark dataframes.
- Join all `Games_` tables into one dataframe.
- Count the number of games per `publisher` and per `genre`.
- Find day and hour when most new accounts were created (based on Player_Summaries table) e.g. 8pm on 14th August 2005.


---

#### **TASK 1: Install and run PySpark**

Check if `SparkSession` is loaded in the environment. `SparkSession` is already loaded in the environment via PySpark kernel, hence no need to import it like: 

```
from pyspark.sql import SparkSession
spark = SparkSession\
        .builder\
        .appName("steamAnalysis")\
        .getOrCreate()
spark   

```

In [1]:
# Check SparkSession
spark

In [2]:
%%bash

# Check Python version
python --version

Python 3.6.5 :: Anaconda, Inc.



---


#### **TASK 2: Load .csv files into PySpark dataframes.**

- Player_Summaries, 
- Game_Publishers, 
- Game_Genres,
- Game_Developers, 
- Games_1 

Data is saved in Google Cloud Storage bucket and it will be read straight into RAM. Before you run, make sure that you are authenticated with GCP. You can do it in a couple of ways: 
1. run `gcloud init` on your laptop terminal and follow instructions
- if you run on GCP compute, allow the compute id read access to GCS bucket or 
- use a service account to authenticate on the terminal; can run something like 

    `gcloud auth activate-service-account *service_account_name* --key-file=credentials_file_path`, followed by 
    
    `gcloud config set account *service_account_name*`
    
More info on authentication [here](https://cloud.google.com/sdk/gcloud/reference/auth/).   


In [None]:
# # Authenticate to GCP with the service account + set the default account IF you run locally
# # No need to do this if you run on GCP and have given GCS and BQ access to the compute ID. 

# import os 


# command = 'gcloud auth activate-service-account steam-analysis@north-star-213610.iam.gserviceaccount.com --key-file=../credentials/gcp_service_account.json'
# with open('command.sh', 'w') as the_file:
#   the_file.write(command)
# # Copy files to GCS    
# bashCommand = "bash command.sh"
# os.system(bashCommand)
 
# # Set default account 
# command = 'gcloud config set account steam-analysis@north-star-213610.iam.gserviceaccount.com'
# with open('command.sh', 'w') as the_file:
#   the_file.write(command)
# # Copy files to GCS    
# bashCommand = "bash command.sh"
# os.system(bashCommand)

# # Remove the command files
# bashCommand = "rm command.sh"
# os.system(bashCommand)

In [3]:
%%bash

# List all the files in the bucket. 
# Comment dataset env var to list the small or large dataset. 

# Define env variables
export gcs_bucket="pp_steam_analyssis"
# export dataset_type="steam_gaming_small"
export dataset="steam_gaming_large"

gsutil ls gs://$gcs_bucket/data/sample/$dataset_type/


gs://pp_steam_analyssis/data/sample/steam_gaming_large/


In [4]:
# Define a few global parameters 
gcs_bucket="pp_steam_analyssis"

# Define what dataset to read. True for small; False for large
steam_gaming_small = False

if steam_gaming_small:
    dataset_type = "steam_gaming_small"
else:
    dataset_type = "steam_gaming_large"

print("It will read {} dataset from GCS bucket: {}.".format(dataset_type, gcs_bucket))

It will read steam_gaming_large dataset from GCS bucket: pp_steam_analyssis.


> **Note:** 


1. Before reading the data, check if there is a header, otherwise when using `header=True`, it will read the header as 1st row!
2. `samplingRatio` is used as the ratio of the # of rows to use for inferring the schema. 

In [5]:
! gsutil cat -r 0-100 gs://$gcs_bucket/data/sample/$dataset_type/Player_Summaries*.csv


steamid,personaname,profileurl,avatar,avatarmedium,avatarfull,personastate,communityvisibilitystate,psteamid,personaname,profileurl,avatar,avatarmedium,avatarfull,personastate,communityvisibilitystate,psteamid,personaname,profileurl,avatar,avatarmedium,avatarfull,personastate,communityvisibilitystate,psteamid,personaname,profileurl,avatar,avatarmedium,avatarfull,personastate,communityvisibilitystate,psteamid,personaname,profileurl,avatar,avatarmedium,avatarfull,personastate,communityvisibilitystate,psteamid,personaname,profileurl,avatar,avatarmedium,avatarfull,personastate,communityvisibilitystate,p

In [6]:
csv_file_path = 'gs://{}/data/sample/{}/Player_Summaries*.csv'.format(gcs_bucket, dataset_type)
player_summaries = spark.read.csv(csv_file_path, header=True, inferSchema=True, samplingRatio = 0.05)

print("player_summaries type = {}".format(type(player_summaries)))
player_summaries.printSchema()


player_summaries type = <class 'pyspark.sql.dataframe.DataFrame'>
root
 |-- steamid: long (nullable = true)
 |-- personaname: string (nullable = true)
 |-- profileurl: string (nullable = true)
 |-- avatar: string (nullable = true)
 |-- avatarmedium: string (nullable = true)
 |-- avatarfull: string (nullable = true)
 |-- personastate: string (nullable = true)
 |-- communityvisibilitystate: integer (nullable = true)
 |-- profilestate: integer (nullable = true)
 |-- lastlogoff: string (nullable = true)
 |-- commentpermission: string (nullable = true)
 |-- realname: string (nullable = true)
 |-- primaryclanid: string (nullable = true)
 |-- timecreated: string (nullable = true)
 |-- gameid: string (nullable = true)
 |-- gameserverip: string (nullable = true)
 |-- gameextrainfo: string (nullable = true)
 |-- cityid: string (nullable = true)
 |-- loccountrycode: string (nullable = true)
 |-- locstatecode: string (nullable = true)
 |-- loccityid: integer (nullable = true)
 |-- dateretrieved: t

In [7]:
! gsutil cat -r 0-50 gs://$gcs_bucket/data/sample/$dataset_type/Games_Publishers*.csv
    

appid,Publisher
207990,""
215220,""
220824,""
24160

In [8]:
csv_file_path = 'gs://{}/data/sample/{}/Games_Publishers*.csv'.format(gcs_bucket, dataset_type)
games_publishers = spark.read.csv(csv_file_path, header=True, inferSchema=True, samplingRatio = 0.05)

print("games_publishers type = {}".format(type(games_publishers)))
games_publishers.printSchema()


games_publishers type = <class 'pyspark.sql.dataframe.DataFrame'>
root
 |-- appid: integer (nullable = true)
 |-- Publisher: string (nullable = true)



In [9]:
! gsutil cat -r 0-50 gs://$gcs_bucket/data/sample/$dataset_type/Games_Genres*.csv


appid,Genre
7290,RPG
8980,RPG
18010,RPG
18040,RPG
2

In [10]:
csv_file_path = 'gs://{}/data/sample/{}/Games_Genres*.csv'.format(gcs_bucket, dataset_type)
games_genres = spark.read.csv(csv_file_path, header=True, inferSchema=True, samplingRatio = 0.05)

print("games_genres type = {}".format(type(games_genres)))
games_genres.printSchema()


games_genres type = <class 'pyspark.sql.dataframe.DataFrame'>
root
 |-- appid: integer (nullable = true)
 |-- Genre: string (nullable = true)



In [11]:
! gsutil cat -r 0-50 gs://$gcs_bucket/data/sample/$dataset_type/Games_Developers*.csv


appid,Developer
462530,8i
452420,M2
466530,M2
36696

In [12]:
csv_file_path = 'gs://{}/data/sample/{}/Games_Developers*.csv'.format(gcs_bucket, dataset_type)
games_developers = spark.read.csv(csv_file_path, header=True, inferSchema=True, samplingRatio = 0.05)

print("games_developers type = {}".format(type(games_developers)))
games_developers.printSchema()


games_developers type = <class 'pyspark.sql.dataframe.DataFrame'>
root
 |-- appid: integer (nullable = true)
 |-- Developer: string (nullable = true)



In [13]:
! gsutil cat -r 0-150 gs://$gcs_bucket/data/sample/$dataset_type/Games_1*.csv


steamid,appid,playtime_2weeks,playtime_forever,dateretrieved
76561197972368092,55230,,2919,2013-05-13 06:11:48 UTC
76561197972228702,42910,,18,2013-05-steamid,appid,playtime_2weeks,playtime_forever,dateretrieved
76561198005175274,42690,,1312,2013-06-14 08:10:50 UTC
76561198005175274,65730,84,108,2013-

In [14]:
csv_file_path = 'gs://{}/data/sample/{}/Games_1*.csv'.format(gcs_bucket, dataset_type)
games_1 = spark.read.csv(csv_file_path, header=True, inferSchema=True, samplingRatio = 0.05)

print("games_1 type = {}".format(type(games_1)))
games_1.printSchema()


games_1 type = <class 'pyspark.sql.dataframe.DataFrame'>
root
 |-- steamid: long (nullable = true)
 |-- appid: integer (nullable = true)
 |-- playtime_2weeks: integer (nullable = true)
 |-- playtime_forever: integer (nullable = true)
 |-- dateretrieved: string (nullable = true)



In [15]:
# ! gsutil cat -r 0-150 gs://$gcs_bucket/data/sample/$dataset_type/Games_2*.csv

In [16]:

# csv_file_path = 'gs://{}/data/sample/{}/Games_2*.csv'.format(gcs_bucket, dataset_type)
# games_2 = spark.read.csv(csv_file_path, header=True, inferSchema=True, samplingRatio = 0.05)

# print("games_2 type = {}".format(type(games_2)))
# games_2.printSchema()



---

#### **TASK 3: Join all `Games_` tables into one dataframe.**

> Note: 
- the joining will be done on `appid` with join type = 'outer' to capture all data available. 
- the documentation states that `games_1` refers to initial crawl and `games_2` refers to follow-up crawl. In this join only initial crawl is considered. If needed, one can join the dataframes with `games_1.union(games_2)`

**Steps:**
1. join `games_1` (initial crawl) with 
- join `games_developers`
- join `games_genres`
- join `games_publishers`

In [17]:
games = games_1.\
    join(games_developers, on='appid', how = 'outer').\
    join(games_genres, on='appid', how = 'outer').\
    join(games_publishers, on='appid', how = 'outer')


print("games type = {}".format(type(games)))
games.printSchema()

games type = <class 'pyspark.sql.dataframe.DataFrame'>
root
 |-- appid: integer (nullable = true)
 |-- steamid: long (nullable = true)
 |-- playtime_2weeks: integer (nullable = true)
 |-- playtime_forever: integer (nullable = true)
 |-- dateretrieved: string (nullable = true)
 |-- Developer: string (nullable = true)
 |-- Genre: string (nullable = true)
 |-- Publisher: string (nullable = true)



In [18]:
games.show()

+-----+-----------------+---------------+----------------+--------------------+--------------+------+---------+
|appid|          steamid|playtime_2weeks|playtime_forever|       dateretrieved|     Developer| Genre|Publisher|
+-----+-----------------+---------------+----------------+--------------------+--------------+------+---------+
| 4900|76561197970498542|           null|            3494|2013-05-11 23:09:...|Unknown Worlds| Indie|     null|
| 4900|76561197970498542|           null|            3494|2013-05-11 23:09:...|Unknown Worlds|Casual|     null|
| 4900|76561197972086370|           null|             225|2013-05-12 12:59:...|Unknown Worlds| Indie|     null|
| 4900|76561197972086370|           null|             225|2013-05-12 12:59:...|Unknown Worlds|Casual|     null|
| 4900|76561197971631392|           null|             204|2013-05-12 20:03:...|Unknown Worlds| Indie|     null|
| 4900|76561197971631392|           null|             204|2013-05-12 20:03:...|Unknown Worlds|Casual|   

#### **TASK 4: Count the number of games per `publisher` and per `genre`.**

> Note:
[Documentation](https://steam.internet.byu.edu/#) for *APP_ID_INFO* states that 
- *appid* is *The ID of the "app" in question, which is not necessarily a game* and 
- *type* The type of the "app". Possible values include: "demo," "dlc," "game," "hardware," "mod," and "video." Game is the most common.

To get the count of games we need to merge the APP_ID_INFO dataframe with the games dataframe and then do a count, after filtering for the right app type (i.e. game). 


In [19]:
! gsutil cat -r 0-150 gs://$gcs_bucket/data/sample/$dataset_type/App_ID_Info*.csv


appid,Title,Type,Price,Release_Date,Rating,Required_Age,Is_Multiplayer
392230,Littlstar VR Cinema,game,0,1970-01-01 00:00:00 UTC,-1,0,0
440000,Portal 2

In [20]:
csv_file_path = 'gs://{}/data/sample/{}/App_ID_Info*.csv'.format(gcs_bucket, dataset_type)
app_ID_Info = spark.read.csv(csv_file_path, header=True, inferSchema=True, samplingRatio = 0.05)

print("app_ID_Info type = {}".format(type(app_ID_Info)))
app_ID_Info.printSchema()


app_ID_Info type = <class 'pyspark.sql.dataframe.DataFrame'>
root
 |-- appid: integer (nullable = true)
 |-- Title: string (nullable = true)
 |-- Type: string (nullable = true)
 |-- Price: double (nullable = true)
 |-- Release_Date: string (nullable = true)
 |-- Rating: integer (nullable = true)
 |-- Required_Age: integer (nullable = true)
 |-- Is_Multiplayer: integer (nullable = true)



In [21]:
# Do a left join with games dataframe --> Only interested in the app types for which there is publisher and genre data. 

games_new = games.join(app_ID_Info, on ='appid', how='left')

print("games_new type = {}".format(type(games_new)))
games_new.printSchema()


games_new type = <class 'pyspark.sql.dataframe.DataFrame'>
root
 |-- appid: integer (nullable = true)
 |-- steamid: long (nullable = true)
 |-- playtime_2weeks: integer (nullable = true)
 |-- playtime_forever: integer (nullable = true)
 |-- dateretrieved: string (nullable = true)
 |-- Developer: string (nullable = true)
 |-- Genre: string (nullable = true)
 |-- Publisher: string (nullable = true)
 |-- Title: string (nullable = true)
 |-- Type: string (nullable = true)
 |-- Price: double (nullable = true)
 |-- Release_Date: string (nullable = true)
 |-- Rating: integer (nullable = true)
 |-- Required_Age: integer (nullable = true)
 |-- Is_Multiplayer: integer (nullable = true)



In [23]:
# Get the appid type distribution
# games_new.select('Type').toPandas()['Type'].value_counts()

In [24]:
# filter for the 'game' appid type
counts = games_new.filter(games_new.Type == 'game').\
groupBy('Publisher', 'Genre').\
agg({'appid':'count'})

counts.orderBy(counts['count(appid)'].desc()).show(30) # order desc by the counts and show top 30 results. 

+--------------------+------------+------------+
|           Publisher|       Genre|count(appid)|
+--------------------+------------+------------+
|               Valve|      Action|     4750004|
|          Activision|      Action|     1341693|
|         Aspyr (Mac)|      Action|     1150421|
|                SEGA|    Strategy|      840062|
|Feral Interactive...|    Strategy|      591212|
|            2K Games|      Action|      553922|
|            2K Games|    Strategy|      511185|
|               Valve|Free to Play|      416418|
|         Square Enix|      Action|      401992|
|Feral Interactive...|    Strategy|      377856|
|Feral Interactive...|      Action|      356213|
|      Rockstar Games|      Action|      322508|
|         Deep Silver|      Action|      296274|
|     Electronic Arts|      Action|      289271|
|             Ubisoft|      Action|      282748|
|               Valve|    Strategy|      276377|
|            2K Games|         RPG|      272382|
|  Bethesda Softwork

In [25]:
# P.S. You can also ge the results into a pandas dataframe: counts.toPandas()

#### **TASK 5: Find day and hour when most new accounts were created**

> (based on Player_Summaries table) e.g. 8pm on 14th August 2005.

> Note:
To get the the day and hour when most accounts were created, we need to use the `timecreated` field. Will extract day and hour and do a groupby, followed by count. 
 


In [26]:
# A refresher on the schema available 
player_summaries.printSchema()

root
 |-- steamid: long (nullable = true)
 |-- personaname: string (nullable = true)
 |-- profileurl: string (nullable = true)
 |-- avatar: string (nullable = true)
 |-- avatarmedium: string (nullable = true)
 |-- avatarfull: string (nullable = true)
 |-- personastate: string (nullable = true)
 |-- communityvisibilitystate: integer (nullable = true)
 |-- profilestate: integer (nullable = true)
 |-- lastlogoff: string (nullable = true)
 |-- commentpermission: string (nullable = true)
 |-- realname: string (nullable = true)
 |-- primaryclanid: string (nullable = true)
 |-- timecreated: string (nullable = true)
 |-- gameid: string (nullable = true)
 |-- gameserverip: string (nullable = true)
 |-- gameextrainfo: string (nullable = true)
 |-- cityid: string (nullable = true)
 |-- loccountrycode: string (nullable = true)
 |-- locstatecode: string (nullable = true)
 |-- loccityid: integer (nullable = true)
 |-- dateretrieved: timestamp (nullable = true)



In [27]:
# See a few values in timecreated
player_summaries.select('timecreated').show(10)

+-------------------+
|        timecreated|
+-------------------+
|2012-02-23 08:11:02|
|2005-01-29 12:11:10|
|2007-08-27 13:38:33|
|2012-04-03 10:51:59|
|2004-11-17 07:57:04|
|2004-12-26 06:35:19|
|2012-01-05 20:55:33|
|2011-10-26 10:28:57|
|2004-12-31 16:45:39|
|2011-06-08 00:52:18|
+-------------------+
only showing top 10 rows



In [28]:
# Extract month, day of the month and hour ++ Drop null values. 
from pyspark.sql.functions import month, dayofmonth, hour

month_day_hour = player_summaries.select(
    month("timecreated").alias('month'), 
    dayofmonth("timecreated").alias('day'),
    hour("timecreated").alias('hour'), 
).dropna() 

month_day_hour.show(10)

+-----+---+----+
|month|day|hour|
+-----+---+----+
|    2| 23|   8|
|    1| 29|  12|
|    8| 27|  13|
|    4|  3|  10|
|   11| 17|   7|
|   12| 26|   6|
|    1|  5|  20|
|   10| 26|  10|
|   12| 31|  16|
|    6|  8|   0|
+-----+---+----+
only showing top 10 rows



In [29]:
counts = month_day_hour.groupBy('month', 'day', 'hour').\
    agg({'day':'count'})

counts.orderBy(counts['count(day)'].desc()).show(10) # order desc by the counts and show top 10 results. 

+-----+---+----+----------+
|month|day|hour|count(day)|
+-----+---+----+----------+
|   12| 25|  10|      2933|
|   12| 25|   8|      2811|
|   12| 25|  11|      2620|
|   12| 25|   7|      2438|
|    3|  2|  10|      2390|
|   12| 25|  12|      2363|
|   12| 25|   3|      2268|
|   12| 25|   6|      2261|
|   12| 25|   9|      2196|
|   12| 25|   5|      2153|
+-----+---+----+----------+
only showing top 10 rows



> Note: The results above show that most of the accounts were created on the 25th of December at 10am - surprinsingly on Christmas Day :) 