In [2]:
import os
os.environ["PATH"] = "/opt/homebrew/bin:" + os.environ.get("PATH", "")
import re
import pandas as pd
import time
import zipfile
import rarfile
import subprocess
import gzip
import bz2
import shutil
import pathlib
from pathlib import Path
from io import BytesIO
from urllib.parse import urljoin

from demoparser2 import DemoParser

from bs4 import BeautifulSoup
import undetected_chromedriver as uc
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC

from shapely.geometry import Point, Polygon
import matplotlib.pyplot as plt
import random

import concurrent.futures

# Functions

## Webscraping

In [3]:
def setup_undetected_chrome(download_dir: pathlib.Path, headless: bool = False) -> uc.Chrome:
    options = uc.ChromeOptions()
    prefs = {
        "download.default_directory": str(download_dir.resolve()),
        "download.prompt_for_download": False,
        "download.directory_upgrade": True,
        "safebrowsing.enabled": True,
    }
    options.add_experimental_option("prefs", prefs)
    options.headless = headless
    return uc.Chrome(options=options)

def get_soup(driver, url: str, delay: int = 5) -> BeautifulSoup:
    driver.get(url)
    try:
        WebDriverWait(driver, delay).until(
            EC.presence_of_element_located((By.CSS_SELECTOR, "a[href^='/matches/']"))
        )
    except Exception:
        time.sleep(delay)
    html = driver.page_source
    return BeautifulSoup(html, "lxml")

def find_match_urls_from_results(driver, max_matches: int = 5, delay: int = 5) -> list[str]:
    soup = get_soup(driver, RESULTS_URL, delay = delay)
    time.sleep(15)
    anchors = soup.select("a[href^='/matches/']")
    urls, seen = [], set()
    for a in anchors: 
        href = a.get("href", "")
        if "/matches/" in href:
            full = urljoin(HLTV_BASE, href)
            if full not in seen: 
                seen.add(full)
                urls.append(full)
    print(f"Found {len(urls)} match links on results page.")
    return urls[:max_matches]
    

def find_demo_download_url(driver, max_wait=6) -> str:
    # match_url: str
    # driver.get(match_url)
    print("Waiting for demo link to become availableâ€¦")
    for i in range(max_wait):
        try:
            # Try primary robust selector
            link = driver.find_element(By.CSS_SELECTOR, "a.stream-box[data-demo-link-button][data-demo-link]")
            demo_path = link.get_attribute("data-demo-link")
            if demo_path:
                demo_url = urljoin(HLTV_BASE, demo_path)
                print(f"Demo found via CSS selector: {demo_url}")
                return demo_url
        except Exception:
            pass

        # Fallback: try finding any anchor with /download/demo/ in href
        anchors = driver.find_elements(By.TAG_NAME, "a")
        for a in anchors:
            href = a.get_attribute("href")
            if href and "/download/demo/" in href:
                print(f"Demo found via fallback href: {href}")
                return href

        time.sleep(10)
        print(f"â€¦ still waiting ({(i+1)*10}s)")

    raise RuntimeError(f"No demo link found on match page after {max_wait*10}s: {match_url}")

def extract_match_data(driver, match_url: str, delay: int = 5) -> dict:
    """
    Extract detailed match data from a single HLTV match page.
    
    Returns a dictionary containing:
    - match_url: The URL of the match
    - team1_name: Name of team 1
    - team2_name: Name of team 2
    - team1_score: Number of maps won by team 1
    - team2_score: Number of maps won by team 2
    - map_1 through map_5: Individual map names (None if not played)
    - total_maps: Total number of maps played
    - date: Match date
    - event: Event name
    - demo_url: Demo download URL (if available)
    """
    soup = get_soup(driver, match_url, delay=delay)
    
    match_data = {
        'match_url': match_url,
        'team1_name': None,
        'team2_name': None,
        'team1_score': 0,
        'team2_score': 0,
        'map_1': None,
        'map_2': None,
        'map_3': None,
        'map_4': None,
        'map_5': None,
        'total_maps': 0,
        'date': None,
        'event': None,
        'demo_url': None
    }
    
    # Extract team names and scores
    teams = soup.find_all('div', class_='team')
    if len(teams) >= 2:
        # Team 1 (left side)
        team1 = teams[0]
        team1_name_elem = team1.find('div', class_='teamName')
        if team1_name_elem:
            match_data['team1_name'] = team1_name_elem.text.strip()
        
        # Check if team1 won or lost and get score
        team1_won = team1.find('div', class_='won')
        team1_lost = team1.find('div', class_='lost')
        if team1_won:
            score_text = team1_won.text.strip()
            try:
                match_data['team1_score'] = int(score_text)
            except ValueError:
                pass
        elif team1_lost:
            score_text = team1_lost.text.strip()
            try:
                match_data['team1_score'] = int(score_text)
            except ValueError:
                pass
        
        # Team 2 (right side)
        team2 = teams[1]
        team2_name_elem = team2.find('div', class_='teamName')
        if team2_name_elem:
            match_data['team2_name'] = team2_name_elem.text.strip()
        
        # Check if team2 won or lost and get score
        team2_won = team2.find('div', class_='won')
        team2_lost = team2.find('div', class_='lost')
        if team2_won:
            score_text = team2_won.text.strip()
            try:
                match_data['team2_score'] = int(score_text)
            except ValueError:
                pass
        elif team2_lost:
            score_text = team2_lost.text.strip()
            try:
                match_data['team2_score'] = int(score_text)
            except ValueError:
                pass
    
    # Extract maps that were played
    maps_played = []
    mapholders = soup.find_all('div', class_='mapholder')
    for mapholder in mapholders:
        # Check if this map was actually played
        played = mapholder.find('div', class_='played')
        if played:
            mapname_elem = mapholder.find('div', class_='mapname')
            if mapname_elem:
                map_name = mapname_elem.text.strip()
                maps_played.append(map_name)
    
    # Assign maps to individual columns (up to 5 maps)
    for i, map_name in enumerate(maps_played[:5], 1):
        match_data[f'map_{i}'] = map_name
    
    match_data['total_maps'] = len(maps_played)
    
    # Extract date and event
    time_and_event = soup.find('div', class_='timeAndEvent')
    if time_and_event:
        # Extract date
        date_elem = time_and_event.find('div', class_='date')
        if date_elem:
            match_data['date'] = date_elem.text.strip()
        
        # Extract event name
        event_elem = time_and_event.find('div', class_='event text-ellipsis')
        if event_elem:
            match_data['event'] = event_elem.text.strip()
    
    # Extract demo download URL
    try:
        demo_url = find_demo_download_url(driver, max_wait=6)
        match_data['demo_url'] = demo_url
    except Exception as e:
        print(f"Error getting demo URL: {e}")
        match_data['demo_url'] = None
    
    return match_data

def scrape_multiple_matches(driver, match_urls, delay: int = 5) -> list[dict]:
    """
    Scrape data from multiple matches on the results page.
    
    Returns a list of dictionaries, each containing match data.
    """
    all_match_data = []
    
    for i, match_url in enumerate(match_urls, 1):
        print(f"\nScraping match {i}/{len(match_urls)}: {match_url}")
        try:
            match_data = extract_match_data(driver, match_url, delay=delay)
            all_match_data.append(match_data)
            maps_str = ', '.join([match_data[f'map_{j}'] for j in range(1, 6) if match_data[f'map_{j}'] is not None])
            # print(f"  - {match_data['team1_name']} vs {match_data['team2_name']}: {match_data['team1_score']}-{match_data['team2_score']}")
            # print(f"  - Maps played: {maps_str}")
            # print(f"  - Demo URL: {match_data['demo_url']}")
        except Exception as e:
            print(f"Error scraping {match_url}: {e}")
        
        # Add delay between requests to avoid rate limiting
        if i < len(match_urls):
            time.sleep(REQUEST_DELAY_SEC)
    
    return all_match_data

def save_match_data_to_csv(match_data_list: list[dict], output_file: str = "hltv_matches.csv"):
    """
    Save scraped match data to a CSV file.
    """
    df = pd.DataFrame(match_data_list)
    
    # Reorder columns for better readability
    column_order = [
        'match_url', 'demo_url', 'date', 'event',
        'team1_name', 'team2_name', 
        'team1_score', 'team2_score',
        'total_maps',
        'map_1', 'map_2', 'map_3', 'map_4', 'map_5'
    ]
    
    df = df[column_order]
    df.to_csv(output_file, index=False)
    print(f"\nData saved to {output_file}")
    return df

## Downloading

In [4]:
def human_size(bytes_):
    for unit in ['B', 'KB', 'MB', 'GB']:
        if bytes_ < 1024:
            return f"{bytes_:3.1f} {unit}"
        bytes_ /= 1024
    return f"{bytes_:3.1f} TB"

def get_demo_number_from_url(demo_url: str) -> str:
    # Extract the number at the end of the demo URL, e.g., '102077' from 'https://www.hltv.org/download/demo/102077'
    match = re.search(r'/demo/(\d+)', demo_url)
    if match:
        return match.group(1)
    else:
        raise ValueError(f"Could not find demo number in URL: {demo_url}")

def download_demo_with_selenium(driver, demo_url: str, out_dir: pathlib.Path) -> pathlib.Path:
    driver.get(demo_url)
    size_prev = 0
    start = time.time()
    final_file = None
    print("â–¶ Starting downloadâ€¦")
    
    while True:
        candidates = []
        for ext in ['zip', 'rar', 'gz', 'bz2', 'dem']:
            candidates += list(out_dir.glob(f"*.{ext}"))
        if candidates:
            final_file = max(candidates, key=lambda f: f.stat().st_ctime)
            # NEW: Wait for file size to stabilize
            time.sleep(3)
            stable_size = final_file.stat().st_size
            time.sleep(2)
            if final_file.stat().st_size == stable_size and stable_size > 0:
                print(f"\nâœ… Download complete: {final_file.name} ({human_size(stable_size)})")
                break
            else:
                print(f"\rðŸ“¦ Finalizing... {human_size(stable_size)}", end="")
                final_file = None
                continue

        partials = list(out_dir.glob("*.crdownload"))
        if partials:
            part = partials[0]
            size = part.stat().st_size
            if size != size_prev:
                print(f"\rðŸ“¦ Downloading... {human_size(size)}", end="")
                size_prev = size

        if time.time() - start > 3600:
            print("\nâš  Timed out waiting for download to finish.")
            break
        time.sleep(1)
    
    if not final_file:
        raise RuntimeError("No finished demo file detected after timeout.")
    
    # Additional wait to ensure Chrome finishes all file operations
    print("Waiting for file system sync...")
    time.sleep(5)
    return final_file

def extract_dem_from_file(archive_path: pathlib.Path, out_dir: pathlib.Path, name_filter: str = None) -> pathlib.Path:
    out_dir.mkdir(parents=True, exist_ok=True)
    suffix = archive_path.suffix.lower()

    def matches_filter(filename):
        return name_filter.lower() in filename.lower() if name_filter else True

    # Diagnostic: print header bytes
    with open(archive_path, 'rb') as f:
        header = f.read(16)
        print(f"   Header (hex): {header[:8].hex()}")

    if suffix == ".dem":
        if not matches_filter(archive_path.name):
            raise RuntimeError(f".dem file '{archive_path.name}' does not match filter '{name_filter}'. Skipping.")
        target = out_dir / archive_path.name
        shutil.copy2(archive_path, target)
        return target

    if suffix == ".zip":
        with zipfile.ZipFile(archive_path, "r") as z:
            dem_members = [m for m in z.namelist() if m.lower().endswith(".dem") and matches_filter(m)]
            if not dem_members:
                raise RuntimeError(f"No .dem matching filter '{name_filter}' found inside zip: {archive_path.name}")
            # Extract all filtered dem files and return first one
            for dem_file in dem_members:
                z.extract(dem_file, out_dir)
            return out_dir / dem_members[0]

    if suffix == ".rar":
        # Find WinRAR/UnRAR executable path
        winrar_paths = [
            r"C:\Program Files\WinRAR\WinRAR.exe",
            r"C:\Program Files (x86)\WinRAR\WinRAR.exe",
            r"C:\Program Files\WinRAR\UnRAR.exe",
            r"C:\Program Files (x86)\WinRAR\UnRAR.exe",
        ]
        unrar_path = None
        for path in winrar_paths:
            if os.path.exists(path):
                unrar_path = path
                break
    
        if not unrar_path:
            raise RuntimeError("WinRAR/UnRAR not found! Please install WinRAR or add UnRAR to PATH.")
    
        print(f"[rar] Using: {unrar_path}")
    
        # List archive contents using rarfile
        with rarfile.RarFile(str(archive_path), 'r') as rf:
            file_names = rf.namelist()
            matching_files = [f for f in file_names if f.lower().endswith('.dem') and (name_filter is None or name_filter.lower() in f.lower())]
    
        if not matching_files:
            if name_filter:
                raise RuntimeError(f"No .dem files matching filter '{name_filter}' found inside archive.")
            else:
                matching_files = [f for f in file_names if f.lower().endswith('.dem')]
                if not matching_files:
                    raise RuntimeError("No .dem files found inside archive.")
    
        # Extract only matching files using subprocess with WinRAR/UnRAR
        for file_to_extract in matching_files:
            print(f"[rar] Extracting file: {file_to_extract}")
            extract_result = subprocess.run(
                [unrar_path, "x", "-y", str(archive_path), file_to_extract, str(out_dir)],
                capture_output=True,
                text=True,
                timeout=600
            )
            print(f"[rar] Extract stdout: {extract_result.stdout[:200]}")
            print(f"[rar] Extract stderr: {extract_result.stderr[:200]}")
    
        # After extraction, return first matched .dem file path in out_dir
        dem_files = [f for f in out_dir.glob("*") if f.name in matching_files]
        if dem_files:
            return dem_files[0]
        else:
            raise RuntimeError("Extraction completed but .dem file not found in output.")

## Parsing

In [5]:
def add_next_tick_position(df):
    # Sort to ensure correctness (by player and tick)
    df = df.sort_values(['steamid', 'tick'])
    
    # Compute next tick positions for each player's ordered ticks
    df['X_next'] = df.groupby('steamid')['X'].shift(-1)
    df['Y_next'] = df.groupby('steamid')['Y'].shift(-1)
    df['tick_next'] = df.groupby('steamid')['tick'].shift(-1)
    
    return df

def parse_demo(dem_path: pathlib.Path, base_csv_dir: pathlib.Path, match_name: str, demo_id: str):
    dem_name = dem_path.stem

    print(f"Parsing demo: {dem_name}")
    parser = DemoParser(str(dem_path))

    KNOWN_MAPS = ["nuke", "mirage", "dust2", "inferno", "overpass", "train", "vertigo"]

    def extract_map_name(dem_name):
        dem_name_lower = dem_name.lower()
        for map_name in KNOWN_MAPS:
            if map_name in dem_name_lower:
                return map_name
        match = re.search(r"\b(" + "|".join(KNOWN_MAPS) + r")\b", dem_name_lower)
        if match:
            return match.group(1)
        return None

    meta = {
        "match": match_name,
        "demo_name": dem_name,
        "demo_path": str(dem_path),
        "date": None,
        "map": extract_map_name(dem_name),
        "total_rounds": None,
        "rounds_started": 0,
        "rounds_ended": 0,}

    try:
        meta["date"] = parser.match.date.strftime("%Y-%m-%d") if hasattr(parser, "match") and parser.match and parser.match.date else None
    except Exception:
        pass

    event_types = {
        "player_death": ["X", "Y", "attacker", "victim", "weapon", "total_rounds_played"],
        # Add other event types if needed
    }

    base_csv_dir.mkdir(parents=True, exist_ok=True)

    for event, fields in event_types.items():
        folder = base_csv_dir / event
        folder.mkdir(parents=True, exist_ok=True)

        try:
            result = parser.parse_event(event, player=fields)
            df = pd.DataFrame(result) if isinstance(result, list) else result
            df['demo_id'] = demo_id
            csv_path = folder / f"{dem_name}_{event}.csv"
            df.to_csv(csv_path, index=False)
            print(f"âœ… Exported {event} events to {csv_path} (rows={len(df)})")

            if event == "round_start":
                meta["rounds_started"] = len(df)
                meta["total_rounds"] = len(df)
            elif event == "round_end":
                meta["rounds_ended"] = len(df)

        except Exception as e:
            print(f"âš  Failed to export {event} events: {e}")

    all_props = [
        "X", "Y", "Z", "health", "armor_value", "has_helmet",
        "is_alive", "life_state", "active_weapon", "active_weapon_name",
        "inventory", "total_cash_spent", "team_num", "is_bomb_dropped", "is_bomb_planted"]

    try:
        tick_folder = base_csv_dir / "ticks"
        tick_folder.mkdir(parents=True, exist_ok=True)
        ticks_df = parser.parse_ticks([
            "X","Y","Z","health","is_alive",
            "active_weapon", "active_weapon_name",
            "inventory", "total_cash_spent","team_num"])
        ticks_df = add_next_tick_position(ticks_df)
        ticks_df['demo_id'] = demo_id
        tick_csv_path = tick_folder / f"{dem_name}_ticks.csv"
        ticks_df.to_csv(tick_csv_path, index=False)
        print(f"âœ… Exported ticks to {tick_csv_path} (rows={len(ticks_df)})")
    except Exception as e:
        print(f"âš  Failed to export ticks: {e}")

def parse_demo_with_timeout(dem_path: pathlib.Path, base_csv_dir: pathlib.Path, master_csv_path: pathlib.Path, match_name: str, timeout_sec=300):
    """
    Parses a demo with a timeout. If parsing exceeds `timeout_sec` seconds, skip and warn.
    """
    
    with concurrent.futures.ProcessPoolExecutor(max_workers=1) as executor:
        future = executor.submit(parse_demo, dem_path, base_csv_dir, master_csv_path, match_name)
        try:
            future.result(timeout=timeout_sec)
        except concurrent.futures.TimeoutError:
            print(f"âš  Parsing of {dem_path} took longer than {timeout_sec} seconds and was skipped.")
            # Forcefully terminate the process
            executor.shutdown(wait=False, cancel_futures=True)

def parse_demos(structured_root):
    base_dir = pathlib.Path("hltv_demos")
    matches_csv_path = "hltv_matches.csv"

    df = pd.read_csv(matches_csv_path)

    for idx, info in df.iterrows():
        dem_path_str = info.get('dem_path', "")
        
        # Check dem_path_str is a valid, non-empty string
        if not isinstance(dem_path_str, str) or dem_path_str.strip() == "":
            print(f"Skipping demo at row {idx} because dem_path is empty or invalid")
            continue
            
        status = str(info.get('parsed_status', "")).lower()
        if not dem_path_str or status in {"parsed", "skip"}:
            print(f"Skipping demo at row {idx} due to status: '{status}'")
            continue

        dem_path = pathlib.Path(dem_path_str)
        match_name = info.get('match_name', "")
        demo_url = info.get('demo_url','')
        demo_id = demo_url.rstrip('/').split('/')[-1]
        try:
            # Call parse_demo without master_csv_path argument
            parse_demo(dem_path, structured_root, match_name, demo_id)
            
            df.at[idx, 'parsed_status'] = "parsed"
            df.to_csv(matches_csv_path, index=False)
            print(f"âœ… Parsed demo {dem_path}")
        except Exception as e:
            print(f"âš  Failed to parse demo {dem_path}: {e}")

## Transformation

### Coordinate Labeling

In [6]:
def load_map_data(csv_file='map_bounds-in.csv'):
    df = pd.read_csv(csv_file)
    
    # Group coordinates by map and area name
    areas = {}
    for (map_name, area_name), group in df.groupby(['map', 'name']):
        # Extract x, y coordinates
        coords = list(zip(group['x_axis'], group['y_axis']))
        
        # Only create polygon if we have at least 3 points
        if len(coords) >= 3:
            areas[area_name] = Polygon(coords)
    
    return areas

def label_coordinate(x, y, areas):
    """
    Given X and Y coordinates, return the label of the area.
    If not inside any area, returns the closest area's name.
    """
    point = Point(x, y)
    min_dist = float('inf')
    closest_area = "Unknown"
    for area_name, polygon in areas.items():
        if polygon.contains(point):
            return area_name
        # Track closest polygon by centroid distance
        d = point.distance(polygon.centroid)
        if d < min_dist:
            min_dist = d
            closest_area = area_name
    return closest_area

def label_coordinates_batch(coordinates, areas):
    """
    Label multiple coordinates at once.
    
    Parameters:
    coordinates (list): List of (x, y) tuples
    areas (dict): Dictionary of area names mapped to Polygon objects
    
    Returns:
    list: List of area names corresponding to each coordinate
    """
    results = []
    for x, y in coordinates:
        label = label_coordinate(x, y, areas)
        results.append(label)
    return results


def plot_areas(areas):
    plt.figure(figsize=(12, 12))
    ax = plt.gca()
    colors = {}

    for area_name, polygon in areas.items():
        x, y = polygon.exterior.xy
        # Assign a random color to each polygon
        if area_name not in colors:
            colors[area_name] = (random.random(), random.random(), random.random())
        ax.fill(x, y, alpha=0.5, fc=colors[area_name], ec='black', linewidth=1, label=area_name)

        # Label the polygon at its centroid
        centroid = polygon.centroid
        plt.text(centroid.x, centroid.y, area_name, fontsize=10, ha='center', va='center')

    plt.xlabel('X Coordinate')
    plt.ylabel('Y Coordinate')
    plt.title('Map Areas Visualization')
    plt.legend(loc='upper right', bbox_to_anchor=(1.25, 1))
    plt.axis('equal')
    plt.grid(True)
    plt.show()

In [19]:
def add_teammates_alive(df):
    """
    Add teammates_alive column to dataframe.
    
    Args:
        df: DataFrame with 'tick', 'team_num', and 'is_alive' columns
        
    Returns:
        DataFrame with added 'teammates_alive' column
    """
    df['is_alive_bool'] = df['is_alive'].astype(bool)
    
    alive_counts_tm = (
        df[df['is_alive_bool']]
        .groupby(['tick', 'team_num'])
        .size()
        .reset_index(name='alive_count_tm')
    )
    
    df = df.merge(alive_counts_tm, how='left', on=['tick', 'team_num'])
    df['teammates_alive'] = df['alive_count_tm'].fillna(0).astype(int) - df['is_alive'].astype(int)
    df = df.drop(columns=['alive_count_tm'], errors='ignore')
    
    return df


def add_enemies_alive(df):
    """
    Add enemies_alive column to dataframe.
    
    Args:
        df: DataFrame with 'tick', 'team_num', and 'is_alive_bool' columns
        
    Returns:
        DataFrame with added 'enemies_alive' column
    """
    alive_counts_all = (
        df[df['is_alive_bool']]
        .groupby(['tick', 'team_num'])
        .size()
        .reset_index(name='alive_count_any_team')
    )
    
    total_alive_per_tick = (
        alive_counts_all
        .groupby('tick')['alive_count_any_team']
        .sum()
        .reset_index(name='total_alive')
    )
    
    alive_counts_all = alive_counts_all.merge(total_alive_per_tick, on='tick')
    alive_counts_all['enemies_alive'] = (
        alive_counts_all['total_alive'] - alive_counts_all['alive_count_any_team']
    )
    
    df = df.merge(
        alive_counts_all[['tick', 'team_num', 'enemies_alive']],
        how='left',
        on=['tick', 'team_num']
    )
    
    return df


def add_teammate_zones(df, label_col='current_zone', max_teammates=4):
    """
    Add teammate zone columns to dataframe.
    
    Args:
        df: DataFrame with 'tick', 'team_num', 'is_alive_bool', and label_col columns
        label_col: Name of the column containing zone labels
        max_teammates: Maximum number of teammates to track (default 4 for CS:GO)
        
    Returns:
        DataFrame with added 'teammate_1_zone', 'teammate_2_zone', etc. columns
    """
    # Aggregate all zones for each team at each tick
    teammate_zones = (
        df[df['is_alive_bool']]
        .groupby(['tick', 'team_num'])[label_col]
        .agg(lambda x: list(x))
        .reset_index()
        .rename(columns={label_col: 'teammate_zones_all'})
    )
    
    df = df.merge(teammate_zones, how='left', on=['tick', 'team_num'])
    
    # Extract teammate zones (excluding own zone) into separate columns
    def extract_teammate_zones(row):
        zones_all = row['teammate_zones_all']
        
        # Check if value is NaN or not a list
        if not isinstance(zones_all, list):
            zones = []
        else:
            zones = zones_all.copy()
            try:
                zones.remove(row[label_col])  # Remove player's own zone
            except ValueError:
                pass  # Own zone not in list (e.g., player is dead)
        
        # Pad or truncate to max_teammates
        zones = zones[:max_teammates]  # Truncate if more
        zones += [''] * (max_teammates - len(zones))  # Pad if fewer
        
        return pd.Series({f'teammate_{i+1}_zone': zone for i, zone in enumerate(zones)})
    
    # Apply and concatenate teammate zone columns
    teammate_cols = df.apply(extract_teammate_zones, axis=1)
    df = pd.concat([df, teammate_cols], axis=1)
    
    # Clean up temporary column
    df = df.drop(columns=['teammate_zones_all'], errors='ignore')
    
    return df


# Execution

## Scraping Master List of Demos

In [None]:
HLTV_BASE = "https://www.hltv.org"
Filters = "?startDate=2024-11-15&endDate=2025-11-15&stars=2&map=de_mirage"
RESULTS_URL = f"{HLTV_BASE}/results{Filters}"
REQUEST_DELAY_SEC = 5
print(RESULTS_URL)

if __name__ == "__main__":
    # Setup
    download_dir = pathlib.Path("./downloads")
    download_dir.mkdir(exist_ok=True)
    
    driver = setup_undetected_chrome(download_dir)
    
    try:
        # You may want to pass a less filtered results URL to see if this works!
        match_urls = find_match_urls_from_results(driver, max_matches=100, delay=5)
        if not match_urls:
            print("No matches found with the given parameters and filters.")
        else:
            matches = scrape_multiple_matches(driver, match_urls)
            if not matches:
                print("No match data extracted (possibly due to page change or filter).")
            else:
                df = save_match_data_to_csv(matches, "hltv_matches.csv")
                print("\n=== Summary ===")
                print(df)
    finally:
        driver.quit()

## Download, Extract, Parse

### Downloading

In [None]:
def download_and_extract(max_downloads=None, name_filter: str = None):
    base_dir = pathlib.Path("hltv_demos")
    download_root = base_dir / "downloads"
    extract_root = base_dir / "extracted"
    download_root.mkdir(parents=True, exist_ok=True)
    extract_root.mkdir(parents=True, exist_ok=True)

    REQUEST_DELAY_SEC = 5

    df = pd.read_csv("hltv_matches.csv")
    filtered = df[df["demo_url"].notnull()]
    print(f"Found {len(filtered)} demo links in CSV.")

    # Add or initialize columns if missing
    if 'dem_path' not in df.columns:
        df['dem_path'] = ""
    if 'match_name' not in df.columns:
        df['match_name'] = ""
    if 'parsed_status' not in df.columns:
        df['parsed_status'] = ""

    # Skip rows already parsed or marked to skip
    filtered = filtered[(df['parsed_status'] != "parsed") & (df['parsed_status'] != "skip")]

    # Limit number of downloads if specified
    if max_downloads is not None:
        filtered = filtered.iloc[:max_downloads]

    for idx, row in filtered.iterrows():
        match_url = row["match_url"]
        demo_url = row["demo_url"]
        print(f"\n === [{idx+1}/{len(filtered)}] {match_url} ===")
        print(f"Demo URL: {demo_url}")
        time.sleep(10)

        try:
            demo_number = get_demo_number_from_url(demo_url)
            demo_download_dir = download_root / demo_number
            demo_download_dir.mkdir(parents=True, exist_ok=True)

            driver = setup_undetected_chrome(download_dir=demo_download_dir, headless=False)
            archive_path = download_demo_with_selenium(driver, demo_url, demo_download_dir)
            match_name = archive_path.stem

            while any(demo_download_dir.glob("*.crdownload")):
                print("Waiting for partial downloads to finish...")
                time.sleep(30)

            before_extraction = set(extract_root.glob("*.dem"))
            dem_path = extract_dem_from_file(archive_path, extract_root, name_filter)
            after_extraction = set(extract_root.glob("*.dem"))
            new_dem_files = after_extraction - before_extraction

            for new_dem in new_dem_files:
                existing_dem_path = df.at[idx, 'dem_path']
                if pd.isna(existing_dem_path) or existing_dem_path == "":
                    df.at[idx, 'dem_path'] = str(new_dem)
                else:
                    print(f"âš  dem_path already exists for row {idx}, skipping update.")

                existing_match_name = df.at[idx, 'match_name']
                if pd.isna(existing_match_name) or existing_match_name == "":
                    df.at[idx, 'match_name'] = match_name
                else:
                    print(f"âš  match_name already exists for row {idx}, skipping update.")
            
            # Mark this row as 'downloaded' and ready for parsing or set your own logic here
            df.at[idx, 'Status'] = "downloaded"

            driver.quit()
        except Exception as e:
            print(f"âš  Skipping this demo due to error: {e}")
            df.at[idx, 'parsed_status'] = "Skip"

        time.sleep(REQUEST_DELAY_SEC)

    # Overwrite the original CSV
    df.to_csv("hltv_matches.csv", index=False)
    print("Updated hltv_matches.csv with enriched demo info and parsing status.")

In [None]:
base_dir = pathlib.Path("hltv_demos")
structured_root = base_dir / "structured"
structured_root.mkdir(parents=True, exist_ok=True)

demo_infos = download_and_extract(max_downloads=20, name_filter = "mirage")


### Parsing

In [None]:
base_dir = pathlib.Path("hltv_demos")
structured_root = base_dir / "structured"
structured_root.mkdir(parents=True, exist_ok=True)

# Load enriched hltv_matches.csv containing all demo metadata and parsing status
matches_csv_path = "hltv_matches.csv"
df_matches = pd.read_csv(matches_csv_path)

# Convert 'dem_path' strings to pathlib.Path objects for parsing function
df_matches['dem_path'] = df_matches['dem_path'].apply(lambda p: pathlib.Path(p) if pd.notna(p) and p != "" else None)

# Call parse_demos with the DataFrame instead of a separate demo_infos list
parse_demos(structured_root)

## Transformation

In [17]:
def label_all_ticks_in_dir(
    ticks_dir,
    areas,
    x_col="X",
    y_col="Y",
    label_col="current_zone"
):
    """
    Label all tick CSV files in a directory with zone and teammate information.
    
    Args:
        ticks_dir: Path to directory containing tick CSV files
        areas: Dict of area_name -> shapely Polygon for zone labeling
        x_col: Name of X coordinate column
        y_col: Name of Y coordinate column
        label_col: Name of the zone label column
    """
    ticks_dir = Path(ticks_dir)
    labeled_dir = ticks_dir.parent / "labeled"
    labeled_dir.mkdir(exist_ok=True)
    
    # Load and prepare matches tracking CSV
    matches_csv_path = "hltv_matches.csv"
    df_matches = pd.read_csv(matches_csv_path)
    
    if 'labeled_status' not in df_matches.columns:
        df_matches['labeled_status'] = pd.Series(dtype='string')
    else:
        df_matches['labeled_status'] = df_matches['labeled_status'].astype('string')
    
    # Process each tick file
    files = list(ticks_dir.glob("*.csv"))
    for file in files:
        dem_name_from_tick = file.stem.replace('_ticks', '')
        
        # Find matching row in hltv_matches.csv
        match_rows = df_matches[
            df_matches['dem_path'].apply(
                lambda p: pathlib.Path(p).stem if pd.notna(p) else ''
            ) == dem_name_from_tick
        ]
        
        # Check if already labeled or should skip
        if not match_rows.empty:
            val = match_rows.iloc[0].get('labeled_status')
            status = '' if pd.isna(val) else str(val).lower()
            
            if status in {"labeled", "skip"}:
                print(f"Skipping labeling for {file.name} due to status: '{status}'")
                continue
                
            row_index = match_rows.index[0]
        else:
            row_index = None
        
        # Process the file
        print(f"Processing {file.name} ...")
        df = pd.read_csv(file, low_memory=False)
        
        # Filter to every 16th tick and remove invalid coordinates
        df = df[df["tick"] % 16 == 0]
        df = df.dropna(subset=[x_col, y_col])
        
        # Label zones using coordinate labeling function
        df[label_col] = df.apply(
            lambda row: label_coordinate(row[x_col], row[y_col], areas), 
            axis=1
        )
        
        # Add all features
        df = add_teammates_alive(df)
        df = add_enemies_alive(df)
        df = add_teammate_zones(df, label_col=label_col)
        
        # Clean up temporary columns
        df = df.drop(columns=['is_alive_bool'], errors='ignore')
        
        # Save labeled file
        output_path = labeled_dir / file.name
        df.to_csv(output_path, index=False)
        print(f"Saved labeled CSV to {output_path}")
        
        # Update status in tracking CSV
        if row_index is not None:
            df_matches.at[row_index, 'labeled_status'] = 'labeled'
        else:
            print(f"Warning: No matching row found in hltv_matches.csv for {file.name}")
    
    # Save updated tracking CSV
    df_matches.to_csv(matches_csv_path, index=False)
    print(f"Processing complete. Updated {matches_csv_path}")



In [18]:
areas = load_map_data('map_bounds(in).csv')
labeled = label_all_ticks_in_dir("hltv_demos/structured/ticks", areas) 

Skipping labeling for astralis-vs-falcons-m2-mirage_ticks.csv due to status: 'labeled'
Skipping labeling for astralis-vs-natus-vincere-m2-mirage_ticks.csv due to status: 'labeled'
Skipping labeling for astralis-vs-vitality-m3-mirage_ticks.csv due to status: 'labeled'
Skipping labeling for aurora-vs-heroic-m1-mirage_ticks.csv due to status: 'labeled'
Skipping labeling for aurora-vs-legacy-m2-mirage_ticks.csv due to status: 'labeled'
Skipping labeling for aurora-vs-the-mongolz-m2-mirage_ticks.csv due to status: 'labeled'
Skipping labeling for falcons-vs-furia-m2-mirage_ticks.csv due to status: 'labeled'
Skipping labeling for furia-vs-natus-vincere-m1-mirage_ticks.csv due to status: 'labeled'
Skipping labeling for furia-vs-pain-m3-mirage_ticks.csv due to status: 'labeled'
Skipping labeling for furia-vs-the-mongolz-m1-mirage_ticks.csv due to status: 'labeled'
Skipping labeling for legacy-vs-liquid-m1-mirage_ticks.csv due to status: 'labeled'
Skipping labeling for mouz-vs-falcons-m3-mirage_

## Main

In [None]:
def main():
    base_dir = pathlib.Path("hltv_demos")
    download_root = base_dir / "downloads"
    extract_root = base_dir / "extracted"
    structured_root = base_dir / "structured"
    download_root.mkdir(parents=True, exist_ok=True)
    extract_root.mkdir(parents=True, exist_ok=True)
    structured_root.mkdir(parents=True, exist_ok=True)

    master_csv_path = structured_root / "master.csv"  # path to master file

    driver = setup_undetected_chrome(download_dir=download_root)

    try:
        match_urls = find_match_urls_from_results(driver, max_matches=20, delay=5)
        if not match_urls:
            raise RuntimeError("No match links found on results page.")

        # Step 1: Download, extract, and immediately parse each demo
        for idx, match_url in enumerate(match_urls, start=1):
            print(f"\n === [{idx}/{len(match_urls)}] {match_url} ===")
            try:
                demo_url = find_demo_download_url(driver, match_url)
                print(f"Demo URL: {demo_url}")

                archive_path = download_demo_with_selenium(driver, demo_url, download_root)
                
                # Extract match name from archive filename (e.g., .rar file stem)
                match_name = archive_path.stem
                
                # Track .dem files before extraction
                before_extraction = set(extract_root.glob("*.dem"))
                
                # Extract archive
                dem_path = extract_dem_from_file(archive_path, extract_root)
                
                # Find newly extracted .dem files
                after_extraction = set(extract_root.glob("*.dem"))
                new_dem_files = after_extraction - before_extraction
                
                # Parse each newly extracted demo immediately with match name
                for new_dem in new_dem_files:
                    try:
                        parse_demo(new_dem, structured_root, master_csv_path, match_name)
                    except Exception as e:
                        print(f"âš  Failed to parse demo {new_dem}: {e}")
                        
            except Exception as e:
                print(f"âš  Skipping this match due to error: {e}")

            time.sleep(REQUEST_DELAY_SEC)

    finally:
        print("Closing browser.")
        driver.quit()

In [None]:
if __name__ == "__main__":
    main()

# Coordinate Labeling

In [None]:
# Example usage
if __name__ == "__main__":
    # Load the map data
    areas = load_map_data('map_bounds(in).csv')
    
    print(f"Loaded {len(areas)} areas from the map\n")
    
    # Test with some example coordinates
    test_coordinates = [
        (-1, -600),   # Should be in t_spawn area
        (700, 2500),    # Should be in bombsite_A area
        (-2000, 2000),  # Should be in bombsite_B area
        (0, 0),         # Unknown area
        (600, 1000),    # Should be in long_A area
    ]
    
    print("Testing coordinates:")
    print("-" * 50)
    for x, y in test_coordinates:
        label = label_coordinate(x, y, areas)
        print(f"Coordinate ({x:6}, {y:6}) -> {label}")
    
    print("\n" + "=" * 50)
    print("\nBatch labeling example:")
    labels = label_coordinates_batch(test_coordinates, areas)
    for (x, y), label in zip(test_coordinates, labels):
        print(f"({x:6}, {y:6}): {label}")


In [None]:
areas = load_map_data('map_bounds(in).csv')
plot_areas(areas)