In [8]:
import pandas as pd
import requests
import sqlite3
import time
import numpy as np
import asyncio
import aiohttp
import aiosqlite
from time import perf_counter
from datetime import datetime
import pytz

class StopExecution(Exception):
    def _render_traceback_(self):
        pass

### ID's and Names

**Scrapes the unique ids and names of all Steam video games from Steam's API**

The API URL is https://api.steampowered.com/IStoreService/GetAppList/v1/, where the ids and names are stored in the nested dictionary hierarchy response['response']['apps']. Only a maximum of 50,000 results can be retrieved per response. The API has a parameter called 'last_appid' which will resume data retrieval from the last recorded game (app) id; I replaced appid with game_id or id for a better naming convention.

In [9]:
video_game_data_df = pd.DataFrame()
last_appid = 0

while True:

    params = {
        'key': '1674C7309B00CA08D73A8CC100CA24C7',
        'max_results': '50000', # maximum of 50,000 retrievals per response
        'last_appid': last_appid} # will resume data retrieval from this id
    url = 'https://api.steampowered.com/IStoreService/GetAppList/v1/'

    response = requests.get(url, params=params)
    response = response.json()

    if len(response['response']) == 0:
        break

    temporary_df = pd.DataFrame(response['response']['apps']) # location of ids and names
    video_game_data_df = pd.concat([video_game_data_df, temporary_df], ignore_index=True) # adding data to already existing data frame
    last_appid = video_game_data_df['appid'].max()

# Renaming appid to id
video_game_data_df.rename(columns={'appid': 'id'}, inplace=True)
# Dropping unnecessary columns
video_game_data_df.drop(columns=['last_modified', 'price_change_number'], inplace=True)
# Adding columns for future data insertion
video_game_data_df[['release_date', 'price']] = np.nan
video_game_data_df.head()

Unnamed: 0,id,name,release_date,price
0,10,Counter-Strike,,
1,20,Team Fortress Classic,,
2,30,Day of Defeat,,
3,40,Deathmatch Classic,,
4,50,Half-Life: Opposing Force,,


**Converts the 'video_game_data_df' data frame into a table called 'game' in a SQL file called 'video_game_data_db.sqlite' for permanent storage**

If the SQL database already exists, this will compare the list of ids from the dataframe (just pulled from the API) to the ids from the SQL database to check if there are any new ids. The new ids, which arrive almost everyday, will be added to the database.

In [10]:
with sqlite3.connect('video_game_data_db.sqlite') as conn:
    cur = conn.cursor()
    try:
        # Converts the 'video_game_data_df' to a TABLE called 'game' in the video_game_data_db.sqlite file
        video_game_data_df.to_sql(name='game', con=conn, index=False)
    except:
        print("Table 'game' already exists")

        # Create an index on the COLUMN 'id' for quicker data retrieval
        cur.execute("CREATE UNIQUE INDEX IF NOT EXISTS idx_game_id ON game(id)")

        # Checking to see if there are differences in the lists of ids from the data frame and database
        video_game_data_db_ids = set([id[0] for id in cur.execute('SELECT id FROM game').fetchall()])
        video_game_data_df_ids = set(list(video_game_data_df['id']))
        new_game_ids = list(video_game_data_df_ids - video_game_data_db_ids)
        print(f"Number of new games: {len(new_game_ids)}")

        # Slice the 'video_game_data_df' data frame with the new ids
        new_game_df = video_game_data_df[video_game_data_df['id'].isin(new_game_ids)]

        # Insert the id, name, date, and price of the new games into the database file
        for row in new_game_df.itertuples(index=False):
            id = row[0]
            name = row[1]
            date = row[2]
            price = row[3]

            data = (id, name, date, price)
            cur.execute("INSERT INTO game VALUES (?,?,?,?)", data)

        conn.commit()

Table 'game' already exists
Number of new games: 5


### Release Dates and Prices

**Scrapes the release dates and prices of video games from Steam's API**

The API URL with release dates and prices is https://store.steampowered.com/api/appdetails/. This code chunk will update the rows of the database with missing values in the release dates and prices columns. This process will be slow because of the rate limit of 200 requests per response which will require a 5 minute pause in between each response. If data from all games were to be scraped, it would take nearly a day because of the rate limit and the vast number of games.

In [11]:
with sqlite3.connect('video_game_data_db.sqlite') as conn:
    cur = conn.cursor()

    game_pragma = cur.execute('PRAGMA table_info(game)').fetchall()
    game_columns = [col[1] for col in game_pragma] # column names from database

    # Grab all the data from the SQL TABLE 'game'
    video_game_data_db_game = cur.execute('SELECT * FROM game').fetchall()

    # Converts the SQL TABLE 'game' into a dataframe
    game_df = pd.DataFrame(video_game_data_db_game, columns=game_columns)
    game_df.set_index('id', inplace=True) # Reset the index to the id

    # Grab a subset of the dataframe that has missing data in any of the features/columns
    incomplete_game_df = game_df[game_df.isnull().any(axis=1)]
    new_ids = incomplete_game_df.index # new_ids


    print(f"Number of rows with any NULL values: {len(incomplete_game_df)}")
    entry_count = int(input("How many rows would you like to update?"))

    entries = 1
    for id in new_ids:

        if entries > entry_count:
            break

        params = {'appids': id}
        url = 'https://store.steampowered.com/api/appdetails/'
        response = requests.get(url, params=params)

        # Pauses the script due to too many requests to the API
        while response.status_code in [429, 403]:
            if response.status_code == 429:
                print('Status Code', response.status_code, '-> Pausing for 30 seconds')
                time.sleep(30)
                response = requests.get(url, params=params)
            elif response.status_code == 403:
                print('Status Code', response.status_code, '-> Pausing for 10 seconds')
                time.sleep(10)
                response = requests.get(url, params=params)


        ### The JSON file from the API will occassionally encounter key/value errors

        # Error handling for the release date
        try:
            release_date = response.json()[str(id)]['data']['release_date']['date'] # nested dictionary hierarchy for the release date
        except:
            release_date = -1

        # Error handling for the price
        try:
            price = response.json()[str(id)]['data']['price_overview']['final_formatted'] # nested dictionary hierarchy for the price
        except:
            price = -1

        print(f"Entries: {entries}, App ID: {id}, Release Date: {release_date}, Price: {price}")


        # Updating SQL table 'game' with new 'release_date' and 'price' data
        cur.execute("UPDATE game SET release_date = ?, price = ? WHERE id = ?", (release_date, price, id))

        entries += 1

        conn.commit()


print("---Entry Complete---")

incomplete_game_df.head()

Number of rows with any NULL values: 413
---Entry Complete---


Unnamed: 0_level_0,name,release_date,price
id,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1
391150,Red Tie Runner,,
539130,Keywords Test Application,,
642390,Blocksworld,,
1135440,Little Green Girl,,
1145630,BEAST,,


### Tags

**Scrapes the tags associated with each game from a third-party API. The list of tags will be stored in the SQL table 'tag' in the same file**


The third-party API with the tags is https://store.steampowered.com/tagdata/populartags/english. This code chunk will scrape the most relevant tags (genres which players think the game is a part of) from each game. Not all of the tags are included in this API. As a result, the size of the merged table of the 'game' and 'tag' table will be the smaller of the two tables.

In [16]:
with sqlite3.connect('video_game_data_db.sqlite') as conn:
    cur = conn.cursor()

    # Creates the TABLE 'tag' in the video_game_data_db.sqlite file
    cur.executescript('''
    DROP TABLE IF EXISTS tag;

    CREATE TABLE tag (
        id INTEGER PRIMARY KEY,
        name TEXT
    );

    CREATE INDEX IF NOT EXISTS idx_tag_id ON tag(id);

    ''')

    url = 'https://store.steampowered.com/tagdata/populartags/english'
    response = requests.get(url)

    print(response.url)
    print('Status Code', response.status_code)

    response = response.json() # list of dictionaries

    for tag in response:
        tag_id = tag['tagid']
        tag_name = tag['name']
        cur.execute('INSERT INTO tag (id, name) VALUES (?, ?)', (tag_id, tag_name))

    conn.commit()

    tag_db = pd.read_sql("SELECT * FROM tag", conn)

tag_db.head()

https://store.steampowered.com/tagdata/populartags/english
Status Code 200


Unnamed: 0,id,name
0,9,Strategy
1,19,Action
2,21,Adventure
3,84,Design & Illustration
4,87,Utilities


### Game_Tag Pairs Table

**Stores a list of dictionaries into a variable where its key-value pair is the game_id and the tags associated with that game**

The third-party API with the game_tag pairs data is https://steamspy.com/api.php. The final table is organized in a long format, meaning there will be rows with the same game id but different tag ids. Concurrent programming in this code block reduces the run time significantly, however, the total execution time still ranges from 10 to 40 minutes depending on internet speeds and the API's status. The libaries asyncio, aiosqlite, and httpx were all imported to achieve concurrency. In addition, a sempahore with a connection_limit/worker limit of 100 was initialized to control the number of active coroutines.

In [1]:
# Returns a dictionary of the game_id with a dictionary of its associated tags
async def get_tag(game_id, semaphore, session: aiohttp.ClientSession, url) -> dict:
    async with (
        session.get(url) as response,
        semaphore
    ):
        response = await response.json()
        tags = response['tags']
        game_tags = {game_id: tags}
        return game_tags


async def main():
    start_time = perf_counter()

    async with (
        aiosqlite.connect('video_game_data_db.sqlite') as conn,
        conn.execute("SELECT id FROM game") as cur
    ):
        game_ids = [game_id[0] for game_id in await cur.fetchall()] # all game_ids in video_game_data_db.sqlite file

    # Creates a semaphore to control number of active coroutines
    connection_limit = 50
    semaphore = asyncio.Semaphore(connection_limit)

    # Asychronous session for concurrent scraping from the same API
    async with aiohttp.ClientSession() as session:
        tasks = []
        for game_id in game_ids:
            url = f"https://steamspy.com/api.php?request=appdetails&appid={game_id}"
            tasks.append(asyncio.create_task(get_tag(game_id, semaphore, session, url)))

        # Gathers and stores all the games_tags into a list
        games_tags = await asyncio.gather(*tasks)

        print(f"Time Elapsed: {perf_counter() - start_time}")
        print(f"First game_tag pairing: {games_tags[0]}")
        return games_tags




games_tags = await main()


Time Elapsed: 134.56701469999825
First game_tag pairing: {10: {'Action': 5451, 'FPS': 4869, 'Multiplayer': 3420, 'Shooter': 3374, 'Classic': 2807, 'Team-Based': 1882, 'First-Person': 1718, 'Competitive': 1619, 'Tactical': 1359, "1990's": 1215, 'e-sports': 1203, 'PvP': 895, 'Old School': 785, 'Military': 639, 'Strategy': 623, 'Survival': 307, 'Score Attack': 293, '1980s': 272, 'Assassin': 231, 'Nostalgia': 154}}


**Stores the game_tag pairs into the game_tag table**

In [18]:
with sqlite3.connect("video_game_data_db.sqlite") as conn:
  cur = conn.cursor()

  cur.executescript('''
  DROP TABLE IF EXISTS game_tag;

  CREATE TABLE IF NOT EXISTS game_tag (
      game_id INTEGER,
      tag_id INTEGER,
      PRIMARY KEY(game_id, tag_id),
      FOREIGN KEY(game_id) REFERENCES game(id),
      FOREIGN KEY(tag_id) REFERENCES tag(id)
  );
  ''')

  for game_tags in games_tags:              # [{10: {'Action': 5451, 'Multiplayer': 3419}}, {20: ...}]
    for game_id, tags in game_tags.items(): # {10: {'Action': 5451, 'Multiplayer': 3419}}
      for tag in tags:                      # {'Action': 5451, 'Multiplayer': 3419}
        
        tag_id = cur.execute("SELECT id FROM tag WHERE name = (?)", (tag,)).fetchone()
        
        # Only considering the popular tags (checking if they exist in the TABLE tag)
        if tag_id:
          tag_id = tag_id[0]
        else:
          continue

        cur.execute("INSERT INTO game_tag (game_id, tag_id) VALUES (?,?)", (game_id, tag_id))

  conn.commit()
  game_tag_db = pd.read_sql("SELECT * FROM game_tag", conn)

game_tag_db.head()


Unnamed: 0,game_id,tag_id
0,10,19
1,10,1663
2,10,3859
3,10,1774
4,10,1693


### Player Count Table

**Stores the number of active players for each game on an hourly basis. The data is stored in the TABLE 'player_count'**

The games that are not released yet, which are labelled as 'Coming soon', 'Coming Soon', or 'To be announced' in the release_date column of the game table, will not be tracked in this script.

The asyncio library along with two additional asynchronous libraries, aiohttp and aiosqlite, highly optimized the scraping process of Steam's API because it enabled concurrent scraping of multiple URLs. Additionally, a asyncio semaphore with a connection limit of 100 was incorporated in the script to control the number of active coroutines. Without the semaphore, the script would frequently encounter TimeoutErrors.

In [None]:
import asyncio
import aiohttp
import aiosqlite
from time import perf_counter
from datetime import datetime
import pytz




# Returns relevant game ids (games that are already released)
async def get_game_ids() -> list:
  async with (
      aiosqlite.connect('video_game_data_db.sqlite') as conn,
      conn.execute(
        """
        SELECT id
        FROM game
        WHERE release_date NOT IN
          ('Coming soon', 'Coming Soon', 'To be announced')
        """
      ) as cur
  ):
    game_ids = [game_id[0] for game_id in await cur.fetchall()]

    return game_ids


# Returns a dictionary pair of the game_id and its number of active players
async def get_players(game_id, session: aiohttp.ClientSession, semaphore: asyncio.Semaphore) -> dict:
  url = "https://api.steampowered.com/ISteamUserStats/GetNumberOfCurrentPlayers/v1"
  params = {'appid': game_id, 'key': '1674C7309B00CA08D73A8CC100CA24C7'}

  max_retries = 4
  for attempt in range(max_retries + 1):
    async with (
        session.get(url, params=params) as r,
        semaphore
    ):
        error_response = {
           404: f"Received Error 404 for game ID {game_id}. Faulty URL",
           403: f"Attempt {attempt + 1}: Received Error 403 for game ID {game_id}. Retrying...",
           429: f"Attempt {attempt + 1}: Received Error 429 for game ID {game_id}. Too many requests. Retrying...",
        }

        if r.status == 200:  # Success status code
            try:
                r = await r.json()
                player_count = r['response']['player_count']
                return (game_id, player_count)
            except Exception as e:
                print(f"Error parsing JSON response: {e}")
                return (game_id, -1)
        elif r.status == 404:
            print(error_response.get(r.status))
            return (game_id, -1)
        else:
            print(error_response.get(r.status, f"Attempt {attempt + 1}: Received unexpected status code {r.status} for game ID {game_id}. Retrying..."))

        # Sleep briefly before retrying to avoid overwhelming the server
        await asyncio.sleep(1)


# Inserts player counts into the TABLE player_count
async def insert_players(game_ids: list, players: list, current_time: str) -> None:
  async with aiosqlite.connect('video_game_data_db.sqlite') as conn:
    game_ids = [(game_id,) for game_id in game_ids]

    mdhm = current_time.strftime('%m_%d_%H%M')
    mdhm = f"'{mdhm}'" # Add quotation marks because of SQL syntax

    # Preparing the SQL database TABLE 'player_count'
    await conn.execute("CREATE TABLE IF NOT EXISTS player_count (id INT PRIMARY KEY)")
    await conn.execute("CREATE UNIQUE INDEX IF NOT EXISTS idx_game_id ON player_count(id)")
    await conn.execute(f"ALTER TABLE player_count ADD COLUMN {mdhm} INT")

    # First insert the game ids into a column
    try:
      await conn.executemany(f"INSERT INTO player_count (id) VALUES (?)", game_ids)
    except:
      print("Failed to insert new game IDs. Game IDs already exist.")

    # Then update the player count columns for each game id
    for id_player in players:
      try:
        game_id = id_player[0]
        player = id_player[1]
        await conn.execute(f"UPDATE player_count SET {mdhm} = (?) WHERE id = (?)", (player, game_id))
      except:
        print(id_player)

    await conn.execute("COMMIT")


async def main():
  desired_entry = int(input("How many hourly intervals would you like to record? "))

  entry_count = 0
  is_sleeping = False
  while True:
    if entry_count == desired_entry:
      break

    # Scrapes player count data every hour
    desired_timezone = 'US/Central'
    now = datetime.now(pytz.timezone(desired_timezone))
    is_top_of_hour = now.minute == 0

    if is_top_of_hour:
      pass
    else:
      if is_sleeping is False:
        print("Sleeping until next hour")
        is_sleeping = True
      await asyncio.sleep(60)
      continue

    start_time = perf_counter()
    game_ids = await get_game_ids() # game ids that are already released

    # Semaphore to control the number of active coroutines
    connection_limit = 100
    semaphore = asyncio.Semaphore(connection_limit)

    async with aiohttp.ClientSession() as session:
      tasks = [asyncio.create_task(get_players(game_id, session, semaphore)) for game_id in game_ids]

      # Creates a list of tuples with game_id-player_count pairs
      player_counts = await asyncio.gather(*tasks)
      
      # Inserts player count data into a SQL table
      await insert_players(game_ids, player_counts, now)

      print(f"First 10 game_id-player_count tuples: {player_counts[:10]}")
      print(f"Execution Time: {perf_counter() - start_time}")

      entry_count += 1
      is_sleeping = False


await main()

Sleeping until next hour
Received Error 404 for game ID 8780. Faulty URL
Received Error 404 for game ID 39391. Faulty URL
Received Error 404 for game ID 39392. Faulty URL
Received Error 404 for game ID 63970. Faulty URL
Received Error 404 for game ID 224880. Faulty URL
Received Error 404 for game ID 243240. Faulty URL
Received Error 404 for game ID 252190. Faulty URL
Attempt 1: Received Error 429 for game ID 269710. Too many requests. Retrying...
Attempt 1: Received Error 429 for game ID 270760. Too many requests. Retrying...
Attempt 1: Received Error 429 for game ID 270950. Too many requests. Retrying...
Attempt 1: Received Error 429 for game ID 270880. Too many requests. Retrying...
Attempt 1: Received Error 429 for game ID 270270. Too many requests. Retrying...
Attempt 1: Received Error 429 for game ID 270850. Too many requests. Retrying...
Attempt 1: Received Error 429 for game ID 270190. Too many requests. Retrying...
Attempt 1: Received Error 429 for game ID 271550. Too many requ

**Preview the player_count table to check if its functional**

In [19]:
with sqlite3.connect('video_game_data_db.sqlite') as conn:
  cur = conn.cursor()
  player_count_db = pd.read_sql("SELECT * FROM player_count", conn)

player_count_db.head()

Unnamed: 0,id,09_06_0216,09_06_0415,09_06_0000,09_06_1300,09_06_1700,09_06_1900,09_06_2000
0,10,5257,4320,3970,11302,7111,5487,5217
1,20,71,57,59,45,70,65,91
2,30,112,65,62,90,86,93,108
3,40,3,1,1,4,3,2,1
4,50,75,60,48,110,76,82,86


In [None]:
# sleep 2 sec 1009
# sleep 1 sec

# sleep 1 no insert semaphore = 50 errors = 403 | 260
# 20000 games sleep 1 no insert semaphore = 20 errors = 403,429| 500
# 20000 games sleep 1 no insert semaphore = 50 errors = 403,429| 80
# 20000 games sleep 1 no insert semaphore = 100 errors = 403,429| 20, 32, 27

# 20000 games sleep 1 WITH insert semaphore = 50 errors = 403,429| 28

# ALL games sleep 1 WITH insert semaphore = 100 errors = 403,429| 630, 565
# ALL games sleep 1 WITH insert semaphore = 100 errors = 403,429| 1115
