In [1]:
# --------------------------------------------------------------------------

# ----------------- design to establish all conditions for  unsupervised sample selection

# -------------------------------------------------------------------------


# ¡¡¡ --- !!! # ---> modules and data cases

# --- system modules

import sys
import datetime
import os


base_dir = os.path.abspath(os.getcwd() + "/..")

# --- data handling modules

import numpy as np
import pandas as pd
import scipy.io as sp_io
import scipy as sp

# --- visualization modules

import matplotlib.pyplot as plt
import pandas as pd
from matplotlib import rcParams
import plotly.express as px
import plotly.graph_objects as go
from plotly.subplots import make_subplots

# --- my modules

methods_dir = base_dir + '/methods/'  
sys.path.insert(0, methods_dir)
from class_sample_selection import sample_selection
import simpls_module



# ¡¡¡ --- !!! # ---> base working directory and available data cases

# # ************************************ init --- user 
caseID_key = "d01_milk"
y_id = 2 # lactose
# # ************************************ end --- user 



# # ¡¡¡ --- !!! # ---> data

mat_filename = base_dir + "/data/" + caseID_key +".mat"
data_mat = sp_io.loadmat(mat_filename, struct_as_record = False)



xcal = data_mat["xcal"].copy()
ycal = data_mat["ycal"].copy()[:,[y_id]]
xtest = data_mat["xtest"].copy()
ytest = data_mat["ytest"].copy()[:,[y_id]]
chemical_comp_name = data_mat["y_labels"][y_id]

print("--- data ready ---")


--- data ready ---


In [2]:
# --- factors

K = xcal.shape[1]
ncal = xcal.shape[0]
npc_range = list(range(1,26))
npc_range.append(K)
method_name = ["random","ks","duplex","puchwein","clustering","optfederov"]
sample_sizes = list(range(30,ncal,10))



In [3]:
# --- making design


design = []


        
for pc in npc_range:

    for method in method_name:

        for ss in sample_sizes:
            
            if (method!="optfederov" or (pc<=30 and (ss-pc)>=5)) and (method!="puchwein" or (pc>1 and pc<=30)):                

                run = {"npc":pc,
                       "method_name":method,
                       "sample_size":ss}
                
            

                design.append(run)
                    
# pd.DataFrame(design)
print("done")
design[-1]
len(design)

done


4437

In [4]:
df = pd.DataFrame(design)
df[(df["method_name"]=="optfederov")&(df["sample_size"]<40)]

Unnamed: 0,method_name,npc,sample_size
116,optfederov,1,30
290,optfederov,2,30
464,optfederov,3,30
638,optfederov,4,30
812,optfederov,5,30
986,optfederov,6,30
1160,optfederov,7,30
1334,optfederov,8,30
1508,optfederov,9,30
1682,optfederov,10,30


In [5]:
df = pd.DataFrame(design)
df[(df["method_name"]=="puchwein")]

Unnamed: 0,method_name,npc,sample_size
232,puchwein,2,30
233,puchwein,2,40
234,puchwein,2,50
235,puchwein,2,60
236,puchwein,2,70
237,puchwein,2,80
238,puchwein,2,90
239,puchwein,2,100
240,puchwein,2,110
241,puchwein,2,120


In [6]:
# --- all iterations for sample selection

from datetime import datetime

print('finished: ',datetime.now())



selected_samples_dict = {}


for ii in range(len(design)):
    
    if ii%100==0:
        print(ii)

    

    design_setting = design[ii]    

    # - class sample selection

    my_sample_selection = sample_selection(xcal, ncp = design_setting["npc"])  
    my_sample_selection.get_xcal_pca_scores(first_ncp = 0)

    # - sample size

    n_sel = design_setting["sample_size"]

    # - input matrix type for sample selection function arguments

    if design_setting["npc"]>30:

        dim_red = False
        dist_measure = "euclidean"

    else:

        dim_red = True
        dist_measure = "mahalanobis"

    # - select samples based on all settings depending on method

    if design_setting["method_name"] == "random":

        current_samples = my_sample_selection.random_sample(Nout = n_sel)["sample_id"]

    if design_setting["method_name"] == "ks":

        current_samples = my_sample_selection.kennard_stone(Nout = n_sel, fixed_samples=None, dim_reduction=dim_red, distance_measure=dist_measure)["sample_id"]

    if design_setting["method_name"] == "duplex":

        current_samples = my_sample_selection.duplex(Nout = n_sel, dim_reduction=dim_red, distance_measure=dist_measure)["sample_id"]

    if design_setting["method_name"] == "puchwein":
        
        current_samples = np.zeros(1)

        ff_low = 0.0000001
        ff_upp = 1
        ff = (ff_low+ff_upp)/2
        while np.abs(current_samples.sum()-n_sel)>2:
            current_samples = my_sample_selection.puchwein(Nout = n_sel, factor_k=ff, dim_reduction=dim_red, distance_measure=dist_measure)["sample_id"]
            if current_samples.sum() > n_sel:
                ff_low = ff
            else:
                ff_upp = ff
            ff = (ff_low+ff_upp)/2

    if design_setting["method_name"] == "clustering":

        current_samples = my_sample_selection.clustering(Nout = n_sel , dim_reduction=dim_red, distance_measure=dist_measure)["sample_id"]

    if design_setting["method_name"] == "optfederov" and design_setting["npc"] <= 30:

        current_samples = my_sample_selection.optfederov_r(Nout = n_sel, fixed_samples=None, optimality_criterion='D')["sample_id"]


    # - save result

    design_setting["selected_samples"] = current_samples.flatten()


    selected_samples_dict[str(ii)] = design_setting


print('finished: ',datetime.now())

# ¡¡¡ --- !!! ---> save output 

sp_io.savemat(base_dir + "/scripts_output/" + caseID_key + "_01_design_selected_samples.mat", selected_samples_dict)
df_output = pd.DataFrame.from_dict(selected_samples_dict, orient="index")
df_output.to_pickle(base_dir + "/scripts_output/" + caseID_key + "_01_design_selected_samples.pkl")

finished:  2020-12-17 16:31:10.043815
0
100
200
300
400
500
600
700
800
900
1000
1100
1200
1300
1400
1500
1600
1700
1800
1900
2000
2100
2200
2300
2400
2500
2600
2700
2800
2900
3000
3100
3200
3300
3400
3500
3600
3700
3800
3900
4000
4100
4200
4300
4400
finished:  2020-12-17 16:40:37.063134


In [9]:
# --- check cases where sample size was not met


df_puch = df_output[df_output["method_name"]=="puchwein"]

checks = []
for ii in range(df_puch.shape[0]):
    diff = df_puch["sample_size"][ii]-df_puch["selected_samples"][ii].sum()
    checks.append(np.abs(diff))
    if np.abs(diff)>5:
        print(df_puch.iloc[ii], diff)

