# Steam Review Summarization

Generates neutral game descriptions from 100+ user reviews using distilbart-cnn-12-6.

**Run cells in order: 1 → 7**

In [1]:
# Imports
import os
os.environ["TOKENIZERS_PARALLELISM"] = "false"  # avoid tokenizer threading/fork issues

import pandas as pd
import glob
import re
import string
import pickle
import time
import multiprocessing
from concurrent.futures import ThreadPoolExecutor, as_completed
import warnings
warnings.filterwarnings('ignore')

# Install transformers if needed
try:
    from transformers import AutoTokenizer, AutoModelForSeq2SeqLM, pipeline
    from tqdm import tqdm
    import torch
except ImportError:
    print("Installing required packages...")
    import subprocess
    import sys
    subprocess.check_call([sys.executable, "-m", "pip", "install", "-q", "transformers", "sentencepiece", "torch", "tqdm"])
    from transformers import AutoTokenizer, AutoModelForSeq2SeqLM, pipeline
    from tqdm import tqdm
    import torch

print("✓ All imports loaded (TOKENIZERS_PARALLELISM=false)")

✓ All imports loaded (TOKENIZERS_PARALLELISM=false)


In [4]:
# Configuration – adaptive root detection (never inside /python)
import os
from pathlib import Path

# --- Model path setup (unchanged) ---
default_local_cache = Path.home() / "hf_cache" / "distilbart"
hf_default_cache = Path.home() / ".cache" / "huggingface" / "hub" / "models--sshleifer--distilbart-cnn-12-6"

if default_local_cache.exists():
    MODEL_PATH = str(default_local_cache)
elif hf_default_cache.exists():
    MODEL_PATH = str(hf_default_cache)
else:
    MODEL_PATH = "sshleifer/distilbart-cnn-12-6"  # fallback (downloads if missing)

# --- Project root detection ---
# Priority:
# 1) STEAM_GAMES_ROOT env var, if provided
# 2) Walk up from CWD to the first folder that has a "data" subfolder and is NOT named "python"
# 3) If still not found, prefer parent of a trailing "/python" folder; else use CWD
env_root = os.environ.get("STEAM_GAMES_ROOT")
if env_root:
    project_root = Path(env_root).expanduser().resolve()
else:
    here = Path.cwd().resolve()
    project_root = None
    for cand in [here, *here.parents]:
        if cand.name == "python":
            continue
        if (cand / "data").exists():
            project_root = cand
            break
    if project_root is None:
        project_root = here.parent if here.name == "python" else here

# Final safety: never allow .../python/data
if project_root.name == "python":
    project_root = project_root.parent

# --- Data directories ---
DATA_DIR = (project_root / "data").resolve()
PROCESSED_DIR = (DATA_DIR / "processed").resolve()

# --- Processing range ---
START_APP_ID = 7000   # None = start from beginning
END_APP_ID   = 12000  # None = process all

# --- Processing settings ---
NUM_WORKERS = (os.cpu_count() or 4) - 1
CHECKPOINT_EVERY = 25

# --- Display configuration ---
print("Configuration:")
print(f"  Model: {MODEL_PATH}")
print(f"  App ID range: {START_APP_ID or 'start'} to {END_APP_ID or 'end'}")
print(f"  Workers: {NUM_WORKERS}, Checkpoint every: {CHECKPOINT_EVERY}")
print(f"  Project root: {project_root}")
print(f"  Data dir:     {DATA_DIR}")
print(f"  Processed dir:{PROCESSED_DIR}")

Configuration:
  Model: /Users/radimsoukal/.cache/huggingface/hub/models--sshleifer--distilbart-cnn-12-6
  App ID range: 7000 to 12000
  Workers: 9, Checkpoint every: 25
  Project root: /Users/radimsoukal/Library/Mobile Documents/com~apple~CloudDocs/VŠE/05. SEMESTR/Text Analytics/R/Steam_Games 2
  Data dir:     /Users/radimsoukal/Library/Mobile Documents/com~apple~CloudDocs/VŠE/05. SEMESTR/Text Analytics/R/Steam_Games 2/data
  Processed dir:/Users/radimsoukal/Library/Mobile Documents/com~apple~CloudDocs/VŠE/05. SEMESTR/Text Analytics/R/Steam_Games 2/data/processed


# Clean cache (run this if you get KeyError about 'combined_reviews')
cache_file = os.path.join(PROCESSED_DIR, 'combined_reviews_cache.pkl')
if os.path.exists(cache_file):
    os.remove(cache_file)
    print("✓ Cache deleted - will rebuild on next cell")
else:
    print("✓ No cache to delete")

In [5]:
# Load and combine CSV files (robust cache handling)
cache_file = os.path.join(PROCESSED_DIR, 'combined_reviews_cache.pkl')
os.makedirs(PROCESSED_DIR, exist_ok=True)

combined_df = None  # ensure we rebuild a proper DataFrame in this cell

# 1) Try to load a valid cache
if os.path.exists(cache_file):
    try:
        obj = pd.read_pickle(cache_file)
        if isinstance(obj, pd.DataFrame) and 'combined_reviews' in obj.columns:
            combined_df = obj
            print(f"✓ Loaded cache: {len(combined_df)} games")
        elif isinstance(obj, pd.Series):
            print("Cache contains a Series -> rebuilding DataFrame from raw CSVs and reusing combined text…")
            # Load raw CSVs and attach the series as the combined_reviews column
            csv_files = sorted(glob.glob(os.path.join(DATA_DIR, 'raw', 'app_reviews_*.csv')))
            dfs = [pd.read_csv(f) for f in csv_files]
            raw_df = pd.concat(dfs, ignore_index=True)
            series_vals = obj.reset_index(drop=True)
            if len(raw_df) == len(series_vals):
                raw_df['combined_reviews'] = series_vals
                combined_df = raw_df
                combined_df.to_pickle(cache_file)  # overwrite cache with correct format
                print(f"✓ Rewrote cache as DataFrame: {len(combined_df)} games")
            else:
                print("Series length mismatch with raw CSVs -> discarding cache")
                os.remove(cache_file)
        else:
            print("Cache invalid type -> discarding cache")
            os.remove(cache_file)
    except Exception as e:
        print(f"Cache load error: {e} -> discarding cache")
        try:
            os.remove(cache_file)
        except Exception:
            pass

# 2) Build from raw if needed
if combined_df is None:
    print("Loading raw CSV files and building combined reviews…")
    csv_files = sorted(glob.glob(os.path.join(DATA_DIR, 'raw', 'app_reviews_*.csv')))
    if not csv_files:
        raise FileNotFoundError(f"No input files found in {os.path.join(DATA_DIR, 'raw')} (expected app_reviews_*.csv)")
    dfs = [pd.read_csv(f) for f in csv_files]
    combined_df = pd.concat(dfs, ignore_index=True)
    print(f"✓ Loaded {len(csv_files)} files, {len(combined_df)} games")

    def clean_text(text):
        if not isinstance(text, str) or not text.strip():
            return None
        text = re.sub(r'<[^>]+>', '', text)
        text = re.sub(r'http[s]?://\S+|www\.\S+', '', text)
        text = re.sub(r'[\U0001F600-\U0001F64F\U0001F300-\U0001F5FF\U0001F680-\U0001F6FF\U0001F1E0-\U0001F1FF]+', '', text)
        text = text.replace('\r', ' ').replace('\n', ' ').replace('\t', ' ')
        allowed_chars = set(string.ascii_letters + string.digits + ' .,!?\'-')
        text = ''.join(char for char in text if char in allowed_chars)
        text = re.sub(r'\s+', ' ', text).strip()
        return text if text else None

    def combine_reviews(row):
        reviews = []
        for i in range(1, 101):
            review = row.get(f'review_{i}')
            if review and isinstance(review, str):
                cleaned = clean_text(review)
                if cleaned:
                    reviews.append(cleaned)
        unique = list(dict.fromkeys(reviews))
        return ' [SEP] '.join(unique) if unique else ''

    combined_df['combined_reviews'] = combined_df.apply(combine_reviews, axis=1)
    combined_df.to_pickle(cache_file)
    print("✓ Cleaned and cached reviews")

# 3) Final sanity check and stats
if not isinstance(combined_df, pd.DataFrame) or 'combined_reviews' not in combined_df.columns:
    raise RuntimeError("combined_df is not a DataFrame with a 'combined_reviews' column. Please re-run this cell.")

print(f"Avg length: {combined_df['combined_reviews'].str.len().mean():.0f} chars")

✓ Loaded cache: 2622 games
Avg length: 80789 chars


In [7]:
# BLOCK 4: Load model from local cache or Hugging Face if missing
from transformers import AutoTokenizer, AutoModelForSeq2SeqLM, pipeline
import os

def _resolve_hub_snapshot_dir(root_dir: str) -> str | None:
    """If MODEL_PATH is a Hugging Face hub cache root (models--org--model),
    resolve it to an actual snapshot dir containing the model files."""
    snapshots_dir = os.path.join(root_dir, "snapshots")
    refs_main = os.path.join(root_dir, "refs", "main")

    # Prefer the commit hash in refs/main if present
    if os.path.isfile(refs_main):
        try:
            with open(refs_main, "r", encoding="utf-8") as f:
                commit = f.read().strip()
            cand = os.path.join(snapshots_dir, commit)
            if os.path.isdir(cand):
                return cand
        except Exception:
            pass

    # Else pick the newest snapshot
    try:
        candidates = [d for d in glob.glob(os.path.join(snapshots_dir, "*")) if os.path.isdir(d)]
        if candidates:
            candidates.sort(key=lambda d: os.path.getmtime(d), reverse=True)
            return candidates[0]
    except Exception:
        pass
    return None

def _looks_like_model_dir(path: str) -> bool:
    # Any of these files in the directory is good enough
    must_have = ["config.json", "pytorch_model.bin", "tokenizer_config.json", "vocab.json", "merges.txt"]
    try:
        files = set(os.listdir(path))
        return any(m in files for m in must_have)
    except Exception:
        return False

print(f"Loading model from: {MODEL_PATH}")
is_local_path = os.path.exists(MODEL_PATH)

try:
    load_dir = None
    if is_local_path:
        # If it's a hub cache root, resolve to a snapshot subdir
        if os.path.basename(MODEL_PATH).startswith("models--"):
            candidate = _resolve_hub_snapshot_dir(MODEL_PATH)
            if candidate and _looks_like_model_dir(candidate):
                load_dir = candidate
        # If it's already a real model folder with files, use it directly
        if load_dir is None and _looks_like_model_dir(MODEL_PATH):
            load_dir = MODEL_PATH

    if load_dir:
        # Local-only load
        tokenizer = AutoTokenizer.from_pretrained(load_dir, local_files_only=True)
        model = AutoModelForSeq2SeqLM.from_pretrained(load_dir, local_files_only=True)
        print(f"✓ Loaded model from local cache: {load_dir}")
    else:
        # Use Hub model ID (downloads if missing)
        tokenizer = AutoTokenizer.from_pretrained(MODEL_PATH)
        model = AutoModelForSeq2SeqLM.from_pretrained(MODEL_PATH)
        print("✓ Downloaded or loaded from Hugging Face Hub")

    # Define summarizer (CPU by default; you can move model to MPS later if desired)
    global summarizer
    summarizer = pipeline("summarization", model=model, tokenizer=tokenizer, device=-1)

    print("✓ Model loaded successfully")
    print(f"✓ Summarizer defined: {type(summarizer)}")

except Exception as e:
    raise RuntimeError(f"❌ Failed to load model from {MODEL_PATH}: {e}")

Loading model from: /Users/radimsoukal/.cache/huggingface/hub/models--sshleifer--distilbart-cnn-12-6


Device set to use cpu


✓ Loaded model from local cache: /Users/radimsoukal/.cache/huggingface/hub/models--sshleifer--distilbart-cnn-12-6/snapshots/a4f8f3ea906ed274767e9906dbaede7531d660ff
✓ Model loaded successfully
✓ Summarizer defined: <class 'transformers.pipelines.text2text_generation.SummarizationPipeline'>


In [8]:
# BLOCK 5: Summarization functions with chunking and guidance

GUIDANCE_PROMPT = """Summarize the following user reviews into a concise, neutral third-person description of the game. Focus on: genre, gameplay mechanics, core features or modes, campaign or multiplayer aspects, and overall atmosphere or difficulty. Avoid opinions, memes, and repetition.

Reviews:"""


def split_into_chunks(text, max_chunk_size=3500, overlap=200):
    if not text or len(text) <= max_chunk_size:
        return [text] if text else []
    chunks = []
    start = 0
    while start < len(text):
        end = start + max_chunk_size
        if end < len(text):
            for sep in ['. ', '! ', '? ', ' [SEP] ']:
                last_sep = text[start:end].rfind(sep)
                if last_sep != -1:
                    end = start + last_sep + len(sep)
                    break
        chunks.append(text[start:end].strip())
        start = end - overlap if end < len(text) else end
    return chunks


def summarize_reviews(text, summarizer):
    if not text or len(text.strip()) == 0:
        return ""
    try:
        # Short text - direct summarization
        if len(text) <= 1000:
            prompt = f"{GUIDANCE_PROMPT}\n\n{text}"
            result = summarizer(
                prompt,
                max_length=130,
                min_length=30,
                num_beams=2,
                no_repeat_ngram_size=3,
                do_sample=False,
                truncation=True,
            )
            return result[0]['summary_text']

        # Long text - chunked approach
        chunks = split_into_chunks(text)
        chunk_summaries = []
        for chunk in chunks:
            if len(chunk) < 50:
                continue
            prompt = f"{GUIDANCE_PROMPT}\n\n{chunk}"
            result = summarizer(
                prompt,
                max_length=80,
                min_length=30,
                num_beams=2,
                no_repeat_ngram_size=3,
                do_sample=False,
                truncation=True,
            )
            chunk_summaries.append(result[0]['summary_text'])

        if not chunk_summaries:
            return ""

        # Final pass
        combined = ' '.join(chunk_summaries)
        if len(combined) > 200:
            prompt = f"{GUIDANCE_PROMPT}\n\n{combined}"
            result = summarizer(
                prompt,
                max_length=130,
                min_length=40,
                num_beams=2,
                no_repeat_ngram_size=3,
                do_sample=False,
                truncation=True,
            )
            return result[0]['summary_text']
        return combined
    except Exception as e:
        print(f"Error: {e}")
        return ""

print("✓ Summarization functions ready")

✓ Summarization functions ready


In [None]:
# BLOCK 6: Parallel processing with checkpoints (robust + ordered checkpoints)
os.makedirs("data/reviews_summary", exist_ok=True)

# Ensure tokenizer doesn't parallelize internally when we use threads
os.environ["TOKENIZERS_PARALLELISM"] = "false"

# Fallback: ensure summarizer exists (in case Block 4 wasn't run in this session)
if 'summarizer' not in globals() or summarizer is None:
    print("Summarizer not found in session. Loading from local cache (fallback)...")
    tokenizer = AutoTokenizer.from_pretrained(MODEL_PATH, local_files_only=True)
    model = AutoModelForSeq2SeqLM.from_pretrained(MODEL_PATH, local_files_only=True)
    summarizer = pipeline("summarization", model=model, tokenizer=tokenizer, device=-1)
    print("✓ Summarizer loaded (fallback)")

# Filter by app_id range and sort
df_sorted = combined_df.sort_values('app_id').reset_index(drop=True)
if START_APP_ID is not None:
    df_sorted = df_sorted[df_sorted['app_id'] >= START_APP_ID]
if END_APP_ID is not None:
    df_sorted = df_sorted[df_sorted['app_id'] <= END_APP_ID]
df_sorted = df_sorted.reset_index(drop=True)

n_rows = len(df_sorted)
print(f"\nProcessing {n_rows} games (workers: {NUM_WORKERS})")
print("-" * 60)

if n_rows == 0:
    print("No rows to process for the selected range.")
else:
    if 'reviews_summary' not in df_sorted.columns:
        df_sorted['reviews_summary'] = ''

    # Lock summarizer calls to avoid thread-safety issues
    from threading import Lock
    summarize_lock = Lock()

    def process_game(idx):
        try:
            text = df_sorted.loc[df_sorted.index[idx], 'combined_reviews']
            app_id = int(df_sorted.loc[df_sorted.index[idx], 'app_id'])
            if not text or len(text.strip()) == 0:
                return (idx, app_id, '', 'empty')
            # Guard model call with a lock
            with summarize_lock:
                summary = summarize_reviews(text, summarizer)
            return (idx, app_id, summary, None)
        except Exception as e:
            app_id = int(df_sorted.loc[df_sorted.index[idx], 'app_id'])
            return (idx, app_id, '', str(e)[:200])

    successful = failed = skipped = 0

    # Process in ordered batches so checkpoint filenames reflect contiguous app_id ranges
    batch_size = CHECKPOINT_EVERY if CHECKPOINT_EVERY and CHECKPOINT_EVERY > 0 else 100

    for start in range(0, n_rows, batch_size):
        end = min(start + batch_size, n_rows)
        batch_indices = list(range(start, end))
        batch_start_id = int(df_sorted.iloc[start]['app_id'])
        batch_end_id = int(df_sorted.iloc[end - 1]['app_id'])

        with ThreadPoolExecutor(max_workers=NUM_WORKERS) as executor:
            futures = {executor.submit(process_game, idx): idx for idx in batch_indices}
            with tqdm(total=len(batch_indices), desc=f"Summarizing {batch_start_id}-{batch_end_id}", unit="game") as pbar:
                for future in as_completed(futures):
                    idx, app_id, summary, error = future.result()
                    row_idx = df_sorted.index[idx]

                    if error == 'empty':
                        skipped += 1
                    elif error:
                        failed += 1
                        pbar.write(f"Error @ app_id {app_id}: {error}")
                    else:
                        df_sorted.loc[row_idx, 'reviews_summary'] = summary
                        successful += 1

                    pbar.update(1)

        # Write ordered checkpoint for this contiguous batch
        checkpoint_path = os.path.join(
            "data/reviews_summary",
            f"checkpoint_{batch_start_id:06d}_to_{batch_end_id:06d}.csv",
        )
        df_sorted.loc[start:end - 1, ['app_id', 'reviews_summary']].to_csv(checkpoint_path, index=False)
        print(f"💾 {os.path.basename(checkpoint_path)} | batch done: {successful} ok, {failed} failed, {skipped} skipped (cumulative)")

    # Save final with full app_id range in filename
    overall_start_id = int(df_sorted.iloc[0]['app_id'])
    overall_end_id = int(df_sorted.iloc[-1]['app_id'])
    final_filename = f"review_summaries_COMPLETE_{overall_start_id:06d}_to_{overall_end_id:06d}.csv"
    final_path = os.path.join("data/reviews_summary", final_filename)

    df_sorted[['app_id', 'combined_reviews', 'reviews_summary']].to_csv(final_path, index=False)
    print(f"\n✓ Complete: {successful} successful, {failed} failed, {skipped} skipped")
    print(f"✓ Saved: {final_path}")


Processing 66 games (workers: 9)
------------------------------------------------------------


Summarizing 7000-8600: 100%|██████████| 25/25 [26:07<00:00, 62.71s/game]


💾 checkpoint_007000_to_008600.csv | batch done: 25 ok, 0 failed, 0 skipped (cumulative)


Summarizing 8800-10180:  60%|██████    | 15/25 [18:10<11:20, 68.01s/game]