# MPPT Analysis
## Select Batches

In [11]:
%matplotlib ipympl
%load_ext autoreload
%autoreload 2
import ipywidgets as widgets
from IPython.display import display, Markdown, HTML, Latex
import os
import sys
import pandas as pd
import numpy as np
import itertools
import plotly.io as pio
import plotly.graph_objects as go
import plotly.express as px

sys.path.append(os.path.dirname(os.getcwd()))
from api_calls import get_ids_in_batch, get_sample_description, get_all_mppt
import batch_selection
import plotting_utils
import access_token

url_base ="https://nomad-hzb-se.de"
url = f"{url_base}/nomad-oasis/api/v1"
token = access_token.get_token(url)

The autoreload extension is already loaded. To reload it, use:
  %reload_ext autoreload


In [12]:
warning_sign = "\u26A0"

out = widgets.Output()
out2 = widgets.Output()
read = widgets.Output()
dynamic_content = widgets.Output()  # For dynamically updated content
results_content = widgets.Output(layout={
    # 'border': '1px solid black',  # Optional: adds a border to the widget
    'max-height': '1000px',  # Set the height
    'overflow': 'scroll',  # Adds a scrollbar if content overflows
    })
cell_edit = widgets.VBox()

default_variables = widgets.Dropdown(
    options=['sample name', 'batch',"sample description", 'custom'],
    index=0,
    description='name preset:',
    disabled=False,
    tooltip="Presets for how the samples will be named in the plot"
)

#global dictionary to hold data
data = {}

#used in the fitting section, must be defined here so that the bounds can be set properly
fit_interval= widgets.FloatRangeSlider(min=0, max=1000,
                                       value=(0, 1000),
                                       step=1,
                                       description="interval",
                                       tooltip="time interval to use for model fitting")

#widget group for selecting samples and naming them
class cellSelector(widgets.widget_box.VBox):
    def __init__(self, sample_id, default, cell_box):
        self.sample_id = sample_id
        self.sample_id_text = widgets.Label(value=sample_id, layout={'width': '200px'})
        self.count_text = widgets.Label(layout={'width': '100px'})
        
        item_split = sample_id.split("&")
        batch, variable = "", sample_id
        if len(item_split) >=2:
            batch, variable = item_split[0], "&".join(item_split[1:])
        if default == "batch":
            default_value = batch if batch else "_".join(sample_id.split("_")[:-1])
        elif default == "sample name":
            default_value = variable
        elif default == "sample description":
            default_value = data["properties"].loc[sample_id, "description"]
        else:
            default_value = ""
        self.text_input = widgets.Text(value=default_value, placeholder='sample name in plot', layout={'width': '300px'})
        
        self.display_all_button = widgets.Button(description="use all", layout={'width': '100px'}, tooltip="use every measurement of this sample")
        self.display_none_button = widgets.Button(description="use none", layout={'width': '100px'}, tolltip="use no measurements of this sample")
        self.edit_curves_button = widgets.Button(description="expand options", layout={'width': '100px'}, tooltip="select which measurements of this sample to use")
        self.display_all_button.on_click(self.select_all)
        self.display_none_button.on_click(self.disselect_all)
        self.edit_curves_button.on_click(self.expand_options)
        
        super().__init__([widgets.HBox([self.sample_id_text, self.count_text]), 
                          self.text_input, 
                          widgets.HBox([self.display_all_button, self.display_none_button, self.edit_curves_button])])
        
        self.select_individual_cells = []
        self.name_defaults= []
        self.name_individual_cells = []
        for i in data["entries"].loc[sample_id].index:
            current_select_box=widgets.Checkbox(description = data["entries"].loc[(sample_id,i),"entry_names"], value=True)
            current_select_box.observe(self.update_count,"value")
            self.select_individual_cells.append(current_select_box)
            self.name_individual_cells.append(widgets.Text(placeholder="measurement name"))
            self.name_defaults.append(data["entries"].loc[(sample_id,i),"entry_names"].removeprefix(sample_id))
        
        self.individual_widget_list = [widgets.HBox([self.select_individual_cells[i],self.name_individual_cells[i]]) for i in range(len(self.name_individual_cells))]
        #box for containing the widgets for editing individual curve names and visibility
        self.edit_box = cell_box
        
        #initialize value for the counter text
        self.update_count(None)
    def get_name(self):
        if not self.text_input.value.strip():
            return self.sample_id
        else:
            return self.text_input.value
    def get_cell_selection(self):
        return [cell.value for cell in self.select_individual_cells]#, index=data["params"].loc[self.sample_id].index)
    def get_curve_names(self):
        name_list = []
        for i, text_field in enumerate(self.name_individual_cells):
            if text_field.value.strip():
                name_list.append(text_field.value)
            else:
                name_list.append(self.name_defaults[i])
        return name_list
    
    def select_all(self,b):
        for button in self.select_individual_cells:
            button.value=True
    def disselect_all(self,b):
        for button in self.select_individual_cells:
            button.value=False
    def expand_options(self,b):
        self.edit_box.children = self.individual_widget_list
    def update_count(self,change):
        self.count_text.value=f"{self.get_cell_selection().count(True)}/{len(self.select_individual_cells)} shown"

def create_widgets_table(elements_list):
    rows=[]
    selectors_dict={}
    for sample_id in elements_list:
        select = cellSelector(sample_id, default_variables.value, cell_edit)
        rows.append(select)
        selectors_dict[sample_id]=select
    return widgets.VBox(rows), selectors_dict

#take list of sample ids and return mppt data as data frames
def get_mppt_data(try_sample_ids):  
    all_mppt = get_all_mppt(url, token, try_sample_ids)  
    existing_sample_ids = pd.Series(all_mppt.keys())  

    # Check if there's any MPPT data  
    if len(existing_sample_ids) == 0:  
        return None, None, None  # Return None values to indicate no data  

    mppt_curves_list = []  
    description_list = []  
    for sample_data in all_mppt:  
        entry_names_list = []  
        entry_description_list = []  
        sample_curves_list = []  
        for mppt_entry in all_mppt.get(sample_data):  
            sample_curves_list.append(pd.DataFrame(mppt_entry[0], columns=["time", "power_density", "voltage", "current_density"]))  
            entry_names_list.append(mppt_entry[0]["name"])  
            entry_description_list.append(mppt_entry[0].get("description",""))  

        # Only try to concatenate if there's data  
        if sample_curves_list:  
            mppt_curves_list.append(pd.concat(sample_curves_list, keys=np.arange(len(sample_curves_list))))  
            description_list.append(pd.DataFrame({"entry_names":entry_names_list, "entry_description":entry_description_list}))  

    # Only try to concatenate if there's data  
    if mppt_curves_list and description_list:  
        return pd.concat(mppt_curves_list, keys=existing_sample_ids), existing_sample_ids, pd.concat(description_list, keys=existing_sample_ids)  
    else:  
        return None, None, None

def on_load_data_clicked(batch_ids_selector):  
    dynamic_content.clear_output()  
    with out:  
        out.clear_output()  
        print("Loading Data")  

        try_sample_ids = get_ids_in_batch(url, token, batch_ids_selector.value)  

        #extract MPPT here  
        data["curves"], data["sample_ids"], data["entries"] = get_mppt_data(try_sample_ids)  

        # Check if MPPT data was found  
        if data["curves"] is None:  
            out.clear_output()  
            print("The batches selected don't contain any MPPT measurements")  
            return  

        #make current and power positive  
        data["curves"].loc[:,"power_density"] *=-1  
        data["curves"].loc[:,"current_density"] *=-1  

        #convert seconds to hours  
        data["curves"].loc[:,"time"] *= 1/3600  

        identifiers = get_sample_description(url, token, list(data["sample_ids"]))  
        data["properties"]=pd.DataFrame({"description":pd.Series(identifiers),"name":pd.Series()})  

        data["entries"].loc[:,"plot"]=False  
        data["entries"].loc[:,"name"]=""  

        out.clear_output()  
        print("Data Loaded")  

    #set minimum and maximum bounds for the fitting range selection  
    minimum_time = data["curves"].loc[:,"time"].min()  
    maximum_time = data["curves"].loc[:,"time"].max()  
    fit_interval.min=minimum_time  
    fit_interval.max=maximum_time  
    fit_interval.value=(minimum_time,maximum_time)  
    fit_interval.step=(maximum_time-minimum_time)/1000  

    data["curves"].to_csv("mppt_curve.csv")  
    data["entries"].to_csv("mppt_entries.csv")  
    data["properties"].to_csv("mppt_properties.csv")  
    make_variables_menu(data["sample_ids"])  
    return

def make_variables_menu(sample_ids):
    variables_markdown = f"""
# 2 Dataset names
{len(sample_ids)} samples have been found.  
Enter the name of the samples that should be used in the plot.
""" 
    #results_markdown = brief_data_summary(data['jvc'])
    with dynamic_content:
        display(results_content)
        display(Markdown(variables_markdown))
        display(default_variables)
        
        widgets_table, selectors_dict = create_widgets_table(sample_ids)
              
        retrieve_button = widgets.Button(description="Confirm Names", button_style='primary')
        retrieve_button.on_click(lambda b: on_confirm_clicked(selectors_dict))
        
        display(widgets.HBox([widgets_table, cell_edit]))
        button_group = widgets.HBox([retrieve_button, read])
        display(button_group)
    
    #create_overview_table(results_content)
    with read:
        read.clear_output()
        print(f"{warning_sign} Selection not confirmed")
    return

def on_change_default_variables(b):
    dynamic_content.clear_output()
    make_variables_menu(data["sample_ids"])

def on_confirm_clicked(selectors_dict):
    name_dict = {}
    read.clear_output()
    for item, selector_widget in selectors_dict.items():
        #print(item, text_widget.value)
        name_dict[item] = selector_widget.get_name()
        data["entries"].loc[item, "plot"] = selector_widget.get_cell_selection()
        data["entries"].loc[item, "name"] = selector_widget.get_curve_names()
    data["properties"].loc[:,"name"] = pd.Series(name_dict)
    with read:
        print("Selection confirmed")

default_variables.observe(on_change_default_variables,names=['value'])

display(plotting_utils.create_manual("mppt_manual.md"))

display(batch_selection.create_batch_selection(url, token, on_load_data_clicked))
display(out)
display(dynamic_content)  # This will be updated dynamically with the variables menu

VBox(children=(ToggleButton(value=False, description='Manual'), Output()))

VBox(children=(Text(value='', description='Search Batch'), SelectMultiple(description='Batches', layout=Layout…

Output()

Output()

In [7]:
#contains some models and utility functions
from fitting_tools import *
#import lmfit #easy to use library for curve fitting, like scipy.optimize but also calculates R^2 and other things for you

# constructor signature: name, function for parameters, name for plotting, column names, Latex description (optional)
# one column should be named "$R^2$" to compare the fit quality of different models 
available_fit_model_list = [fit_model("linear", linear_params, "linear ", 
                            ["a", "b", "$R^2$","t80","LE"], 
                            description="$P(t)=at+b$"), 
                  fit_model("exponential", exponential_params, "exponential ", 
                            ["$A$","$\tau$", "$R^2$","t80","LE"], 
                            description="$P(t)=A\\exp{(-\\frac{t}{\\tau})}$"), 
                  fit_model("biexponential", biexponential_params, "biexponential ", 
                            ["$A_1$","$\tau_1$","$A_2$","$\tau_2$", "$R^2$","tS","Ts80","LE"], 
                            description="$P(t)=A_1\\exp{(-\\frac{t}{\\tau_1})}+A_2\\exp{(-\\frac{t}{\\tau_2})}$"), 
                  fit_model("logexp", logistic_params, "logistic + exp ", 
                            ["A", "$\tau$", "L", "k", "t0", "$R^2$", "tS", "Ts80", "LE"], 
                            description="$P(t)=A\\exp{(-\\frac{t}{\\tau})}+\\frac{L}{1+\\exp{(-k(t-t_0)})}$"), 
                  fit_model("stretched_exponential", stretched_exponential_params, "stretched exp ", 
                            ["A", "$\tau$", "$\\beta$", "$R^2$", "T80", "LE"], 
                            description="$P(t)=A\\exp{(-(\\frac{t}{\\tau})^{\\beta})}$"), 
                  fit_model("errorfunctionXlinear", erfc_params, "errorfunction x linear ", 
                            ["$P_0$", "k", "$t_0$", "b", "$R^2$", "tS", "Ts80", "LE"], 
                            description="$P(t)=0.5(1-\\text{erf}(\\frac{t-t_0}{b}))(P_0-kt)$")
                 ]

#will be filled with the selected models
fit_model_list=[]

#widget to select which fit to plot, used one block down
plot_fit_selector = widgets.RadioButtons(options=[("None",False), ("Best (highest R²)", "best")]+[(model.name, model) for model in fit_model_list], index=1)

fit_model_selector = widgets.SelectMultiple(options=[(model.name, model) for model in available_fit_model_list], 
                                            description='Select models',  
                                            layout=widgets.Layout(width='400px', height='300px'), 
                                            tooltip="models that will be fitted to the measurements")
fit_button = widgets.Button(description="perform fit", button_style='primary')
fit_output = widgets.Output() #for the table containg the fitting parameters

def execute_fitting(b):
    fit_model_list = list(fit_model_selector.value)
    plot_fit_selector.options=[("None",False), ("Best (highest R²)", "best")]+[(model.name, model) for model in fit_model_list]
    
    for model in fit_model_list:
        model.data = pd.DataFrame(index=data["entries"].index, columns=model.columns)
    
    #function to get the proper interval of the time and power values
    def get_interval_data(index):
        power = data["curves"].loc[index, "power_density"].to_numpy()
        time = data["curves"].loc[index, "time"].to_numpy()
        
        boundry_left = np.argmin(abs(time - fit_interval.value[0]))
        boundry_right = np.argmin(abs(time - fit_interval.value[1]))
        
        return power[boundry_left:boundry_right], time[boundry_left:boundry_right]
    
    fitted_curves_list = []
    for index in data["entries"].index:
        power, time = get_interval_data(index)
        current_fitted_curves = pd.DataFrame({"time":time})
        for model in fit_model_list:
            model.data.loc[index], current_fitted_curves.loc[:,model.name] = model.parfunc(power, time)
        
        fitted_curves_list.append(current_fitted_curves)
    data["fitted_curves"]=pd.concat(fitted_curves_list, keys=data["entries"].index)

    all_fit_results=pd.concat([model.data for model in fit_model_list], axis=1, keys=[model.name for model in fit_model_list])
    
    with fit_output, pd.option_context('display.float_format', '{:,.2e}'.format):
        fit_output.clear_output()
        display(Markdown("fitting models for Power over time P(t)"))
        for model in fit_model_list:
            display(Latex(model.name + ": " + model.description))
        display(HTML(all_fit_results.to_html()))
        all_fit_results.to_csv("mppt_fitting_results.csv")
    return

fit_button.on_click(execute_fitting)

display(fit_interval, fit_model_selector, fit_button, fit_output)

FloatRangeSlider(value=(7.866666666666667e-06, 0.050913888888888884), description='interval', max=0.0509138888…

SelectMultiple(description='Select models', layout=Layout(height='300px', width='400px'), options=(('linear', …

Button(button_style='primary', description='perform fit', style=ButtonStyle())

Output()

In [8]:
template = pio.templates["plotly_white"]
template.data.scatter = [go.Scatter(line_color=color) for color in px.colors.qualitative.Vivid]

# Plot MPPT Curves

In [9]:
curve_out = widgets.Output()

def update_curve_plot(b):
    #create looping iterator for setting colors
    color_iterator = itertools.cycle(px.colors.qualitative.Vivid)
    
    with curve_out:
        #plot power on y axis if fitted curves should be plotted as well
        if plot_fit_selector.value:
            axis_title, column_name = ("power density","power_density")
        else:
            axis_title, column_name = unit_selector.value
        
        layout = go.Layout(
            width=curve_options.width.value,
            height=curve_options.height.value,
            xaxis={"title":{"text":"time"}},
            yaxis={"title":{"text":axis_title}},
            template=template)
        
        curve_out.clear_output()
        figure = go.Figure(layout=layout)
        
        #iterate over every sample and cell with parameter plot set to true
        for sample_id in data["sample_ids"]:
            sample_name = data["properties"].loc[sample_id,"name"]
            samples_filtered =data["entries"].loc[sample_id].loc[data["entries"].loc[sample_id]["plot"]]  
            for i in samples_filtered.index:
                color= next(color_iterator)
                name = curve_options.name.value(sample_name,samples_filtered.loc[i,"name"])
                
                figure.add_scatter(x=data["curves"].loc[(sample_id, i),"time"],
                                   y=data["curves"].loc[(sample_id, i),column_name],
                                   name=name,
                                   line_color=color,
                                  )
                
                if not plot_fit_selector.value:
                    continue
                elif plot_fit_selector.value == "best":
                    fit_model_list=list(fit_model_selector.value) #no idea why this line is needed
                    model = max(fit_model_list, key= lambda model: model.data.loc[(sample_id, i), "$R^2$"])
                else:
                    model = plot_fit_selector.value

                figure.add_scatter(x=data["fitted_curves"].loc[(sample_id, i),"time"],
                                   y=data["fitted_curves"].loc[(sample_id, i),model.name],
                                   name=model.abbreviated_name + name,
                                   line_color=color
                                  )
        figure.show()

#options contains list of tupels with contents (description, value), in this case value is (axis title, column name) 
unit_selector = widgets.ToggleButtons(options=[("power density",("power density","power_density")),
                                               ("voltage",("voltage", "voltage")), 
                                               ("current density",("current density","current_density"))], 
                                      index=0, tooltip="only relevant when no fit is plotted")

curve_button = widgets.Button(description="refresh plot", button_style='primary')
curve_button.on_click(update_curve_plot)

curve_options = plotting_utils.plot_options(default_name=0)

display(plot_fit_selector, unit_selector, curve_options, curve_button, curve_out)

RadioButtons(options=(('None', False), ('Best (highest R²)', 'best'), ('biexponential', <fitting_tools.fit_mod…

ToggleButtons(options=(('power density', ('power density', 'power_density')), ('voltage', ('voltage', 'voltage…

plot_options(children=(ToggleButtons(description='select how the datasets will be named', options=(('sample + …

Button(button_style='primary', description='refresh plot', style=ButtonStyle())

Output()

# Area Plot
Measurements with the same name are grouped together

In [10]:
def update_area_plot(b):
    #create looping iterator for setting colors
    color_iterator = itertools.cycle(px.colors.qualitative.Vivid)
    
    axis_title, column_name = area_unit_selector.value
    
    with area_out:
        layout = go.Layout(
            width=curve_options.width.value,
            height=curve_options.height.value,
            xaxis={"title":{"text":"time"}},
            yaxis={"title":{"text":axis_title}},
            template=template)
        area_out.clear_output()
        figure = go.Figure(layout=layout)
        
        #Dictionary with every unique given name as index, contains list of all curves that have this given name
        data_organized_by_given_name = {}
        for sample_id in data["sample_ids"]:
            sample_name = data["properties"].loc[sample_id,"name"]
            samples_filtered =data["entries"].loc[sample_id].loc[data["entries"].loc[sample_id]["plot"]]  
            for i in samples_filtered.index:
                name = area_options.name.value(sample_name, samples_filtered.loc[i,"name"])
                if name not in data_organized_by_given_name.keys():
                    data_organized_by_given_name[name]=[]
                data_organized_by_given_name[name].append(data["curves"].loc[(sample_id, i),:])

        #print(data_organized_by_given_name)
        for name, curve_list in data_organized_by_given_name.items():
            #get minimum and maximum times
            max_time=max([curve.loc[curve.index[0],"time"] for curve in curve_list])
            min_time=min([curve.loc[curve.index[-1],"time"] for curve in curve_list])

            xcoords = np.linspace(min_time, max_time, 500)

            interpolated_curves = pd.DataFrame(map(lambda curve : np.interp(xcoords, 
                                                                            curve.loc[:,"time"], 
                                                                            curve.loc[:,column_name], 
                                                                            left=np.nan, right=np.nan), 
                                                   curve_list))

            curve_stats = pd.DataFrame([interpolated_curves.mean(), 
                                        interpolated_curves.std(), 
                                        interpolated_curves.median(), 
                                        interpolated_curves.quantile(q=0.25,interpolation='linear'),
                                        interpolated_curves.quantile(q=0.75,interpolation='linear')
                                       ],
                                       index = ["mean","std","median","lower_quartile","upper_quartile"]
                                      )

            #Plot the results
            color = next(color_iterator)
            if standart_deviation_area.value:
                figure.add_scatter(x=np.concatenate([xcoords,xcoords[::-1]]), #[indices, reversed indices]
                                   y= pd.concat([curve_stats.loc["mean",:] + curve_stats.loc["std",:], 
                                                 curve_stats.loc["mean",::-1] - curve_stats.loc["std",::-1]]), #[mean+std, mean-std in reversed order]
                                   line_color='rgba(255,255,255,0)', #make outline of area invisible
                                   fillcolor=f"rgba({color[4:-1]},0.2)", #manipulate color string to add transparency
                                   fill="toself",
                                   legendgroup=name,
                                   showlegend=False,
                                   name=name)
                figure.add_scatter(x=xcoords, 
                                   y=curve_stats.loc["mean",:], 
                                   name=name, 
                                   line_color=color,
                                   legendgroup=name)
            else:
                figure.add_scatter(x=np.concatenate([xcoords,xcoords[::-1]]), #[indices, reversed indices]
                                   y= pd.concat([curve_stats.loc["lower_quartile",:], curve_stats.loc["upper_quartile",::-1]]), #[lower qurtile, upper quartile in reversed order]
                                   line_color='rgba(255,255,255,0)', #make outline of area invisible
                                   fillcolor=f"rgba({color[4:-1]},0.2)", #manipulate color string to add transparency
                                   fill="toself",
                                   legendgroup=name,
                                   showlegend=False,
                                   name=name)
                figure.add_scatter(x=xcoords, 
                                   y=curve_stats.loc["median",:], 
                                   name=name, 
                                   line_color=color,
                                   legendgroup=name)
        figure.show()

area_out = widgets.Output()
standart_deviation_area = widgets.ToggleButtons(description="group type",options=[("median, quartiles",False),("mean, std",True)], index=0)
area_options = plotting_utils.plot_options(default_name=1)
area_unit_selector = widgets.ToggleButtons(options=[("power density",("power density","power_density")),
                                               ("voltage",("voltage", "voltage")), 
                                               ("current density",("current density","current_density"))], 
                                      index=0)
area_button = widgets.Button(description="refresh plot", button_style='primary')
area_button.on_click(update_area_plot)

display(standart_deviation_area, area_unit_selector, area_options, area_button, area_out)

ToggleButtons(description='group type', options=(('median, quartiles', False), ('mean, std', True)), value=Fal…

ToggleButtons(options=(('power density', ('power density', 'power_density')), ('voltage', ('voltage', 'voltage…

plot_options(children=(ToggleButtons(description='select how the datasets will be named', index=1, options=(('…

Button(button_style='primary', description='refresh plot', style=ButtonStyle())

Output()