# Solar Irradiance From AC Export

A Jupyter Notebook that does it's best to model and construct a historical solar irradiance time series from solar panel park's historical AC export data.

## 1. Project Setup

### 1.1 Imports

In [None]:
# --- Imports ---

# Standard Library Imports
from pathlib import Path
import os

# Third-Party Library Imports
import yaml
import pandas as pd
import plotly.io as pio
from dotenv import load_dotenv

print("✅ Libraries loaded successfully.")

***

### 1.2 Configuration

This project uses a two-step configuration process:

1.  **Path Definition (`.env`):** This file defines the project's physical location (`PROJECT_ROOT`) and the name of the configuration file. This separation ensures the notebook is portable across different machines and environments.
2.  **Parameter Definition (`config.yml`):** This file contains the physical and electrical parameters of your solar park(s), including sensitive information like GPS coordinates and detailed system specifications.

**To get started:**

1.  **Configure Paths:** Copy the template file `.env.example` to a new file named `.env`. Open `.env` and set the absolute path for the `PROJECT_ROOT` variable.
2.  **Configure Parks:** Copy the example configuration file `config.example.yml` to `config.yml`. Open `config.yml` and replace the placeholder values with the details of your solar installation.

The cell below loads the environment variables, resolves the final configuration path, and sets up the plotting environment.

In [None]:
# --- Configuration ---

# Load environment variables from .env file
load_dotenv()

# Define paths using environment variables
PROJECT_ROOT_STR = os.getenv("PROJECT_ROOT")
CONFIG_FILENAME = os.getenv("CONFIG_FILENAME", "config.yml")  # Fallback to config.yml
PRODUCTION_AND_PRICE_FILE_PATH = os.getenv(
    "PRODUCTION_AND_PRICE_FILE_PATH",
    "/home/user/solar-irradiance-from-ac-export/production.csv",
)
WEATHER_FILE_PATH = os.getenv(
    "WEATHER_FILE_PATH", "/home/user/solar-irradiance-from-ac-export/weather.csv"
)

if not PROJECT_ROOT_STR:
    # If PROJECT_ROOT is not set in .env, assume the current working directory
    PROJECT_ROOT_STR = os.getcwd()
    print(
        f"⚠️ WARNING: PROJECT_ROOT not set in .env. Using current directory: {PROJECT_ROOT_STR}"
    )

PROJECT_ROOT = Path(PROJECT_ROOT_STR)
CONFIG_PATH = PROJECT_ROOT / CONFIG_FILENAME

print(f"Project Root defined as: {PROJECT_ROOT}")
print(f"Configuration file path: {CONFIG_PATH}")

try:
    with open(CONFIG_PATH, "r", encoding="utf-8") as f:
        config = yaml.safe_load(f)

    # Extract park configurations
    PARK_CONFIGS = config.get("parks", {})

    if not PARK_CONFIGS:
        raise ValueError(
            "No parks defined under the 'parks' key in the configuration file."
        )

    # Create a list of park names for easy iteration later
    PARK_NAMES = list(PARK_CONFIGS.keys())

    # --- Load and Validate Target Park for Analysis ---
    TARGET_PARK_NAME = os.getenv("TARGET_PARK_NAME")

    if not TARGET_PARK_NAME:
        raise ValueError("TARGET_PARK_NAME is not set in the .env file. Please specify which park to analyze.")

    if TARGET_PARK_NAME not in PARK_NAMES:
        raise ValueError(
            f"The target park '{TARGET_PARK_NAME}' defined in .env is not found in 'config.yml'.\n"
            f"Available parks in config: {PARK_NAMES}"
        )

    print(f"🎯 Analysis will be performed for target park: '{TARGET_PARK_NAME}'")

    print(
        f"✅ Configuration loaded successfully from '{CONFIG_PATH}' for {len(PARK_NAMES)} park(s): {', '.join(PARK_NAMES)}."
    )

except FileNotFoundError:
    print(f"❌ CONFIGURATION ERROR: The '{CONFIG_PATH}' file was not found.")
    print(
        "Please check your .env file's PROJECT_ROOT setting, and ensure 'config.yml' exists at that location."
    )
    print(
        "If 'config.yml' is missing, copy 'config.example.yml' to 'config.yml' and fill in your park's details."
    )
except (yaml.YAMLError, ValueError) as e:
    print(
        f"❌ CONFIGURATION ERROR: Could not parse '{CONFIG_PATH}'. Please check its format. Details: {e}"
    )


# --- Plotting and Display Configuration ---
pio.templates.default = "plotly_dark"

# Set display options for better viewing in Jupyter
pd.set_option("display.max_columns", None)
pd.set_option("display.width", 1000)

print("Plotting and display options set.")

***
***

## 2. Data Loading

### Helper Functions

In [None]:
# --- Data Loading Helper Function ---


def load_park_specific_data(
    file_path: str,
    timestamp_col: str,
    park_name_col: str,
    required_data_cols: list[str],
    target_park_name: str,
    data_name: str,
) -> pd.DataFrame:
    """
    Loads, validates, and filters data for a single specified park from a long-format CSV.

    Args:
        file_path (str): Absolute path to the CSV file.
        timestamp_col (str): Name of the timestamp column.
        park_name_col (str): Name of the park identifier column.
        required_data_cols (list): List of required data column names.
        target_park_name (str): The specific park to extract data for.
        data_name (str): A descriptive name for the data (e.g., "Production").

    Returns:
        pandas.DataFrame: A DataFrame containing only the data for the target park,
                          with the park_name column removed. Returns an empty
                          DataFrame on failure.
    """
    print(f"--- Loading {data_name} Data for '{target_park_name}' ---")
    print(f"Attempting to load from: {file_path}")

    try:
        # 1. Load the full CSV
        df = pd.read_csv(
            file_path, parse_dates=[timestamp_col], index_col=timestamp_col
        )

        # 2. Basic Column Check
        all_required_cols = required_data_cols + [park_name_col]
        if not all(col in df.columns for col in all_required_cols):
            missing = [col for col in all_required_cols if col not in df.columns]
            raise ValueError(f"Missing required columns in {data_name} CSV: {missing}")

        # 3. Data Cleaning and Validation
        df.index = pd.to_datetime(df.index, utc=True)
        df = df.dropna(subset=[park_name_col])
        df[park_name_col] = df[park_name_col].astype(str)

        # 4. Check if Target Park Exists in Data
        if target_park_name not in df[park_name_col].unique():
            raise ValueError(
                f"Target park '{target_park_name}' not found in the {data_name} file."
            )

        # 5. Filter for Target Park and Finalize
        df_park = df[df[park_name_col] == target_park_name].copy()

        # Drop the now-redundant park name column
        df_park = df_park.drop(columns=[park_name_col])

        df_park = df_park.sort_index()
        print(f"✅ {data_name} data for '{target_park_name}' loaded successfully.")
        print(f"   Shape of final DataFrame: {df_park.shape}")
        print(f"   Time range: {df_park.index.min()} to {df_park.index.max()}")
        print("Sample:")
        print(df_park.sample(n=5))
        return df_park

    except FileNotFoundError:
        print(f"❌ DATA ERROR: The {data_name} file was not found at: {file_path}")
        return pd.DataFrame()
    except Exception as e:
        print(f"❌ AN UNEXPECTED ERROR OCCURRED during {data_name} data loading: {e}")
        return pd.DataFrame()

print("✅ Helper function load_park_specific_data defined.")

***

### 2.1 Hourly Production And Spot Price Data

In [None]:
# --- Load Production and Price Data ---

# Define required column names for production data
COL_TIMESTAMP = "timestamp_utc"
COL_PARK_NAME = "park_name"
PRODUCTION_DATA_COLS = ["ac_export_kwh", "spot_price_eur_mwh"]

# Load the data for the target park using the helper function
df_production = load_park_specific_data(
    file_path=PRODUCTION_AND_PRICE_FILE_PATH,
    timestamp_col=COL_TIMESTAMP,
    park_name_col=COL_PARK_NAME,
    required_data_cols=PRODUCTION_DATA_COLS,
    target_park_name=TARGET_PARK_NAME,  # type: ignore
    data_name="Production & Price",
)
assert isinstance(df_production.index, pd.DatetimeIndex)

***

### 2.2 Load Hourly Weather Data

In [None]:
# --- Load and Crop Weather Data ---

# Define required column names for weather data
WEATHER_DATA_COLS = ["temp_air_c", "wind_speed_m_s", "pressure_hpa", "ghi_w_m2"]

# Load the weather data for the target park using the helper function
df_weather = load_park_specific_data(
    file_path=WEATHER_FILE_PATH,
    timestamp_col=COL_TIMESTAMP,
    park_name_col=COL_PARK_NAME,
    required_data_cols=WEATHER_DATA_COLS,
    target_park_name=TARGET_PARK_NAME,  # type: ignore
    data_name="Weather",
)
assert isinstance(df_weather.index, pd.DatetimeIndex)

# Post-processing: Crop the weather data to the production time range
if not df_production.empty and not df_weather.empty:
    start_time = df_production.index.min()
    end_time = df_production.index.max()

    original_rows = len(df_weather)
    df_weather = df_weather.loc[start_time:end_time].copy()

    print(f"\nWeather data cropped to production time range.")
    print(f"   Original rows: {original_rows}, Cropped rows: {len(df_weather)}")
    print(f"   New time range: {df_weather.index.min()} to {df_weather.index.max()}")

***
***

## 3. Data Upsampling and Feature Engineering 

### Helper Functions 

In [None]:
# --- Interpolation Helper Function ---

from typing import Literal
import numpy as np


def interpolate_by_gap_size(
    data: pd.Series | pd.DataFrame,
    max_gap_size: int = 1,
    method: Literal[
        "linear",
        "time",
        "index",
        "values",
        "pad",
        "nearest",
        "zero",
        "slinear",
        "quadratic",
        "cubic",
        "barycentric",
        "polynomial",
        "spline",
        "krogh",
        "piecewise_polynomial",
        "pchip",
        "akima",
        "cubicspline",
        "from_derivatives",
    ] = "linear",
    **kwargs
) -> pd.Series | pd.DataFrame:
    """
    Interpolates NaN values in a Series or DataFrame, but only for gaps
    that are less than or equal to a specified maximum size.

    For a DataFrame, the operation is applied column-wise.

    Args:
        data (pd.Series | pd.DataFrame): Input data with potential NaNs.
        max_gap_size (int): Max consecutive NaNs to interpolate. Gaps larger
                            than this are ignored. Defaults to 1.
        method (str): Interpolation technique (see pandas.Series.interpolate).
                      Defaults to "linear".
        **kwargs: Additional keyword arguments for the interpolate() method.

    Returns:
        pd.Series | pd.DataFrame: New object with specified gaps filled.
    """
    if not isinstance(data, (pd.Series, pd.DataFrame)):
        raise TypeError("Input `data` must be a pandas Series or DataFrame.")
    if not isinstance(max_gap_size, int) or max_gap_size <= 0:
        raise ValueError("`max_gap_size` must be a positive integer.")

    if isinstance(data, pd.Series):
        if data.empty or data.isna().sum() == 0:
            return data.copy()

        # Identify gaps by creating groups based on non-NaN values
        grouper = data.notna().cumsum()
        # Calculate the size of each NaN gap
        gap_sizes = data.isna().groupby(grouper).transform("sum")
        # Create a boolean mask for NaNs that are part of small-enough gaps
        mask_to_fill = data.isna() & (gap_sizes <= max_gap_size)

        # Interpolate the entire series to get the fill values
        fully_interpolated = data.interpolate(method=method, **kwargs) # type: ignore

        # Apply the fill values only where the mask is True
        return data.where(~mask_to_fill, fully_interpolated)

    # If it's a DataFrame, apply this function to each column
    return data.apply(
        lambda col: interpolate_by_gap_size(
            col, max_gap_size=max_gap_size, method=method, **kwargs
        )
    )


print("✅ Helper function interpolate_by_gap_size defined.")

***

### 3.1 Upsample Production Data 

In [None]:
# --- Upsample Production and Price Data to 30-Minute Frequency ---

# Check if the production dataframe exists and is not empty before proceeding
if "df_production" in locals() and not df_production.empty:
    print(f"--- Upsampling data for park: '{TARGET_PARK_NAME}' ---")

    # 1. Determine the actual production period to avoid creating a massive index
    first_prod_time: pd.Timestamp = df_production[df_production["ac_export_kwh"] > 0].index.min()
    last_prod_time: pd.Timestamp = df_production[df_production["ac_export_kwh"] > 0].index.max()

    if pd.isna(first_prod_time) or pd.isna(last_prod_time):
        print("⚠️ No valid production data (> 0 kWh) found. Cannot proceed.")
        df_production_30min = pd.DataFrame()
    else:
        print(f"   - Production data range: {first_prod_time} to {last_prod_time}")

        # Crop the hourly data to the actual production period
        df_prod_cropped = df_production.loc[first_prod_time:last_prod_time]

        # Create a full 30-minute time range for this period
        full_30min_range = pd.date_range(
            start=first_prod_time, end=last_prod_time, freq="30min", tz="UTC"
        )

        # 2. Upsample spot price using forward-fill
        # The price at HH:30 is the same as the price at HH:00
        price_30min = (
            df_prod_cropped["spot_price_eur_mwh"]
            .reindex(full_30min_range)
            .ffill()
            .to_frame()
        )

        # 3. Upsample production data from hourly kWh to 30-min average Power (W)
        # Convert kWh (energy) to W (power) for the hourly interval
        power_w_hourly = df_prod_cropped["ac_export_kwh"] * 1000

        # Shift the index 30 mins forward. The power calculated from the interval
        # [HH:00, HH+1:00] is best represented at the midpoint, HH:30.
        power_w_hourly.index = power_w_hourly.index + pd.Timedelta(minutes=30)

        # Reindex to the full 30-min range. This introduces NaNs at HH:00.
        # Use interpolate_by_gap_size(max_gap_size=1) to linearly fill only the
        # points at HH:00, which are surrounded by two HH:30 points.
        power_w_30min = interpolate_by_gap_size(
            power_w_hourly.reindex(full_30min_range), max_gap_size=1, method="linear"
        ).to_frame("ac_export_w") # type: ignore

        # 4. Combine the upsampled series into a single DataFrame
        df_production_30min = price_30min.join(power_w_30min)

        print("\n✅ Production and price data upsampled to 30-minute frequency.")
        print(f"   - Shape of new DataFrame: {df_production_30min.shape}")
        print(
            f"   - New time range: {df_production_30min.index.min()} to {df_production_30min.index.max()}"
        )
        print("   - Sample data:")
        print(df_production_30min.sample(n=5))

else:
    print("⚠️ Skipping upsampling: `df_production` is not defined or is empty.")
    df_production_30min = pd.DataFrame()

***

### 3.2 Upsample Weather Data

In [None]:
# --- Upsample Weather Data to 30-Minute Frequency ---

# Check if both required dataframes are available before proceeding
if (
    "df_weather" in locals()
    and not df_weather.empty
    and "df_production_30min" in locals()
    and not df_production_30min.empty
):

    print("--- Upsampling weather data ---")

    # 1. Use the index from the upsampled production data for perfect alignment
    target_30min_range = df_production_30min.index
    print(
        f"   - Aligning to target time range: {target_30min_range.min()} to {target_30min_range.max()}"
    )

    # 2. Handle standard weather variables with linear interpolation
    standard_weather_cols = ["temp_air_c", "wind_speed_m_s", "pressure_hpa"]
    df_weather_standard_30min = interpolate_by_gap_size(
        df_weather[standard_weather_cols].reindex(target_30min_range),
        max_gap_size=1,
        method="linear",
    )

    # 3. Handle GHI with a time shift before interpolation
    # The hourly GHI value at HH:00 represents the average over the interval [HH-1:00, HH:00].
    # We shift the timestamp back 30 mins to place this value at the interval's midpoint (HH-1:30).
    ghi_hourly = df_weather[["ghi_w_m2"]].copy()
    assert isinstance(ghi_hourly.index, pd.DatetimeIndex)
    ghi_hourly.index = ghi_hourly.index - pd.Timedelta(minutes=30)

    # Reindex and interpolate. This will correctly fill the values at the top of the hour (HH:00).
    df_ghi_30min = interpolate_by_gap_size(
        ghi_hourly.reindex(target_30min_range), max_gap_size=1, method="linear"
    )

    # 4. Combine all upsampled weather series
    df_weather_30min = df_weather_standard_30min.join(df_ghi_30min)

    print("\n✅ Weather data upsampled to 30-minute frequency.")
    print(f"   - Shape of new DataFrame: {df_weather_30min.shape}")
    print("   - Sample data:")
    print(df_weather_30min.sample(n=5))

else:
    print(
        "⚠️ Skipping weather upsampling: `df_weather` or `df_production_30min` is not available."
    )
    df_weather_30min = pd.DataFrame()

***

### 3.3 Model PVLIB Features

In [None]:
# --- Model Solar Geometry and Clear-Sky Irradiance with PVLIB ---

import pvlib

# Check if pre-requisite dataframes are available
if (
    "df_weather_30min" in locals()
    and not df_weather_30min.empty
    and "PARK_CONFIGS" in locals()
    and TARGET_PARK_NAME in PARK_CONFIGS
):

    print(f"--- Modeling PVLIB features for '{TARGET_PARK_NAME}' ---")

    # 1. Extract location parameters from the main config
    park_params = PARK_CONFIGS[TARGET_PARK_NAME]
    location = pvlib.location.Location(
        latitude=park_params["location"]["latitude"],
        longitude=park_params["location"]["longitude"],
        tz="UTC",
    )

    # 2. Prepare inputs for pvlib calculations
    times = df_weather_30min.index
    pressure_pa = df_weather_30min["pressure_hpa"] * 100  # Convert hPa to Pa
    temperature_c = df_weather_30min["temp_air_c"]

    # 3. Perform pvlib calculations
    print("   - Calculating solar position...")
    solar_position = location.get_solarposition(
        times, pressure=pressure_pa, temperature=temperature_c  # type: ignore
    )
    assert isinstance(solar_position, pd.DataFrame)

    print("   - Calculating airmass...")
    airmass_relative = pvlib.atmosphere.get_relative_airmass(
        zenith=solar_position["apparent_zenith"]
    )
    airmass_absolute = pvlib.atmosphere.get_absolute_airmass(
        airmass_relative=airmass_relative, pressure=pressure_pa  # type: ignore
    )

    print("   - Calculating extraterrestrial radiation...")
    dni_extra = pvlib.irradiance.get_extra_radiation(times)

    print("   - Calculating clear-sky irradiance (Ineichen model)...")
    clearsky_irrad = location.get_clearsky(
        times,
        model="ineichen",
        solar_position=solar_position,
        airmass_absolute=airmass_absolute,
        dni_extra=dni_extra,
    )
    assert isinstance(clearsky_irrad, pd.DataFrame)
    clearsky_irrad = clearsky_irrad.rename(
        columns={
            "ghi": "ghi_clearsky_w_m2",
            "dni": "dni_clearsky_w_m2",
            "dhi": "dhi_clearsky_w_m2",
        }
    )

    print("   - Adding 'is_day' flag...")
    is_day = solar_position["apparent_zenith"] < 90.0

    # 4. Assemble the final pvlib DataFrame
    df_pvlib_30min = pd.concat(
        [
            solar_position[["apparent_zenith", "zenith", "azimuth"]],
            pd.Series(airmass_relative, name="airmass_relative"),
            pd.Series(airmass_absolute, name="airmass_absolute"),
            pd.Series(dni_extra, name="dni_extra_w_m2"),
            clearsky_irrad,
            pd.Series(is_day, name="is_day"),
        ],
        axis=1,
    )

    print("\n✅ PVLIB features modeled successfully.")
    print(f"   - Shape of new DataFrame: {df_pvlib_30min.shape}")
    print("   - Sample data:")
    print(df_pvlib_30min.sample(n=5))

else:
    print("⚠️ Skipping PVLIB modeling: Prerequisite data is not available.")
    df_pvlib_30min = pd.DataFrame()

***

### 3.4 Merge DataFrames 

In [None]:
# --- Merge All 30-Minute DataFrames ---

# Check if all three dataframes to be merged are available
if (
    "df_production_30min" in locals()
    and not df_production_30min.empty
    and "df_weather_30min" in locals()
    and not df_weather_30min.empty
    and "df_pvlib_30min" in locals()
    and not df_pvlib_30min.empty
):

    print("--- Merging all 30-minute data sources ---")

    # Join the three dataframes on their common DatetimeIndex
    # This preserves all rows and fills with NaNs where data is missing in one source
    df_30min = df_production_30min.join([df_weather_30min, df_pvlib_30min])

    print("\n✅ All data sources successfully merged into a single DataFrame.")
    print(f"   - Final shape: {df_30min.shape}")
    print(f"   - Final time range: {df_30min.index.min()} to {df_30min.index.max()}")
    print("   - Sample of merged data:")
    print(df_30min.sample(n=5))

else:
    print("⚠️ Skipping merge operation: One or more source DataFrames are missing.")
    df_30min = pd.DataFrame()