In [None]:
from sys import path
if '..' not in path:
    path.insert(0, '..')

In [None]:
from _library.utils import SYSTEM_NAMES, SUBFOLDERS, load_datasets
from collections import defaultdict
import _library.uc2_interpolation as interpolation_utils
import _library.som_pre_utils as pre_utils
import numpy as np
from mpl_toolkits.mplot3d import Axes3D
import pandas as pd

In [None]:
%cd /mnt/data/vieri/projects/SAMPLE/

# The photovoltaic systems

In [None]:
print(SYSTEM_NAMES, "\nSUBFOLDERS: -->", SUBFOLDERS)
# --- 0 ---------- 1 ---------- 2 --------- 3 ---------- 4 -------

## A) Selecting the PV system

In [None]:
system_name = SYSTEM_NAMES[2]
print(f"PV SYSTEM --> {system_name}")

## b) Selecting the dataset type
- 1-hour sampling
- 1-hour averaged sampling

In [None]:
dataset_name = "1-hour averaged sampling"

## c) Loading the dataset

In [None]:
system_path, inv_data, inv_names, *_ = load_datasets(system_name, subfolder = dataset_name)

# Pre-processing

In [None]:
relevant_columns = ['Cc 1 (A)', 'Vcc 1 (V)', 'Irradiance (W/mq)', 'Amb. Temp (°C)']
for inv_name in inv_names:
    print("\n" + 30 * "-", inv_name, 30 * "-")
    df = inv_data[inv_name]
    display(df[relevant_columns].describe())
    
    if 'Date/Time' in df.columns:
        df.index = df['Date/Time']
        df.drop(columns = 'Date/Time', inplace=True)
        print("a) The timetamps are now used as index")

In [None]:
clean_with_reg = True

In [None]:
if clean_with_reg:
    inv_linear_model = dict()
    perc_error_threhsold = 3
    for inv_name in inv_names:
        print(40 * "-" + f" {inv_name}: TRAIN " + 40 * "-")
        df = inv_data[inv_name]

        # Find outliers using a linear regression: Pac = f(Irr, Amb. Temp, [humidity])
        num_features = 2
        idk_potential_outliers, linear_model = pre_utils.find_pac_outliers_lin_reg(df, num_features, perc_error_threhsold,
                                                                                   verbose=True)
        inv_linear_model[inv_name] = linear_model

        # Drop the observations identified as outliers 
        # (i.e., the AC power generated is out of range considering the ambiental conditons)
        df.drop(index = idk_potential_outliers, inplace=True)

# Find maxiumum voltages/currents for all possible ambiental conditions
- Ambiental temperature (°C)
- Solar irradiance (w/mq)
- Maximum voltage (V) 
- Maximum current (A)

In [None]:
step_ambiental_temp = 0.5
step_solar_irradiance = 15

In [None]:
ambiental_temp_values = np.arange(-5, 45, step = step_ambiental_temp)
solar_irradiance_values = np.arange(0, 1500, step = step_solar_irradiance)

In [None]:
maximum_voltage = 780
maximum_current = 818

In [None]:
verbose = False

In [None]:
merge_data = True

In [None]:
amb_col_name = 'Amb. Temp (°C)'
irr_col_name = 'Irradiance (W/mq)'

inv_pairs = defaultdict(list)
inv_missing_pairs = defaultdict(list)

for inv_name in inv_names:
    print("\n" + 40 * "-", inv_name, 40 * "-")
    
    # Retrieve data of the inverter
    if merge_data:
        all_data = [inv_data[key] for key in ('INV1', 'INV2', 'INV3', 'INV4')]
        df = pd.concat(all_data)
        df.sort_index(inplace=True)
        print(f"--> Merging the inverter data (TOTAL: {len(df)} obs --> "\
              f"{', '.join([inv_name + ': ' + str(len(inv_data)) for inv_name, inv_data in inv_data.items()])})", 
              "-" * 20)
        inv_names = inv_names[:1]
        inv_names = ['All inverter data']
        inv_name = inv_names[0]
    else:
        df = inv_data[inv_name]  
    
    j = 1
    max_params_amb_cond = []
    num_obs_counter = []
    print("\n--> Computing the tuples (amb_cond, max_current, max_voltage)...\n")
    for ambiental_temp in ambiental_temp_values:
        for solar_irradiance in solar_irradiance_values:
            
            # Interval (value: value + step)
            interval_amb_temp = (ambiental_temp, ambiental_temp + step_ambiental_temp)
            interval_solar_irradiance = (solar_irradiance, solar_irradiance + step_solar_irradiance)
            
            # Conditions for retriving the condition for these values of ambiental conditions
            amb_temp_condition = df[amb_col_name].between(interval_amb_temp[0], interval_amb_temp[1],
                                                          inclusive = 'left')
            irr_condition = df[irr_col_name].between(interval_solar_irradiance[0], interval_solar_irradiance[1], 
                                                     inclusive = 'left')
            
            # Filtering the dataset for this ambiental condition
            amb_condition_df = df[amb_temp_condition & irr_condition]
            num_obs = len(amb_condition_df)
            num_obs_counter.append(num_obs)
            
            if num_obs > 0:   
                strat_num = 0
                
                # Find the maxium values
                if strat_num == 0:
                    max_voltage = amb_condition_df['Vcc 1 (V)'].max()
                    max_current = amb_condition_df['Cc 1 (A)'].max()
                
                # Second maxiumum values
                if strat_num == 1:
                    max_voltage = amb_condition_df['Vcc 1 (V)'].nlargest(2).iloc[-1]
                    max_current = amb_condition_df['Cc 1 (A)'].nlargest(2).iloc[-1]
                    
                # Average value between the second and third maxiumum values
                if strat_num == 2:
                    #obs_to_skip = 1 if num_obs > 1 else 0
                    obs_to_skip = 0
                    obs_to_select = (2 + obs_to_skip) if (num_obs + obs_to_skip) >= 2 else num_obs
                    
                    max_voltage = amb_condition_df['Vcc 1 (V)'].nlargest(obs_to_select).iloc[obs_to_skip:].mean()
                    max_current = amb_condition_df['Cc 1 (A)'].nlargest(obs_to_select).iloc[obs_to_skip:].mean()
   
                if max_current > maximum_current:
                    print(f"[ISSUE - MAXIMUM CURRENT REACHED] Hey, What the hell are you doing? "\
                          f"({ambiental_temp}°C, {solar_irradiance} w/mq)")
                    max_current = maximum_current
                if max_voltage > maximum_voltage:
                    print(f"[ISSUE - MAXIMUM VOLTAGE REACHED] Hey, What the hell are you doing? "\
                          f"({ambiental_temp}°C, {solar_irradiance} w/mq)")
                    max_voltage = maximum_voltage
                
                # Save this pair
                inv_pairs[inv_name].append({
                        'amb_temp': round(ambiental_temp, 1),
                        'solar_irr': solar_irradiance,
                        'max_voltage': max_voltage,
                        'max_current': max_current
                    })
                                           
                if verbose:
                    print("\n" + "-"* 50)
                    print(f"AMBIENTAL CONDITION ({j}/{len(ambiental_temp_values) * len(solar_irradiance_values)}): "\
                          f"{round(interval_amb_temp[0], 2)}:{round(interval_amb_temp[1], 2)} °C || "\
                          f"{interval_solar_irradiance[0]}:{interval_solar_irradiance[1]} w/mq")
                    print(f"NUM OBSERVATIONS FOUND: {num_obs}")
                    print(f"[MAX] Voltage: {max_voltage} V || [MAX] Current: {max_current} A")
                    print("-"* 50)
                    #print(amb_condition_df[relevant_columns])
            else:
                inv_missing_pairs[inv_name].append((ambiental_temp, solar_irradiance))
                    
            # Increment the counter of the loops
            j += 1 
           
    # Save the pairs for each inverter
    print(f"--> Tuples available: {len(inv_pairs[inv_name])}/{len(ambiental_temp_values) * len(solar_irradiance_values)} "\
         f"({round((len(inv_pairs[inv_name])/(len(ambiental_temp_values) * len(solar_irradiance_values)))*100, 2)} %)")
    print(f"--> [STRAT {strat_num}] Average number of observations in each pair of ambiental conditions:", int(round(np.mean(num_obs_counter), 0)))

# Interpolation

In [None]:
from matplotlib import pyplot as plt
        
def generate_sub_graph(fig, idk_plot, inputs, target_values, predicted_values, var_name, pov_elev = 5, pov_angle = -30,
                       visualize_actual_points = False):   
    main_ax = fig.add_subplot(1, 2, idk_plot + 1, projection='3d')
    
    x_values = inputs[:, 0]
    y_values = inputs[:, 1]
    
   
    # Graph
    surf = main_ax.plot_trisurf(x_values, y_values, predicted_values, label = f"Predicted values",
                                cmap = 'cividis', shade = True, linewidth = 1, edgecolor = 'none')
    
    # Fix a library bug
    surf._facecolors2d = surf._facecolor3d
    surf._edgecolors2d = surf._edgecolor3d
    
    # 3D: Point of view
    main_ax.view_init(elev=pov_elev, azim=pov_angle)
    
    
    if visualize_actual_points:
        main_ax.scatter(x_values, y_values, target_values, label = f"Target values", 
                        alpha = 0.5, marker = "x", s=20, color = "grey")
   
    # Graphical parameters
    #main_ax.set_title(var_name, fontsize = 30) #y = 1.2 * np.log10(pov_elev)
    main_ax.set_xlabel('Temperature (°C)', fontsize = 20, labelpad = 10)
    main_ax.set_ylabel('Solar irradiance (w/mq)', fontsize = 20, labelpad = 10)
    main_ax.set_zlabel(var_name, fontsize = 20, labelpad = 10)
    
    #main_ax.invert_yaxis()
    #main_ax.invert_xaxis()

In [None]:
inv_reg_model = dict()
for inv_name in inv_names:
    print("\n" + 40 * "-", inv_name, 40 * "-")
    
    # Retrieve the tuples
    amb_cond_tuples = inv_pairs[inv_name]
    
    # Retrieve list of values
    amb_conditions = []
    voltage_values = []
    current_values = []
    for item in amb_cond_tuples:
        amb_conditions.append((item['amb_temp'], item['solar_irr']))
        voltage_values.append(item['max_voltage'])
        current_values.append(item['max_current'])
    
    # Data
    all_inputs = np.array(amb_conditions)    
    outputs = [np.array(np.array(voltage_values)), np.array(np.array(current_values))]
    output_labels = ['Voltage (V)', 'Current (A)']
    
    # Create the visual panel
    fig = plt.figure(figsize=(20, 20))
    fig.suptitle(f"[{inv_name}] Interpolation", size = 40, y = 0.7)
    
    best_models = []
    for idk, output in enumerate(outputs):
        print("\n" + "-" * 80)
        print(f"FUNCTION ({idk + 1}): {output_labels[idk]} = F(Amb. temperature, solar irradiance)")
        print("-" * 80)
        
        # A.0) Split the data
        train_x_data, train_y_data, test_x_data, test_y_data = interpolation_utils.split_data(x_data = all_inputs, 
                                                                                              y_data = output, 
                                                                                              test_dim = 0.2)
        # A.1) Fit the model
        best_model, best_degree = interpolation_utils.fit_best_polynomial_model(train_x_data, train_y_data, 
                                                                                test_x_data, test_y_data, 
                                                                                force_non_negative_values = False,
                                                                                verbose = True)
        best_models.append(best_model)
        
        # B) Using the fitted model predict all the data
        all_pred_values = best_model.predict(all_inputs, force_non_negative_values = True)
        
        # B.1) Generate the 3-dimensional subplot
        pov_elev = 10
        if idk == 0:
            pov_angle = -40
        elif idk == 1:
            pov_angle = -140
        generate_sub_graph(fig, idk, all_inputs, output, all_pred_values, output_labels[idk], 
                           pov_elev, pov_angle, visualize_actual_points = True)
        
    inv_reg_model[inv_name] = {'voltage': best_models[0], 'current': best_models[1]}
    
    # Visualize the graphical panel
    fig.tight_layout(pad = 4)
    plt.legend(loc = 'best', fontsize=15, markerscale = 2, shadow=True)   
    print("\n", "-" * 50, "Visualize the outcome", "-" * 50)
    plt.show()

# Fill up the missing  ambiental conditions 

In [None]:
for inv_name in inv_names:
    
    # Retrieve the tuples
    amb_cond_tuples = inv_pairs[inv_name]
    print(f"[{inv_name}] CONDITIONS AVAILABLE:", len(amb_cond_tuples))
    
    # Missing pairs
    missing_conditions = inv_missing_pairs[inv_name]
    print("MISSING CONDITIONS:", len(missing_conditions))
    
    # Regression model 
    voltage_reg_model = inv_reg_model[inv_name]['voltage']
    current_reg_model = inv_reg_model[inv_name]['current']
    
    # Predicted max values
    predicted_max_voltage = voltage_reg_model.predict(missing_conditions, force_non_negative_values = False)
    predicted_max_current = current_reg_model.predict(missing_conditions, force_non_negative_values = False)
    
    # Fix negative values
    predicted_max_current[predicted_max_current < 0] = 0
    predicted_max_voltage[predicted_max_voltage < 0] = 0
    
    for idk, (ambiental_temp, solar_irradiance) in enumerate(missing_conditions):
        print(f"\nAMB. CONDITION {idk + 1}/{len(missing_conditions)}: {ambiental_temp}°C || "\
              f"{solar_irradiance} w/mq\n" + "-" * 40)
        
        # Find the respective predictcted max voltage/current
        max_voltage = predicted_max_voltage[idk]
        max_current = predicted_max_current[idk]
        
        # Fix thierpotential issues  (i.e., values exceeding their theorethical maximum)
        if max_current > maximum_current:
            print(f"\n\t[ISSUE - MAXIMUM CURRENT REACHED]\n\tHey, what the hell are you doing? "\
                  f"\n\t--> THEORETICAL MAX: {maximum_current} A || PREDICTED: {round(max_current, 2)} A \n")
            max_current = maximum_current
        if max_voltage > maximum_voltage:
            print(f"\n\t[ISSUE - MAXIMUM VOLTAGE REACHED]\n\tHey, what the hell are you doing? "\
                  f"\n\t--> THEORETICAL MAX: {maximum_voltage} V || PREDICTED: {round(max_voltage, 2)} V\n")
            max_voltage = maximum_voltage
            
        if max_voltage >= 700:
            print("HEY2")
        
        # Visualize the new values
        print(f"[PREDICTED] MAX VOLTAGE: {round(max_voltage, 2)} V")
        print(f"[PREDICTED] MAX CURRENT: {round(max_current, 2)} A")
        
        # Save the new tuple
        amb_cond_tuples.append({
            'amb_temp': round(ambiental_temp, 1),
            'solar_irr': solar_irradiance,
            'max_voltage': max_voltage,
            'max_current': max_current
        })
        
    # Save the new lists 
    inv_pairs[inv_name] = sorted(amb_cond_tuples, key = lambda cond: (cond['amb_temp'], cond['solar_irr']))

## Visualize the new filled space

In [None]:
for inv_name in inv_names:
    print("-" * 35, f"[{inv_name}] CONDITIONS AVAILABLE: {len(amb_cond_tuples)}", "-" * 35)
    
    # 0) Load the conditions
    amb_cond_tuples = inv_pairs[inv_name]
    
    # 0.1) Retrieve list of values
    amb_conditions = []
    voltage_values = []
    current_values = []
    for item in amb_cond_tuples:
        amb_conditions.append((item['amb_temp'], item['solar_irr']))
        voltage_values.append(item['max_voltage'])
        current_values.append(item['max_current'])

    amb_conditions = np.array(amb_conditions)
    voltage_values = np.array(voltage_values)
    current_values = np.array(current_values)
    
    # 1) Create the visual panel
    fig = plt.figure(figsize=(20, 20))
    fig.suptitle(f"[{inv_name}] Interpolation", size = 40, y = 0.7)
    
    # 1.1) [VOLTAGE] Generate the 3-dimensional subplot
    generate_sub_graph(fig, 0, amb_conditions, None, voltage_values, 'Voltage (V)', 
                       pov_elev = 30, pov_angle = 20, visualize_actual_points = False)
    
    # 1.2) [CURRENT] Generate the 3-dimensional subplot
    generate_sub_graph(fig, 1, amb_conditions, None, current_values, 'Current (A)', 
                       pov_elev = 30, pov_angle = 20, visualize_actual_points = False)
    
    # Visualize the graphical panel
    fig.tight_layout(pad = 4)
    plt.legend(loc = 'best', fontsize=15, markerscale = 2, shadow=True)   
    plt.show()