In [None]:
import requests
import xml.etree.ElementTree as ET
import pandas as pd
import time
import numpy as np
from typing import Dict, List, Any
import os
from datetime import datetime

class CompleteBGGScraper:
    def __init__(self):
        self.base_url = "https://boardgamegeek.com/xmlapi2"
        self.session = requests.Session()
        self.session.headers.update({
            'User-Agent': 'BGG Complete Data Scraper 1.0'
        })
        self.max_retries = 3
        self.retry_delay = 5
    
    def get_game_details(self, game_id: int) -> Dict[str, Any]:
        """
        Fetch detailed information for a specific game ID with retry logic
        """
        url = f"{self.base_url}/thing"
        params = {
            'id': game_id,
            'stats': 1,
            'type': 'boardgame'
        }
        
        for attempt in range(self.max_retries):
            try:
                response = self.session.get(url, params=params, timeout=30)
                response.raise_for_status()
                
                root = ET.fromstring(response.content)
                item = root.find('item')
                
                if item is None:
                    return None
                
                # Extract comprehensive game data
                game_data = {
                    'id': int(item.get('id')),
                    'name': self._get_primary_name(item),
                    'year_published': self._get_year_published(item),
                    'min_players': self._get_int_value(item, 'minplayers'),
                    'max_players': self._get_int_value(item, 'maxplayers'),
                    'playing_time': self._get_int_value(item, 'playingtime'),
                    'min_play_time': self._get_int_value(item, 'minplaytime'),
                    'max_play_time': self._get_int_value(item, 'maxplaytime'),
                    'min_age': self._get_int_value(item, 'minage'),
                    'description': self._clean_description(self._get_text_value(item, 'description')),
                    'categories': self._get_linked_items(item, 'boardgamecategory'),
                    'mechanisms': self._get_linked_items(item, 'boardgamemechanic'),
                    'designers': self._get_linked_items(item, 'boardgamedesigner'),
                    'artists': self._get_linked_items(item, 'boardgameartist'),
                    'publishers': self._get_linked_items(item, 'boardgamepublisher'),
                }
                
                # Extract comprehensive statistics
                stats = item.find('statistics/ratings')
                if stats is not None:
                    game_data.update({
                        'users_rated': self._get_int_value(stats, 'usersrated'),
                        'average_rating': self._get_float_value(stats, 'average'),
                        'bayes_average': self._get_float_value(stats, 'bayesaverage'),
                        'num_comments': self._get_int_value(stats, 'numcomments'),
                        'num_weights': self._get_int_value(stats, 'numweights'),
                        'average_weight': self._get_float_value(stats, 'averageweight'),
                        'bgg_rank': self._get_bgg_rank(stats),
                        'category_ranks': self._get_category_ranks(stats),
                    })
                
                return game_data
                
            except (requests.RequestException, ET.ParseError) as e:
                print(f"Attempt {attempt + 1} failed for game {game_id}: {e}")
                if attempt < self.max_retries - 1:
                    time.sleep(self.retry_delay)
                else:
                    print(f"Failed to fetch game {game_id} after {self.max_retries} attempts")
                    return None
    
    def get_game_ids_by_year_range(self, start_year=1950, end_year=2024):
        """
        Get game IDs by searching year ranges (more comprehensive approach)
        """
        all_game_ids = set()
        
        print(f"Searching for games from {start_year} to {end_year}...")
        
        # Search by year ranges to get comprehensive coverage
        for year in range(start_year, end_year + 1):
            print(f"Searching games from {year}...")
            year_ids = self._search_games_by_year(year)
            all_game_ids.update(year_ids)
            time.sleep(2)  # Rate limiting
        
        return list(all_game_ids)
    
    def _search_games_by_year(self, year):
        """Search games by specific year using BGG search"""
        url = f"{self.base_url}/search"
        params = {
            'query': f'',
            'type': 'boardgame',
            'yearpublished': year
        }
        
        try:
            response = self.session.get(url, params=params)
            response.raise_for_status()
            
            root = ET.fromstring(response.content)
            game_ids = []
            
            for item in root.findall('item'):
                game_ids.append(int(item.get('id')))
            
            print(f"Found {len(game_ids)} games for year {year}")
            return game_ids
            
        except Exception as e:
            print(f"Error searching games for year {year}: {e}")
            return []
    
    def get_all_ranked_games(self):
        """
        Get all games that have rankings (comprehensive but more focused dataset)
        """
        print("This method will take a very long time and may not be complete.")
        print("BGG doesn't provide a direct way to get ALL games.")
        print("Consider using ID ranges or specific collections instead.")
        
        # You can try sequential ID approach (not recommended for all games)
        return self.get_games_by_id_range(1, 50000)  # Adjust range as needed
    
    def get_games_by_id_range(self, start_id=1, end_id=1000):
        """
        Get games by ID range - useful for comprehensive scraping
        """
        print(f"Getting games from ID {start_id} to {end_id}")
        return list(range(start_id, end_id + 1))
    
    def get_games_from_collections(self):
        """
        Get games from various BGG collections and rankings
        """
        collections = {
            'hot': self._get_hot_games(),
            'top_rated': self._get_top_rated_games(),
            # Add more collection methods as needed
        }
        
        all_ids = set()
        for collection_name, ids in collections.items():
            print(f"Found {len(ids)} games in {collection_name}")
            all_ids.update(ids)
        
        return list(all_ids)
    
    def _get_hot_games(self):
        """Get current hot games"""
        url = f"{self.base_url}/hot"
        params = {'type': 'boardgame'}
        
        try:
            response = self.session.get(url, params=params)
            response.raise_for_status()
            root = ET.fromstring(response.content)
            return [int(item.get('id')) for item in root.findall('item')]
        except:
            return []
    
    def _get_top_rated_games(self):
        """Get top rated games - you'd need to implement BGG browse functionality"""
        # This is a placeholder - BGG's browse functionality is complex
        # You might need to scrape the web interface or use other methods
        return []
    
    def scrape_games_batch(self, game_ids: List[int], batch_size=50, delay=1.0, 
                          save_progress=True, output_dir='bgg_data', 
                          null_check_mode="important") -> pd.DataFrame:
        """
        Scrape games in batches with progress saving and comprehensive null checking
        
        Args:
            game_ids: List of BGG game IDs to scrape
            batch_size: Number of games to process per batch
            delay: Delay between API requests in seconds
            save_progress: Whether to save progress after each batch
            output_dir: Directory to save progress files
            null_check_mode: Level of null checking
                - "critical": Only check essential fields (name, year, id)
                - "important": Check critical + important fields (players, rating, etc.)
                - "all": Check all fields - strictest filtering
        """
        if save_progress:
            os.makedirs(output_dir, exist_ok=True)
        
        all_games_data = []
        total_games = len(game_ids)
        
        print(f"Starting to scrape {total_games} games in batches of {batch_size}...")
        
        for batch_start in range(0, total_games, batch_size):
            batch_end = min(batch_start + batch_size, total_games)
            batch_ids = game_ids[batch_start:batch_end]
            
            print(f"\nProcessing batch {batch_start//batch_size + 1}: games {batch_start+1}-{batch_end}")
            
            batch_data = []
            for i, game_id in enumerate(batch_ids):
                print(f"  Scraping game {batch_start + i + 1}/{total_games}: ID {game_id}")
                
                game_data = self.get_game_details(game_id)
                
                if game_data and self._is_valid_game(game_data, null_check_mode):
                    batch_data.append(game_data)
                    print(f"    ✓ Valid game added: {game_data.get('name', 'Unknown')}")
                else:
                    print(f"    ✗ Skipping game {game_id} - failed validation")
                
                time.sleep(delay)
            
            all_games_data.extend(batch_data)
            
            # Save progress after each batch
            if save_progress and batch_data:
                batch_df = pd.DataFrame(batch_data)
                timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
                batch_file = f"{output_dir}/batch_{batch_start//batch_size + 1}_{timestamp}.csv"
                batch_df.to_csv(batch_file, index=False, encoding='utf-8')
                print(f"  Saved batch to {batch_file}")
        
        print(f"\nCompleted scraping. Total valid games: {len(all_games_data)}")
        
        # Create final DataFrame
        if all_games_data:
            df = pd.DataFrame(all_games_data)
            df = self._clean_dataframe(df)
            
            # Save final dataset
            if save_progress:
                final_file = f"{output_dir}/complete_bgg_dataset_{datetime.now().strftime('%Y%m%d_%H%M%S')}.csv"
                df.to_csv(final_file, index=False, encoding='utf-8')
                print(f"Final dataset saved to {final_file}")
            
            return df
        else:
            return pd.DataFrame()
    
    def _is_valid_game(self, game_data, strict_mode=True):
        """
        Check if game data meets quality criteria with configurable null checking
        
        Args:
            game_data: Dictionary containing game data
            strict_mode: If True, checks all fields. If False, only checks critical fields.
        """
        if not game_data:
            return False
        
        # Critical fields that must never be null
        critical_fields = [
            'name', 
            'year_published', 
            'id'
        ]
        
        # Important fields that should preferably not be null
        important_fields = [
            'min_players',
            'max_players', 
            'playing_time',
            'min_age',
            'average_rating',
            'users_rated'
        ]
        
        # All possible fields for strictest checking
        all_fields = critical_fields + important_fields + [
            'min_play_time',
            'max_play_time', 
            'description',
            'categories',
            'mechanisms',
            'designers',
            'artists', 
            'publishers',
            'bayes_average',
            'num_comments',
            'num_weights',
            'average_weight',
            'bgg_rank',
            'category_ranks'
        ]
        
        if strict_mode == "critical":
            # Only check critical fields
            fields_to_check = critical_fields
        elif strict_mode == "important":
            # Check critical + important fields
            fields_to_check = critical_fields + important_fields
        elif strict_mode == "all" or strict_mode is True:
            # Check all fields
            fields_to_check = all_fields
        else:
            # Default to critical fields
            fields_to_check = critical_fields
        
        # Check for null/empty values
        for field in fields_to_check:
            value = game_data.get(field)
            if value is None or value == '' or value == 'None':
                print(f"    Rejecting game {game_data.get('id', 'Unknown')} - null/empty field: {field}")
                return False
        
        # Additional validation checks
        year = game_data.get('year_published')
        if year and (year < 1800 or year > 2030):
            print(f"    Rejecting game {game_data.get('id', 'Unknown')} - invalid year: {year}")
            return False
            
        min_players = game_data.get('min_players')
        max_players = game_data.get('max_players')
        if min_players and max_players and min_players > max_players:
            print(f"    Rejecting game {game_data.get('id', 'Unknown')} - min_players > max_players")
            return False
        
        rating = game_data.get('average_rating')
        if rating and (rating < 0 or rating > 10):
            print(f"    Rejecting game {game_data.get('id', 'Unknown')} - invalid rating: {rating}")
            return False
        
        return True
    
    def _clean_dataframe(self, df, final_null_check=True):
        """
        Clean the final dataframe with additional null checking
        
        Args:
            df: DataFrame to clean
            final_null_check: Whether to do a final comprehensive null check
        """
        print(f"Cleaning dataframe - initial size: {len(df)} games")
        
        # Remove rows with null critical values
        critical_cols = ['name', 'year_published', 'id']
        initial_size = len(df)
        df = df.dropna(subset=critical_cols)
        print(f"After critical field null check: {len(df)} games ({initial_size - len(df)} removed)")
        
        # Remove duplicates
        initial_size = len(df)
        df = df.drop_duplicates(subset=['id'])
        print(f"After duplicate removal: {len(df)} games ({initial_size - len(df)} removed)")
        
        if final_null_check:
            # Calculate null percentage for each row
            null_threshold = 0.5  # Remove rows with more than 50% null values
            initial_size = len(df)
            
            null_counts = df.isnull().sum(axis=1)
            total_cols = len(df.columns)
            null_percentages = null_counts / total_cols
            
            df = df[null_percentages <= null_threshold]
            print(f"After removing rows with >{null_threshold*100}% nulls: {len(df)} games ({initial_size - len(df)} removed)")
        
        # Sort by BGG rank (nulls last)
        df = df.sort_values('bgg_rank', na_position='last')
        
        # Print final null statistics
        print(f"\nFinal dataset null statistics:")
        null_stats = df.isnull().sum()
        for col, null_count in null_stats.items():
            if null_count > 0:
                percentage = (null_count / len(df)) * 100
                print(f"  {col}: {null_count} nulls ({percentage:.1f}%)")
        
        return df
    
    def _clean_description(self, description):
        """Clean HTML from description"""
        if description:
            # Basic HTML tag removal
            import re
            clean_desc = re.sub(r'<[^>]+>', '', description)
            return clean_desc.strip()
        return description
    
    def _get_primary_name(self, item):
        """Get the primary name of the game"""
        names = item.findall('name')
        for name in names:
            if name.get('type') == 'primary':
                return name.get('value')
        return names[0].get('value') if names else None
    
    def _get_alternative_names(self, item):
        """Get alternative names"""
        names = item.findall('name')
        alt_names = [name.get('value') for name in names if name.get('type') == 'alternate']
        return ', '.join(alt_names) if alt_names else None
    
    def _get_year_published(self, item):
        """Get year published"""
        year_elem = item.find('yearpublished')
        return int(year_elem.get('value')) if year_elem is not None else None
    
    def _get_int_value(self, parent, tag):
        """Get integer value from XML element"""
        elem = parent.find(tag)
        if elem is not None:
            value = elem.get('value')
            try:
                return int(value) if value else None
            except ValueError:
                return None
        return None
    
    def _get_float_value(self, parent, tag):
        """Get float value from XML element"""
        elem = parent.find(tag)
        if elem is not None:
            value = elem.get('value')
            try:
                return float(value) if value else None
            except ValueError:
                return None
        return None
    
    def _get_text_value(self, parent, tag):
        """Get text value from XML element"""
        elem = parent.find(tag)
        return elem.text if elem is not None else None
    
    def _get_linked_items(self, parent, link_type):
        """Get linked items as comma-separated string"""
        links = parent.findall(f'link[@type="{link_type}"]')
        if links:
            return ', '.join([link.get('value') for link in links])
        return None
    
    def _get_bgg_rank(self, stats):
        """Get BGG overall rank"""
        ranks = stats.findall('ranks/rank')
        for rank in ranks:
            if rank.get('name') == 'boardgame':
                rank_value = rank.get('value')
                if rank_value and rank_value.isdigit():
                    return int(rank_value)
        return None
    
    def _get_category_ranks(self, stats):
        """Get category-specific ranks"""
        ranks = stats.findall('ranks/rank')
        category_ranks = {}
        for rank in ranks:
            name = rank.get('name')
            if name and name != 'boardgame':
                rank_value = rank.get('value')
                if rank_value and rank_value.isdigit():
                    category_ranks[name] = int(rank_value)
        return str(category_ranks) if category_ranks else None

# Different scraping strategies
def scrape_comprehensive_dataset():
    """
    Main function with multiple strategies for comprehensive scraping
    """
    scraper = CompleteBGGScraper()
    
    print("=== BGG Comprehensive Data Scraper ===")
    print("\nChoose your scraping strategy:")
    print("1. ID Range (recommended for testing): Scrape games by ID range")
    print("2. Year Range: Scrape games by publication years")
    print("3. Collections: Scrape from various BGG collections")
    print("4. Custom list: Provide your own game IDs")
    
    #strategy = input("\nEnter strategy number (1-4): ").strip()
    strategy = "5"
    
    if strategy == "1":
        start_id = int(input("Enter start ID (e.g., 1): "))
        end_id = int(input("Enter end ID (e.g., 10000): "))
        game_ids = scraper.get_games_by_id_range(start_id, end_id)
        
    elif strategy == "2":
        start_year = int(input("Enter start year (e.g., 2000): "))
        end_year = int(input("Enter end year (e.g., 2024): "))
        game_ids = scraper.get_game_ids_by_year_range(start_year, end_year)
        
    elif strategy == "3":
        game_ids = scraper.get_games_from_collections()
        
    elif strategy == "4":
        ids_input = input("Enter game IDs separated by commas: ")
        game_ids = [int(x.strip()) for x in ids_input.split(',')]
        
    else:
        print("Invalid choice. Using default ID range 1-1000")
        game_ids = scraper.get_games_by_id_range(408950, 415000)
    
    if not game_ids:
        print("No game IDs found. Exiting.")
        return
    
    print(f"\nFound {len(game_ids)} games to scrape")
    
    # Null checking options
    print("\nNull checking strictness:")
    print("1. Critical only: name, year_published, id")
    print("2. Important: critical + players, rating, etc.")
    print("3. All fields: strictest filtering (recommended)")
    
    null_check = input("Enter null check level (1-3, default=2): ").strip()
    null_modes = {"1": "critical", "2": "important", "3": "all"}
    null_check_mode = null_modes.get(null_check, "important")
    
    # Scraping parameters
    batch_size = int(input("Enter batch size (recommended: 50-100): ") or "50")
    delay = float(input("Enter delay between requests in seconds (recommended: 1.0-2.0): ") or "1.0")
    
    # Start scraping
    df = scraper.scrape_games_batch(
        game_ids=game_ids,
        batch_size=batch_size,
        delay=delay,
        save_progress=True,
        null_check_mode=null_check_mode
    )
    
    if not df.empty:
        print(f"\n=== SCRAPING COMPLETE ===")
        print(f"Total games scraped: {len(df)}")
        print(f"Columns: {len(df.columns)}")
        print(f"Date range: {df['year_published'].min()} - {df['year_published'].max()}")
        print(f"Average rating range: {df['average_rating'].min():.2f} - {df['average_rating'].max():.2f}")
        
        # Show sample
        print(f"\nSample data:")
        print(df[['name', 'year_published', 'average_rating', 'bgg_rank']].head(10))
        
    else:
        print("No data was scraped successfully.")

# Quick start functions for specific use cases
def scrape_top_games(num_games=1000, null_check_mode="important"):
    """Quick function to scrape top games with null checking"""
    scraper = CompleteBGGScraper()
    game_ids = scraper.get_games_by_id_range(1, num_games)
    return scraper.scrape_games_batch(game_ids, batch_size=50, delay=1.0, null_check_mode=null_check_mode)

def scrape_recent_games(start_year=2020, null_check_mode="important"):
    """Quick function to scrape recent games with null checking"""
    scraper = CompleteBGGScraper()
    game_ids = scraper.get_game_ids_by_year_range(start_year, 2025)
    return scraper.scrape_games_batch(game_ids, batch_size=100, delay=1.0, null_check_mode=null_check_mode)

def scrape_clean_dataset(game_ids, null_check_mode="all"):
    """
    Scrape with strictest null checking for cleanest dataset
    
    Args:
        game_ids: List of game IDs to scrape
        null_check_mode: "critical", "important", or "all"
    """
    scraper = CompleteBGGScraper()
    print(f"Scraping {len(game_ids)} games with '{null_check_mode}' null checking...")
    return scraper.scrape_games_batch(
        game_ids, 
        batch_size=50, 
        delay=1.5, 
        null_check_mode=null_check_mode
    )

if __name__ == "__main__":
    scrape_comprehensive_dataset()

=== BGG Comprehensive Data Scraper ===

Choose your scraping strategy:
1. ID Range (recommended for testing): Scrape games by ID range
2. Year Range: Scrape games by publication years
3. Collections: Scrape from various BGG collections
4. Custom list: Provide your own game IDs
Invalid choice. Using default ID range 1-1000
Getting games from ID 408950 to 415000

Found 6051 games to scrape

Null checking strictness:
1. Critical only: name, year_published, id
2. Important: critical + players, rating, etc.
3. All fields: strictest filtering (recommended)
Starting to scrape 6051 games in batches of 100...

Processing batch 1: games 1-100
  Scraping game 1/6051: ID 408950
    ✗ Skipping game 408950 - failed validation
  Scraping game 2/6051: ID 408951
    ✗ Skipping game 408951 - failed validation
  Scraping game 3/6051: ID 408952
    ✗ Skipping game 408952 - failed validation
  Scraping game 4/6051: ID 408953
    ✗ Skipping game 408953 - failed validation
  Scraping game 5/6051: ID 408954
 

In [None]:
import os
import pandas as pd

input_folder = r'C:\Users\suyil\OneDrive\Desktop\BIL476\project\bgg_complete'     # Replace with your actual folder path
output_file = 'complete_dataset.csv'      # Name of the resulting merged file

# === COMBINE CSV FILES ===
# Get all CSV file paths in the folder

csv_files = [f for f in os.listdir(input_folder) if f.endswith('.csv') and f.startswith('complete')]

# Read and concatenate all CSVs
df_list = []
for file in csv_files:
    file_path = os.path.join(input_folder, file)
    try:
        df = pd.read_csv(file_path)
        df_list.append(df)
        print(f"Loaded: {file}")
    except Exception as e:
        print(f"Error reading {file}: {e}")

# Concatenate all dataframes
if df_list:
    combined_df = pd.concat(df_list, ignore_index=True)
    combined_df.to_csv(os.path.join(input_folder, output_file), index=False)
    print(f"\n✅ All CSVs combined into {output_file}")
else:
    print("❌ No CSV files were successfully loaded.")


Loaded: complete_bgg_dataset_20250714_182120.csv
Loaded: complete_bgg_dataset_20250714_202226.csv
Loaded: complete_bgg_dataset_20250714_233432.csv
Loaded: complete_bgg_dataset_20250715_055536.csv
Loaded: complete_bgg_dataset_20250715_162016.csv
Loaded: complete_bgg_dataset_20250716_170918.csv
Loaded: complete_bgg_dataset_20250716_200224.csv
Loaded: complete_bgg_dataset_20250716_223731.csv
Loaded: complete_bgg_dataset_20250717_061747.csv
Loaded: complete_bgg_dataset_20250717_133159.csv
Loaded: complete_bgg_dataset_20250717_162545.csv
Loaded: complete_bgg_dataset_20250719_005512.csv
Loaded: complete_bgg_dataset_20250720_111435.csv
Loaded: complete_bgg_dataset_20250721_123857.csv
Loaded: complete_bgg_dataset_20250723_022516.csv
Loaded: complete_bgg_dataset_20250723_071440.csv

✅ All CSVs combined into complete_dataset.csv
