# Perform property prediction using multiple machine learning techniques

Machine learning techniques include principle component regression, partial least squares regression, least absolute shrinkage and selection operator, linear regression, and random forest on 39 polyolefin samples, including 19 non-blended polymers, 11 HDPE/PP blends and 9 HDPE/LDPE blends

## Dependencies

In [None]:
# plotting
import matplotlib.pyplot as plt
import numpy as np
import matplotlib.markers as pltmarkers
from matplotlib.lines import Line2D

# data handling
import glob
import os
import pandas as pd

# machine learning
import sklearn
from sklearn.preprocessing import StandardScaler
from sklearn.decomposition import PCA
from sklearn.cross_decomposition import PLSRegression
from sklearn.linear_model import Lasso
from sklearn.ensemble import RandomForestRegressor
from sklearn.model_selection import KFold

# regression
from sklearn.linear_model import LinearRegression

# model performance evaluation
from collections import Counter, defaultdict
from sklearn.metrics import r2_score, mean_squared_error

## Read spectra data and separate intensity and wavenumber into two dataframes

In [None]:
# get file list
folderpath = "../Data/NIR/"
joined_files = os.path.join(folderpath, "*.csv")

joined_list = glob.glob(joined_files)

# combine wavenumbers in one dataframe
dfq = pd.concat(
    [
        pd.read_csv(file, header=None, names=[file[len(folderpath) : -4]], usecols=[0])
        for file in joined_list
    ],
    axis=1,
)

# combine intensities in one dataframe
dfI = pd.concat(
    [
        pd.read_csv(file, header=None, names=[file[len(folderpath) : -4]], usecols=[1])
        for file in joined_list
    ],
    axis=1,
)

### Plot spectra

In [None]:
plt.figure(figsize=(5, 4))

for col in dfI.columns:
    plt.plot(dfq[col], dfI[col], label=col)

plt.xlabel("Wavenumber [cm$^{-1}$]", fontsize=14)
plt.ylabel("Intensity", fontsize=14)

plt.show()

### Check to make sure all wavenumbers are the same across all samples

In [None]:
dfq = dfq.T.drop_duplicates().T
if len(dfq.columns) == 1:
    q = dfq.to_numpy()
else:
    print("Wavenumbers vary across samples. Code needs to be modified.")

## RNV preprocessing

In [None]:
def RNVpreprocessing(I):
    """Perform RNV preprocessing."""

    quart1 = np.percentile(I, 25)
    quart3 = np.percentile(I, 75)

    mask = np.logical_and(quart1 <= I, I <= quart3)

    mean = I[mask].mean()
    std = I[mask].std()

    IRNV = (I - mean) / std

    return IRNV

In [None]:
dfRNV = dfI.apply(RNVpreprocessing, axis=0)

# plot the results
plt.figure(figsize=(5, 4))
dfRNV.plot(legend=False)

plt.xlabel("Wavenumber [cm$^{-1}$]", fontsize=14)
plt.ylabel("Intensity", fontsize=14)

plt.show()

RNVI = dfRNV.to_numpy().T
nspectra = RNVI.shape[0]

# export wavenumbers
np.savetxt("../ModelData/RNV_wavenumbers.csv", dfq, delimiter=",", header="Wavenumbers")

# export RNV intensities
np.savetxt("../ModelData/RNV_intensities.csv", RNVI, delimiter=",")

column_names = dfI.columns.tolist()
with open("../ModelData/RNV_column_names.txt", "w") as f:
    for col_name in column_names:
        f.write(f"{col_name}\n")

## Read property data

In [None]:
file_path = "../Data/PropertyMeasurements.csv"

dfprop = pd.read_csv(file_path, usecols=[0, 1, 2, 3])

# remove units for ease of use
header = dfprop.columns.tolist()
header = [label.split()[0] for label in header]
dfprop.columns = header

dfprop.head()

## Labeling polymer types by type based on file name

In [None]:
# get list of samples by removing replicas
samples = sorted(list(set([file[:-2] for file in list(dfI.columns)])))

labels = (
    samples.copy()
)  # labels is a copy of the samples list, which will be modified to hold the corresponding labels for each sample
indices = np.arange(len(samples))
labeldict = {"HDPE": 0, "LDPE": 1, "LLDPE": 2, "PP": 3, "PE/PP blend": 4, "PE blend": 5}

# assign labels
for i, single in enumerate(samples):
    if "L_" in single:
        labels[i] = "PE blend"
    elif "HDPE_" in single:
        labels[i] = "PE/PP blend"
    elif "MDPE" in single:
        labels[i] = "LDPE"
    elif "LLDPE" in single:
        labels[i] = "LLDPE"
    elif "LDPE" in single:
        labels[i] = "LDPE"
    elif "HDPE" in single:
        labels[i] = "HDPE"
    elif "PP" in single:
        labels[i] = "PP"
    else:
        print("Error.")

for i in range(len(samples)):
    print(samples[i], "\t", labels[i], "\t", labeldict[labels[i]])
numlabels = np.array(
    [labeldict[entry] for entry in labels]
)  # convert a list of labels into an array of numerical labels

np.savetxt("../ModelData/samples.txt", samples, delimiter=",", fmt="%s")
np.savetxt("../ModelData/labels.txt", labels, delimiter=",", fmt="%s")
np.savetxt("../ModelData/numlabels.csv", numlabels, delimiter=",", fmt="%d")

### Define get data function for machine learning

In [None]:
def getXy(index, samples, dfRNV, propvalues, proplabel):
    """Get X and y data based on indices."""

    # get X
    samplelist = [samples[i] + "_" + str(j + 1) for i in index for j in range(6)]
    X = dfRNV[samplelist].to_numpy().T

    # get y
    propdict = pd.Series(
        propvalues, index=proplabel
    ).to_dict()  # creating a dictionary (propdict) from a Pandas Series
    prop = np.array(
        [propdict[samples[i]] for i in index]
    )  # retrieves the corresponding sample name from the samples list and uses it as a key to fetch the corresponding density value from the propdict dictionary
    y = np.repeat(
        prop, 6
    )  # repeats each element of the prop array six times to match 6 spectra replicates

    return X, y

## Centering and Normalization (additional preprocessing after RNV)

In [None]:
def center_and_normalize(Xtrain, Xtest):
    """Center and normalize the training and test data."""
    scaler = StandardScaler()
    Xtrain_scaled = scaler.fit_transform(Xtrain)
    Xtest_scaled = scaler.transform(Xtest)

    return Xtrain_scaled, Xtest_scaled

# Define ML models for Property Prediction

## Principal Component Analysis followed by Regression

In [None]:
def PCR(Xtrain, ytrain, Xtest, n):
    """Perform Principal Component Analysis followed by Regression."""

    # center and normalize data
    Xtrain_scaled, Xtest_scaled = center_and_normalize(Xtrain, Xtest)

    pca = PCA()
    Xtrain_pca = pca.fit_transform(Xtrain_scaled)
    Xtest_pca = pca.transform(Xtest_scaled)

    # perform regression on train
    regression_model_pcr = LinearRegression()
    regression_model_pcr.fit(Xtrain_pca[:, :n], ytrain)

    # evaluate
    ytest_pred_pcr = regression_model_pcr.predict(Xtest_pca[:, :n])

    return ytest_pred_pcr, regression_model_pcr

## Partical Least Squares Regression

In [None]:
def PLSR(Xtrain, ytrain, Xtest, n):
    """Perform Partical Least Squares Regression."""

    # center and normalize data
    Xtrain_scaled, Xtest_scaled = center_and_normalize(Xtrain, Xtest)

    pls = PLSRegression(n_components=n)
    pls.fit(Xtrain_scaled, ytrain)

    # evaluate
    ytest_pred_plsr = pls.predict(Xtest_scaled)

    return ytest_pred_plsr, pls

## Least Absolute Shrinkage and Selection Operator

In [None]:
def LASSO(Xtrain, ytrain, Xtest, alpha):
    """Perform Lasso Regression. alpha determines the regularization strength."""

    # center and normalize data
    Xtrain_scaled, Xtest_scaled = center_and_normalize(Xtrain, Xtest)

    lasso = Lasso(alpha=alpha, max_iter=50000)
    lasso.fit(Xtrain_scaled, ytrain)

    # evaluate
    ytest_pred = lasso.predict(Xtest_scaled)

    return ytest_pred, lasso

## Random Forest

In [None]:
def RF(Xtrain, ytrain, Xtest, n_estimators, max_features):
    """Perform Random Forest Regression.

    n_estimators is the number of trees and max_features is
    the size of the random subsets of features to consider when splitting a node.
    The lower the greater the reduction of variance, but also the greater the increase in bias.
    """

    # center and normalize data
    Xtrain_scaled, Xtest_scaled = center_and_normalize(Xtrain, Xtest)

    rf = RandomForestRegressor(
        n_estimators=n_estimators, max_features=max_features, random_state=0
    )
    rf.fit(Xtrain_scaled, ytrain)

    # evaluate
    ytest_pred_rf = rf.predict(Xtest_scaled)

    return ytest_pred_rf, rf

## Linear Regression

In [None]:
def LR(Xtrain, ytrain, Xtest):
    """Perform linear regression on NIR spectra."""

    # center and normalize data
    Xtrain_scaled, Xtest_scaled = center_and_normalize(Xtrain, Xtest)

    # train the Linear Regression model
    lr_model = LinearRegression()
    lr_model.fit(Xtrain_scaled, ytrain)

    # evaluate
    ytest_pred = lr_model.predict(Xtest_scaled)

    return ytest_pred, lr_model

# Set up training machine learning models

## Nested cross fold for training models including hyperparamter optimization

In [None]:
def NestedCrossFold(
    dfRNV,
    samples,
    propvalues,
    proplabel,
    MLmodel,
    param,
    indices,
    numlabels,
    savefile=None,
):
    """Perform nested cross fold to perfrom hyperparameter optimization and train machine learning models."""

    results = {
        "test_r2_scores": [],
        "train_r2_scores": [],
        "test_rmse_scores": [],
        "train_rmse_scores": [],
        "best_params": [],  # store full best parameters for each outer fold
        "val_r2_scores": defaultdict(list),
        "best_hyperparameters": [],  # store the key hyperparameter for each model type
        "num_parameters": [],  # store the number of non-zero coefficients for LASSO
    }

    model_name = MLmodel.__name__

    val_rmse_scores = defaultdict(list)

    # parameter keys to track for each model type
    param_keys = {"PLSR": "n", "PCR": "n", "LASSO": "alpha", "RF": "n_estimators"}

    key_param = param_keys.get(model_name, None)

    # outer fold
    skf_out = sklearn.model_selection.StratifiedKFold(
        n_splits=5, shuffle=True, random_state=0
    )

    print(f"Running {model_name} with nested cross-validation")
    print("Indices are:", indices)

    for i, (train_out_index, test_out_index) in enumerate(
        skf_out.split(indices, numlabels)
    ):

        numlabels_train_out = numlabels[train_out_index]
        numlabels_test_out = numlabels[test_out_index]
        train_out_index_fix = indices[train_out_index]
        test_out_index_fix = indices[test_out_index]

        # inner fold
        skf_in = sklearn.model_selection.StratifiedKFold(
            n_splits=5, shuffle=True, random_state=0
        )

        # track performance for each parameter set across all inner folds
        param_performance = defaultdict(list)

        for j, (train_in_index, test_in_index) in enumerate(
            skf_in.split(train_out_index_fix, numlabels_train_out)
        ):

            train_in_index_fix = train_out_index_fix[train_in_index]
            test_in_index_fix = train_out_index_fix[test_in_index]

            # get data for training the models
            Xtrain, ytrain = getXy(
                train_in_index_fix, samples, dfRNV, propvalues, proplabel
            )
            Xval, yval = getXy(test_in_index_fix, samples, dfRNV, propvalues, proplabel)

            for params in param:
                # create a parameter identifier
                param_id = str(params)

                # train and evaluate model
                yval_pred, _ = MLmodel(Xtrain, ytrain, Xval, **params)

                r2 = r2_score(yval, yval_pred)
                rmse = np.sqrt(mean_squared_error(yval, yval_pred))

                # store this parameter set's performance
                param_performance[param_id].append({"rmse": rmse, "params": params})

                # keep a record of validation performance metrics, analyze how a specific hyperparameter affects model performance
                if key_param and key_param in params:
                    key_value = params[key_param]
                    val_rmse_scores[key_value].append(rmse)

        # calculate average performance for each parameter set
        avg_performance = {}
        for param_id, performances in param_performance.items():
            avg_rmse = np.mean([p["rmse"] for p in performances])
            avg_performance[param_id] = {
                "avg_rmse": avg_rmse,
                "params": performances[0]["params"],
            }

        # find best parameter set based on lowest average RMSE
        best_param_id = min(
            avg_performance, key=lambda k: avg_performance[k]["avg_rmse"]
        )
        best_params = avg_performance[best_param_id]["params"]
        best_avg_rmse = avg_performance[best_param_id]["avg_rmse"]

        print(f"Outer fold {i}: Best parameters with avg RMSE {best_avg_rmse:.4f}")

        # store the best parameters for this outer fold
        results["best_params"].append(best_params)

        # store the key hyperparameter
        if key_param and key_param in best_params:
            results["best_hyperparameters"].append(best_params[key_param])
            print(f"Best {key_param} for {model_name}: {best_params[key_param]}")

        # **************** below test the models on the test sets on outer loops ********************

        Xtrain_out, ytrain_out = getXy(
            train_out_index_fix, samples, dfRNV, propvalues, proplabel
        )
        Xtest_out, ytest_out = getXy(
            test_out_index_fix, samples, dfRNV, propvalues, proplabel
        )

        # get the model predictions and count parameters efficiently
        if model_name == "LASSO":
            ytest_pred, modellasso = MLmodel(
                Xtrain_out, ytrain_out, Xtest_out, **best_params
            )

            # count non-zero coefficients
            num_nonzero = np.sum(modellasso.coef_ != 0)
            total_features = len(modellasso.coef_)
            results["num_parameters"].append(num_nonzero)

            print(
                f"LASSO with alpha={best_params['alpha']} uses {num_nonzero} out of {total_features} features"
            )

        elif model_name == "PCR":
            ytest_pred, modelpcr = MLmodel(
                Xtrain_out, ytrain_out, Xtest_out, **best_params
            )
            n_components = best_params["n"]
            results["num_parameters"].append(n_components)
            print(f"PCR model using {n_components} principal components")

        elif model_name == "PLSR":
            ytest_pred, plsrmodel = MLmodel(
                Xtrain_out, ytrain_out, Xtest_out, **best_params
            )
            n_components = best_params["n"]
            results["num_parameters"].append(n_components)
            print(f"PLSR model using {n_components} components")

        elif model_name == "RF":
            ytest_pred, modelrf = MLmodel(
                Xtrain_out, ytrain_out, Xtest_out, **best_params
            )

            # extract feature importances and count how many features were used
            importances = modelrf.feature_importances_
            num_important_features = np.count_nonzero(importances > 1e-6)
            results["num_parameters"].append(num_important_features)
            print(f"RF model using {num_important_features} important features")

        elif model_name == "LR":
            ytest_pred, modellr = MLmodel(
                Xtrain_out, ytrain_out, Xtest_out, **best_params
            )
            num_features = Xtrain_out.shape[1]  # total number of input features
            results["num_parameters"].append(num_features)
            print(f"LR model using all {num_features} features")

        test_r2 = r2_score(ytest_out, ytest_pred)
        test_rmse = np.sqrt(mean_squared_error(ytest_out, ytest_pred))
        print(
            f"Test R^2 score for outer fold {i}: {test_r2:.4f}, Test RMSE: {test_rmse:.4f}"
        )

        ytrain_pred, savedmodel = MLmodel(
            Xtrain_out, ytrain_out, Xtrain_out, **best_params
        )

        # save the results
        if savefile != None:

            np.savetxt(
                savefile + "labels_train_" + str(i) + ".csv",
                numlabels_train_out,
                delimiter=",",
            )
            np.savetxt(
                savefile + "labels_test_" + str(i) + ".csv",
                numlabels_test_out,
                delimiter=",",
            )

            np.savetxt(
                savefile + "X_train_" + str(i) + ".csv", Xtrain_out, delimiter=","
            )
            np.savetxt(savefile + "X_test_" + str(i) + ".csv", Xtest_out, delimiter=",")
            np.savetxt(
                savefile + "y_train_" + str(i) + ".csv", ytrain_out, delimiter=","
            )
            np.savetxt(savefile + "y_test_" + str(i) + ".csv", ytest_out, delimiter=",")
            np.savetxt(
                savefile + "ypred_" + model_name + "_train_" + str(i) + ".csv",
                ytrain_pred,
                delimiter=",",
            )
            np.savetxt(
                savefile + "ypred_" + model_name + "_test_" + str(i) + ".csv",
                ytest_pred,
                delimiter=",",
            )

            if model_name in ["PLSR", "PCR", "LASSO", "LR"]:

                np.savetxt(
                    savefile + model_name + "_coef_" + str(i) + ".csv",
                    savedmodel.coef_,
                    delimiter=",",
                )
                np.savetxt(
                    savefile + model_name + "_intercept_" + str(i) + ".csv",
                    [savedmodel.intercept_],
                    delimiter=",",
                )

        train_r2 = r2_score(ytrain_out, ytrain_pred)
        train_rmse = np.sqrt(mean_squared_error(ytrain_out, ytrain_pred))
        print(
            f"Train R^2 score for outer fold {i}: {train_r2:.4f}, Train RMSE: {train_rmse:.4f}"
        )

        results["test_r2_scores"].append(test_r2)
        results["test_rmse_scores"].append(test_rmse)
        results["train_r2_scores"].append(train_r2)
        results["train_rmse_scores"].append(train_rmse)

    # calculate and print average performance across all outer folds
    avg_test_r2 = np.mean(results["test_r2_scores"])
    avg_test_rmse = np.mean(results["test_rmse_scores"])
    avg_train_r2 = np.mean(results["train_r2_scores"])
    avg_train_rmse = np.mean(results["train_rmse_scores"])

    print(f"\nAverage performance for {model_name}:")
    print(f"Test R² = {avg_test_r2:.4f} ± {np.std(results['test_r2_scores']):.4f}")
    print(
        f"Test RMSE = {avg_test_rmse:.4f} ± {np.std(results['test_rmse_scores']):.4f}"
    )
    print("Test RMSEs =", results["test_rmse_scores"])
    print(f"Train R² = {avg_train_r2:.4f} ± {np.std(results['train_r2_scores']):.4f}")
    print(
        f"Train RMSE = {avg_train_rmse:.4f} ± {np.std(results['train_rmse_scores']):.4f}"
    )
    print("Train RMSEs =", results["test_rmse_scores"])

    # calculate average number of parameters
    if results["num_parameters"]:
        avg_num_params = np.mean(results["num_parameters"])
        print(f"Average number of features used by {model_name}: {avg_num_params:.1f}")

    return results, val_rmse_scores

## Set options for hyperparameters

In [None]:
param_plsr = [{"n": n} for n in range(1, 31)]
param_pcr = [{"n": n} for n in range(1, 31)]
param_lasso = [
    {"alpha": alpha} for alpha in np.logspace(-4, 0, 10)
]  # 10 numbers from 10(-4) to 10(0)
param_rf = [
    {"n_estimators": n, "max_features": f}
    for n in [20, 50, 100, 200, 300]
    for f in [2, 5, 10, 15, 20]
]

In [None]:
modellist = [PLSR, PCR, LASSO, LR, RF]
paramlist = [param_plsr, param_pcr, param_lasso, [{}], param_rf]

# Loop through the properties

## Density

In [None]:
results_df_den = pd.DataFrame(
    columns=[
        "Model",
        "Mean Test RMSE",
        "Std Test RMSE",
        "Mean Train RMSE",
        "Std Train RMSE",
    ]
)

for i, model in enumerate(modellist):

    results, _ = NestedCrossFold(
        dfRNV,
        samples,
        dfprop.density.values,
        dfprop.label,
        model,
        paramlist[i],
        indices,
        numlabels,
        "../ModelData/density/",
    )
    print(model.__name__, "Results:", results)

    results_df_den.loc[i] = [
        model.__name__,
        np.mean(results["test_rmse_scores"]),
        np.std(results["test_rmse_scores"]),
        np.mean(results["train_rmse_scores"]),
        np.std(results["train_rmse_scores"]),
    ]

In [None]:
results_df_den.to_csv("../ModelData/density/density_results.csv", index=False)
results_df_den.head(5)

## Crystallinity

In [None]:
# single out non-blended samples only
mask_homopolymers = np.isin(numlabels, [0, 1, 2, 3])
indiceshomopolymer = indices[mask_homopolymers]
numlabelshomopolymer = numlabels[mask_homopolymers]

In [None]:
results_df_crys = pd.DataFrame(
    columns=[
        "Model",
        "Mean Test RMSE",
        "Std Test RMSE",
        "Mean Train RMSE",
        "Std Train RMSE",
    ]
)

for i, model in enumerate(modellist):

    results, _ = NestedCrossFold(
        dfRNV,
        samples,
        dfprop.crystallinity.values,
        dfprop.label,
        model,
        paramlist[i],
        indiceshomopolymer,
        numlabelshomopolymer,
        "../ModelData/crystallinity/",
    )
    print(model.__name__, "Results:", results)

    results_df_crys.loc[i] = [
        model.__name__,
        np.mean(results["test_rmse_scores"]),
        np.std(results["test_rmse_scores"]),
        np.mean(results["train_rmse_scores"]),
        np.std(results["train_rmse_scores"]),
    ]

In [None]:
results_df_crys.to_csv(
    "../ModelData/crystallinity/crystallinity_results.csv", index=False
)
results_df_crys.head(5)

## Short Chain Branching

In [None]:
results_df_scb = pd.DataFrame(
    columns=[
        "Model",
        "Mean Test RMSE",
        "Std Test RMSE",
        "Mean Train RMSE",
        "Std Train RMSE",
    ]
)

for i, model in enumerate(modellist):

    results, _ = NestedCrossFold(
        dfRNV,
        samples,
        dfprop.SCB.values,
        dfprop.label,
        model,
        paramlist[i],
        indices,
        numlabels,
        "../ModelData/SCB/",
    )
    print(model.__name__, "Results:", results)

    results_df_scb.loc[i] = [
        model.__name__,
        np.mean(results["test_rmse_scores"]),
        np.std(results["test_rmse_scores"]),
        np.mean(results["train_rmse_scores"]),
        np.std(results["train_rmse_scores"]),
    ]

In [None]:
results_df_scb.to_csv("../ModelData/scb/scb_results.csv", index=False)
results_df_scb.head(5)