In [18]:
# --------------------------------------------------------------------------

# ----------------- dashboard sample selection analysis 

# -------------------------------------------------------------------------


# ¡¡¡ --- !!! # ---> modules and data cases

# --- system modules

import sys
import datetime
import os


base_dir = os.path.dirname(os.path.abspath("."))

# --- data handling modules

import numpy as np
import pandas as pd
import scipy.io as sp_io

# --- visualization modules

import matplotlib.pyplot as plt
import pandas as pd
from matplotlib import rcParams
import plotly.express as px
import plotly.graph_objects as go
from plotly.subplots import make_subplots

# --- my modules

methods_dir = base_dir + '/methodology/python'  
sys.path.insert(0, methods_dir + '/model_building')
sys.path.insert(0, methods_dir + '/read_data')
sys.path.insert(0, methods_dir + '/sample_selection')
from class_chemometrics_data import chemometrics_data
from class_mcw_pls import mcw_pls, mcw_pls_sklearn, min_rmsecv_lv_simpls, cv_lv_simpls
from class_sample_selection import sample_selection
from class_pcr import pcr, pcr_sklearn


# ¡¡¡ --- !!! # ---> base working directory and available data cases


# ************************************ init --- user 
cases_dict = {"d0001": ["d0001_corn", "d0001_data_prepared_02"]}
# ************************************ end --- user 


print("--------- imports loaded ----------")




# ¡¡¡ --- !!! # ---> data


# ************************************ init --- user 
caseID_key = "d0001"
# ************************************ end --- user 

case_dir = cases_dict[caseID_key][0]
dname = cases_dict[caseID_key][1]
data_dir = '/data/' + case_dir + '/data_prepared/'





# ************************************ init --- user
data_class = chemometrics_data(base_dir + data_dir + dname + '.mat', 
                               data_identifier = data_dir + dname,
                               include_val = False,
                               include_test = False,
                               include_unlabeled = False,
                               y_all_range = False,
                               y_range = np.array([0]),
                               obs_all_cal = True,
                              shuffle = False)
# ************************************ end --- user



print(data_class.ncal, data_class.K)

print("--------- data loaded for " + data_class.data_identifier + "----------")




print("--------- functions loaded ----------")


--------- imports loaded ----------
80 700
--------- data loaded for /data/d0001_corn/data_prepared/d0001_data_prepared_02!*moisture*!----------
--------- functions loaded ----------


In [36]:
# ¡¡¡ --- !!! dashboard

output_dir = base_dir + "/figures/" 

def dashboard_sample_selection(selected_rows, a_comp, ss_method,cv_reps=10):



    X0 = data_class.get_cal()["xcal"].copy()
    Xc = X0 - X0.mean(axis=0)
    Y0 = data_class.get_cal()["ycal"].copy()
    Yc = Y0 - Y0.mean(axis=0)



    # ¡¡¡ --- !!! get samples

    cal_samples = selected_rows.copy()
    n_ss = cal_samples.sum()



    # --- pc reconstruction

    X_samples_selected = X0[cal_samples==1,:]
    Xc_with_samples_selected = X0 - X_samples_selected.mean(axis=0)

    u, s, v = np.linalg.svd(X_samples_selected - X_samples_selected.mean(axis=0))

    V_pc = v[0:30,:].T
    T_pc = (Xc_with_samples_selected).dot(V_pc)
    T_pc_norm = T_pc.dot(np.diag(1/np.sqrt(np.diag(T_pc.T.dot(T_pc)))))
    X_pc = Xc_with_samples_selected.dot(V_pc.dot(V_pc.T)) + X_samples_selected.mean(axis=0)

    A = T_pc_norm.T.dot(T_pc_norm)
    X_cov_pc = np.abs(A)    
    criterion = np.linalg.det(A)




    # ---- cross validation pls

    Xcal = X0[cal_samples==1,:]
    Ycal = Y0[cal_samples==1,:]
    Xval = X0[cal_samples==0,:]
    Yval = Y0[cal_samples==0,:]


    X2 = np.concatenate((Xcal, Xval), axis=0)
    Y2 = np.concatenate((Ycal, Yval), axis=0)


    lv = 25


    rmsecv = cv_lv_simpls(Xcal, Ycal, total_ncp = lv, total_repetitions=cv_reps)
    #print(np.amin(rmsecv), np.amax(rmsecv))

    final_lv = np.argmin(rmsecv[:,0])+1

    # ---- final calibration


    my_simpls = mcw_pls(Xcal, Ycal, final_lv)
    my_simpls_trained = my_simpls.train(iters = 0, current_R0 = None)
    my_simpls_cv_pred = my_simpls.crossval_KFold(train_object = my_simpls_trained["train_object"],number_splits=10)


    # --- val    

    my_simpls_yval_pred = my_simpls.predict(X = Xval,mcw_pls_output = my_simpls_trained)

   
    # --- performance
    
     # cv and val

    all_predicted = np.concatenate((my_simpls_cv_pred["cv_predicted"], my_simpls_yval_pred), axis = 0)
    all_y = np.concatenate((Ycal,Yval), axis=0)

    r2_cv_val = 1-((np.power(all_y-all_predicted,2).sum(axis=0))/(np.power(all_y-Ycal.mean(axis=0),2).sum(axis=0)))

   
    
    performance_cv_val = "r2 cv+val: " + str(np.round(r2_cv_val[0],4)) 
 

    # ---- dashboard

    fig = make_subplots(rows = 2, cols=2)

    fig.add_trace(go.Heatmap(x = np.arange(1,X_cov_pc.shape[0]+1,1),
                             y = np.arange(1,X_cov_pc.shape[0]+1,1),
                             z=X_cov_pc, colorbar={'x':-0.15, 'y':0.8,'len':0.5},
                            zmin = 0,zmax = 0.3), row=1, col = 1)


    for rep in range(1,rmsecv.shape[1]):
        fig.add_trace(go.Scatter(x = np.arange(1, lv+1), y = rmsecv[:,rep], showlegend = False, marker = {'color':'gray'}, opacity= 0.3), row=1, col = 2)
    fig.add_trace(go.Scatter(x = np.arange(1, lv+1), y = rmsecv[:,0], showlegend = False, marker = {'color':'blue'}), row=1, col = 2)
    fig.add_trace(go.Scatter(x = [final_lv], y = [rmsecv[final_lv-1,0]],mode = "markers", marker = {'color':'red', 'size': 10}, showlegend=False), row=1, col = 2)


    fig.add_trace(go.Scatter(x = Ycal[:,0], y = my_simpls_cv_pred['cv_predicted'][:,0], mode = "markers", marker = {'color':'purple', 'opacity': 0.6}, name = "cv"), row=2, col = 1)
    fig.add_trace(go.Scatter(x = Yval[:,0], y = my_simpls_yval_pred[:,0], mode = "markers", marker = {'color':'orange', 'opacity': 0.6}, name = "val"), row=2, col = 1)
    fig.add_trace(go.Scatter(x = [np.amin(Ycal), np.amax(Ycal)], y = [np.amin(Ycal), np.amax(Ycal)], showlegend = False, marker = {'color':'black'}), row=2, col = 1)



 
    fig['layout']['annotations'] = [{'xref':'x3', 'yref':'y3','x':10, 'y':11,'text':performance_cv_val,'showarrow':False, 'font':{'size':18}}]
    

    fig['layout']['yaxis']['autorange'] = "reversed"
    fig['layout']['yaxis']['title'] = "pc"
    fig['layout']['xaxis']['title'] = "pc"

    fig['layout']['yaxis2']['range'] = [np.amin(rmsecv), np.amax(rmsecv)]
    fig['layout']['yaxis2']['title'] = "rmsecv"
    fig['layout']['yaxis2']['ticks'] = "inside"
    fig['layout']['yaxis2']['showgrid'] = True
    fig['layout']['yaxis2']['mirror'] = "ticks"
    fig['layout']['yaxis2']['gridcolor'] = "#F3F2F2"
    fig['layout']['yaxis2']['linecolor'] = "black"

    fig['layout']['xaxis2']['title'] = "number of lv"
    fig['layout']['xaxis2']['tickvals'] = np.arange(1, lv+1)
    fig['layout']['xaxis2']['ticks'] = "inside"
    fig['layout']['xaxis2']['showgrid'] = True
    fig['layout']['xaxis2']['mirror'] = "ticks"
    fig['layout']['xaxis2']['gridcolor'] = "#F3F2F2"
    fig['layout']['xaxis2']['linecolor'] = "black"



    fig['layout']['yaxis3']['title'] = "predicted"
    fig['layout']['yaxis3']['ticks'] = "inside"
    fig['layout']['yaxis3']['showgrid'] = True
    fig['layout']['yaxis3']['mirror'] = "ticks"
    fig['layout']['yaxis3']['gridcolor'] = "#F3F2F2"
    fig['layout']['yaxis3']['linecolor'] = "black"

    fig['layout']['xaxis3']['title'] = "observed"
    fig['layout']['xaxis3']['ticks'] = "inside"
    fig['layout']['xaxis3']['showgrid'] = True
    fig['layout']['xaxis3']['mirror'] = "ticks"
    fig['layout']['xaxis3']['gridcolor'] = "#F3F2F2"
    fig['layout']['xaxis3']['linecolor'] = "black"
    
    




    



    fig.update_layout(width = 1100, height = 800,legend_orientation="h" , plot_bgcolor= "white",
                      title = data_class.y_names[0] + " - "+ss_method + " - " + str(n_ss)+" samples  -  criterion: " + str(np.round(criterion,4)) 
                     )
    
    
    figure_name = data_class.y_names[0].replace(" ","") + "_"+ss_method + "_" + str(n_ss)+" samples"
    
    fig.write_image(output_dir+figure_name+".png")

In [37]:
# --- get dashboard

my_ss = sample_selection(data_class.get_cal()["xcal"], ncp = 30)
my_ss.get_xcal_pca_scores()
current_samples = my_ss.clustering(Nout=50)["sample_id"][:,0]

dashboard_sample_selection(selected_rows=current_samples, a_comp=30, ss_method = "example_method",cv_reps=10)
print("finished. Check your figure in figures folder")


finished
