In [None]:
import requests
from bs4 import BeautifulSoup
import time
import pandas as pd
import ast
import re
from tqdm import tqdm
import numpy as np

# Load API key from file
with open("IsThereAnyDeal_API_KEY", "r") as f:
    API_KEY = f.read().strip()

#### Get list of top 5000 steam best best selling priced games in the US from webscraping

In [None]:
# Web-scrapping code for gathering ~ top 5000 steam best selling priced games in the US.

def fetch_batch(start, count):
    params = {
        "query": "",
        "start": start,
        "count": count,
        "category1": "998",
        "supportedlang": "english",
        "hidef2p": "1",
        "filter": "topsellers",
        "ndl": "1" 
    }

    headers = {
        "User-Agent": "Mozilla/5.0"
    }

    r = requests.get("https://store.steampowered.com/search/results/", params=params, headers=headers)
    r.raise_for_status()
    return r.text

def parse_batch(html):
    soup = BeautifulSoup(html, "html.parser")
    container = soup.find("div", id="search_resultsRows")
    results = []

    if container:
        for a in container.find_all("a", href=True):
            if a.get("data-ds-packageid"):  # skip packages
                continue
            if a.get("data-ds-subid"):  # skip subs
                continue

            appid = a.get("data-ds-appid")
            href = a["href"]
            title_span = a.find("span", class_="title")
            title = title_span.text.strip() if title_span else "Unknown"

            review_span = a.find("span", class_ = "search_review_summary")
            if review_span:
                tooltip = review_span.get("data-tooltip-html")
                # Use regex to extract the number from the tooltip text
                match = re.search(r'([\d,]+) user reviews', tooltip)
                review_number = int(match.group(1).replace(',', '')) if match else None
            else:
                review_number = 0

            if appid:
                if review_number > 3000:
                    results.append((appid, href, title))

    return results

def get_all_appids(limit=5000, batch_size=50, delay=0.02):
    all_results = []
    for start in range(0, limit, batch_size):
        print(f"Fetching results {start}–{start+batch_size}...")
        html = fetch_batch(start, batch_size)
        batch = parse_batch(html)
        if not batch:
            print("No more results.")
            break
        all_results.extend(batch)
        time.sleep(delay)
    return all_results


results = get_all_appids(limit=5000)
for appid, href, title in results:
    print(f"{title} (AppID: {appid}) - {href}")

print(f"\nTotal results: {len(results)}")

In [None]:
# Dump scrapped results into CSV

df = pd.DataFrame(results, columns=["steam_id", "link", "game"])
df.to_csv("steam_top_games.csv", index=False)
print("\nCSV saved as steam_top_games.csv")

#### Get game discount history in past 10 years using IsThereAnyDeal API

In [None]:
# Find out ITAD ID for games scrapped using IsThereAnyDeal API, prepping for gathering pricing history

# Step 1: Load your CSV
df = pd.read_csv("steam_top_games.csv")

# Step 2: Convert AppIDs to str (API requires string list)
appids = df["steam_id"].astype(str).apply(lambda x: f"app/{x}").tolist()

# Step 3: Send request to ITAD API

url = "https://api.isthereanydeal.com/lookup/id/shop/61/v1"

response = requests.post(url, json=appids)
mapping = response.json()

# Step 4: Attach the ITAD game ID to the dataframe
# Clean appid (remove "app/") to match with df["steam_id"]
df["itad_id"] = df["steam_id"].astype(str).apply(lambda x: mapping.get(f"app/{x}"))

# Step 5: Save the updated dataframe
df.to_csv("steam_top_games_with_itad_ids.csv", index=False)
print("Saved updated CSV as steam_top_games_with_itad_ids.csv")

In [None]:
# Using ITAD ID for games to gather pricing history using IsThereAnyDeal API

# Load the CSV with steam_id, game name, itad_id
df = pd.read_csv("steam_top_games_with_itad_ids.csv")

PRICE_HIST_URL = "https://api.isthereanydeal.com/games/history/v2"
SINCE_DATE = "2015-06-01T00:00:00Z"

# Create a new column to store raw price log JSON
df["price_logs"] = None

for idx, row in tqdm(df.iterrows(), total=len(df), desc="Fetching price log info"):
    itad_id = row.get("itad_id")
    if pd.isna(itad_id):
        continue

    params = {
        "key": API_KEY,
        "id": itad_id,
        "shops": 61, 
        "since": SINCE_DATE
    }

    try:
        resp = requests.get(PRICE_HIST_URL, params=params)
        resp.raise_for_status()
        price_data = resp.json()

        # Store raw price log list directly
        df.at[idx, "price_logs"] = str(price_data)  # stringify list for CSV compatibility

        time.sleep(0.05)

    except Exception as e:
        print(f"Failed for {row.get('game')} ({itad_id}): {e}")
        continue

# Save to CSV
df.to_csv("steam_top_games_with_price_logs.csv", index=False)
print("Saved raw price logs to steam_top_games_with_price_logs.csv")

In [None]:
# Initial clean-ups of gathered pricing history data

# Load your full price log dataset
df = pd.read_csv("steam_top_games_with_price_logs.csv")
print(f"Totaling {len(df)} games before clean-up.\n")

# Parse the 'price_logs' column from string to list
df["parsed_logs"] = df["price_logs"].apply(lambda x: ast.literal_eval(x) if pd.notna(x) else [])

# Identify rows with empty price log lists
empty_log_df = df[df["parsed_logs"].apply(lambda logs: len(logs) == 0)]

# Print the list of missing games
print("Games with NO price history on IsThereAnyDeal:\n")
for _, row in empty_log_df.iterrows():
    print(f"- {row['game']} (Steam ID: {row['steam_id']})")

# Filter out those rows (i.e., keep only non-empty logs)
df_cleaned = df[df["parsed_logs"].apply(lambda logs: len(logs) > 0)].drop(columns=["parsed_logs"])

print(f"\nClean-up complete, {len(df_cleaned)} games left after clean-up.")

# Save cleaned dataset
df_cleaned.to_csv("steam_top_games_with_price_logs_cleaned.csv", index=False)
print("Saved cleaned dataset to steam_top_games_with_price_logs_cleaned.csv")

#### Get game basic information using Steam's own API

In [None]:
from datetime import datetime
import time

def get_game_info (steam_id):
    HEADERS = {
        "User-Agent": "Mozilla/5.0"
    }

    r = requests.get(f"https://store.steampowered.com/api/appdetails?appids={steam_id}&cc=us&l=en", headers=HEADERS)

    # in case missing information, return all None
    # try:
    info = r.json().get(f'{steam_id}', {}).get("data", {})

    # get game related information

    # store as a list if it's a list, or wrap in list if it's a single string
    publisher = info.get("publishers", []) if isinstance(info.get("publishers"), list) else [info.get("publishers")] if info.get("publishers") else []
    developer = info.get("developers", []) if isinstance(info.get("developers"), list) else [info.get("developers")] if info.get("developers") else []
                                                                                                      
    category  = str(info.get("categories")) if "categories" in info else None
    genre     = str(info.get("genres")) if "genres" in info else None

    header_image = str(info.get("header_image")) if "categories" in info else None

    # release_date = str(datetime.strptime(info.get("release_date", {}).get('date'), "%b %d, %Y")) if  # for partial storing in json
    date_str = info.get("release_date", {}).get('date')
    if isinstance(date_str, str) and date_str.strip():
        release_date = str(datetime.strptime(date_str, "%b %d, %Y"))
    else:
        release_date = None
    
    # rec = int(info.get("recommendations").get("total")) if "recommendations" in info else 0
    score = info.get("metacritic", {}).get("score")
    
    # platforms_avaliable = info.get("platforms", {}) if isinstance(info.get("platforms"), dict) else {info.get("platforms")} if info.get("platforms") else {}
    platforms = info.get("platforms", {})
    is_windows = int(platforms.get("windows", False))
    is_mac = int(platforms.get("mac", False))
    is_linux = int(platforms.get("linux", False))
    # platforms_avaliable = sum(info.get("platforms").values()) if "platforms" in info else None
    
    # except Exception as e:
    #     print(f"Error occured at index {idx} for {steam_id}: {e}")
    #     # publisher,developer,category,genre,release_date,score, platforms_avaliable = None, None, None, None, None, None, None

    r_rec = requests.get(f"https://store.steampowered.com/appreviews/{steam_id}?json=1&language=all", headers=HEADERS)
    query_summary = r_rec.json().get("query_summary", {})
    rec_tot = int(query_summary.get("total_reviews", {})) if "total_reviews" in query_summary else None
    rec_pos = int(query_summary.get("total_positive", {})) if "total_positive" in query_summary else None
    rec_desc = str(query_summary.get("review_score_desc", {})) if "review_score_desc" in query_summary else None
    
    time.sleep(1.5)
    return (publisher,developer,category,genre,header_image,release_date,score,rec_pos,rec_tot,rec_desc,is_windows,is_mac,is_linux)

In [None]:
from tqdm import tqdm
import json
import pickle

df = pd.read_csv("steam_top_games_with_price_logs_cleaned.csv")

# collect and add basic game information to df
df_new = df.copy()
game_info = []

try:
    for idx, row in tqdm(df_new.iterrows(), total=len(df_new), desc="Fetching game info"):
        current_game_info = get_game_info(row["steam_id"])
        game_info.append(current_game_info)

        if idx % 100 == 0 and idx != 0:
            with open("game_info.pkl", "wb") as f:
                pickle.dump(game_info, f)
            # print(f"Auto-saved at index {idx}")
            # print(f"Testing integrity of data: the last game is from {current_game_info[0]} and is rated as {current_game_info[8]}")

except Exception as e:
    print(f"Error occured at index {idx} for {row["steam_id"]}: {e}")
    with open("game_info.pkl", "wb") as f:
        pickle.dump(game_info, f)
    print("Partial data saved to 'game_info.pkl'")
    raise  # re-raise the exception to stop the script

with open("game_info.pkl", "wb") as f:
                pickle.dump(game_info, f)
print("Data saved to 'game_info.pkl'")

In [None]:
with open("game_info.pkl", "rb") as f:
    game_info = pickle.load(f)

df = pd.read_csv("steam_top_games_with_price_logs_cleaned.csv")
df_new = df.copy()
df_new[["publishers", "developers", "categories", "genres", "header_image", "release_dates", \
        "metacritic_scores", "positive_review", "total_review", "review_desc", \
        "is_windows", "is_mac", "is_linux"]] = pd.DataFrame(game_info, index=df_new.index)

# Save dataset
df_new.to_csv("steam_top_games_with_price_logs_and_game_info.csv", index=False)
print("Saved cleaned dataset to steam_top_games_with_price_logs_and_game_info.csv")

#### Removing duplicate and errorsome games

In [None]:
df = pd.read_csv("steam_top_games_with_price_logs_and_game_info.csv")
print(df.info())
print(f"\nunique steam_id {df.steam_id.nunique()}\n")
print(f"\nunique publishers {df.publishers.nunique()}\n")
df

In [None]:
# Replicate steam_id:

# Group by 'steam_id' and count occurrences
duplicates = df.groupby('steam_id').size().reset_index(name='count')
duplicates = duplicates[duplicates['count'] > 1]

# Filter the original df for these steam_ids
non_unique_df = df[df['steam_id'].isin(duplicates['steam_id'])]

# Show the 'steam_id' and 'game' columns
print(non_unique_df[['steam_id', 'game']].sort_values(by='steam_id'))

In [None]:
# Post processing: 
# Removing steam_id 22330 and 40960 completely.
# Removing It Takes Two Friend's Pass
# For the remaining repeating rows, keep one of them.

df = pd.read_csv("steam_top_games_with_price_logs_and_game_info.csv")

# 1 Remove rows with steam_id 22330 and 40960
df = df[~df['steam_id'].isin([22330, 40960,1794960])]

# 2 Remove rows where game is "It Takes Two Friend's Pass"
df = df[df['game'] != "It Takes Two Friend's Pass"]

# 3 For remaining duplicate steam_id rows, keep the first occurrence
df = df.drop_duplicates(subset='steam_id', keep='first')

df.to_csv("steam_top_games_with_price_logs_and_game_info.csv", index=False)
print(f"Saved cleaned dataset to steam_top_games_with_price_logs_and_game_info.csv, length = {len(df)}")

#### Using RAWG API to supplement meta-score

In [None]:
df = pd.read_csv("steam_top_games_with_price_logs_and_game_info.csv")

with open("RAWG_API_Key", "r") as f:
    RAWG_API_Key = f.read().strip()

In [None]:
from tqdm import tqdm

records = []
failed_titles = []

for title in tqdm(df['game'],desc="Fetching meta score info"):
    try:
        # Step 1: Search for the game on RAWG (platform 4 = PC)
        search_url = 'https://api.rawg.io/api/games'
        search_params = {
            'search': title,
            'key': RAWG_API_Key,
            'page_size': 1,
            'platforms': 4
        }
        search_resp = requests.get(search_url, params=search_params).json()

        if not search_resp['results']:
            continue

        game_id = search_resp['results'][0]['id']
        game_name = search_resp['results'][0]['name']

        # Step 2: Fetch full game details using the ID
        game_resp = requests.get(
            f'https://api.rawg.io/api/games/{game_id}',
            params={'key': RAWG_API_Key}
        ).json()

        # Extract PC-specific Metacritic score if available
        pc_score = None
        for p in game_resp.get('metacritic_platforms', []):
            if p['platform']['name'].lower() == 'pc':
                pc_score = p['metascore']
                break

        records.append({
            'search_name': title,
            'resolved_name': game_name,
            'overall_metacritic_score': game_resp.get('metacritic'),
            'pc_metacritic_score': pc_score
        })

    except Exception as e:
        print(f"Error processing {title}: {e}")
        failed_titles.append(title)

# Convert to DataFrame
RAWG_meta_df = pd.DataFrame(records)
RAWG_meta_df

In [None]:

for title in tqdm(failed_titles,desc="Retrying fetching meta score info for failed cases"):
    try:
        # Step 1: Search for the game on RAWG (platform 4 = PC)
        search_url = 'https://api.rawg.io/api/games'
        search_params = {
            'search': title,
            'key': RAWG_API_Key,
            'page_size': 1,
            'platforms': 4
        }
        search_resp = requests.get(search_url, params=search_params).json()

        if not search_resp['results']:
            continue

        game_id = search_resp['results'][0]['id']
        game_name = search_resp['results'][0]['name']

        # Step 2: Fetch full game details using the ID
        game_resp = requests.get(
            f'https://api.rawg.io/api/games/{game_id}',
            params={'key': RAWG_API_Key}
        ).json()

        # Extract PC-specific Metacritic score if available
        pc_score = None
        for p in game_resp.get('metacritic_platforms', []):
            if p['platform']['name'].lower() == 'pc':
                pc_score = p['metascore']
                break

        records.append({
            'search_name': title,
            'resolved_name': game_name,
            'overall_metacritic_score': game_resp.get('metacritic'),
            'pc_metacritic_score': pc_score
        })

    except Exception as e:
        print(f"Error processing {title}: {e}")

In [None]:
RAWG_meta_df = pd.DataFrame(records)
RAWG_meta_df.to_csv('RAWG_metacritic_scores.csv', index=False)
RAWG_meta_df

In [None]:
# Merge based on 'game' matching 'search_name'
df = pd.read_csv("steam_top_games_with_price_logs_and_game_info.csv")
RAWG_meta_df = pd.read_csv("RAWG_metacritic_scores.csv")
merged_df = RAWG_meta_df.merge(
    df[['game', 'metacritic_scores']].rename(columns={'metacritic_scores': 'steam_metacritic_score'}),
    left_on='search_name',
    right_on='game',
    how='left'
).drop(columns='game')  # Drop redundant 'game' column after merge
merged_df

merged_df['metacritic_score'] = (
    merged_df['pc_metacritic_score']
    .combine_first(merged_df['steam_metacritic_score'])
    .combine_first(merged_df['overall_metacritic_score'])
)

In [None]:
merged_df['metacritic_score'] = merged_df['metacritic_score'].fillna(merged_df['metacritic_score'].mean())

In [None]:
df = df.merge(
    merged_df[['search_name', 'metacritic_score']],
    how='left',
    left_on='game',
    right_on='search_name'
).rename(columns={'metacritic_score': 'metacritic_scores_augmented'}).drop(columns=['search_name'])

df

In [None]:
df.to_csv('steam_top_games_with_price_logs_and_game_info.csv', index=False)

#### Final filtering to remove games with less than 5000 reviews

In [None]:
final_filter = pd.read_csv('steam_top_games_with_price_logs_and_game_info.csv')
final_filter = final_filter[final_filter['total_review'] > 5000]
final_filter.to_csv('steam_top_games_with_price_logs_and_game_info.csv', index=False)
final_filter