In [1]:
import pickle

import numpy as np
import pandas as pd
import tensorflow as tf
import sklearn.model_selection as ms
import sklearn.preprocessing as pp
import sklearn.metrics as metrics

from tensorflow import keras
from tensorflow.keras import backend as K

# 1D CNN

## CNN Model

In [2]:
def cnn(input_shape, output_shape):
    """A 1D CNN model.

    Args:
        input_shape (tuple): input shape
        output_shape (int): output shape

    Returns:
        keras.Model: CNN model
    """
    # set random seeds
    tf.random.set_seed(0)
    np.random.seed(0)

    inputs = keras.layers.Input(input_shape)
    x = keras.layers.BatchNormalization()(inputs)
    for _ in range(2):
        x = keras.layers.Conv1D(64, 3, activation='relu')(x)
        x = keras.layers.Conv1D(64, 3, activation='relu')(x)
        x = keras.layers.MaxPool1D()(x)
    x = keras.layers.Flatten()(x)
    for _ in range(3):
        x = keras.layers.Dense(64, activation='relu')(x)
        x = keras.layers.Dropout(0.2)(x)
    outputs = keras.layers.Dense(output_shape, activation='softmax')(x)
    model = keras.Model(inputs, outputs, name='cnn1d')
    return model

## Load Data

In [3]:
def load_data(k=1, name='train'):
    """A function to load data.

    Args:
        k (int, optional): k fold index between 1 and 5. Defaults to 1.
        name (str, optional): name of pickle file. Defaults to 'train'.

    Returns:
        tuple: (x_train, x_valid, x_test, y_train, y_valid, y_test)
    """
    # load pickle file
    with open(f'./data/{name}.pickle', 'rb') as handle:
        x = pickle.load(handle)
        y = pickle.load(handle)

    # encode label
    le = pp.LabelEncoder()
    le.fit(['BK', 'PE', 'PP', 'PS', 'PVC'])
    y = le.transform(y)

    # random shuffle
    x = np.random.RandomState(0).permutation(x)[..., None] # CNN needs at least 1 channel
    y = np.random.RandomState(0).permutation(y)

    # k-fold cross validation
    skf = ms.StratifiedKFold(n_splits=5, shuffle=True, random_state=0)
    i = 0
    for train_index, test_index in skf.split(x, y):
        x_train, x_test = x[train_index], x[test_index]
        y_train, y_test = y[train_index], y[test_index]
        x_train, x_valid, y_train, y_valid = ms.train_test_split(
            x_train, y_train, random_state=0, test_size=0.3)
        i += 1
        if i == k:
            break
    
    # output data
    data = (x_train, x_valid, x_test, y_train, y_valid, y_test)
    return data

## Training Function

In [4]:
def train_kernel(data, name='train', train=False, verbose=2):
    """_summary_

    Args:
        data (tuple): (x_train, x_valid, x_test, y_train, y_valid, y_test)
        name (str, optional): name of data pickle. Defaults to 'train'.
        train (bool, optional): if train the model or just test it. Defaults to False.
        verbose (int, optional): if show details of training. Defaults to 2.

    Returns:
        tuple: model, hist, y_test, y_pred
    """
    # clear all states set by Keras
    K.clear_session()

    # expand data
    x_train, x_valid, x_test, y_train, y_valid, y_test = data
    
    # convert data to categorical labels
    y_train = keras.utils.to_categorical(y_train, num_classes=5)
    y_valid = keras.utils.to_categorical(y_valid, num_classes=5)

    # weight h5 and result pickle paths
    h5_path = f'./h5/{name}.h5'
    pickle_path = f'./pickle/{name}.pickle'
    
    # compile model
    model = cnn(input_shape=x_train.shape[1:], output_shape=y_train.shape[-1])
    model.compile(optimizer=keras.optimizers.Adam(learning_rate=1e-3), loss='categorical_crossentropy', metrics=['acc'])

    # early stopping
    es = keras.callbacks.EarlyStopping(monitor='val_loss', min_delta=0, patience=25)

    # model check point
    mcp = keras.callbacks.ModelCheckpoint(
        monitor='val_loss',
        filepath=h5_path,
        verbose=verbose,
        save_weights_only=True,
        save_best_only=True)

    # train model
    if train:
        hist = model.fit(
            x_train, 
            y_train,
            epochs=50,
            batch_size=256,
            validation_data=(x_valid, y_valid),
            callbacks=[es, mcp],
            verbose=verbose,
            shuffle=True).history
    
        model.load_weights(h5_path)
        y_pred = model.predict(x_test).squeeze()
        
        with open(pickle_path, 'wb') as f:
            pickle.dump(hist, f)
            pickle.dump(y_test, f)
            pickle.dump(y_pred, f)
            
    else:
        model.load_weights(h5_path)
        
        with open(pickle_path, 'rb') as f:
            hist = pickle.load(f)
            y_test = pickle.load(f)
            y_pred = pickle.load(f)
    
        return model, hist, y_test, y_pred

# Train CNN Models

In [10]:
names = ['train', 'smooth_train', 'detrend_train', 'both_train']

for name in names:
    for k in range(1, 6):
        data = load_data(k=k, name=name)
        
        # change train=True if you want retraining
        model, hist, y_true, y_pred = train_kernel(data=data, name=f'{name}_{k}', train=False, verbose=0)

# Test on Unseen Data

In [10]:
def predict_unseen(name='train'):
    """A function to predict unseen plastic mix composition

    Args:
        name (str, optional): file name. Defaults to 'train'.
    """
    # use the best k-fold model with the lowest test set accuracy
    acc_best = 0
    for k in range(1, 6):
        data = load_data(k=k, name=name)
        model, hist, y_true, y_pred = train_kernel(data=data, name=f'{name}_{k}', train=False, verbose=0)
        acc = metrics.accuracy_score(y_true, y_pred.argmax(axis=1))
        if acc > acc_best:
            acc_best = acc
            model_best = model

    # load unseen data
    name_unseen = name.replace('train', 'unseen')
    with open(f'./data/{name_unseen}.pickle', 'rb') as handle:
        x_unseen = pickle.load(handle)
        y_unseen = pickle.load(handle)
    x_unseen = x_unseen[..., None]
    y_pred = model_best.predict(x_unseen)

    # predict unseen data
    y_mix = []
    name_pure = ['BK', 'PE', 'PP', 'PS', 'PVC']
    name_mix = ['BKPS', 'BKPVC', 'PVCPE', 'PSPP', 'BKPVCPPPS']

    for i, name_ in enumerate(name_mix):
        idx_ = np.where(y_unseen == name_)[0]
        y_mix_ = y_pred[idx_].argmax(axis=1)
        y_mix.append(y_mix_)
    y_mix = np.array(y_mix)
    
    # visualize unseen data
    result = np.zeros((len(name_mix), len(name_pure)))
    for i, _ in enumerate(name_mix):
        for j, _ in enumerate(name_pure):
            result[i, j] = len(np.where(y_mix[i] == j)[0]) / 5

    df = pd.DataFrame(result, columns=name_pure)
    df.insert(0, column='comb', value=name_mix)
    print(' '.join(name.split('_')[:-1]))
    print(df)

    # save result
    with open(f'./unseen_pickle/{name_unseen}.pickle', 'wb') as handle:
        pickle.dump(result, handle)

## Raw Spectra

In [11]:
predict_unseen('train')


        comb    BK   PE    PP    PS   PVC
0       BKPS  48.6  0.0   0.0  51.4   0.0
1      BKPVC  48.8  0.0   0.0   0.0  51.2
2      PVCPE   0.0  0.0  39.6   0.0  60.4
3       PSPP   0.6  0.0   0.0  99.0   0.4
4  BKPVCPPPS  72.4  1.0   7.2   9.4  10.0


## Smoothed Spectra

In [12]:
predict_unseen('smooth_train')

smooth
        comb    BK   PE    PP    PS   PVC
0       BKPS  48.8  0.0   0.0  51.2   0.0
1      BKPVC  48.6  0.0   0.4   0.8  50.2
2      PVCPE   0.0  0.2  49.2   0.2  50.4
3       PSPP   1.2  0.0  15.4  83.4   0.0
4  BKPVCPPPS  81.8  0.0   3.0   6.0   9.2


## Detrended Spectra

In [13]:
predict_unseen('detrend_train')

detrend
        comb    BK    PE   PP    PS   PVC
0       BKPS  47.4   0.0  0.0  52.6   0.0
1      BKPVC  48.6   0.0  0.0   0.0  51.4
2      PVCPE   0.0  50.4  0.0   0.0  49.6
3       PSPP   0.0   0.0  1.8  98.2   0.0
4  BKPVCPPPS  72.4   0.0  8.6  10.0   9.0


## Both

In [14]:
predict_unseen('both_train')

both
        comb    BK    PE    PP    PS   PVC
0       BKPS  47.6   0.0   0.0  52.4   0.0
1      BKPVC  48.8   0.0   0.0   0.0  51.2
2      PVCPE   0.0  50.8   0.2   0.0  49.0
3       PSPP   0.0   0.0  52.6  47.0   0.4
4  BKPVCPPPS  72.8   0.2   8.8   9.6   8.6
