# Steam Data Gathering


Games are now sold mainly through digital platform distribution. 
Steam is the most prolific platform to get anything linked to games. From software to dlcs or game items. 


## Our Goals: 
We want to understand what are the key caracteristics that make a game popular.


We will use the Steam and SteamSpy API to gather data on the games sold on Steam. We will use the data to predict the success of a game based on its features.

# Import libraries

first we import the libraries we will use


In [55]:
# for data manipulation and array operations
import numpy as np
import pandas as pd 

# for api calls 
import requests

# to create pauses between api calls and get time metrics 
import time 
import datetime as dt

# read and write csv files
import csv

# progress bars libraries and display
from tqdm import tqdm
from IPython.display import clear_output


# Generating a list of AppIDs

We will now gather the app IDs of games in Steam. We will use the SteamSpy API to get the list of games. Thats because the Steam API provides a list with too many irrelevant games. 
We hence use Steamspy genres to gather a list of games that are relevant to us.

In [None]:

# list of genres to iterate through gathered on steamspy's website 
genres = ['Action','Strategy','RPG','Indie','Adventure','Sports','Simulation','Early Access','Ex Early Access','Massively','Free']

def App_List():
    '''APP LIST CREATION

    function that iterates through appids of games for each genre of steamspy

    Returns:
    list: list of unique appids.  

    
    '''
    steam_spy_all = pd.DataFrame()

    ##############iterating through genres###############
    for genre in genres:
        url = "https://steamspy.com/api.php"
        parameters = {"request": "genre", "genre": genre}
        response = requests.get(url, params=parameters)
        json_data = response.json()
        df = pd.DataFrame.from_dict(json_data, orient='index')
        steam_spy_all = steam_spy_all.append(df)


    app_list = steam_spy_all[['appid', 'name']].drop_duplicates(subset=['appid']).sort_values('appid').reset_index(drop=True)
    return app_list

app_list = App_List()

# convert list to csv file
app_list.to_csv('../data/gathering/app_list.csv', index=False)


# uncommend below to read from stored csv to keep consistency and comment everything above

# app_list = pd.read_csv('../data/gathering/app_list.csv')

# the API request Logic 
**Api Calls handler** <br />
<br />
the api calls require different parsing and handling depending on the API. We will create a funcyion to treat the different API calls.

**handling requests** <br />
<br />
We will be making multiple request. When calling an API multiple times we might get errors in the reply or get momentary blocked by the server.
To avoid this we will wrap the request function to cath errors. 

In [68]:
from enum import StrEnum 

# allows for futur modifications and more robust checks of the source of the api call
class API(StrEnum):
    STEAM = 'Steam'
    STEAMSPY = 'Steamspy'

def call_api(source: API,appId): 
    '''API CALL
    handles the api calls for each source. 

    Input: 
    source(str): the source of the api call
    appId(str) : game id to be used in the api call

    Returns:
    data(json): json object containing the data of the game 
    '''

    if source == API.STEAM:
        url = "http://store.steampowered.com/api/appdetails/"
        parameters = {"appids": appId}

        steam_json = request_handler(url,parameters)
        app_json = steam_json[str(appId)]
        
        if app_json['success']:
            data = app_json['data']

        else:
            data = {'name': app_list.loc[app_list['appid'] == appId,'name'].values[0], 'steam_appid': appId}
        
        return data

    elif source == API.STEAMSPY:
        url = "https://steamspy.com/api.php"
        parameters = {"request": "appdetails", "appid": appId}

        steamspy_json = request_handler(url,parameters)

        return steamspy_json

    else:
        print('the source provided is not valid',end='\r')

def request_handler(url,parameters): 
    
    '''
    function to handle possible shortcomings of making many requests to the api

    Parameters: 
    url(str)
    parameters(json)

    Returns: 
    data(json)
    '''

    try: 
            response = requests.get(url, parameters)
            response.raise_for_status()

    except requests.exceptions.RequestException as e:
        # handles all possible errors 
        print(f'Error occurred: {e}')

        # count down before retrying
        for i in range(5, 0, -1):
            print(f"Waiting... ({i})", end='\r')
            time.sleep(1)

        print("Retrying...", end='\r')

        # makes a new request
        return request_handler(url,parameters)

    else:     
        return response.json()




the next cell is just the different variables we will use to make the request. They are source specific. 

In [60]:
# all the necessary information to make the api calls

# filename to save API response data
steam_csv = '../data/gathering/steam_data.csv'
steamspy_csv = '../data/gathering/steamspy_data.csv'



# filename to save index
steam_index = '../data/gathering/steam_index.txt'
steamspy_index = '../data/gathering/steamspy_index.txt'


# column names for CSV file
steam_columns = [
    'type', 'name', 'steam_appid', 'required_age', 'is_free', 'controller_support',
    'dlc', 'detailed_description', 'about_the_game', 'short_description', 'fullgame',
    'supported_languages', 'header_image', 'website', 'pc_requirements', 'mac_requirements',
    'linux_requirements', 'legal_notice', 'drm_notice', 'ext_user_account_notice',
    'developers', 'publishers', 'demos', 'price_overview', 'packages', 'package_groups',
    'platforms', 'metacritic', 'reviews', 'categories', 'genres', 'screenshots',
    'movies', 'recommendations', 'achievements', 'release_date', 'support_info',
    'background', 'content_descriptors'
]

steamspy_columns = [
    'appid', 'name', 'developer', 'publisher', 'score_rank', 'positive',
    'negative', 'userscore', 'owners', 'average_forever', 'average_2weeks',
    'median_forever', 'median_2weeks', 'price', 'initialprice', 'discount',
    'languages', 'genre', 'ccu', 'tags'
]


# Downloading the data
here we create a function to download the data and save it to a csv file while keeping track of the progress.It will handle the errors and retry the request if it fails but also be able to restart from where it left off if it was interupted.<br />
<br />
One of the only requirements is that the file structure is in place. You need the data/gathering folders to already exist. The different csv and txt files will be created automatically.

In [71]:
def create_csv (source: API,stop = -1,container_size = 100, pause = 1):

    '''DATA COLLECTION, DATA WRITING
    
    function that iterates through the app_list and calls the api for each game

    Input:
    source(str): the source of the api call
    stop(int): the index of the last game to be called
    container_size(int): the number of games to be called before writing to file
    pause(int): the number of seconds to wait between each api call

    Returns:
    None

    Creates:
    csv file containing the data of the games
    index file containing the index of the last game called
    
    '''


    # if the value of stop is -1 then the function will call all the games
    if stop == -1:
        stop = len(app_list)

    # get the specific informations for each source 
    if source == API.STEAM:
        Index_path = steam_index
        csv_path = steam_csv
        columns = steam_columns

    elif source == API.STEAMSPY:
        Index_path = steamspy_index
        csv_path = steamspy_csv
        columns = steamspy_columns

    else: 
        print('the source provided is not valid',end='\r')
        return
    
    # returns the index of the last container saved 
    # this is helpful if the program is interrupted
    try:
        with open(Index_path, 'r') as f:
            index = int(f.readline())

    except FileNotFoundError:
        index = 0


    # creating columns to write on file if it is the first time the function is called 
    if index == 0 :
        with open(csv_path, 'w', newline='') as f:
            writer = csv.DictWriter(f, fieldnames=columns)
            writer.writeheader()

    # creating containers of games to be called
    containers = np.arange(index, stop, container_size)
    containers = np.append(containers, stop)

    # list the times it takes to process each container
    container_times = []


    ############################## for each container ###############################

    for i in range(len(containers)-1):

        # bounds of the container
        begin = containers[i]
        end = containers[i+1]
        container_start_time = time.time()

        
        data = []

        # iterate trought the appids of this container and call the api
        # tqdm is used to show the progress of the loop
        for id in tqdm(app_list[begin:end]['appid'],desc=f'processing container {i+1} out of {len(containers)-1}',leave=False):
            response_data = call_api(source,id)
            data.append(response_data)
            time.sleep(pause)
        
        print('                               ',end='\r')
        for j in range(3,0,-1):
            print("About to write data ({})".format(j), end='\r')
            time.sleep(0.5)
        
        # write data to csv file
        with open(csv_path, 'a', newline='') as f:
            writer = csv.DictWriter(f,fieldnames=columns, extrasaction='ignore')
            writer.writerows(data)

        # writing index of last container saved to file
        with open(Index_path, 'w') as f:
            index = end
            print(index, file=f)
        
        # print time elapsed for this container
        container_end_time = time.time()
        container_tot_time = container_end_time - container_start_time
        container_times.append(container_tot_time)

        # get mean time for each container
        mean_time = np.mean(container_times)
        est_remaining = (len(containers) - i - 2) * mean_time
        remaining_td = dt.timedelta(seconds=round(est_remaining))

        clear_output(wait=True)

        # print estimated time until program finish
        print(f"Time estimated until program finish: {remaining_td}", end='\r')
        

    clear_output(wait=True)
    print( "program Finished !                         ", end='\r')
    return

# Gathering the data by calling our function
We now see examples of the data gathering using our function. </br>
steamspy accepts more api calls per minute than steam.


In [81]:
# eliminate the stop and container_size parameters if you want to call all the games

create_csv("Steamspy",stop = 20, container_size = 3, pause = 0.5)


program Finished !                         

In [80]:
# eliminate the stop and container_size parameters if you want to call all the games
create_csv("Steam",stop = 20, container_size = 3, pause = 1)

program Finished !                         