In [1]:
import requests
import json
import pandas as pd
import os
from datetime import datetime
import re
import random 
import time 
import glob
import asyncio
from typing import Tuple, Optional, List, Dict, Any, Set
from pydantic import BaseModel, ValidationError, Field
import aiohttp
import sys

In [2]:
MASTER_EVENTS_DIR = "../Data/Master/Events"
MASTER_EVENTS_SUFFIX = "_master_events.csv"
MASTER_EVENTS_REGEX = rf"^\d{{8}}{re.escape('_')}{re.escape(MASTER_EVENTS_SUFFIX)}$"

SINGLES_PAYLOADS_DIR = "../Data/Processed/Singles_match_payloads"
os.makedirs(SINGLES_PAYLOADS_DIR, exist_ok=True)

RAW_MATCH_DETAILS_DIR = "../Data/Raw/Match_details"
os.makedirs(RAW_MATCH_DETAILS_DIR, exist_ok=True)

MIN_PAUSE = 0.00
MAX_PAUSE = 0.02

MAX_RETRIES = 4

FAILURE_LOG_PATH = "../Data/Raw/failure_log_match_details.csv"


# only used in get_latest_master_events to return blank df if no files found and further checks are required
MINIMAL_EVENT_COLUMNS = ["eventId"]

In [3]:
# Define Pydantic models for basic validation for match details api response.

# contains two player models inside a list with key = competitiors (THERE IS A TYPO IN THE API)
class CompetitorsModel(BaseModel):
    """Ensures the 'players' list (containing PlayerModel) exists."""
    competitiorId: str
 

class MatchDetailModel(BaseModel):
    """Ensures the top-level keys we need for filtering and analysis exist."""
    eventId: str
    documentCode: str
    #This field can be null
    resultOverallScores: Optional[str] = None 
    # Must be a list of CompetitorModels
    # Mispelling of competitiors is intentional to match the api response
    competitiors: List[CompetitorsModel] 

In [4]:
def get_latest_master_events(master_dir:str, master_regex) -> Tuple[pd.DataFrame,Optional[str]]:
    """
    Parses specified directory for events files in format yyyy_mm_dd. 
    Attempts to read latest file in this format. 

    Args:
        directory (str): The folder where the master files are stored (e.g., '../Data/Events/Intermediate').
        filename_pattern (str): The pattern to match (e.g., '*_events_intermediate.csv').

    Returns:
        Tuple[pd.DataFrame,Optional]: returns DF with data if available or blank df if data unavailable
    """
    if not os.path.isdir(master_dir):
        print (f"‚ùå{master_dir} does not exist as a directory")
        return pd.DataFrame(columns=MINIMAL_EVENT_COLUMNS), None    
    
    # Get csv files in 
    files = glob.glob(f"{master_dir}/*.csv")
   

    master_files = []
    
    if not files:
        print(f"‚ùå No existing *.csv files found in MASTER Events Directory: {master_dir} ")
        return pd.DataFrame(columns=MINIMAL_EVENT_COLUMNS), None 

    for file in files:
        filename = os.path.basename(file)    
       
        if re.match(master_regex,filename):
          master_files.append(file)

    if not master_files:
        print(f"‚ùå No existing MASTER files in format: {master_regex} in {master_dir}")
        return pd.DataFrame(columns=MINIMAL_EVENT_COLUMNS), None 
    master_files.sort()    
    latest_master = master_files[-1]

    try: 
        latest_master_df = pd.read_csv(latest_master)
        print(f"‚úÖ {len(latest_master_df)} events found in latest MASTER: {latest_master} ")
        return latest_master_df, latest_master
        
    except Exception as e:
        print (f"‚ùå Error reading lastest MASTER, {latest_master}: {e}")
        return pd.DataFrame(columns=MINIMAL_EVENT_COLUMNS), None 

In [5]:
def get_all_payloads(singles_payloads_dir:str) -> List[Tuple[int, str]]:
    """
    Reads all singles payload files and returns a complete list of (eventId, documentCode) tuples.
    """
    print(f"--- üü† Finding  all event ids and payloads from {singles_payloads_dir} ---")

    
    all_csv_files = glob.glob(f"{singles_payloads_dir}/*.csv")
    all_payloads_list = []
    
    if not all_csv_files:
        print(f"--- ‚ùå ERROR: No CSV files found in {singles_payloads_dir}. ---")
        return []
    
  
    
    for file_path in all_csv_files:    
        filename = os.path.basename(file_path)       
            
        
        try:
            # ONLY get the eventId match code
            payload_df = pd.read_csv(file_path, usecols=['eventId', 'documentCode'])
            if payload_df.empty:
                continue

            payload_df['eventId'] = payload_df['eventId'].astype(int)
            payload_df['documentCode'] = payload_df['documentCode'].astype(str)            
            # Convert to list of (eventId, documentCode) tuples
            payloads = list(payload_df[['eventId', 'documentCode']].itertuples(index=False, name=None))
            all_payloads_list.extend(payloads)

            
            if payload_df.empty:
                continue
        except (pd.errors.EmptyDataError, KeyError, FileNotFoundError) as e:
            print(f"--- ‚ùåERROR: Could not read payload file {filename}: {e}")
            continue
        # check for duplicates and remove :) 
    all_payloads_list = list(set(all_payloads_list))
    if all_payloads_list:
        print(f"--- ‚úÖ All desired payloads: Found {len(all_payloads_list)} total unique matches to scrape. ---")
        return  all_payloads_list
    else:
        print(f"‚ùå No Match payloads found")
 
                   

In [6]:
def get_obtained_match_details(raw_match_details_dir:str) -> List[Tuple[int, str]]:
    """
    Parses all obtained singles_match_details files. 
    Return a list of tuple(eventId, match_code) which are require for scraping
    used to determine the matches that are already found. 
    """
    print(f"--- üü† Finding already obtained match details from {raw_match_details_dir} ---")

    all_details_list = []
    
    # get all file_paths
    all_details_files = glob.glob(f"{raw_match_details_dir}/*match_details.json")
    files_processed_count = 0
    
    for file_path in all_details_files: 
        files_processed_count += 1
        
        # try to read the file (catch error if it fails)
        try:
            # Safely open and load the JSON file
            with open(file_path,"r") as f:
                matches_list = json.load(f)
            
            # Check if the file contains the expected list of matches
            if not isinstance(matches_list, list):
                 print(f" Skipping file {os.path.basename(file_path)}: Content is not a list.")
                 continue

            # get eventId and match code from each match 
            for match in matches_list:
                event_id_raw = match.get("eventId")
                match_code = match.get("documentCode")
                
                # check the data exists.
                if event_id_raw and match_code:
                    try:
                        event_id = int(event_id_raw) # Ensure ID is an integer
                        all_details_list.append((event_id, match_code))
                    except ValueError:
                        print(f"Skipping record in {os.path.basename(file_path)} due to bad data eventId.")
                
        except json.JSONDecodeError:
            print(f"‚ùå ERROR: Failed to read JSON in {os.path.basename(file_path)}. ")
        except Exception as e:
            print(f"‚ùå ERROR: Unexpected error reading {os.path.basename(file_path)}: {type(e).__name__}")
            
    # convert to set to remove duplicates 
    final_unique_list = list(set(all_details_list))
    
    # get the number of events read 
    unique_events_obtained = len(set(match_tuple[0] for match_tuple in final_unique_list))
    
    print(f"--- ‚úÖ Found {len(final_unique_list)} total unique match details across {files_processed_count} files. ---")
    print(f"--- ‚úÖ Unique Events with Data: {unique_events_obtained} ---")

    return final_unique_list

In [7]:
async def get_match_details(session: aiohttp.ClientSession, 
                            match_tuple: Tuple[int, str], 
                            min_pause: float, 
                            max_pause: float
                           ) -> Tuple[bool, Optional[Dict[str, Any]], Tuple[int, str], str]:
    """
    Unpacks match_tuple to get event_id and match_code used to call API endpoint.
    Returns:
        status: A boolean indicating success (True) or failure (False).
        dict: The FULL RAW JSON response as a Python dictionary, or None if an error occurs.
        match_tuple: The original input tuple (event_id, match_code).
        status_msg: A string message to log error codes upon failure.
    """
    # pause before running for api politeness
    await asyncio.sleep(random.uniform(min_pause,max_pause)) 
            
            
    event_id, match_code = match_tuple
    status_msg = ""


    url = f"https://liveeventsapi.worldtabletennis.com/api/cms/GetMatchCardDetails/{event_id}/{match_code}?&use_live_match_cache=false"
    
    headers = {
        'Accept': 'application/json',
        'Accept-Language': 'en-GB,en;q=0.9,es;q=0.8',
        'Cache-Control': 'no-cache',
        'Connection': 'keep-alive',
        'Content-Type': 'application/json',
        'DNT': '1',
        'Origin': 'https://www.worldtabletennis.com',
        'Pragma': 'no-cache',
        'Referer': 'https://www.worldtabletennis.com/',
        'Sec-Fetch-Dest': 'empty',
        'Sec-Fetch-Mode': 'cors',
        'Sec-Fetch-Site': 'same-site',
        'User-Agent': 'Mozilla/5.0 (Linux; Android 11.0; Surface Duo) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/140.0.0.0 Mobile Safari/537.36',
        'sec-ch-ua': '"Chromium";v="140", "Not=A?Brand";v="24", "Google Chrome";v="140"',
        'sec-ch-ua-mobile': '?1',
        'sec-ch-ua-platform': '"Android"'
    }
    
    
    try:
        async with session.get(url,headers=headers,timeout = 20) as response:
            
            # Raise an error for bad status codes
            response.raise_for_status()       
            raw_json_response = await response.json()                

            if raw_json_response:          
               
                # validate and dump
                # raises error if validation fails.
                MatchDetailModel.model_validate(raw_json_response)

                status_msg = f"Successful, raw JSON response for {match_tuple}."
                return True, raw_json_response, match_tuple, status_msg 
               
            else:
                status_msg = f"No data returned for {match_tuple}."
                return False, None, match_tuple, status_msg  
    except ValidationError as e: # Catches Pydantic schema errors
        try:
            first_error = e.errors()[0]
            field = ".".join(map(str, first_error['loc']))
            status_msg = f"Task failed with error: Validation Error (Field '{field}' missing)"
        except:
             status_msg = "Task failed with error: ValidationError (Unknown structure)"
        return False, None, match_tuple, status_msg      

    except Exception as e: 
        status_msg = f"Error: {type(e).__name__}"
        return False, None, match_tuple, status_msg

In [8]:
async def main_scraper(tuples_to_scrape: List[Tuple[int, str]]) -> Tuple[List[Dict[str, Any]], List[Tuple[int, str, str]]]:
    """
    Runs the simulation and collects results into success and failure lists.
    (Docstring remains the same)
    """
    
    successful_matches: List[Dict[str, Any]] = []
    # This list will hold the (eventId, matchCode) tuples of failed tasks
    failed_matches_log: List[Tuple[int, str, str]] = []
    total_tasks = len(tuples_to_scrape)
    
    #start session for concurrent requests
    async with aiohttp.ClientSession() as session:
    
        # create all coroutines (helper function and the input tuples to scrape (event_id, match_code))
        coroutines = [get_match_details(session, task, MIN_PAUSE, MAX_PAUSE) for task in tuples_to_scrape]

        print(f"--- üöÄ Launching {total_tasks} tasks concurrently... ---")
        start_time = time.time()
        processed_count = 0

        # process results as coroutines are completed.
        for future in asyncio.as_completed(coroutines):
            processed_count += 1
            
            # await result tuple - helper function returns and catches errors itself
            # including data validation using pydantic. 
            # here function returns the input tuple as an output for logging.
            status, result_dict, input_tuple, status_msg = await future

            # check status of function result
            
            if status:
                # on successfully getting expected api response:
                successful_matches.append(result_dict)
                
                # logging
                event_id, match_code = input_tuple
                elapsed = time.time() - start_time                
                
            else:
                # add the input tuple AND error message to the FAILED log
                event_id, match_code = input_tuple                
                failed_matches_log.append((event_id, match_code, status_msg))
            # print every 200 tasks to keep track of progress
            if processed_count % 20 == 0:
                log_line = f"--- üü† [{processed_count}/{total_tasks}] Time elapsed: {elapsed:.2f}s. üü† ---"
                print(log_line.ljust(80), end='\r')
                # print every 10 tasks to keep track of pro

        # Final print summary outside the loop.
        print(" " * 80, end='\r') 
        print("\n" + "=" * 50)        
        print(f"üü¢ Successfully fetched: {len(successful_matches)} tasks üü¢")
        print(f"Failed to fetch (ready for retry): {len(failed_matches_log)} tasks")
        
        return successful_matches, failed_matches_log


In [9]:
def group_match_details(all_match_details: List[Dict[str, Any]]) -> Dict[int, List[Dict[str, Any]]]:
    """Groups all fetched match details into a dictionary where the key is the eventId."""
    event_groups: Dict[int, List[Dict[str, Any]]] = {}

    for match_detail in all_match_details:
        
        event_id_raw = match_detail.get('eventId')
        if event_id_raw is not None:
            try:
                event_id = int(event_id_raw)
                event_groups.setdefault(event_id, []).append(match_detail)
            except ValueError:
                print(f"WARN: Skipping record due to non-integer Event ID: {event_id_raw}")
    return event_groups

In [10]:
def append_to_json_list(output_filename: str, new_matches: List[Dict[str, Any]]):
    """
    Reads a list from a JSON file, appends new match data, and writes the full list back.
    If the file doesn't exist, it creates a new file with the data.
    """
    existing_list = []

    # READ and Check if the file exists and read its contents.
    if os.path.exists(output_filename) and os.path.getsize(output_filename) > 0:
        try:
            with open(output_filename, 'r') as f:
                # Load the current list from the file
                existing_list = json.load(f)
        except json.JSONDecodeError:
            print(f"‚ùå Warning: Could not decode JSON from {output_filename}. Starting with an empty list.")
        except Exception as e:
            print(f"‚ùå Error reading {output_filename}: {e}. Starting with an empty list.")

    # Ensure existing_list is actually a list before appending new data 
    if not isinstance(existing_list, list):
        print(f"‚ùå Warning: Data in {output_filename} was not a list. Overwriting with new data.")
        existing_list = []
    
   
    existing_ids = {match.get('id') for match in existing_list if isinstance(match, dict)}
    added_count = 0
    for match in new_matches:
        if isinstance(match, dict) and match.get('id') not in existing_ids:
            existing_list.append(match)
            added_count += 1
  
    try:
        with open(output_filename, 'w') as f:
            # use W mode as we are writing a new, COMPLETE, updated set of data.
            json.dump(existing_list, f, indent=4)        
        
        # Use  end='\r' return to create a dynamic updating line
        print(f"‚úÖ Updated Event {os.path.basename(output_filename)}: Added {added_count} new matches (Total: {len(existing_list)}).", end='\r')
    except Exception as e:
        print(f"‚ùå Error writing to {output_filename}: {e}".ljust(80)) 
    return None 

In [11]:
if __name__ == "__main__": 
    
    # Get the latest master events file

    latest_master_df, latest_master_file = get_latest_master_events(MASTER_EVENTS_DIR, MASTER_EVENTS_REGEX)
    if latest_master_df.empty:
        print(f"‚ùå Exiting: No existing Master Events File available")
        sys.exit(1)
    
    # get all payloads that could be scraped for match details
    # compare this to matches already obtained, leaving only payloads left to be scraped!Q

    all_payloads = get_all_payloads(SINGLES_PAYLOADS_DIR)
    already_obtained_matches = get_obtained_match_details(RAW_MATCH_DETAILS_DIR)

    intitial_matches_to_scrape = list(set(all_payloads) - set(already_obtained_matches))
    intitial_matches_to_scrape_count = len(intitial_matches_to_scrape)
    intitial_events_to_scrape_count = len(set([match_tuple[0] for match_tuple in intitial_matches_to_scrape]))
    print(f"\nüèì Matches to scrape: {intitial_matches_to_scrape_count} across {intitial_events_to_scrape_count} events üèì")
    
    # Get the initial list of tasks / matches to be scraped
    
    matches_to_scrape = intitial_matches_to_scrape 
    # List to hold ALL successful ¬†and failed results from all attempts
    all_successful_data = []    
    failures = [] 
    # set retries count to 0 
    retries = 0
    # Start total timer
    global_start_time = time.time() 


    ############################ START OF MAIN ASYNC LOOP  #####################################

    # while loop - keep trying if there are still matches to get or until max retries count is reached
    while (retries < MAX_RETRIES) and bool(matches_to_scrape):
        retries += 1 # Increment attempt counter (Attempt 1, 2, ...)
        
        # print intial scraping started
        if retries == 1:
            print(f"--- üöÄ Starting initial scrape for {len(matches_to_scrape)} matches... ---")
        else:
            # log if a retry has started
            print(f"\n--- üîÑ Starting Retry {retries-1}/{MAX_RETRIES-1} for {len(matches_to_scrape)} remaining matches... ---")
        
        # start scraping and log retry start time (useful if one retry goes awry)
        attempt_start_time = time.time()
        
        # run the async fetching 
        successes, failures = await (main_scraper(tuples_to_scrape=matches_to_scrape))
        
        
        # process results as they finish
        
        # Add new successes to the successes list 
        all_successful_data.extend(successes)
        
        # after each retry - update new matches to scrape list for next loop

        matches_to_scrape = [(event_id, match_code)for event_id, match_code, error_msg in failures]
        
        # log results of the retry attempt dependent on success / fails
        attempt_duration = time.time() - attempt_start_time
        if successes: 
            print(f"--- Attempt {retries} finished in {attempt_duration:.2f}s. Got {len(successes)} new results. ---")
        if failures: 
            print(f"--- {len(failures)} tasks failed and will be retried. ---")

    ################################### END OF MAIN ASYNC LOOP #####################################

    # Final Printing summary (OUTSIDE OF MAIN LOOP) 
    print("\n" + "=" * 50)
    
    # if no matches left to scrape, then ¬†log success :) 
    if not matches_to_scrape: 
        print("--- ‚úÖüèìüü¢ Scraping Complete. All tasks finished successfully. üü¢üèì‚úÖ---")
    else:
        # 'failures' holds the leftover failures ¬†from the LAST attempt
        print(f"--- ‚ö†Ô∏è Scraping Complete. {len(failures)} tasks permanently failed after {MAX_RETRIES} attempts. ---")
        
    print(f"Total successful matches collected: {len(all_successful_data)}")
    total_duration = time.time() - global_start_time
    print(f"Total Wall Clock Time: {total_duration:.2f} seconds.")

    # group all data 
    grouped_details = group_match_details(all_successful_data) 
    # log events saved (useful if errors occur)
    events_saved_count = 0 

    print("--- üíæ Saving data to disk (Read-Modify-Write) ---")
    for event_id, new_matches_list in grouped_details.items():
        output_filename = os.path.join(RAW_MATCH_DETAILS_DIR, f"{event_id}_match_details.json")
        
        existing_matches_list = []
        
        # Read file and check if the file exists and read its contents.
        if os.path.exists(output_filename) and os.path.getsize(output_filename) > 0:
            try:
                with open(output_filename, 'r') as f:
                    existing_matches_list = json.load(f)
                if not isinstance(existing_matches_list, list):
                    print(f"‚ö†Ô∏è Warning: Data in {output_filename} was not a list. Overwriting.")
                    existing_matches_list = []
            except (json.JSONDecodeError, Exception) as e:
                print(f"‚ö†Ô∏è Warning: Could not read/decode {output_filename} ({e}). Overwriting.")
                existing_matches_list = []

        # log all unique matches processed 
        existing_doc_codes = {match.get('documentCode') for match in existing_matches_list if isinstance(match, dict)}
        added_count = 0
        
        for match in new_matches_list:
            if isinstance(match, dict) and match.get('documentCode') not in existing_doc_codes:
                existing_matches_list.append(match)
                added_count += 1
        
       
        try:
            with open(output_filename, 'w') as f:
                json.dump(existing_matches_list, f, indent=4)
            
            events_saved_count += 1
            print(f"‚úÖ Updated Event {event_id}: Added {added_count} new matches (Total: {len(existing_matches_list)}).", end='\r')
            
        except Exception as e:
            print(f"‚ùå ERROR saving Event {event_id}: {type(e).__name__}".ljust(80))
    

    print(f"\n--- ‚úÖ All successful events written to disk. ({events_saved_count} files) ---")

    # if failures exists, print and save for future reference
    if failures: 
        failures_df = pd.DataFrame(failures, columns=["eventId", "matchCode","failureReason"])
        try:
            # Save the failures to a csv for reference
            failures_df.to_csv(FAILURE_LOG_PATH, index=False)
            
            print(f"--- ‚úÖ Failure log saved for {len(failures_df)} tasks to {FAILURE_LOG_PATH} ---")
            
        except Exception as e:
            print(f"--- ‚ùå FAILED to save failure log: {e} ---")

‚úÖ 187 events found in latest MASTER: ../Data/Master/Events/20251110__master_events.csv 
--- üü† Finding  all event ids and payloads from ../Data/Processed/Singles_match_payloads ---
--- ‚úÖ All desired payloads: Found 24665 total unique matches to scrape. ---
--- üü† Finding already obtained match details from ../Data/Raw/Match_details ---
--- ‚úÖ Found 24665 total unique match details across 187 files. ---
--- ‚úÖ Unique Events with Data: 187 ---

üèì Matches to scrape: 0 across 0 events üèì

--- ‚úÖüèìüü¢ Scraping Complete. All tasks finished successfully. üü¢üèì‚úÖ---
Total successful matches collected: 0
Total Wall Clock Time: 0.00 seconds.
--- üíæ Saving data to disk (Read-Modify-Write) ---

--- ‚úÖ All successful events written to disk. (0 files) ---


In [None]:
cle