In [None]:
"""
FastF1 client that uses environment configuration
"""

import sys
from pathlib import Path

current = Path.cwd()

indicators = [
    '.git', 'pyproject.toml'
]

for parent in [current] + list(current.parents):
    if any((parent / indicator).exists() for indicator in indicators):
        project_root = parent

sys.path.insert(0, str(project_root))

from typing import List, Optional

import fastf1
import pandas as pd
from fastf1 import Cache

from config.logging import get_logger
from config.settings import fastf1_config

logger = get_logger(__name__)


class FastF1Client:
    """Extending the FastF1Client with data ingestion methods"""

    def __init__(self):
        self.config = fastf1_config

    def get_session(self, year: str, gp: str, session: str):
        """Get a session from the FastF1 API"""

        try:
            # Use FastF1's get_session function
            session_obj = fastf1.get_session(year, gp, session)

            # Configure session based on environment settings
            if hasattr(session_obj, "load"):
                # Load with telemetry based on configuration
                load_telemetry = self.config.enable_telemetry
                load_weather = self.config.enable_weather
                load_race_control_messages = self.config.enable_race_control_messages

                logger.info("Loading session %d %s %s", year, gp, session)
                logger.info("Telemetry: %s, Weather: %s", load_telemetry, load_weather)

                session_obj.load(
                    telemetry=load_telemetry,
                    weather=load_weather,
                    messages=load_race_control_messages,
                )

            return session_obj

        except Exception as e:
            logger.error("Error loading session %d %s %s: %s", year, gp, session, e)
            raise

    def get_season_schedule(self, year: int) -> pd.DataFrame:
        """Get season schedule with configuration"""
        try:
            schedule = fastf1.get_event_schedule(
                year=year, include_testing=self.config.include_testing
            )
            return schedule
        except Exception as e:
            logger.error("Error loading schedule for %d: %s", year, e)
            raise

    def get_multiple_sessions(
        self, year: int, events: List[str], session_types: Optional[List[str]] = None
    ) -> dict:
        """Get multiple sessions efficiently"""

        if session_types is None:
            session_types = self.config.session_types

        sessions = {}

        for event in events:
            sessions[event] = {}
            for session_type in session_types:
                try:
                    session_obj = self.get_session(year, event, session_type)
                    sessions[event][session_type] = session_obj
                    logger.info("Loaded %d %s %s", year, event, session_type)
                except Exception as e:
                    logger.warning(
                        "Could not load %d %s %s: %s", year, event, session_type, e
                    )
                    sessions[event][session_type] = None

        return sessions

    def cache_info(self) -> tuple:
        """Get cache information"""
        return Cache.get_cache_info()

    def clear_cache(self, deep: bool = False):
        """Clear FastF1 cache"""
        Cache.clear_cache(deep=deep)
        logger.info("Cache cleared (deep=%s)", deep)


In [1]:
"""
Session-specific data loader for F1 sessions
"""

import sys
from pathlib import Path

current = Path.cwd()

indicators = [
    '.git', 'pyproject.toml'
]

for parent in [current] + list(current.parents):
    if any((parent / indicator).exists() for indicator in indicators):
        project_root = parent

sys.path.insert(0, str(project_root))

import json
import logging
from typing import Any, Dict, List, Optional

import pandas as pd

from config.settings import data_config, fastf1_config
from src.data_ingestion.fastf1_client import FastF1Client
from src.utils.decorators import log_operation, measure_time
from src.utils.helpers import ensure_directory, save_data
from src.utils.logger import log_data_info


class SessionLoader:
    """Loads and processes F1 session data"""

    def __init__(self, client: Optional[FastF1Client] = None):
        self.client = client or FastF1Client()
        self.config = fastf1_config
        self.data_config = data_config
        self.logger = logging.getLogger("data_ingestion.session_loader")

    @log_operation
    def load_session_data(self, year: int, gp: str, session: str) -> Dict[str, Any]:
        """Load comprehensive session data"""

        self.logger.info("Loading session data: %d %s %s", year, gp, session)

        try:
            # Get session from FastF1
            session_obj = self.client.get_session(year, gp, session)

            # Extract different types of data
            session_data = {
                "session_info": self._extract_session_info(session_obj),
                "laps": self._extract_lap_data(session_obj),
                "results": self._extract_session_results(session_obj),
                "weather": self._extract_weather_data(session_obj),
                "telemetry": self._extract_telemetry_summary(session_obj),
            }

            # Save data
            self._save_session_data(session_data, year, gp, session)

            self.logger.info(
                "Successfully loaded session data: %d %s %s", year, gp, session
            )
            return session_data

        except Exception as e:
            self.logger.error(
                "Failed to load session data %d %s %s: %s", year, gp, session, str(e)
            )
            raise

    def _extract_session_info(self, session_obj) -> Dict[str, Any]:
        """Extract basic session information"""

        self.logger.debug("Extracting session info")

        session_info = {
            "event_name": session_obj.event.EventName
            if hasattr(session_obj, "event")
            else None,
            "location": session_obj.event.Location
            if hasattr(session_obj, "event")
            else None,
            "country": session_obj.event.Country
            if hasattr(session_obj, "event")
            else None,
            "session_name": session_obj.name if hasattr(session_obj, "name") else None,
            "session_date": session_obj.date if hasattr(session_obj, "date") else None,
        }

        # Add event metadata if available
        if hasattr(session_obj, "event"):
            event = session_obj.event
            additional_info = {
                "event_format": getattr(event, "EventFormat", None),
                "round_number": getattr(event, "RoundNumber", None),
                "official_event_name": getattr(event, "OfficialEventName", None),
            }
            session_info.update(additional_info)

        self.logger.debug("Extracted session info: %s", session_info)
        return session_info

    def _extract_lap_data(self, session_obj) -> Optional[pd.DataFrame]:
        """Extract lap timing data"""

        self.logger.debug("Extracting lap data")

        if not hasattr(session_obj, "laps") or session_obj.laps.empty:
            self.logger.warning("No lap data available")
            return None

        laps = session_obj.laps.copy()

        # Add computed columns
        if "LapTime" in laps.columns:
            laps["LapTimeSeconds"] = laps["LapTime"].dt.total_seconds()

        # Add session metadata to each lap
        laps["EventName"] = (
            session_obj.event.EventName if hasattr(session_obj, "event") else None
        )
        laps["SessionName"] = session_obj.name if hasattr(session_obj, "name") else None
        laps["SessionDate"] = session_obj.date if hasattr(session_obj, "date") else None

        log_data_info(laps, "Lap Data", self.logger)
        return laps

    def _extract_session_results(self, session_obj) -> Optional[pd.DataFrame]:
        """Extract session results"""

        self.logger.debug("Extracting session results")

        if not hasattr(session_obj, "results") or session_obj.results.empty:
            self.logger.warning("No results data available")
            return None

        results = session_obj.results.copy()

        # Add session metadata
        results["EventName"] = (
            session_obj.event.EventName if hasattr(session_obj, "event") else None
        )
        results["SessionName"] = (
            session_obj.name if hasattr(session_obj, "name") else None
        )
        results["SessionDate"] = (
            session_obj.date if hasattr(session_obj, "date") else None
        )

        log_data_info(results, "Session Results", self.logger)
        return results

    def _extract_weather_data(self, session_obj) -> Optional[pd.DataFrame]:
        """Extract weather data"""

        if not self.config.enable_weather:
            self.logger.debug("Weather data disabled by configuration")
            return None

        self.logger.debug("Extracting weather data")

        if not hasattr(session_obj, "weather_data") or session_obj.weather_data.empty:
            self.logger.warning("No weather data available")
            return None

        weather = session_obj.weather_data.copy()

        # Add session metadata
        weather["EventName"] = (
            session_obj.event.EventName if hasattr(session_obj, "event") else None
        )
        weather["SessionName"] = (
            session_obj.name if hasattr(session_obj, "name") else None
        )
        weather["SessionDate"] = (
            session_obj.date if hasattr(session_obj, "date") else None
        )

        log_data_info(weather, "Weather Data", self.logger)
        return weather

    def _extract_telemetry_summary(self, session_obj) -> Dict[str, Any]:
        """Extract telemetry summary information"""

        if not self.config.enable_telemetry:
            self.logger.debug("Telemetry processing disabled by configuration")
            return {}

        self.logger.debug("Extracting telemetry summary")

        telemetry_summary = {
            "telemetry_available": False,
            "fastest_lap_telemetry": None,
            "total_telemetry_points": 0,
        }

        try:
            if hasattr(session_obj, "laps") and not session_obj.laps.empty:
                # Get fastest lap
                fastest_lap = session_obj.laps.pick_fastest()
                if fastest_lap is not None and hasattr(fastest_lap, "get_telemetry"):
                    telemetry = fastest_lap.get_telemetry()
                    if telemetry is not None and not telemetry.empty:
                        telemetry_summary["telemetry_available"] = True
                        telemetry_summary["total_telemetry_points"] = len(telemetry)
                        telemetry_summary["fastest_lap_driver"] = fastest_lap.get(
                            "Driver", "Unknown"
                        )
                        telemetry_summary["fastest_lap_time"] = fastest_lap.get(
                            "LapTime"
                        )

                        # Basic telemetry stats
                        if "Speed" in telemetry.columns:
                            telemetry_summary["max_speed"] = telemetry["Speed"].max()
                            telemetry_summary["avg_speed"] = telemetry["Speed"].mean()

                        if "RPM" in telemetry.columns:
                            telemetry_summary["max_rpm"] = telemetry["RPM"].max()

        except Exception as e:
            self.logger.warning("Error extracting telemetry summary: %s", str(e))

        self.logger.debug("Telemetry summary: %s", telemetry_summary)
        return telemetry_summary

    def _save_session_data(
        self, session_data: Dict[str, Any], year: int, gp: str, session: str
    ):
        """Save session data to files"""

        self.logger.debug("Saving session data: %d %s %s", year, gp, session)

        # Create directory structure
        base_path = self.data_config.raw_data_path / str(year) / gp / session
        ensure_directory(base_path)

        # Save each data component
        for data_type, data in session_data.items():
            if data is None:
                continue

            filepath = base_path / f"{data_type}.{self.data_config.file_format}"

            if isinstance(data, pd.DataFrame):
                save_data(
                    data,
                    filepath,
                    self.data_config.file_format,
                    self.data_config.compression,
                )
            elif isinstance(data, dict):
                with open(filepath.with_suffix(".json"), "w", encoding="utf-8") as f:
                    json.dump(data, f, indent=2, default=str)
                self.logger.debug("Saved dict data: %s", filepath.with_suffix(".json"))

    @measure_time
    def load_multiple_sessions(
        self, year: int, events: List[str], session_types: Optional[List[str]] = None
    ) -> Dict[str, Dict[str, Any]]:
        """Load multiple sessions efficiently"""

        if session_types is None:
            session_types = self.config.session_types

        self.logger.info(
            "Loading multiple sessions: %d, %d events, %d session types",
            year,
            len(events),
            len(session_types),
        )

        all_data = {}

        for event in events:
            self.logger.info("Processing event: %s", event)
            all_data[event] = {}

            for session_type in session_types:
                try:
                    session_data = self.load_session_data(year, event, session_type)
                    all_data[event][session_type] = session_data

                except Exception as e:
                    self.logger.warning(
                        "Failed to load %d %s %s: %s", year, event, session_type, str(e)
                    )
                    all_data[event][session_type] = None

        self.logger.info("Completed loading multiple sessions for %d", year)
        return all_data

    @log_operation
    def get_session_summary(self, year: int, gp: str, session: str) -> Dict[str, Any]:
        """Get a summary of session without loading full data"""

        self.logger.debug("Getting session summary: %d %s %s", year, gp, session)

        try:
            session_obj = self.client.get_session(year, gp, session)

            summary = {
                "event_name": session_obj.event.EventName
                if hasattr(session_obj, "event")
                else None,
                "session_name": session_obj.name
                if hasattr(session_obj, "name")
                else None,
                "session_date": session_obj.date
                if hasattr(session_obj, "date")
                else None,
                "has_laps": hasattr(session_obj, "laps") and not session_obj.laps.empty,
                "has_results": hasattr(session_obj, "results")
                and not session_obj.results.empty,
                "has_weather": hasattr(session_obj, "weather_data")
                and not session_obj.weather_data.empty,
                "lap_count": len(session_obj.laps)
                if hasattr(session_obj, "laps")
                else 0,
                "driver_count": len(session_obj.results)
                if hasattr(session_obj, "results")
                else 0,
            }

            return summary

        except Exception as e:
            self.logger.error(
                "Failed to get session summary %d %s %s: %s", year, gp, session, str(e)
            )
            return {}


In [2]:
client = FastF1Client()

In [3]:
session_loader = SessionLoader()

In [None]:
session_summary = session_loader.get_session_summary(2022, "Monaco", "Q")

In [None]:
session_summary

In [4]:
session_data = session_loader.load_session_data(2022, "Monaco", "Q")

core           INFO 	Loading data for Monaco Grand Prix - Qualifying [v3.6.1]
req            INFO 	Using cached data for session_info
req            INFO 	Using cached data for driver_info
req            INFO 	Using cached data for session_status_data
req            INFO 	Using cached data for track_status_data
req            INFO 	Using cached data for _extended_timing_data
req            INFO 	Using cached data for timing_app_data
core           INFO 	Processing timing data...
req            INFO 	Using cached data for car_data
req            INFO 	Using cached data for position_data
req            INFO 	Using cached data for weather_data
req            INFO 	Using cached data for race_control_messages
core           INFO 	Finished loading data for 20 drivers: ['16', '55', '11', '1', '4', '63', '14', '44', '5', '31', '22', '77', '20', '3', '47', '23', '10', '18', '6', '24']
Lap Data - Missing data: {'LapTime': 98, 'PitOutTime': 301, 'PitInTime': 303, 'Sector1Time': 7, 'Sector2Time': 

In [None]:
session_data

In [None]:
type(session_data)

In [None]:
session_data.keys()

In [None]:
session_data['session_info']

In [None]:
session_data['laps']

In [None]:
for data_type, data in session_data.items():
    print(data_type)

In [7]:
laps = session_data['laps']
laps.head()

Unnamed: 0,Time,Driver,DriverNumber,LapTime,LapNumber,Stint,PitOutTime,PitInTime,Sector1Time,Sector2Time,...,TrackStatus,Position,Deleted,DeletedReason,FastF1Generated,IsAccurate,LapTimeSeconds,EventName,SessionName,SessionDate
0,0 days 00:19:23.424000,LEC,16,0 days 00:02:02.245000,1.0,1.0,0 days 00:17:21.837000,NaT,0 days 00:00:41.672000,0 days 00:00:53.110000,...,1,,False,,False,False,122.245,Monaco Grand Prix,Qualifying,2022-05-28 14:00:00
1,0 days 00:20:38.316000,LEC,16,0 days 00:01:14.892000,2.0,1.0,NaT,NaT,0 days 00:00:20.134000,0 days 00:00:35.102000,...,1,,False,,False,True,74.892,Monaco Grand Prix,Qualifying,2022-05-28 14:00:00
2,0 days 00:22:23.959000,LEC,16,0 days 00:01:45.643000,3.0,1.0,NaT,NaT,0 days 00:00:34.902000,0 days 00:00:46.629000,...,1,,False,,False,True,105.643,Monaco Grand Prix,Qualifying,2022-05-28 14:00:00
3,0 days 00:23:36.898000,LEC,16,0 days 00:01:12.939000,4.0,1.0,NaT,NaT,0 days 00:00:19.351000,0 days 00:00:34.263000,...,1,,False,,False,True,72.939,Monaco Grand Prix,Qualifying,2022-05-28 14:00:00
4,0 days 00:25:17.259000,LEC,16,0 days 00:01:40.361000,5.0,1.0,NaT,NaT,0 days 00:00:27.058000,0 days 00:00:48.237000,...,1,,False,,False,True,100.361,Monaco Grand Prix,Qualifying,2022-05-28 14:00:00


In [None]:
laps.to_parquet("test_laps.parquet", compression='snappy', index=False)

In [5]:
results = session_data["results"]
results.head()

Unnamed: 0,DriverNumber,BroadcastName,Abbreviation,DriverId,TeamName,TeamColor,TeamId,FirstName,LastName,FullName,...,Q1,Q2,Q3,Time,Status,Points,Laps,EventName,SessionName,SessionDate
16,16,C LECLERC,LEC,leclerc,Ferrari,ed1c24,ferrari,Charles,Leclerc,Charles Leclerc,...,0 days 00:01:12.569000,0 days 00:01:11.864000,0 days 00:01:11.376000,NaT,,,,Monaco Grand Prix,Qualifying,2022-05-28 14:00:00
55,55,C SAINZ,SAI,sainz,Ferrari,ed1c24,ferrari,Carlos,Sainz,Carlos Sainz,...,0 days 00:01:12.616000,0 days 00:01:12.074000,0 days 00:01:11.601000,NaT,,,,Monaco Grand Prix,Qualifying,2022-05-28 14:00:00
11,11,S PEREZ,PER,perez,Red Bull Racing,1e5bc6,red_bull,Sergio,Perez,Sergio Perez,...,0 days 00:01:13.004000,0 days 00:01:11.954000,0 days 00:01:11.629000,NaT,,,,Monaco Grand Prix,Qualifying,2022-05-28 14:00:00
1,1,M VERSTAPPEN,VER,max_verstappen,Red Bull Racing,1e5bc6,red_bull,Max,Verstappen,Max Verstappen,...,0 days 00:01:12.993000,0 days 00:01:12.117000,0 days 00:01:11.666000,NaT,,,,Monaco Grand Prix,Qualifying,2022-05-28 14:00:00
4,4,L NORRIS,NOR,norris,McLaren,f58020,mclaren,Lando,Norris,Lando Norris,...,0 days 00:01:12.927000,0 days 00:01:12.266000,0 days 00:01:11.849000,NaT,,,,Monaco Grand Prix,Qualifying,2022-05-28 14:00:00


In [6]:
weather = session_data['weather']
weather.head()

Unnamed: 0,Time,AirTemp,Humidity,Pressure,Rainfall,TrackTemp,WindDirection,WindSpeed,EventName,SessionName,SessionDate
0,0 days 00:00:59.917000,27.6,37.0,1004.0,False,55.3,344,0.8,Monaco Grand Prix,Qualifying,2022-05-28 14:00:00
1,0 days 00:01:59.913000,27.6,37.0,1004.0,False,55.3,3,0.8,Monaco Grand Prix,Qualifying,2022-05-28 14:00:00
2,0 days 00:02:59.934000,27.6,38.0,1004.0,False,54.8,328,0.6,Monaco Grand Prix,Qualifying,2022-05-28 14:00:00
3,0 days 00:03:59.929000,27.6,39.0,1004.0,False,54.8,313,1.0,Monaco Grand Prix,Qualifying,2022-05-28 14:00:00
4,0 days 00:04:59.925000,27.5,39.0,1004.2,False,53.0,303,1.1,Monaco Grand Prix,Qualifying,2022-05-28 14:00:00


In [8]:
all_data = session_loader.load_multiple_sessions(2022, ["Bahrain Grand Prix", "Monaco Grand Prix"], ["Q", "R"])

core           INFO 	Loading data for Bahrain Grand Prix - Qualifying [v3.6.1]
req            INFO 	No cached data found for session_info. Loading data...
_api           INFO 	Fetching session info data...
req            INFO 	Data has been written to cache!
req            INFO 	No cached data found for driver_info. Loading data...
_api           INFO 	Fetching driver list...
req            INFO 	Data has been written to cache!
req            INFO 	No cached data found for session_status_data. Loading data...
_api           INFO 	Fetching session status data...
req            INFO 	Data has been written to cache!
req            INFO 	No cached data found for track_status_data. Loading data...
_api           INFO 	Fetching track status data...
req            INFO 	Data has been written to cache!
req            INFO 	No cached data found for _extended_timing_data. Loading data...
_api           INFO 	Fetching timing data...
_api           INFO 	Parsing timing data...
req            INFO 