# Data collection

In this notebook we collect all the data and format the initial version of the JSON files.
We then create a mock user database with some user-game interactions.

In [None]:
import requests
import json
from pathlib import Path
import pandas as pd
import time
import random

### Creation of API authentication caller

In [None]:
# Twitch API credentials
CLIENT_ID = "client-id"
CLIENT_SECRET = "client-secret"

TOKEN_URL = "https://id.twitch.tv/oauth2/token"

def get_auth_header():
    print("Requesting OAuth token from Twitch...")
    token_resp = requests.post(
        TOKEN_URL,
        params={
            "client_id": CLIENT_ID,
            "client_secret": CLIENT_SECRET,
            "grant_type": "client_credentials",
        },
    )
    token_resp.raise_for_status()
    token_data = token_resp.json()
    access_token = token_data["access_token"]
    print("Got access token.")

    headers = {
        "Client-ID": CLIENT_ID,
        "Authorization": f"Bearer {access_token}",
    }

    return headers

### Call to fetch the games in json format

In [None]:
IGDB_GAMES_URL = "https://api.igdb.com/v4/games"

# Folder for raw data
RAW_DIR = Path("./data/raw")
RAW_DIR.mkdir(parents=True, exist_ok=True)

headers = get_auth_header()

all_games = []
batch_size = 500
offset = 0

while True:
    print(f"Requesting games: offset={offset}, limit={batch_size}...")
    query = f"""
    fields id, name, summary, first_release_date, genres, platforms, keywords, involved_companies;
    where first_release_date != null;
    limit {batch_size};
    offset {offset};
    """

    resp = requests.post(IGDB_GAMES_URL, headers=headers, data=query)
    resp.raise_for_status()
    batch = resp.json()
    print(f"Got {len(batch)} games in this batch")

    if not batch:
        print("No more games, stopping.")
        break

    all_games.extend(batch)

    # move to next page
    offset += batch_size

    # optional: stop at some max
    # if offset >= 5000:
    #     break

print(f"Total games collected: {len(all_games)}")

out_file = RAW_DIR / "games_raw.json"
with out_file.open("w", encoding="utf-8") as f:
    json.dump(all_games, f, ensure_ascii=False, indent=2)

print(f"Saved all games to {out_file}")

### Format JSON files and add Steam IDs 

In [None]:
RAW_DIR = Path("./data/raw")
JSON_DIR = Path("./data/json")
JSON_DIR.mkdir(parents=True, exist_ok=True)

headers = get_auth_header()

IGDB_GENRES_URL = "https://api.igdb.com/v4/genres"
IGDB_PLATFORMS_URL = "https://api.igdb.com/v4/platforms"
IGDB_KEYWORDS_URL = "https://api.igdb.com/v4/keywords"
IGDB_INVOLVED_URL = "https://api.igdb.com/v4/involved_companies"
IGDB_COMPANIES_URL = "https://api.igdb.com/v4/companies"
IGDB_EXTERNAL_GAMES_URL = "https://api.igdb.com/v4/external_games"
STEAM_CATEGORY_ID = 1  # IGDB enum value for Steam

In [None]:
def fetch_steam_ids_for_games(game_ids):
    """
    Return a dict: { game_id (IGDB) -> steam_appid (string) }
    using IGDB's external_games endpoint, filtering category=Steam.
    """
    if not game_ids:
        return {}

    game_ids_list = sorted(list(game_ids))
    chunk_size = 500  # IGDB limit per request
    mapping = {}

    for i in range(0, len(game_ids_list), chunk_size):
        chunk = game_ids_list[i:i + chunk_size]
        ids_str = ",".join(str(gid) for gid in chunk)

        body = f"""
        fields game, uid, category;
        where game = ({ids_str}) & category = {STEAM_CATEGORY_ID};
        limit 500;
        """

        resp = requests.post(IGDB_EXTERNAL_GAMES_URL, headers=headers, data=body)
        resp.raise_for_status()
        items = resp.json()

        for item in items:
            game_id = item["game"]
            steam_uid = item["uid"]   # Steam appid as string/number
            # In case of multiple external entries per game, keep the first
            if game_id not in mapping:
                mapping[game_id] = str(steam_uid)

    return mapping

We load the raw games data collected earlier to extract Steam IDs.

In [None]:
games_file = RAW_DIR / "games_raw.json"
with games_file.open("r", encoding="utf-8") as f:
    games = json.load(f)

print(f"Loaded {len(games)} games from {games_file}")

# collect all IGDB game_ids from raw games
game_ids_set = {g["id"] for g in games}

print("Fetching Steam appids from IGDB external_games...")
steam_ids_map = fetch_steam_ids_for_games(game_ids_set)
print(f"Found Steam appids for {len(steam_ids_map)} games")

Now we enrich games with steam_appid where available and start giving a format to the end JSON file

In [None]:
rows_games = []
rows_game_genres = []
rows_game_platforms = []
rows_game_keywords = []
rows_game_companies = []

genre_ids_set = set()
platform_ids_set = set()
keyword_ids_set = set()
involved_company_ids_set = set()
company_ids_set = set()

for g in games:
    game_id = g["id"]  # use IGDB id as our game_id

    rows_games.append({
        "game_id": game_id,
        "igdb_id": g["id"],
        "name": g.get("name"),
        "summary": g.get("summary"),
        "first_release_date": g.get("first_release_date"),
        "steam_appid": steam_ids_map.get(game_id),
    })

    # genres
    for genre_id in g.get("genres", []):
        rows_game_genres.append({
            "game_id": game_id,
            "genre_id": genre_id,
        })
        genre_ids_set.add(genre_id)

    # platforms
    for platform_id in g.get("platforms", []):
        rows_game_platforms.append({
            "game_id": game_id,
            "platform_id": platform_id,
        })
        platform_ids_set.add(platform_id)

    # keywords
    for keyword_id in g.get("keywords", []):
        rows_game_keywords.append({
            "game_id": game_id,
            "keyword_id": keyword_id,
        })
        keyword_ids_set.add(keyword_id)

    # involved_companies
    for inv_id in g.get("involved_companies", []):
        # we only know the involved_company ID for now
        rows_game_companies.append({
            "game_id": game_id,
            "involved_company_id": inv_id,
        })
        involved_company_ids_set.add(inv_id)

In [None]:
df_games = pd.DataFrame(rows_games)
df_game_genres = pd.DataFrame(rows_game_genres)
df_game_platforms = pd.DataFrame(rows_game_platforms)

df_game_keywords = pd.DataFrame(rows_game_keywords)
df_game_companies = pd.DataFrame(rows_game_companies)

We resolve genre and platform names next.

In [None]:
def fetch_lookup_table(url, ids, id_field="id", name_field="name", chunk_size=500):
    """Fetch id->name mapping (genres, platforms, keywords, companies) from IGDB in chunks."""
    if not ids:
        return pd.DataFrame(columns=[id_field, name_field])

    ids_list = sorted(list(ids))
    frames = []

    for start in range(0, len(ids_list), chunk_size):
        chunk = ids_list[start:start + chunk_size]
        ids_str = ",".join(str(i) for i in chunk)

        body = f"""
        fields {id_field}, {name_field};
        where id = ({ids_str});
        limit {len(chunk)};
        """

        print(f"Fetching lookup from {url}: {start}–{start + len(chunk)} of {len(ids_list)}")
        resp = requests.post(url, headers=headers, data=body)

        if resp.status_code != 200:
            print("Error response:", resp.status_code, resp.text[:300])
            resp.raise_for_status()

        items = resp.json()
        if items:
            df_chunk = pd.DataFrame(items)[[id_field, name_field]]
            frames.append(df_chunk)

        # short delay to avoid hammering the API
        time.sleep(0.2)

    if frames:
        df = pd.concat(frames, ignore_index=True)
        df = df.drop_duplicates(subset=[id_field])
    else:
        df = pd.DataFrame(columns=[id_field, name_field])

    return df

In [None]:

def fetch_involved_companies(ids, chunk_size=200):
    """
    Fetch involved_companies rows from IGDB in chunks to avoid 413 / payload too large.
    Returns a DataFrame with columns:
    id, company, game, developer, publisher, porting, supporting
    """
    if not ids:
        return pd.DataFrame(columns=["id", "company", "game", "developer", "publisher", "porting", "supporting"])

    ids_list = sorted(list(ids))
    frames = []

    for start in range(0, len(ids_list), chunk_size):
        chunk = ids_list[start:start + chunk_size]
        ids_str = ",".join(str(i) for i in chunk)

        body = f"""
        fields id, company, game, developer, publisher, porting, supporting;
        where id = ({ids_str});
        limit {len(chunk)};
        """

        print(f"Fetching involved_companies: {start}–{start + len(chunk)} of {len(ids_list)}")
        resp = requests.post(IGDB_INVOLVED_URL, headers=headers, data=body)

        if resp.status_code != 200:
            print("Error response from involved_companies:", resp.status_code, resp.text[:300])
            resp.raise_for_status()

        items = resp.json()
        if items:
            df_chunk = pd.DataFrame(items)
            frames.append(df_chunk)

        # be nice to IGDB
        time.sleep(0.2)

    if frames:
        df = pd.concat(frames, ignore_index=True)
        df = df.drop_duplicates(subset=["id"])
    else:
        df = pd.DataFrame(columns=["id", "company", "game", "developer", "publisher", "porting", "supporting"])

    return df

In [None]:
# genre lookup
df_genres = fetch_lookup_table(IGDB_GENRES_URL, genre_ids_set, id_field="id", name_field="name")
df_genres = df_genres.rename(columns={"id": "genre_id"})

# platform lookup
df_platforms = fetch_lookup_table(IGDB_PLATFORMS_URL, platform_ids_set, id_field="id", name_field="name")
df_platforms = df_platforms.rename(columns={"id": "platform_id"})

# keyword lookup
df_keywords = fetch_lookup_table(IGDB_KEYWORDS_URL, keyword_ids_set, id_field="id", name_field="name")
df_keywords = df_keywords.rename(columns={"id": "keyword_id"})

df_involved = fetch_involved_companies(involved_company_ids_set)

# collect company IDs from here
if not df_involved.empty:
    company_ids_set = set(df_involved["company"].tolist())
else:
    company_ids_set = set()

df_companies = fetch_lookup_table(IGDB_COMPANIES_URL, company_ids_set, id_field="id", name_field="name")
df_companies = df_companies.rename(columns={"id": "company_id"})

We save all the JSON files.

In [None]:
def save_df_as_json(df, path):
    records = df.to_dict(orient="records")
    with path.open("w", encoding="utf-8") as f:
        json.dump(records, f, ensure_ascii=False, indent=2)

save_df_as_json(df_games, JSON_DIR / "games.json")
save_df_as_json(df_game_genres, JSON_DIR / "game_genres.json")
save_df_as_json(df_game_platforms, JSON_DIR / "game_platforms.json")
save_df_as_json(df_genres, JSON_DIR / "genres.json")
save_df_as_json(df_platforms, JSON_DIR / "platforms.json")

save_df_as_json(df_game_keywords, JSON_DIR / "game_keywords.json")
save_df_as_json(df_keywords, JSON_DIR / "keywords.json")
save_df_as_json(df_game_companies, JSON_DIR / "game_involved_companies.json")
save_df_as_json(df_involved, JSON_DIR / "involved_companies_raw.json")
save_df_as_json(df_companies, JSON_DIR / "companies.json")

print("Saved JSON files to", JSON_DIR)

### Fetching of the Steam reviews

In [None]:
GAMES_FILE = JSON_DIR / "games.json"
OUT_FILE = JSON_DIR / "steam_reviews.json"

STEAM_REVIEW_URL = (
    "https://store.steampowered.com/appreviews/{appid}"
    "?json=1&language=all&purchase_type=all&num_per_page=0"
)

Load the games from the file.

In [None]:
with GAMES_FILE.open("r", encoding="utf-8") as f:
    games = json.load(f)

Get the games with Steam ID and look for their reviews.

In [None]:
rows_reviews = []

for g in games:
    appid = g.get("steam_appid")
    game_id = g["game_id"]

    if not appid:
        continue  # skip games without steam mapping

    url = STEAM_REVIEW_URL.format(appid=appid)
    print(f"Fetching reviews for game_id={game_id}, appid={appid} ...")

    try:
        resp = requests.get(url)
        resp.raise_for_status()
        data = resp.json()

        summary = data.get("query_summary", {})
        rows_reviews.append({
            "game_id": game_id,
            "steam_appid": appid,
            "total_positive": summary.get("total_positive", 0),
            "total_negative": summary.get("total_negative", 0),
            "total_reviews": summary.get("total_reviews", 0),
        })

        # small delay to be polite
        time.sleep(0.2)

    except Exception as e:
        print(f"Error for appid={appid}: {e}")

Save into JSON.

In [None]:
with OUT_FILE.open("w", encoding="utf-8") as f:
    json.dump(rows_reviews, f, ensure_ascii=False, indent=2)

print(f"Saved Steam review stats to {OUT_FILE}")

### Generation of the mock users

In [None]:
USERS_OUT_FILE = JSON_DIR / "users.json"
USER_GAMES_OUT_FILE = JSON_DIR / "user_games.json"

N_USERS = 10000
MIN_GAMES_PER_USER = 1
MAX_GAMES_PER_USER = 50

# some random countries to add more stuff
COUNTRIES = ["US", "GB", "DE", "FR", "ES", "IT", "BR", "CA", "AU", "JP"]

Load the games file and filter for those with Steam ID.

In [None]:
with GAMES_FILE.open("r", encoding="utf-8") as f:
    games = json.load(f)

print(f"Loaded {len(games)} games from {GAMES_FILE}")

# Filter to games that have a steam_appid
games_with_appid = [g for g in games if g.get("steam_appid")]
if games_with_appid:
    games = games_with_appid
    print(f"Using {len(games)} games that have a Steam appid")

if not games:
    raise RuntimeError("No games available to generate interactions!")

game_ids = [g["game_id"] for g in games]

Generate the users.

In [None]:
def generate_user(user_id: int) -> dict:
    username = f"user_{user_id}"
    country = random.choice(COUNTRIES)
    age = random.randint(16, 45)  # arbitrary range

    return {
        "user_id": user_id,
        "username": username,
        "country": country,
        "age": age,
    }

In [None]:
users = [generate_user(i) for i in range(1, N_USERS + 1)]
print(f"Generated {len(users)} mock users")

Generate user-game interactions.

In [None]:
def generate_user_games(user_id: int) -> list[dict]:
    """
    Generate a random list of interactions for a single user.
    Each interaction has: user_id, game_id, rating, playtime_hours.
    """
    n_games = random.randint(MIN_GAMES_PER_USER, MAX_GAMES_PER_USER)
    # sample without replacement so each user doesn't repeat the same game
    sampled_game_ids = random.sample(game_ids, k=min(n_games, len(game_ids)))

    interactions = []
    for gid in sampled_game_ids:
        # rating: skew towards higher ratings a bit (gamers rarely rate 1/5)
        rating = round(min(max(random.gauss(3.8, 0.8), 1.0), 5.0), 1)

        # playtime: heavy-tailed – many low-play games, some very high
        # log-normal-ish: exp of a normal
        base = random.gauss(2.0, 1.0)
        playtime_hours = max(0.1, round((2.71828 ** base), 1))  # e^base, clamp at 0.1

        interactions.append({
            "user_id": user_id,
            "game_id": gid,
            "rating": rating,
            "playtime_hours": playtime_hours,
        })

    return interactions

In [None]:
user_games = []
for u in users:
    ug = generate_user_games(u["user_id"])
    user_games.extend(ug)

print(f"Generated {len(user_games)} user-game interactions")

Save to JSON.

In [None]:
with USERS_OUT_FILE.open("w", encoding="utf-8") as f:
    json.dump(users, f, ensure_ascii=False, indent=2)

with USER_GAMES_OUT_FILE.open("w", encoding="utf-8") as f:
    json.dump(user_games, f, ensure_ascii=False, indent=2)

print(f"Saved users to {USERS_OUT_FILE}")
print(f"Saved user-game interactions to {USER_GAMES_OUT_FILE}")