In [None]:
import requests
from bs4 import BeautifulSoup
import re 


In [None]:
import asyncio
import aiohttp
from aiohttp import ClientPayloadError, ClientResponseError, ClientConnectionError, ServerDisconnectedError
from bs4 import BeautifulSoup
import io
import requests
import pandas as pd
from concurrent.futures import ThreadPoolExecutor, as_completed
from pyspark.sql import SparkSession
import re

async def fetch_with_retry(session, url, retries=3, timeout=10):
    for attempt in range(retries):
        try:
            async with session.get(url, timeout=timeout) as response:
                response.raise_for_status()
                return await response.text()
        except (ClientPayloadError, ClientResponseError, ClientConnectionError, ServerDisconnectedError) as e:
            if attempt >= retries - 1:
                raise e
            await asyncio.sleep(2 ** attempt)  # Exponential backoff
            continue

async def fetch_countries(session):
    """
    Asynchronously fetches the list of country options from boardgamegeek.com/users
    """
    url = "https://boardgamegeek.com/users"
        
    html = await fetch_with_retry(session, url)


    soup = BeautifulSoup(html, 'html.parser')
    if country_select := soup.find(id='avatars-country'):
        return [
            option.get_text(strip=True)
            for option in country_select.find_all('option')
            if option.get_text(strip=True)
        ]
    else:
        return []

async def check_if_game_rating(session,user):
    url = f"https://boardgamegeek.com/collection/user/{user}?sort=rating&sortdir=desc&minrating=1&rating=10&rated=1"

    html = await fetch_with_retry(session, url)
    soup = BeautifulSoup(html, 'html.parser')
    text = soup.find(class_="geekpages").get_text()    
    match = re.search(r'of (\d+)', text)
    if re.search(r'of (\d+)', text):
        return int(match.group(1))>0


async def fetch_users_on_page(session, url):
    """
    Asynchronously fetch the list of users on a given page URL.
    """
    html = await fetch_with_retry(session, url)

    soup = BeautifulSoup(html, 'html.parser')
    user_elements = soup.find_all(class_="username")

    users_with_rating = []
    for user in user_elements:
        user_cleaned = user.get_text(strip=True).strip("()").replace(" ","%20")
        if await check_if_game_rating(session,user_cleaned):
            users_with_rating.append(user_cleaned)

    return users_with_rating


def build_country_url(country, page_number=1):
    """
    Builds the users page URL for a given country and page number.
    (No async needed here since it's just string manipulation.)
    """
    page_suffix = f"/page/{page_number}" if page_number > 1 else ""
    return f"https://boardgamegeek.com/users{page_suffix}?country={country}&state=&city="


async def find_last_page(session, url):
    """
    Asynchronously find the last page number from a given URL.
    """
    html = await fetch_with_retry(session, url)

    soup = BeautifulSoup(html, 'html.parser')
    if last_page_link := soup.find(title='last page'):
        return int(last_page_link.get_text(strip=True).strip("[]"))
    else:
        return 1  # fallback if "last page" does not exist


async def scrape_users():
    """
    Asynchronously scrapes all users for each country.
    Returns a (flat) list of all user names from all pages, or an empty list if none.
    """
    all_users = []

    async with aiohttp.ClientSession() as session:
        countries = await fetch_countries(session)

        tasks = []
        for country in countries[:2]:
            first_page_url = build_country_url(country, 1)
            last_page = await find_last_page(session, first_page_url)

            for page_num in range(1, last_page + 1):
                page_url = build_country_url(country, page_num)
                tasks.append(fetch_users_on_page(session, page_url))

        # Now gather all results concurrently
        results = await asyncio.gather(*tasks)

        # Flatten the list of lists into a single list
        for user_list in results:
            all_users.extend(user_list)

    return all_users


RATE_LIMIT = 1.0
MAX_RETRIES = 10
import time
# Initialize your Spark session once (e.g. in your main code)
#spark = SparkSession.builder.appName("BGGCSV").getOrCreate()

spark = SparkSession.builder \
    .master("local[*]") \
    .appName("TestApp") \
    .getOrCreate()
spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "true")

def fetch_csv_as_spark_df(username,queue_csv):
    """
    1) Request the CSV export from BGG, which triggers CSV generation (first 200).
    2) Keep polling until the actual CSV content is returned (second 200).
    3) Convert it into a Spark DataFrame.
    """
    url = (
        "https://boardgamegeek.com/"
        "geekcollection.php?action=exportcsv&subtype=boardgame&username="
        f"{username}&all=1"
    )
    print(f"Requesting CSV for user={username}")
    
    exp_ratelimit = RATE_LIMIT
    actual_csv_retrieved = False
    response = None

    for attempt in range(MAX_RETRIES):
        exp_ratelimit *= 2
        response = requests.get(url)
        status_code = response.status_code
        print(f"Attempt={attempt+1}, HTTP={status_code}")

        if status_code == 429:
            # Hit the BGG rate limit, back off exponentially
            
            print(f"Rate-limited. Sleeping {exp_ratelimit} seconds...")
            time.sleep(exp_ratelimit)
            continue

        if status_code == 200:

            #If we are in "queue csv" mode, the csv download will be queued. Since it takes a while, we move to the next.
            if queue_csv:
                return            
            content_text = response.text.strip()
            #Fallback. If the csv was queued but is still beeing processed, we query until available
            if "collection has been accepted" in content_text.lower():
                
                print(f"BGG is still preparing the CSV. Retrying...Sleeping {exp_ratelimit} seconds...")
                time.sleep(exp_ratelimit)
                continue
            
            actual_csv_retrieved = True
            break

        # If we got here with another status code (403, 404, 500, etc.),
        # sleep a bit and retry
        exp_ratelimit *= 2

    if not actual_csv_retrieved or response is None:
        print(f"Failed to retrieve CSV for {username} after {MAX_RETRIES} tries.")
        return None 

    
    csv_bytes = io.BytesIO(response.content)
    df_pandas = pd.read_csv(csv_bytes)[['objectid','rating']]
    df_pandas['user'] = username

    return spark.createDataFrame(df_pandas)

def fetch_all_users_parallel(usernames, max_workers=4):
    """
    Download CSVs for all given usernames in parallel threads,
    returning a list of Spark DataFrames.
    """
    dataframes = []

    with ThreadPoolExecutor(max_workers=max_workers) as executor:
        # Dictionary: future -> username
        for user in usernames:
            executor.submit(fetch_csv_as_spark_df, user,True)
        future_to_user = {
            executor.submit(fetch_csv_as_spark_df, user,False): user
            for user in usernames
        }

        for future in as_completed(future_to_user):
            user = future_to_user[future]
            try:
                df_spark = future.result()
                dataframes.append(df_spark)
            except Exception as exc:
                print(f"User {user} generated an exception: {exc}")

    return dataframes


def main(usernames):

    if dfs := fetch_all_users_parallel(usernames, max_workers=3):
        from functools import reduce
        from pyspark.sql import DataFrame

        all_data = reduce(DataFrame.unionByName, dfs)
        print(f"Total rows in combined DataFrame: {all_data.count()}")



    #spark.stop()
    return all_data

users = await scrape_users()
test = main(users)


CancelledError: 

In [11]:
test.show(4)

+-----------+--------+------+--------+------+---+--------+----+---------+----------+---------+----------+--------+----------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------+--------+-------+---------+-----+--------+----------+-------------+----------+----------+-----------+-----------+-----------+-------------+-------------+--------------+--------------+---------------------+-----------+-------+----+--------+-----+----------+-------+------------------+-----------------+---------------------+--------------------+-----------+
| objectname|objectid|rating|numplays|weight|own|fortrade|want|wanttobuy|wanttoplay|prevowned|preordered|wishlist|wishlistpriority|     wishlistcomment|             comment|       conditiontext|        haspartslist|       wantpartslist|  collid|baverage|average|avgweight| rank|numowned|objecttype| originalname|minplayers|maxplayers|playingtime|maxplaytime|minplaytime|yearpublished|bggrecplayers|

In [10]:
len(users)

27

In [3]:
import api_functions as api_func
username = "bachka"
test = api_func.get_collection_username(username)

In [2]:
test

[]