In [2]:
"""
PV Lighthouse Uncertainty Tool
Helper file
Contains many functions used for connecting to the uncertainty API and plotting and analysing its output
"""

# Define constants for plotting
FIGURE_FONT_SIZE = 16           # Font size for figure axes, titles will be (FIGURE_FONT_SIZE + 2).
TRANSPARENT_PLOTS = False        # False = white background, True = transparent (best for adding into presentations)
HISTOGRAM_COLOUR = "#d2226a"    # Hex code for the colour used in column plots
GRID_COLOUR = "#c0c0c0"       # Colour of gridlines in plots
GRID_WIDTH = 0.1                # Thickness of gridlines in plots

# Import P90Client and related tools
from pvl_p90_client.client.p90_client import AuthenticationError, ServiceError, P90Client, P90ConnectionError
from pvl_p90_client.grpcclient import uncertaintyMessages_pb2, weatherData_pb2
from pvl_p90_client.grpcclient.uncertaintyMessages_pb2 import DistributionInput, DistributionFunction
from pvl_p90_client.helpers import pvl_login
from pvl_p90_client.helpers.exception import TimeStepDataError
from pvl_p90_client.helpers.request_helpers import (
    build_request,
    build_gaussian_distribution,
    build_skewed_gaussian_distribution,
    build_weibull_distribution,
    build_arbitrary_distribution,
    build_distribution,
    build_module_info,
    build_result_options,
    build_simulation_options,
    build_system_info,
    build_electrical_settings,
    build_optical_settings,
    build_thermal_settings,
    build_operational_settings,
    load_weather_data_from_pvw_file,
    load_time_step_data_from_csv,
    write_timestep_data_to_csv,
    celsius_to_kelvin,
    kelvin_to_celsius
)


# Import other Python libraries
import logging
import datetime
import re
import time
import math
import base64
import numpy as np
import pandas as pd
from scipy.stats import norm, skewnorm
from ipywidgets import interact, interactive, fixed, interact_manual, IntSlider
import ipywidgets as widgets
from IPython.display import display, Markdown, Latex, clear_output
import matplotlib.pyplot as plt
import plotly.graph_objects as go
import plotly.io as pio
import plotly.express as px
from plotly.subplots import make_subplots

try:
    # Enable the custom widget manager for Colab for interactive plots
    from google.colab import output
    # pio.renderers.default = 'colab'
    output.enable_custom_widget_manager()       # Needed to make plotly interactive plots work correctly in Colab
except:
    # Not using Google Colab
    None
try:
    from openpyxl.workbook import Workbook
except:
    # Install openpyxl
    %pip install openpyxl

# Configure logging
logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s")

# Define dictionary of limits for each DistributionInput type
DISTRIBUTION_LIMITS =  {
    DistributionInput.GHI: (0, float('inf')),                               # Global Horizontal Irradiance (W/m²)
    DistributionInput.DiffuseFraction: (0, float('inf')),
    DistributionInput.WindSpeed: (0, float('inf')),                         # Wind speed (m/s)
    DistributionInput.Temperature: (0, float('inf')),                       # Ambient temperature
    DistributionInput.ModulePower: (0, 1),
    DistributionInput.SpectralCorrection: (0, float('inf')),                # Spectral correction factor
    DistributionInput.SoilingFront: (0, 1),                                 # Front soiling fraction
    DistributionInput.SoilingRear: (0, 1),                                  # Rear soiling fraction
    DistributionInput.Uc: (0, float('inf')),                                # Thermal coefficient Uc (W/m²K)
    DistributionInput.Uv: (0, float('inf')),                                # Thermal coefficient Uv (W/m³Ks)
    DistributionInput.Alpha: (0, float('inf')),                             # Thermal absorptance of module
    DistributionInput.AnnualDegradationRate: (0, 1),
    DistributionInput.Availability: (0, 1),                                 # System Availability
    DistributionInput.YieldModifier: (0, float('inf')),                     # Yield modifier
    DistributionInput.CircumsolarFraction: (0, float('inf')),               # Circumsolar fraction of incident light
    DistributionInput.UndulatingGround: (0, float('inf')),                  # Undulating ground modifier
    DistributionInput.ExtraIrradiance: (0, float('inf')),                   # Extra irradiance modifier
    DistributionInput.Curtailment: (0, 1),                                  # System curtailment
    DistributionInput.DCHealth: (0, 1),
    DistributionInput.InverterToInverterMismatch: (0, 1),
    DistributionInput.StringToStringMismatch: (0, 1),
    DistributionInput.ModuleToModuleMismatch: (0, 1),
    DistributionInput.CellToCellMismatch: (0, 1),
    DistributionInput.InverterEfficiency: (0, 1),
    DistributionInput.ModuleEfficiencyTemperatureCoefficient: (0, float('inf')), # Module efficiency temperature coefficient (/K)
    DistributionInput.Albedo: (0, 1),
    DistributionInput.RearTransmissionFactor: (0, float('inf')),          # Rear transmission factor (f_T, 'Shed transparent fraction')
    DistributionInput.RearStructuralShadingFactor: (0, 1)                 # Rear structural shading factor (f_S)
}

#### Define helper functions ####
# Define function to load weather data from either a .pvw file or a CSV file
def load_weather_data(file_path: str) -> list[uncertaintyMessages_pb2.TimeStepData] | None:
    """Load weather data from a .pvw file or a CSV file."""
    if file_path.lower().endswith('.pvw'):
        weather_data = load_weather_data_from_pvw_file(file_path)
    elif file_path.lower().endswith('.csv'):
        weather_data = load_time_step_data_from_csv(file_path)
    else:
        raise ValueError("Unsupported file format. Please provide a .pvw or .csv file.")
    return weather_data

# Define weibull_cdf function for checking limits are compatible with inversion
def weibull_cdf(x, x0, lambda_term, k, p):
                if p * (x - x0) > 0:
                    return 0.5 + p * (0.5 - math.exp(-1 * (p * (x - x0) / lambda_term) ** k))
                else:
                    return 0 if p > 0 else 1

def create_distribution(input, simToSim=None, yearToYear=None, stepToStep=None) -> uncertaintyMessages_pb2.Distribution:
    """Creates an input distribution with user-input values.
        input: the integer corresponding to the system variable which is being modified (0-16)
            GHI: 0, DiffuseFraction: 1, WindSpeed: 2, Temperature: 3, ModulePower: 4, SpectralCorrection: 5, SoilingFront: 6, SoilingRear: 7,
            Uc: 8, Uv: 9, Alpha: 10, AnnualDegradationRate: 11, Availability: 12, YieldModifier: 13, CircumsolarFraction: 14, UndulatingGround: 15,
            ExtraIrradiance: 16, Curtailment: 17, DCHealth: 18, InverterToInverterMismatch: 19, StringToStringMismatch: 20, ModuleToModuleMismatch: 21,
            CellToCellMismatch: 22, InverterEfficiency: 23, ModuleEfficiencyTemperatureCoefficient: 24, Albedo: 25, RearStructuralShadingFactor: 26, RearTransmissionFactor: 27
        {X}_pdf: the probability distribution function (pdf) for {X} uncertainty (simulation, yearly, or time_step)
                    pdf format: ['FunctionName', *parameters], pdfs: 'Gaussian', 'SkewedGaussian', 'Weibull', or 'Arbitrary'
        simToSim: the pdf for simulation error (e.g. ['Gaussian', 0.01, 0.1] ; x0 = 0.01, σ = 0.1)
        yearToYear: the pdf for yearly error (e.g. ['Weibull', 0.0, 0.025, 1.7] ; x0 = 0, λ = 0.025, k = 1.7)
        time_pdf: the pdf for time step error (e.g. ['SkewedGaussian', 1.0, 3.2, -2.0] ; ɑ = -2, ζ = 1, ⍵ = 3.2)
        If any pdf's are None then they are not used.
    """
    if not input in range(27):
        raise ValueError(f"Invalid simulation input: {input}")
    
    # Check if any distributions are Weibull and ensure that the limits of the input don't give the same CDF value (to avoid error)
    distributions_to_check = [dist for dist in [simToSim, yearToYear, stepToStep] if dist is not None]
    weibull_distributions = [dist for dist in distributions_to_check if dist[0] == "Weibull"]
    if weibull_distributions:
        lower_limit, upper_limit = DISTRIBUTION_LIMITS[input]
        for weibull_dist in weibull_distributions:
            # Extract Weibull parameters: x0, lambda_term, k
            x0, lambda_term, k = weibull_dist[1:4]
            positive_polarity = weibull_dist[4] if len(weibull_dist) > 4 else True  # Default is True, if unprovided
            p = 1 if positive_polarity else -1
            # Check if the CDF values at the limits would be the same
            cdf_lower = weibull_cdf(lower_limit, x0, lambda_term, k, p)
            cdf_upper = weibull_cdf(upper_limit, x0, lambda_term, k, p)

            if abs(cdf_upper - cdf_lower) < 1e-8:
                raise ValueError(f"""Weibull distribution parameters for '{DistributionInput.Name(input)}' result in identical CDF values at limits:
    [{lower_limit}, {upper_limit}]->[{cdf_lower}, {cdf_upper}].
Adjust your Weibull distribution to have a variation between [{lower_limit}, {upper_limit}], probably by changing the x0 or polarity values.""")

    input_distribution = build_distribution(
        input=input,
        sim_to_sim_distribution=simToSim and build_pdf_from_params(simToSim[0], *simToSim[1:]),   # 'and' ensures {X}_pdf=None is passed as None
        yr_to_yr_distribution=yearToYear and build_pdf_from_params(yearToYear[0], *yearToYear[1:]),
        step_to_step_distribution=stepToStep and build_pdf_from_params(stepToStep[0], *stepToStep[1:])
    )
    return input_distribution

def build_constant_distribution(value: float) -> DistributionFunction:
    """Builds a constant distribution."""
    constant_dist = DistributionFunction()
    constant_dist.Type = DistributionType.Constant
    constant_dist.Constant.Value = value
    return constant_dist

def build_pdf_from_params(pdf_name, *params):
    # Simple method of generically sending pdf build requests
    if pdf_name == "Gaussian":
        # build_gaussian_distribution( x0: float, sigma: float, fmax: float | None = None, num_points_in_probability_function: int | None = None, lower_limit: float | None = None, upper_limit: float | None = None, )
        return build_gaussian_distribution(*params)
    elif pdf_name == "SkewedGaussian":
        # build_skewed_gaussian_distribution( alpha: float, zeta: float, omega: float, fmax: float | None = None, num_points_in_probability_function: int | None = None, )
        return build_skewed_gaussian_distribution(params[-1], *params[0:-1])   # TODO: change to (*params) once backend reorders arguments so skewness is last
    elif pdf_name == "Weibull":
        # build_weibull_distribution(x0: float, lambda_term: float, k: float, has_positive_polarity: bool | None = None)
        return build_weibull_distribution(*params)
    elif pdf_name == "Arbitrary":
        # build_arbitrary_distribution(x: list[float], y: list[float])
        return build_arbitrary_distribution(*params)
    elif pdf_name == "Constant":
        # build_constant_distribution(value: float)
        return build_constant_distribution(*params)
    else:
        raise ValueError(f"Invalid pdf name: {pdf_name}")

def perform_analysis(uncertainty_client, uncertainty_request, call_credentials, show_errors=True):
    """Perform the uncertainty analysis and display results."""
    try:
        print("Starting uncertainty analysis...")
        t_start = time.time()    # get the start time
        summary: UncertaintySummary | None = uncertainty_client.send_request( #uncertainty_client,
            uncertainty_request, call_credentials, timeout=60*14
        )  # timeout in seconds
        t_request = time.time()-t_start    # get the request time in seconds
        n_total = uncertainty_request.SimulationOptions.NumberOfSimulations*uncertainty_request.SimulationOptions.NumberOfYears
        print(f"Request took {round(t_request)} s, running {n_total} simulations in total ({round(n_total/t_request)} sims/s)")
        # Display results
        if summary:
            return summary
        else:
            print("⚠️  No results received from analysis")
    except AuthenticationError as e:
        if show_errors:
            print(f"❌ Authentication Error: {e}")
    except P90ConnectionError as e:
        if show_errors:
            print(f"❌ Connection Error: {e}")
    except ServiceError as e:
        if show_errors:
            print(f"❌ Service Error: {e}")
    except TimeoutError as e:
        if show_errors:
            print(f"❌ Timeout Error: {e}")
    except ValueError as e:
        if show_errors:
            print(f"❌ Invalid Request: {e}")
    except (RuntimeError, ConnectionError) as e:
        if show_errors:
            print(f"❌ Unexpected Error: {e}")

def request_analysis(p90_request):
    with P90Client(version="latest-dev") as client:
        # User will be prompted to enter credentials
        credentials = pvl_login.login()
        # Send request and get results
        summary = perform_analysis(client, p90_request, credentials)
        return summary

# Converts the summary data into three pandas DataFrames for easier analysis
def summary_to_dataframes(summary_data, inputs_data):
    bin_width = inputs_data.ResultOptions.PDelta
    # yearly_hist_df: years x bin frequencies
    n_bins = len(summary_data.YearlyHistogram[0].bins)
    bin_min = 1 - n_bins * bin_width / 2
    bin_edges = [float(round(x, 5)) for x in np.linspace(bin_min, bin_min+bin_width*(n_bins-1), n_bins)]
    yearly_hist_df = pd.DataFrame([(hist.Year, *hist.bins) for hist in summary_data.YearlyHistogram], 
                columns=['Year', *bin_edges]).set_index('Year')
    
    
    # yearly_p_df: years x P-values
    yearly_p_df = pd.DataFrame([(pval.Year, f'P{pval.P}', pval.P50Deviation) for pval in summary_data.YearlyPValue],
                columns=['Year', 'P-value', 'Value']).pivot(index='Year', columns='P-value', values='Value')

    # Sort columns by P-value in descending order (e.g. P95, P90, P10, P5)
    yearly_p_df = yearly_p_df.reindex(sorted(yearly_p_df.columns, key=lambda x: int(x[1:]), reverse=True), axis=1)

    # yearly_p50_df: simple year and P50 deviation
    yearly_p50_df = pd.DataFrame([(p50.Year, p50.Value) for p50 in summary_data.YearlyP50Deviation], 
                columns=['Year', 'P50_Deviation']).set_index('Year')
    
    return yearly_hist_df, yearly_p_df, yearly_p50_df

In [3]:
# Plotting functions

# Apply default styles to a plotly figure (Note: FIGURE_FONT_SIZE and TRANSPARENT_PLOTS are defined at the top of this notebook)
def apply_styles(fig, image=True):
    fig.update_layout(
        font=dict(size=FIGURE_FONT_SIZE),
        title_font=dict(size=FIGURE_FONT_SIZE+2),
        paper_bgcolor = "rgba(0,0,0,0)" if TRANSPARENT_PLOTS else "white",  # Fully transparent background
        plot_bgcolor = "rgba(0,0,0,0)" if TRANSPARENT_PLOTS else "white"
    )
    if image:
        # Load the SunSolve Logo and convert to base64
        image_path = "Images/SunSolveLogo.svg"
        with open(image_path, "rb") as image_file:
            encoded_image = base64.b64encode(image_file.read()).decode()
        fig.update_layout(images=[dict(
            source=f"data:image/svg+xml;base64,{encoded_image}",    # Add SunSolve Logo to plot
            xref="paper", yref="paper",
            x=0.4, y=1.18,
            sizex=0.2, sizey=0.2,
            xanchor="left", yanchor="top"
        )])
    fig.update_xaxes(linewidth=1, linecolor="black", mirror=True, ticks="outside")
    fig.update_yaxes(linewidth=1, linecolor="black", mirror=True, ticks="outside", showgrid=True, gridwidth=GRID_WIDTH, gridcolor=GRID_COLOUR)

def plot_yearly_hist_plotly(data, year_index, y_limit=None, fig_width=900, bin_min=0.75, bin_width=0.01,
                            yearly_yield=1, yield_units=None, x_range=None, title=None):
    fig = go.Figure()
    bin_max = 2-bin_min-bin_width
    n_bins = len(data.YearlyHistogram[0].bins)
    x_axis = np.linspace(bin_min, bin_max, n_bins) * yearly_yield
    tot_freq = sum(data.YearlyHistogram[year_index].bins)
    probs = [freq_i/tot_freq*100 for freq_i in data.YearlyHistogram[year_index].bins]   # probabilities are normalised frequencies
    prob_limit = math.ceil(y_limit/tot_freq*100+0.5) if y_limit is not None else None

    fig.add_trace(go.Bar(x=x_axis, y=probs, marker_color=HISTOGRAM_COLOUR))
    
    fig.update_layout(
        title=f"Year {data.YearlyHistogram[year_index].Year} Histogram {'- '+title if title else ''}",
        xaxis_title="Relative yield" if yield_units is None else f"Yield ({yield_units})",
        yaxis_title="Probability (%)",
        yaxis_range=[0, prob_limit] if y_limit is not None else None,
        width=fig_width,
    )
    apply_styles(fig)
    fig.update_xaxes(minor_ticks="outside")
    if x_range is not None:
        fig.update_xaxes(range=x_range)     # Set a fixed x-axis range if provided, otherwise auto-scale
    fig.show()

def plot_interactive_histogram(summary_data, inputs_data, year_one_yield=1, yield_units="MWh", each_year_independent=False, title=None):
    bin_min = inputs_data.ResultOptions.PMin
    bin_width = inputs_data.ResultOptions.PDelta
    max_freq = max(max(summary_data.YearlyHistogram[i].bins) for i in range(len(summary_data.YearlyHistogram)))
    year_slider = IntSlider(min=1, max=len(summary_data.YearlyHistogram), step=1, description="Year:")
    if year_one_yield == 1:
        yield_units = None      # Don't show units if yields are relative to year one

    def update_plot(year_index):
        yearly_yield = summary_data.YearlyP50Deviation[year_index-1].Value * year_one_yield
        xmin = min(dev.Value for dev in summary_data.YearlyP50Deviation) * year_one_yield * bin_min        # Define minimum x-value across all years, to keep plots on same scale as you change year
        xmax = max(dev.Value for dev in summary_data.YearlyP50Deviation) * year_one_yield * (2-bin_min)    # Define maximum x-value across all years, as above
        x_range = [xmin, xmax]
        if each_year_independent:
            # Override to make all histograms relative to their own year's P50
            x_range = None
            yearly_yield = 1
        plot_yearly_hist_plotly(summary_data, year_index-1, y_limit=math.ceil(max_freq/10+0.1)*10, bin_min=bin_min, bin_width=bin_width,
                                yearly_yield=yearly_yield, yield_units=yield_units, x_range=x_range, title=title)

    interactive_plot = interactive(update_plot, year_index=year_slider)
    display(interactive_plot)

def plot_yearly_P50_Values(data, fig_width=900, year_one_yield=1, yield_units="MWh"):
    years, P50Deviations = zip(*[[i.Year, i.Value*year_one_yield] for i in data.YearlyP50Deviation])

    fig = go.Figure()
    fig.add_trace(go.Bar(x=list(years), y=list(P50Deviations), marker_color=HISTOGRAM_COLOUR))

    y_range = math.ceil(1000*(max(P50Deviations)-min(P50Deviations)))/1000
    y0 = round(1000*(max(P50Deviations)+min(P50Deviations))/2)/1000

    fig.update_layout(
        title="P50 Yield Relative to Year 1" if year_one_yield == 1 else "P50 Yield by Year",
        xaxis_title="Year",
        yaxis_title="Relative P50 yield" if year_one_yield == 1 else f"P50 yield ({yield_units})",
        yaxis_range=[y0 - y_range, y0 + y_range],
        width=fig_width,
    )
    apply_styles(fig)
    fig.update_xaxes(dtick=1, minor_ticks=None)
    fig.show()

def plot_pvalues(data, year_one_yield=1, yield_units="MWh"):
    # Get unique P values excluding 50
    unique_p_values = sorted(list(set([item.P for item in data.YearlyPValue if item.P != 50])))

    # Separate P values into < 50 and > 50
    p_values_less_than_50 = [p for p in unique_p_values if p < 50]
    p_values_greater_than_50 = [p for p in unique_p_values if p > 50]

    # Collect the P50 yields relative to year one, for rescaling the deviations
    p50_yields = {}
    for i in data.YearlyP50Deviation:
        p50_yields[i.Year] = i.Value * year_one_yield

    # Create subplots and add traces for P-values < 50
    if p_values_less_than_50 != []:
        fig = make_subplots(rows=1, cols=2)
        # Add traces for P-values < 50
        for p_value in p_values_less_than_50:
            filtered_data = sorted([item for item in data.YearlyPValue if item.P == p_value], key=lambda x: x.Year)
            years = [item.Year for item in filtered_data]
            p50_deviations = [item.P50Deviation * p50_yields[item.Year] for item in filtered_data]
            fig.add_trace(go.Scatter(x=years, y=p50_deviations, mode="lines+markers", name=f"P{p_value}"), row=1, col=1)

    else:
        fig = make_subplots(rows=1, cols=1)

    # Add traces for P-values > 50
    for p_value in p_values_greater_than_50:
        filtered_data = sorted([item for item in data.YearlyPValue if item.P == p_value], key=lambda x: x.Year)
        years = [item.Year for item in filtered_data]
        p50_deviations = [item.P50Deviation * p50_yields[item.Year] for item in filtered_data]

        if p_values_less_than_50 != []:
            fig.add_trace(go.Scatter(x=years, y=p50_deviations, mode="lines+markers", name=f"P{p_value}"), row=1, col=2)
        else:
            fig.add_trace(go.Scatter(x=years, y=p50_deviations, mode="lines+markers", name=f"P{p_value}"), row=1, col=1)

    # Update layout
    fig.update_layout(
        title=f"P-values{' < 50 and > 50,' if p_values_less_than_50 != [] else ''} {'Relative to Year 1 P50 Yield' if year_one_yield == 1 else ''}",
        width=900,
        xaxis={"type": "category"},
        xaxis2={"type": "category"},
        showlegend=True,
    )
    apply_styles(fig)
    fig.update_xaxes(title_text="Year", row=1, col=1)
    fig.update_yaxes(title_text="Relative PXX yield" if year_one_yield == 1 else f"PXX yield ({yield_units})", row=1, col=1)
    fig.update_xaxes(title_text="Year", row=1, col=2)

    fig.show()

# Define interactive weather plot function
def interactive_weather_plot(weather_path, weather_data):
    # Get timezone
    if weather_path.lower().endswith(".pvw"):
        with open(weather_path, "rb") as f:
            weather_file_text = f.read()
            weather_setting = weatherData_pb2.WeatherSetting()
            weather_setting.ParseFromString(weather_file_text)
        timezone = datetime.timezone(datetime.timedelta(hours=weather_setting.LocalStandardTimeOffset_Minutes.data/60))
    else:
        timezone = datetime.timezone(datetime.timedelta(hours=0))       # Don't adjust timezone for csv, etc.

    # Extract date and time information and irradiance data
    dates, hours, ghis, dhis, dnis = [], [], [], [], []

    for data_point in weather_data:
        # Convert EndOfPeriodUTC to datetime
        timestamp_seconds = data_point.EndOfPeriodUTC.seconds
        dt_object = datetime.datetime.fromtimestamp(timestamp_seconds, tz=timezone)

        dates.append(dt_object.date())
        hours.append(dt_object.hour)

        # Extract irradiance values, handle cases where they might be missing
        ghis.append(getattr(data_point.Weather, "GHI", 0))
        dhis.append(getattr(data_point.Weather, "DHI", 0))
        dnis.append(getattr(data_point.Weather, "DNI", 0))

    # Prepare max_irradiance variable for keeping plot limits consistent across all days
    max_irradiance = math.ceil(max(ghis + dhis + dnis)/10)*10

    # Create a DataFrame for easier filtering
    weather_df = pd.DataFrame({"Date": dates, "Hour": hours, "GHI": ghis, "DHI": dhis, "DNI": dnis})

    # Get unique dates
    unique_dates = sorted(weather_df["Date"].unique())

    # Set up interactive plot
    doy_slider = IntSlider(min=1, max=len(unique_dates), step=1, description="Day of year:")

    def update_plot(day_index):
        selected_date = unique_dates[day_index-1]

        # Filter weather_data for the target date and hours 4-20
        daily_data = weather_df[(weather_df["Date"] == selected_date) & (weather_df["Hour"] >= 4) & (weather_df["Hour"] <= 20)]

        # Extract data for plotting
        hours = daily_data["Hour"]

        # Create the plotly plot
        fig = go.Figure()
        fig.add_trace(go.Scatter(x=hours, y=daily_data["GHI"], mode="lines+markers", name="GHI", marker_color="#d2226a", marker_size=4))
        fig.add_trace(go.Scatter(x=hours, y=daily_data["DHI"], mode="lines+markers", name="DHI", marker_color="#27cfff", marker_size=4))
        fig.add_trace(go.Scatter(x=hours, y=daily_data["DNI"], mode="lines+markers", name="DNI", marker_color="#febf24", marker_size=4))

        fig.update_layout(
            title=f"Irradiance on {selected_date}",
            xaxis_title="Hour of Day",
            yaxis_title="Irradiance (W/m²)",
            xaxis_range=[4, 20],
            yaxis_range=[0, max_irradiance],
            title_x=0.0,
            height=300,
            width=500,
            margin=dict(l=20, r=20, t=35, b=20),
        )
        apply_styles(fig, image=False)
        fig.update_xaxes(dtick=2)
        fig.show()

    interactive_plot = interactive(update_plot, day_index=doy_slider)
    display(interactive_plot)

In [4]:
# Define distribution functions for plotting interactively
def gaussian(x, x0, std_dev):
    """Calculates the probability density function of a Gaussian distribution."""
    # Ensure x is a numpy array for correct operation with scipy.stats
    x = np.asarray(x)
    return norm.pdf(x, loc=x0, scale=std_dev)

def weibull(x, x0, lamb, k, positive_polarity=True):
    """Calculates the probability density function of a Weibull distribution."""
    p = 1 if positive_polarity else -1
    # Set to 0 if x is on the wrong side of x0 or exactly equal to x0 (to avoid division by zero)
    return 0 if p*(x-x0)<0 or x == x0 else (k/lamb)*(p*(x-x0)/lamb)**(k-1)*math.exp(-1*(p*(x-x0)/lamb)**k)

def skewed_gaussian(x, x0, std_dev, skewness):
    """Calculates the probability density function of a Skewed Gaussian distribution."""
    # ζ = location (x0), ω = scale (std_dev), α = skewness
    x = np.asarray(x)    # Ensure x is a numpy array for correct operation with scipy.stats
    return skewnorm.pdf(x, skewness, loc=x0, scale=std_dev)

def constant(x, value):
    """Represents a Constant distribution."""
    return value

# Define parameters for each distribution type
distribution_params = {
    "Gaussian": ["x0", "std_dev"],
    "Skewed Gaussian": ["zeta", "omega", "alpha"],
    "Constant": ["value"],
    "Weibull": ["x0", "k", "lamb", "polarity"]
}

# Define interactive distribution plotter function, to visualise possible modifiers
def interactive_distribution_plot():
    # Create specific input widgets for each distribution type (initially hidden), also setting default/starting values
    std_dev_input = widgets.FloatText(value=0.05, description="σ:", layout=widgets.Layout(width="70%", display="none"), step=0.01)
    x0_input = widgets.FloatText(value=0.9, description="x0:", layout=widgets.Layout(width="70%", display="none"), step=0.01)
    k_input = widgets.FloatText(value=2.0, description="k:", layout=widgets.Layout(width="70%", display="none"), step=0.1)
    lamb_input = widgets.FloatText(value=0.05, description="λ:", layout=widgets.Layout(width="70%", display="none"), step=0.01)
    polarity_input = widgets.Dropdown(options=[('-1', False), ('+1', True)], value=True, description="polarity:", layout=widgets.Layout(width="70%", display="none"))
    zeta_input = widgets.FloatText(value=0.9, description="ξ:", layout=widgets.Layout(width="70%", display="none"), step=0.01)
    omega_input = widgets.FloatText(value=0.05, description="ω:", layout=widgets.Layout(width="70%", display="none"), step=0.01)
    alpha_input = widgets.FloatText(value=2.0, description="α:", layout=widgets.Layout(width="70%", display="none"), step=0.1)
    height_input = widgets.FloatText(value=1.0, description="height:", layout=widgets.Layout(width="70%", display="none"), step=0.1)
    value_input = widgets.FloatText(value=1.0, description="value:", layout=widgets.Layout(width="70%", display="none"), step=0.1)
    x1_input = widgets.FloatText(value=1.2, description="x1:", layout=widgets.Layout(width="70%", display="none"), step=0.1)

    # Store all parameter widgets in a dictionary for easy access
    param_widgets = {
        "std_dev": std_dev_input,
        "x0": x0_input,
        "x1": x1_input,
        "k": k_input,
        "lamb": lamb_input,
        "polarity": polarity_input,
        "zeta": zeta_input,
        "omega": omega_input,
        "alpha": alpha_input,
        "height": height_input,
        "value": value_input,
        "x_min": None, # These will be handled by the main xmin_input and xmax_input
        "x_max": None
    }

    # Create main control widgets
    distribution_dropdown = widgets.Dropdown(
        # options=["Gaussian", "Skewed Gaussian", "Weibull", "Delta", "Tophat", "Constant"],
        options=["Gaussian", "Skewed Gaussian", "Weibull"],
        value="Gaussian",
        description="Distribution:",
        layout=widgets.Layout(width="30%")
    )

    plot_header = widgets.HTML(
        value="<b>Plot controls:</b>",
        description="",
    )
    xmin_input = widgets.FloatText(value=0.0, description="x_min:", layout=widgets.Layout(width="17%"), step=0.1)
    xmax_input = widgets.FloatText(value=2.0, description="x_max:", layout=widgets.Layout(width="17%"), step=0.1)
    n_input = widgets.IntText(value=200, description="n_points:", layout=widgets.Layout(width="17%"), step=50)

    # Create an Output widget to specifically hold the plot output
    plot_output = widgets.Output()

    def update_param_visibility(distribution_type):
        """Updates the visibility of parameter widgets based on the selected distribution type."""
        # Hide all parameter widgets first
        for param_widget in [std_dev_input, x0_input, x1_input, k_input, lamb_input, polarity_input, zeta_input, omega_input, alpha_input, height_input, value_input]:
            param_widget.layout.display = "none"

        # Show relevant widgets based on the selected distribution type
        if distribution_type in distribution_params:
            for param_name in distribution_params[distribution_type]:
                if param_name in param_widgets and param_widgets[param_name] is not None:
                    param_widgets[param_name].layout.display = ""


    def plot_distribution(distribution_type, xmin, xmax, n, **params):
        """Plots the selected distribution function."""
        distributions = {
            "Gaussian": gaussian,
            "Weibull": weibull,
            "Skewed Gaussian": skewed_gaussian,
            "Constant": constant
        }

        if distribution_type not in distributions:
            print("Invalid distribution type selected.")
            return

        x = np.linspace(xmin, xmax, n+1)
        y = []
        arguments = []  # parameters for the user to use to produce this distribution

        # Use the Output widget as the context for clearing and displaying the plot
        with plot_output:
            clear_output(wait=True)
            x0 = params.get("x0", (xmin + xmax) / 2)    # Default to (xmin+xmax)/2 if missing (shouldn't be)

            # Handle Delta function separately for plotting
            if distribution_type == "Delta":
                height = params.get("height", 1.0)
                # For delta, we represent it as a single point at x0 with a stem plot
                print(f"\t\t['Delta', {', '.join(f'{i:g}' for i in arguments)}]")     # Show user the command to define this error function
                fig = go.Figure()
                fig.add_trace(go.Scatter(x=[x0], y=[height], mode="markers", name="Relative frequency", marker_color=HISTOGRAM_COLOUR))
                fig.update_layout(xaxis_title="x", yaxis_title="Relative frequency", width=400, height=350, margin=dict(l=20, r=20, t=15, b=20))
                apply_styles(fig, image=False)
                fig.update_yaxes(showgrid=True, gridwidth=GRID_WIDTH, gridcolor=GRID_COLOUR)
                fig.show()
                arguments = [x0, height]
            else:
                dist_func = distributions[distribution_type]
                if distribution_type == "Weibull":
                    # Ensure k and lamb are present in params, use defaults if not
                    k_val = params.get("k", 5)
                    lamb_val = params.get("lamb", 3)
                    polarity_val = params.get("polarity", -1)
                    y = [dist_func(xi, x0, lamb_val, k_val, polarity_val) for xi in x]
                    arguments = [x0, lamb_val, k_val, polarity_val]
                elif distribution_type == "Constant":
                    # Ensure value is present in params, use default if not
                    value_val = params.get("value", 0.0)
                    y = [dist_func(xi, value_val) for xi in x]
                    arguments = [value_val]
                elif distribution_type == "Tophat":
                    # Ensure height is present in params, use default if not
                    height_val = params.get("height", 1.0)
                    x1 = params.get('x1', 3.0)
                    y = [dist_func(xi, x0, x1, height_val) for xi in x]
                    arguments = [x0, x1, height_val]
                elif distribution_type == "Skewed Gaussian":
                    x0 = params.get("zeta", (xmin + xmax) / 2)    # Default to (xmin+xmax)/2 if missing (shouldn't be)
                    omega = params.get("omega", 0.1)    # Omega, default 0.1
                    alpha = params.get("alpha", 1.0)  # Alpha, default 1
                    # Limit the range of values to 4 standard deviations from the mean
                    x = np.linspace(x0 - 4*omega, x0 + 4*omega, n+1)
                    y = [dist_func(xi, x0, omega, alpha) for xi in x]
                    arguments = [x0, omega, alpha]
                else: # Gaussian
                    std_dev = params.get("std_dev", 0.1)
                    # Limit the range of values to 4 standard deviations from the mean
                    x = np.linspace(x0 - 4*std_dev, x0 + 4*std_dev, n+1)
                    y = [dist_func(xi, x0, std_dev) for xi in x]
                    arguments = [x0, std_dev]

                # Show user the command to define this distribution (e.g. "['Gaussian', 1, 0.05]")
                if distribution_type != "Weibull":
                    print(f"\t\t['{distribution_type.replace(' ', '')}', {', '.join(f'{i:g}' for i in arguments)}]")
                else:
                    print(f"\t\t['{distribution_type.replace(' ', '')}', {', '.join(f'{i:g}' for i in arguments[0:-1])}, {arguments[-1]}]")
                # Normalise the y-values to have 1 as the maximum
                y = [yi/max(y) for yi in y]
                # Plot the distribution
                fig = go.Figure()
                fig.add_trace(go.Scatter(x=x, y=y, mode="lines", name="Relative frequency", marker_color=HISTOGRAM_COLOUR))
                fig.update_layout(xaxis_title="x", yaxis_title="Relative frequency", width=400, height=350, margin=dict(l=20, r=20, t=15, b=20))
                fig.update_xaxes(range=[xmin, xmax])
                fig.update_yaxes(range=[0, math.ceil(max(y)*10)/10+0.1])
                apply_styles(fig, image=False)
                fig.show()


    # Link the widgets to the interactive wrapper function
    def interactive_plot_wrapper(distribution_type, xmin, xmax, n, **param_values):
        """Wrapper function to handle interactive updates and call plot_distribution."""
        update_param_visibility(distribution_type)

        # Prepare parameters for plot_distribution based on the selected type
        params_to_pass = {}
        if distribution_type in distribution_params:
            for param_name in distribution_params[distribution_type]:
                # Check if the parameter is one of the specific distribution parameters and exists in param_values
                if param_name in ["mean", "std_dev", "x0", "x1", "k", "lamb", "polarity", "zeta", "omega", "alpha", "height", "value"] and param_name in param_values:
                    params_to_pass[param_name] = param_values[param_name]
                # x_min and x_max for Tophat will be taken from the main xmin and xmax inputs in plot_distribution

        plot_distribution(distribution_type, xmin, xmax, n, **params_to_pass)


    interactive_plot = widgets.interactive(
        interactive_plot_wrapper,
        distribution_type=distribution_dropdown,
        xmin=xmin_input,
        xmax=xmax_input,
        n=n_input,
        # Pass all potential parameter widgets to interactive, their values will be available in param_values
        std_dev=param_widgets["std_dev"],
        x0=param_widgets["x0"],
        x1=param_widgets["x1"],
        k=param_widgets["k"],
        lamb=param_widgets["lamb"],
        polarity=param_widgets["polarity"],
        zeta=param_widgets["zeta"],
        omega=param_widgets["omega"],
        alpha=param_widgets["alpha"],
        height=param_widgets["height"],
        value=param_widgets["value"]
    )

    # Group the parameter specific widgets in two columns
    param_widget_list = [x0_input, x1_input, zeta_input, std_dev_input, alpha_input, omega_input, k_input, lamb_input, height_input, polarity_input, value_input]
    # Split the widgets into two columns (adjust as needed based on the number of parameters)
    col1_widgets = param_widget_list[::2] # Every other widget starting from the first
    col2_widgets = param_widget_list[1::2] # Every other widget starting from the second

    # Group the main control widgets
    control_widgets = widgets.VBox([
        distribution_dropdown,
        widgets.HBox([widgets.VBox(col1_widgets), widgets.VBox(col2_widgets)])
    ])
    # Group the plot control widgets
    plot_widgets = widgets.VBox([
        plot_header,
        widgets.HBox([xmin_input, xmax_input, n_input], layout=widgets.Layout(width='95%')), # Arrange xmin and xmax horizontally
    ])


    # Combine control widgets, parameter widgets, and the interactive plot output
    interactive_display = widgets.VBox([
        # widgets.Label("Interactive Distribution Modifier"), # Add a title
        control_widgets,
        plot_output, # Display the dedicated Output widget for the plot
        plot_widgets,
    ])

    # Display the organized interactive plot
    display(interactive_display)

    # Initially update parameter visibility based on the default dropdown value
    update_param_visibility(distribution_dropdown.value)

    # Trigger an initial plot display
    interactive_plot_wrapper(distribution_dropdown.value, xmin_input.value, xmax_input.value, n_input.value,
                            std_dev=std_dev_input.value, x0=x0_input.value, x1=x1_input.value, k=k_input.value,
                            lamb=lamb_input.value, polarity=polarity_input.value, alpha=alpha_input.value,
                            zeta=zeta_input.value, omega=omega_input.value, height=height_input.value, value=value_input.value)

In [5]:
# Define export function
def export_to_excel(data, filename, used_inputs=None, dist_list=None):
    # Bins go: var numBins = (int)((1 - request.MinP) * 2 / request.DeltaP + 0.5);
    bin_min = used_inputs.ResultOptions.PMin
    bin_width = used_inputs.ResultOptions.PDelta
    # Process YearlyPValue data
    pvalues_data = {}
    unique_p_values = sorted(list(set([item.P for item in data.YearlyPValue])))
    unique_years = sorted(list(set([item.Year for item in data.YearlyPValue])))

    pvalues_data["PValue"] = unique_p_values
    for year in unique_years:
        year_data = [item.P50Deviation for item in data.YearlyPValue if item.Year == year]
        pvalues_data[f"Year {year}"] = year_data

    pvalues_df = pd.DataFrame(pvalues_data)

    # Process YearlyP50Deviation data
    yearly_p50_data = {}
    p50_values = [item.Value for item in data.YearlyP50Deviation]
    yearly_p50_data["P50Deviation"] = ["P50"]
    for year, p50 in zip(unique_years, p50_values):
        yearly_p50_data[f"Year {year}"] = p50

    p50_df = pd.DataFrame(yearly_p50_data)

    # Process YearlyHistogram data
    histograms_data = {}
    unique_years_hist = sorted(list(set([item.Year for item in data.YearlyHistogram])))

    # Define histogram bins based on bin_min and bin_width
    num_bins = len(data.YearlyHistogram[0].bins)
    bin_max = 1 + (1-bin_min) - bin_width
    bin_labels = [round(i, 2) for i in np.linspace(bin_min, bin_max, num_bins)]

    # # Add custom labels for the ends as requested, assuming they correspond to the first and last bins
    # bin_labels[0] = f'<{bin_min:.2f}'     # The first bin is less than bin_min
    # bin_labels[-1] = f'>{bin_max:.2f}'    # The last bin is greater than bin_max+bin_width

    histograms_data["Bin"] = bin_labels

    for year in unique_years_hist:
        year_histogram = [list(item.bins) for item in data.YearlyHistogram if item.Year == year][0]
        histograms_data[f"Year {year}"] = year_histogram

    histograms_df = pd.DataFrame(histograms_data)

    # Add a dataframe with the used inputs, if supplied
    if used_inputs is not None:
        # Convert used_inputs to a structured DataFrame for CSV export

        # Extract all the key parameters from used_inputs
        data = {
            "Parameter": [
                "Module Length (m)", "Module Width (m)", "Module Power (W)", "Module Efficiency Temp Coeff",
                "Module Height Above Ground (m)", "Bifaciality", "Cell to Cell Mismatch",
                "Number of Inverters", "Strings per Inverter", "Modules per String", "Row Pitch (m)",
                "Module Azimuth (degrees)", "Module Tilt (degrees)",
                "Inverter Efficiency", "Inverter to Inverter Mismatch", "String to String Mismatch",
                "Module to Module Mismatch", "Inverter Wiring Loss", "String Wiring Loss", "Max Power Tracking Loss",
                "Beam Multiplier Front", "Isotropic Multiplier Front", "Beam Multiplier Rear", "Isotropic Multiplier Rear",
                "Fallback Albedo", "Fallback Soiling Front", "Fallback Soiling Rear", "Fallback Spectral Correction",
                "Extra Irradiance", "Thermal Uc", "Thermal Uv", "Thermal Alpha",
                "Annual Degradation Rate", "DC Health", "Availability", "Curtailment", "Yield Modifier", "Undulating Ground",
                "Number of Simulations", "Number of Years", "P Min", "P Delta"
            ],
            "Value": [
                used_inputs.Module.LengthInM, used_inputs.Module.WidthInM, used_inputs.Module.PowerRatingAtSTCInW,
                used_inputs.Module.ModuleEfficiencyTemperatureCoefficient, used_inputs.Module.HeightAboveGroundInM,
                used_inputs.Module.Bifaciality, used_inputs.Module.CellToCellMismatch,
                used_inputs.System.NumberOfInverters, used_inputs.System.StringsPerInverter, used_inputs.System.ModulesPerString,
                used_inputs.System.RowPitchInM, used_inputs.System.ModuleAzimuthInDegrees, used_inputs.System.FallbackModuleTiltInDegrees,
                used_inputs.Electrical.InverterEfficiency, used_inputs.Electrical.InverterToInverterMismatch,
                used_inputs.Electrical.StringToStringMismatch, used_inputs.Electrical.ModuleToModuleMismatch,
                used_inputs.Electrical.InverterWiringLoss, used_inputs.Electrical.StringWiringLoss, used_inputs.Electrical.MaxPowerTrackingLoss,
                used_inputs.Optical.BeamMultiplierFront, used_inputs.Optical.IsotropicMultiplierFront,
                used_inputs.Optical.BeamMultiplierRear, used_inputs.Optical.IsotropicMultiplierRear,
                used_inputs.Optical.FallbackAlbedo, used_inputs.Optical.FallbackSoilingFront, used_inputs.Optical.FallbackSoilingRear,
                used_inputs.Optical.FallbackSpectralCorrection, used_inputs.Optical.ExtraIrradiance,
                used_inputs.Thermal.Uc, used_inputs.Thermal.Uv, used_inputs.Thermal.Alpha,
                used_inputs.Operation.AnnualDegradationRate, used_inputs.Operation.DCHealth, used_inputs.Operation.Availability,
                used_inputs.Operation.Curtailment, used_inputs.Operation.YieldModifier, used_inputs.Operation.UndulatingGround,
                used_inputs.SimulationOptions.NumberOfSimulations, used_inputs.SimulationOptions.NumberOfYears,
                used_inputs.ResultOptions.PMin, used_inputs.ResultOptions.PDelta
            ],
            "Units": [
                "m", "m", "W", "/degC", "m", "-", "-",
                "-", "-", "-", "m", "degrees", "degrees",
                "-", "-", "-", "-", "-", "-", "-",
                "-", "-", "-", "-", "-", "-", "-", "-", "-",
                "W/m2K", "W/m3sK", "-",
                "/year", "-", "-", "-", "-", "-",
                "-", "years", "-", "-"
            ]
        }

        # Add P Values as separate rows
        for p_val in used_inputs.ResultOptions.PValues:
            data["Parameter"].append(f'P Value {p_val}')
            data["Value"].append(p_val)
            data["Units"].append('-')

        used_inputs_df = pd.DataFrame(data)

    # Save to Excel
    if filename.endswith(".xlsx"):
        filename = filename[:-5]
    with pd.ExcelWriter(f"{filename}.xlsx") as writer:
        pvalues_df.to_excel(writer, sheet_name="PValues", index=False)
        p50_df.to_excel(writer, sheet_name="P50Deviations", index=False)
        histograms_df.to_excel(writer, sheet_name="Histograms", index=False)
        if used_inputs is not None:
            used_inputs_df.to_excel(writer, sheet_name="UsedInputs", index=False)
        # Add a dataframe with the distribution list, if supplied
        if dist_list is not None:
            dist_df = pd.DataFrame(parse_distribution_list(dist_list)).T    # .T for transpose, to have each input as a row, instead of column
            dist_df.to_excel(writer, sheet_name="Distributions", index=True)    # index=True, as the input names are in the index after .T

    print(f"Summary data saved to '{filename}.xlsx'")

# Define notebook export function
def export_notebook(current_notebook, result_notebook):
    try:
        with open(current_notebook, 'r', encoding='utf-8') as src:
            notebook_content = src.read()
        
        with open(result_notebook, 'w', encoding='utf-8') as dst:
            dst.write(notebook_content)
        
        print(f"Notebook copy created: {result_notebook}")
    except FileNotFoundError:
        print(f"Could not find {current_notebook} to copy. You may need to manually save your notebook.")
    except Exception as e:
        print(f"Error creating notebook copy: {e}")

# Parse distribution list to extract uncertainties
def parse_distribution_list(distribution_list):
    uncertainties = {}
    for dist in distribution_list:
        # Get input name
        dist_str = str(dist)
        if hasattr(dist, "Input") and dist.Input != 0:
            input_name = re.search(r"(?<=Input: )\S+", dist_str).group()
        else:
            input_name = 'GHI'  # Default for first distribution
        
        # Initialize distribution parameters
        sim_to_sim = None
        yr_to_yr = None
        step_to_step = None
        
        # Parse SimToSim Distribution
        if hasattr(dist, "SimToSimDistribution"):
            sim_dist = dist.SimToSimDistribution
            enum_type = sim_dist.DESCRIPTOR.fields_by_name["Type"].enum_type
            dist_name = enum_type.values[sim_dist.Type].name
            match dist_name:
                case "Gaussian":
                    x0 = sim_dist.Gaussian.x0
                    sigma = sim_dist.Gaussian.sigma
                    sim_to_sim = ["Gaussian", x0, sigma]
                case "SkewedGaussian":
                    alpha = sim_dist.SkewedGaussian.alpha
                    zeta = sim_dist.SkewedGaussian.zeta
                    omega = sim_dist.SkewedGaussian.omega
                    sim_to_sim = ["SkewedGaussian", zeta, omega, alpha]
                case "Weibull":
                    x0 = sim_dist.Weibull.x0
                    lambda_val = getattr(sim_dist.Weibull, "lambda")
                    k = sim_dist.Weibull.k
                    positive_polarity = sim_dist.Weibull.hasPositivePolarity
                    sim_to_sim = ["Weibull", x0, lambda_val, k, positive_polarity]
                case "Constant":
                    # No distribution applied
                    pass
                case _:
                    raise ValueError(f"Unknown distribution type: {dist_name}")
        
        # Parse YrToYr Distribution
        if hasattr(dist, "YrToYrDistribution"):
            year_dist = dist.YrToYrDistribution
            enum_type = year_dist.DESCRIPTOR.fields_by_name["Type"].enum_type
            dist_name = enum_type.values[year_dist.Type].name
            match dist_name:
                case "Gaussian":
                    x0 = year_dist.Gaussian.x0
                    sigma = year_dist.Gaussian.sigma
                    yr_to_yr = ["Gaussian", x0, sigma]
                case "SkewedGaussian":
                    alpha = year_dist.SkewedGaussian.alpha
                    zeta = year_dist.SkewedGaussian.zeta
                    omega = year_dist.SkewedGaussian.omega
                    yr_to_yr = ["SkewedGaussian", zeta, omega, alpha]
                case "Weibull":
                    x0 = year_dist.Weibull.x0
                    lambda_val = getattr(year_dist.Weibull, "lambda")
                    k = year_dist.Weibull.k
                    positive_polarity = year_dist.Weibull.hasPositivePolarity
                    yr_to_yr = ["Weibull", lambda_val, k, positive_polarity]
                case "Constant":
                    # No distribution applied
                    pass
                case _:
                    raise ValueError(f"Unknown distribution type: {dist_name}")
                
        # Parse StepToStep Distribution
        if hasattr(dist, "StepToStepDistribution"):
            step_dist = dist.StepToStepDistribution
            enum_type = step_dist.DESCRIPTOR.fields_by_name["Type"].enum_type
            dist_name = enum_type.values[step_dist.Type].name
            match dist_name:
                case "Gaussian":
                    x0 = step_dist.Gaussian.x0
                    sigma = step_dist.Gaussian.sigma
                    step_to_step = ["Gaussian", x0, sigma]
                case "SkewedGaussian":
                    alpha = step_dist.SkewedGaussian.alpha
                    zeta = step_dist.SkewedGaussian.zeta
                    omega = step_dist.SkewedGaussian.omega
                    step_to_step = ["SkewedGaussian", zeta, omega, alpha]
                case "Weibull":
                    x0 = step_dist.Weibull.x0
                    lambda_val = getattr(step_dist.Weibull, "lambda")
                    k = step_dist.Weibull.k
                    positive_polarity = step_dist.Weibull.hasPositivePolarity
                    step_to_step = ["Weibull", lambda_val, k, positive_polarity]
                case "Constant":
                    # No distribution applied
                    pass
                case _:
                    raise ValueError(f"Unknown distribution type: {dist_name}")
        
        uncertainties[input_name] = {
            "sim_to_sim": sim_to_sim,
            "yr_to_yr": yr_to_yr,
            "step_to_step": step_to_step
        }
    return uncertainties

In [6]:
# pvlib comparison functions - for the pvlib comparison notebook

# Helper function to generate random values from distribution specifications
def generate_random_value(dist_spec):
    """Generate random value from distribution specification"""
    if dist_spec is None:
        return 1.0  # No variation
    
    dist_type = dist_spec[0]
    if dist_type == "Gaussian":
        mean, sigma = dist_spec[1], dist_spec[2]
        return np.random.normal(mean, sigma)
    elif dist_type == "SkewedGaussian":
        zeta, omega, alpha = dist_spec[1], dist_spec[2], dist_spec[3]
        return skewnorm.rvs(a=alpha, loc=zeta, scale=omega)
    elif dist_type == "Weibull":
        x0, lambda_param, k = dist_spec[1:4]
        positive_polarity = dist_spec[4] if len(dist_spec) > 4 else False
        p = 1 if positive_polarity else -1
        u = np.random.uniform(0, 1)
        # Inverse CDF calculation
        return x0 + lambda_param * p * (-np.log(1 - u)) ** (1 / k)
    else:
        return 1.0

# Helper function to pass distribution specs for generating random values
def random_val(distributions_dict, input, variation_type):
    """Get random value for a specific input and variation type"""
    dist_spec = distributions_dict.get(input, {}).get(variation_type, None)
    if dist_spec is None:
        if variation_type not in ["sim_to_sim", "yr_to_yr", "step_to_step"]:
            raise ValueError(f"Invalid variation type: {variation_type}")
        return 1.0  # No variation specified
    
    # Retrieve relevant bounds for the input
    enum_value = getattr(DistributionInput, input, None)
    if enum_value is None:
        raise ValueError(f"Invalid input name: {input}")
    lower_bound, upper_bound = DISTRIBUTION_LIMITS[enum_value]

    # Generate values until one falls within the specified bounds, safely
    attempts = 0
    while attempts < 1000:
        value = generate_random_value(dist_spec)
        if lower_bound <= value <= upper_bound:
            return value
        attempts += 1
    raise ValueError(f"Failed to generate valid value for {input} and variation type {variation_type} after {attempts} attempts")


# Convert pvlib data into an UncertaintySummary to match the SunSolve P90 data
def create_pvlib_summary(annual_yield_list, num_years, yield_bins):

    bin_min, bin_width = yield_bins[0], yield_bins[1]-yield_bins[0]
    # aligning with structure of pvl_p90_client.grpcclientuncertaintyMessages_pb2.UncertaintySummary
    class PVLibSummary:
        def __init__(self):
            self.YearlyHistogram = []
            self.YearlyP50Deviation = []
    class YearHistogram:
        def __init__(self):
            self.bins = []
            self.Year = None
    class YearlyP50Deviation:
        def __init__(self, year=None, value=None):
            self.Year = year
            self.Value = value

    pvlib_summary = PVLibSummary()

    for i in range(num_years):
        year_values = YearHistogram()
        year_values.Year = i+1
        yields = annual_yield_list[:, i]
        p50 = np.median(yields)
        if i == 0:
            year_one_p50 = p50
        relative_yields = yields / (p50 if p50 != 0 else 1)   # Avoid division by zero
        year_values.bins = [sum(y < bin_min+bin_width for y in relative_yields)]
        year_values.bins += [sum(bin < y < bin+bin_width for y in relative_yields) for bin in yield_bins[1:-1]]
        year_values.bins += [sum(y > yield_bins[-1] for y in relative_yields) ]
        pvlib_summary.YearlyHistogram.append(year_values)
        pvlib_summary.YearlyP50Deviation.append(YearlyP50Deviation(year=i, value=p50/(year_one_p50 if year_one_p50 != 0 else 1)))
    
    return pvlib_summary