In [None]:
# importing the libraries
import pandas as pd
import numpy as np
from tqdm import tqdm
import os
import gc

# for reading and displaying images 
from skimage.io import imread
import matplotlib.pyplot as plt

# for creating validation set
from sklearn.model_selection import train_test_split
# for evaluating the model
from sklearn.metrics import accuracy_score


# PyTorch libraries and modules
import torch
from torch.autograd import Variable
import torch.nn as nn
import torch.nn.functional as F
from torch.optim import *
import h5py

#for random Forest 
from sklearn.ensemble import RandomForestClassifier

#for CNN
import random
import string

## Preparation

### Tool Functions

In [None]:
 def split_samples(cost_df):
    def padding_frames(frames, fix_length):
        if len(frames) < fix_length:
            frames = frames + [[[0] * 8] * 8] * (fix_length - len(frames))
        elif len(frames) > fix_length:
            frames = frames[:fix_length]
        return frames

    touches = []
    gestures = []
    # variants = []

    temp_frames = []
    for index, row in cost_df.iterrows():
        if row['frame'] == 1:
            gestures.append(row['gesture'])
            # variants.append(row['variant'])
            touches.append([padding_frames(temp_frames, 1750)])
            temp_frames = []
        one_frame = np.array(row.tolist()[4:]).reshape((8, 8)).tolist()
        temp_frames.append(one_frame)

    touches.append([padding_frames(temp_frames, 1750)])
    touches = touches[1:]

    return np.array(touches, dtype='int16'), np.array(gestures, dtype='int16') - 1


def touches_3d_to_2d(touches):
    """(M, 1, 1750, 8, 8) -> (M, 1750, 8, 8)"""
    touches = np.apply_along_axis(lambda x: x[0], 1, touches)
    return touches


def one_touch_remove_padding(touch_padding, dimension=1):
    """remove padding of one touch (all frames of a single gesture)

    Parameters:
    touch_padding (ndarray) -- one touch with padding values, shape (1750, 8, 8)
    dimension (int: 1 or 2) -- dimension of frame, 1: 64; 2: 8x8

    Returns:
    lists: one touch without padding values, shape (N, 64)
    """
    frames_no_padding = []
    for frame_iter in touch_padding:
        if np.sum(frame_iter) != 0:
            if dimension == 1:
                frames_no_padding.append(frame_iter.flatten().tolist())
            else:
                frames_no_padding.append(frame_iter.tolist())
    return frames_no_padding


def touches_remove_padding(touches_padding, dimension=1):
    """remove padding of touches (gestures)

    Parameters:
    touches_padding (ndarray) -- touches with padding values, shape (M, 1750, 8, 8) or (M, 1, 1750, 8, 8)
    dimension (int: 1 or 2) -- dimension of frame, 1: 64; 2: 8x8

    Return:
    lists: touches without padding values, shape (M, N, 64), cannot convert to ndarray because N is not fixed.
    """
    if (touches_padding.shape[1] == 1) and (len(touches_padding.shape) == 5):
        touches_padding = touches_3d_to_2d(
            touches_padding
        )  # (M, 1, 1750, 8, 8) to (M, 1750, 8, 8)

    touches_no_padding = []
    for touch_padding in touches_padding:
        touches_no_padding.append(one_touch_remove_padding(touch_padding, dimension=dimension))
    return touches_no_padding

def generate_random_string(N=8):
  """generate a random string as the id of a gesture record
  Parameters:
  N (int) default 8 -- the character number of string generated

  Return:
  String. the generated string
  
  """
  return ''.join(random.choice(string.ascii_lowercase + string.digits) for _ in range(N))


def padding_sample_3dcnn(touches_list, gestures_list, max_frame=100):
  """
  divede the example to subframe(suitable for 3d cnn)

  Parameters:
  touches_list (list): the list contains each gesture's all frames
  gestures_list (list): the list contains each gesture's label
  max_frame (int): the frame length used to divide sample

  Return:
  touches (ndarray): size (number_of_sample, 1, max_frame, 8, 8 )
  gestures (ndarray): size (number_of_sample,)
  gestures_identity (ndarray): size (number_of_sample), contains the id of each gesture, needed when accuracy test

  """
  def padding_frames(frames, fix_length):
        if len(frames) < fix_length:
            frames = frames + [[[0] * 8] * 8] * (fix_length - len(frames))
        return frames
  touches = []
  gestures = []
  gestures_identity = []

  for touch,gesture in zip(touches_list,gestures_list):
    
    gesture_id = generate_random_string()
    count = len(touch)//max_frame
    if count >0:
      for i in range(count):
        touches.append([touch[0+i*max_frame:max_frame+i*max_frame]])
        gestures.append(gesture)
        gestures_identity.append(gesture_id)
    touches.append([padding_frames(touch[0+count*max_frame:],max_frame)])
    gestures.append(gesture)
    gestures_identity.append(gesture_id)
    
  return np.array(touches, dtype='int16'), np.array(gestures, dtype='int16'), np.array(gestures_identity)


def padding_sample_2dcnn(touches_list, gestures_list, max_frame=100):
  """
  divede the example to subframe(suitable for 3d cnn)

  Parameters:
  touches_list (list): the list contains each gesture's all frames
  gestures_list (list): the list contains each gesture's label
  max_frame (int): the frame length used to divide sample

  Return:
  touches (ndarray): size (number_of_sample, 1, max_frame, 8, 8 )
  gestures (ndarray): size (number_of_sample,)
  gestures_identity (ndarray): size (number_of_sample), contains the id of each gesture, needed when accuracy test

  """
  def padding_frames(frames, fix_length):
        if len(frames) < fix_length:
            frames = frames + [[[0] * 8] * 8] * (fix_length - len(frames))
        return frames
  touches = []
  gestures = []
  gestures_identity = []

  for touch,gesture in zip(touches_list,gestures_list):
    
    gesture_id = generate_random_string()
    count = len(touch)//max_frame
    if count >0:
      for i in range(count):
        touches.append(touch[0+i*max_frame:max_frame+i*max_frame])  #removed the braces here []
        gestures.append(gesture)
        gestures_identity.append(gesture_id)
    touches.append(padding_frames(touch[0+count*max_frame:],max_frame))  #removed the braces []
    gestures.append(gesture)
    gestures_identity.append(gesture_id)
    
  return np.array(touches, dtype='int16'), np.array(gestures, dtype='int16') , np.array(gestures_identity)


def split_samples_subframe(cost_df):
    """
    Return:
    lists: touches without padding values, shape (M, N, 8，8), cannot convert to ndarray because N is not fixed.
    """
    def padding_frames(frames, fix_length):
        if len(frames) < fix_length:
            frames = frames + [[[0] * 8] * 8] * (fix_length - len(frames))
        elif len(frames) > fix_length:
            frames = frames[:fix_length]
        return frames

    touches = []
    gestures = []

    temp_frames = []
    for index, row in cost_df.iterrows():
        if row['frame'] == 1:
            gestures.append(row['gesture'])
            touches.append(temp_frames)
            temp_frames = []
        one_frame = np.array(row.tolist()[4:]).reshape((8, 8)).tolist()
        temp_frames.append(one_frame)

    touches.append(temp_frames)
    touches = touches[1:]

    return touches, gestures


def MN88_to_MN64(touches_88):
    """
    from shape (M, N, 8，8) to shape (M, N, 64)
    """
    touches_64 = []
    for i in range(len(touches_88)):
        temp_sample = []
        for j in range(len(touches_88[i])):
            temp_frame = np.array(touches_88[i][j]).flatten().tolist()
            temp_sample.append(temp_frame)
        touches_64.append(temp_sample)
    return touches_64

### Import Data

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
df = pd.read_csv("/content/drive/My Drive/Colab Notebooks/CoST.csv", sep=', ')

  """Entry point for launching an IPython kernel.


In [None]:
touches_88, gestures = split_samples_subframe(df)
gestures = np.array(gestures) - 1

del df
gc.collect()

0

In [None]:
X_train_val, X_test, y_train_val, y_test = train_test_split(touches_88, gestures, test_size=0.2, random_state=42, stratify=gestures)
X_train, X_val, y_train, y_val = train_test_split(X_train_val, y_train_val, test_size=0.25, random_state=42, stratify=y_train_val)

num_classes = 14

del touches_88
del gestures
# del variants
gc.collect()

0

## Random Forest

### Data Preprocessing

In [None]:
# X_train_rf, X_test_rf: lists; shape: (M, N, 64); M: number of samples; N: number of frames; cannot convert to ndarray because N is not fixed
X_train_rf = MN88_to_MN64(X_train)
X_test_rf = MN88_to_MN64(X_test)

### Feature Extraction-global feature 


In [None]:
#Feature extraction

#global feature group
def flatten_lists(list_of_lists):
    list = [val for sublist in list_of_lists for val in sublist]
    return list

def num_of_frames(touch):
    return len(touch)

def avg_pressure_on_64_chs_over_all_frames(touch):
    all_values = flatten_lists(touch)
    return np.mean(all_values)

def max_value_in_all(touch):
    all_values=flatten_lists(touch)
    return max(all_values)

def no_signal_frames(touch):
    count = 0
    t = 50
    for frame in touch:
        if np.mean(frame) < t:
            count += 1
    return count / len(touch)

#Variations around a specific value c (variations around the central value of temporal signal)
#35th, 50th, 65th, 80th and 95th-percentile of the maximal value. 
#1.number of crossing around c
#2.the average slope of each crossing. 
def variations_around_c(touch):
    def num_of_crossing_and_avg_slope(array, c):
        count = 0
        slopes = []
        total = len(array) - 1
        for i in range(total):
            if (array[i] - c) * (array[i + 1] - c) < 0:
                count += 1
                slopes.append((array[i + 1] - array[i]) / (array[i] + array[i + 1]))
        return count, np.mean(slopes)

    array = np.array(touch).mean(axis=0)
    sorted_array = np.sort(array)
    length = len(sorted_array)
    c_35 = sorted_array[int(65 / 100 * length)]
    c_50 = sorted_array[int(50 / 100 * length)]
    c_65 = sorted_array[int(35 / 100 * length)]
    c_80 = sorted_array[int(20 / 100 * length)]
    c_95 = sorted_array[int(5 / 100 * length)]

    c_list = [c_35, c_50, c_65, c_80, c_95]
    rtn = (
        []
    )  # [num_of_crossing_when_c_35, avg_slope_when_c_35, num_of_crossing_when_c_50, avg_slope_when_c_50, ...]
    for c in c_list:
        rtn.extend(num_of_crossing_and_avg_slope(array, c))

    return rtn

In [None]:
#X_train_rf = touches_remove_padding(X_train_rf)
group1_feature1_data = []  # num_of_frames: value; shape: (N,)
group1_feature2_data = []  # avg_pressure_on_64_chs_over_all_frames; shape: (N,)
group1_feature3_data = []  # max_value_in_all,shape:(N,)
group1_feature4_data = []  # no_signal_frames,shape:(N,)
group1_feature5_data = []  # variations_around_c; shape: (N, 10)

all_data = []
for i in range(len(X_train_rf)):
    group1_feature1_data.append(num_of_frames(X_train_rf[i]))
    group1_feature2_data.append(avg_pressure_on_64_chs_over_all_frames(X_train_rf[i]))
    group1_feature3_data.append(max_value_in_all(X_train_rf[i]))
    group1_feature4_data.append(no_signal_frames(X_train_rf[i]))
    group1_feature5_data.append(variations_around_c(X_train_rf[i]))

    
all_data.append(group1_feature1_data)
all_data.append(group1_feature2_data)
all_data.append(group1_feature3_data)
all_data.append(group1_feature4_data)

#ned_df is the feature group 1 without 5th features 
new_df = pd.DataFrame(all_data)
new_df=new_df.T
new_df.columns=["num_of_frames", "avg_pressure_on_64_chs_over_all_frames","max_value_in_all","no_signal_frames"]

feature5=pd.DataFrame(group1_feature5_data)
feature5.columns=['feature5.1','feature5.2','feature5.3','feature5.4','feature5.5','feature5.6','feature5.7','feature5.8','feature5.9','feature5.10']

#combine feature 5 and other 4 features to get complete feature group 1
new_df_1=new_df.T.append(feature5.T)

  out=out, **kwargs)
  ret = ret.dtype.type(ret / rcount)


In [None]:
#X_test_rf = touches_remove_padding(X_test)
group1_feature1_data0 = []  # num_of_frames: value; shape: (N,)
group1_feature2_data0 = []  # avg_pressure_on_64_chs_over_all_frames; shape: (N,)
group1_feature3_data0 = []  # max_value_in_all,shape:(N,)
group1_feature4_data0 = []  # no_signal_frames,shape:(N,)
group1_feature5_data0 = []  # variations_around_c; shape: (N, 10)

all_data0 = []
for i in range(len(X_test_rf)):
    group1_feature1_data0.append(num_of_frames(X_test_rf[i]))
    group1_feature2_data0.append(avg_pressure_on_64_chs_over_all_frames(X_test_rf[i]))
    group1_feature3_data0.append(max_value_in_all(X_test_rf[i]))
    group1_feature4_data0.append(no_signal_frames(X_test_rf[i]))
    group1_feature5_data0.append(variations_around_c(X_test_rf[i]))

    
all_data0.append(group1_feature1_data0)
all_data0.append(group1_feature2_data0)
all_data0.append(group1_feature3_data0)
all_data0.append(group1_feature4_data0)

#ned_df is the feature group 1 without 5th features 
new_df0 = pd.DataFrame(all_data0)
new_df0=new_df0.T
new_df0.columns=["num_of_frames", "avg_pressure_on_64_chs_over_all_frames","max_value_in_all","no_signal_frames"]

feature5_0=pd.DataFrame(group1_feature5_data0)
feature5_0.columns=['feature5.1','feature5.2','feature5.3','feature5.4','feature5.5','feature5.6','feature5.7','feature5.8','feature5.9','feature5.10']

#combine feature 5 and other 4 features to get complete feature group 1
new_df_1_0=new_df0.T.append(feature5_0.T)

  out=out, **kwargs)
  ret = ret.dtype.type(ret / rcount)


In [None]:
###Feature extraction-channel based feature

In [None]:
#Average pressure of each channel over all frames
#X_train_rf
from pandas.core.frame import DataFrame

def all_channel(touches):
    all_gestures =[]
    for index,touch in enumerate(touches):
        all_channel = []
        for i in range(64):
            data_each_channel = [x[i] for x in touch]
            all_channel.append(sum(data_each_channel)/len(data_each_channel))
        all_gestures.append(all_channel)
    output_df = DataFrame(all_gestures)
    return output_df
all_channel=all_channel(X_train_rf).T

In [None]:
#X_test_rf
def all_channel_0(touches):
    all_gestures_0 =[]
    for index,touch in enumerate(touches):
        all_channel_0 = []
        for i in range(64):
            data_each_channel = [x[i] for x in touch]
            all_channel_0.append(sum(data_each_channel)/len(data_each_channel))
        all_gestures_0.append(all_channel_0)
    output_df = DataFrame(all_gestures_0)
    return output_df
all_channel_0=all_channel_0(X_test_rf).T

In [None]:
#X_train_rf
#Percentage of time when a channel has pressure more than a fixed threshold T: 
#    number of time reach T/number of frames (set T as 90-percentile of all the value in the gesture.)
import sys
import time

def percentage_over_T(touches):
    # find a T as threshold
    percentage_over_T_df = []
    for index,touch in enumerate(touches):
        all_data = []
        for i in range(64):
            channel_data = [x[i] for x in touch]
            all_data += channel_data
        T = sorted(all_data)[int(0.9*len(all_data))]

            # find the percentage when pressure more than T
        more_than_T_list = []
        for i in range(64):
            more_than_T = 0
            channel_data = [x[i] for x in touch]
            for value in channel_data:
                if value > T:
                    more_than_T +=1
            more_than_T_list.append(more_than_T/len(channel_data))
        percentage_over_T_df.append(more_than_T_list)
        print("\r",end="")
        print("progress : {} %".format(100*index/len(touches)),end="")
        sys.stdout.flush()
#         time.sleep(0.05)
    
    output_df = DataFrame(percentage_over_T_df)
    return output_df
percentage_over_T=percentage_over_T(X_train_rf).T

new_df_2=all_channel.append(percentage_over_T)

progress : 99.97864616698698 %

In [None]:
#X_test_rf
def percentage_over_T_0(touches):
    # find a T as threshold
    percentage_over_T_df0 = []
    for index,touch in enumerate(touches):
        all_data = []
        for i in range(64):
            channel_data = [x[i] for x in touch]
            all_data += channel_data
        T = sorted(all_data)[int(0.9*len(all_data))]

            # find the percentage when pressure more than T
        more_than_T_list = []
        for i in range(64):
            more_than_T = 0
            channel_data = [x[i] for x in touch]
            for value in channel_data:
                if value > T:
                    more_than_T +=1
            more_than_T_list.append(more_than_T/len(channel_data))
        percentage_over_T_df0.append(more_than_T_list)
        print("\r",end="")
        print("progress : {} %".format(100*index/len(touches)),end="")
        sys.stdout.flush()
#         time.sleep(0.05)
    
    output_df0 = DataFrame(percentage_over_T_df0)
    return output_df0
percentage_over_T_0=percentage_over_T_0(X_test_rf).T

new_df_2_0=all_channel_0.append(percentage_over_T_0)

progress : 99.93593850096092 %

In [None]:
###combine feature group 1 & 2
X_train_rf=new_df_1.append(new_df_2).T
X_test_rf=new_df_1_0.append(new_df_2_0).T

In [None]:
X_train_rf= np.nan_to_num(X_train_rf.astype(np.float32))
X_test_rf= np.nan_to_num(X_test_rf.astype(np.float32))

In [None]:
###Create RF Model

In [None]:
rf0 =RandomForestClassifier(oob_score=True,random_state=10)   
rf0 = rf0.fit(X_train_rf, y_train)                                         
result0 = rf0.score(X_test_rf, y_test) 

rf1= RandomForestClassifier(n_estimators=140,max_features=9,max_depth=19, min_samples_split=7,min_samples_leaf=1,oob_score=True,random_state=10)  
rf1.fit(X_train_rf, y_train) 
result1 = rf1.score(X_test_rf, y_test) 

print(result0,result1) 

0.57847533632287 0.5816784112748238


### Predict Test Data

In [None]:
y_pred_rf = rf0.predict_proba(X_test_rf) 
 # shape: (N, 14), N == number of len(y_test), 14: probabilty of each class

## 2D CNN

### Data Preprocessing

In [None]:
X_test_2dcnn,y_test_2dcnn,y_test_id_2dcnn = padding_sample_2dcnn(X_test,y_test)
X_train_2dcnn,y_train_2dcnn,y_train_id_2dcnn = padding_sample_2dcnn(X_train,y_train)
X_valid_2dcnn,y_valid_2dcnn,y_valid_id_2dcnn = padding_sample_2dcnn(X_val,y_val)

### Other Codes

In [None]:
train_x_2dcnn = torch.from_numpy(X_train_2dcnn).float()
train_y_2dcnn = torch.from_numpy(y_train_2dcnn).long()
valid_x_2dcnn = torch.from_numpy(X_valid_2dcnn).float()
valid_y_2dcnn = torch.from_numpy(y_valid_2dcnn).long()
test_x_2dcnn = torch.from_numpy(X_test_2dcnn).float()
test_y_2dcnn = torch.from_numpy(y_test_2dcnn).long()
print(train_x_2dcnn.shape)
print(valid_x_2dcnn.shape)
print(test_x_2dcnn.shape)

batch_size = 128


# Pytorch train and test sets
train_2dcnn = torch.utils.data.TensorDataset(train_x_2dcnn,train_y_2dcnn)
valid_2dcnn = torch.utils.data.TensorDataset(valid_x_2dcnn,valid_y_2dcnn)
test_2dcnn = torch.utils.data.TensorDataset(test_x_2dcnn,test_y_2dcnn)

# data loader
train_loader_2dcnn = torch.utils.data.DataLoader(train_2dcnn, batch_size = batch_size, shuffle = False,drop_last=True)
valid_loader_2dcnn = torch.utils.data.DataLoader(valid_2dcnn, batch_size = batch_size, shuffle = False,drop_last=True)
test_loader_2dcnn = torch.utils.data.DataLoader(test_2dcnn, batch_size = batch_size, shuffle = False,drop_last=False)

torch.Size([11296, 100, 8, 8])
torch.Size([3850, 100, 8, 8])
torch.Size([3952, 100, 8, 8])


In [None]:
import torch
import torch.nn as nn

class Network(nn.Module):
    def __init__(self):
        super().__init__()
        self.conv1 = nn.Conv2d(in_channels=100, out_channels=64, kernel_size=3)
        self.conv2 = nn.Conv2d(in_channels=64, out_channels=56, kernel_size=3)
        # self.conv3 = nn.Conv2d(in_channels=56, out_channels=56, kernel_size=3)

        self.fc1 = nn.Linear(in_features=56*2*2, out_features=32)
        self.fc2 = nn.Linear(in_features=32, out_features=14)
        # self.out = nn.Linear(in_features=16, out_features=14)

        # Define proportion or neurons to dropout
        self.dropout = nn.Dropout(0.15)

    def forward(self,x):
        x=self.conv1(x)
        x=F.relu(x)
        x=self.conv2(x)
        x=F.relu(x)
        # x=self.conv3(x)
        # x=F.relu(x)
        x=F.max_pool2d(x,2)
        x = torch.flatten(x, 1)
        x = self.fc1(x)
        x = F.relu(x)
        x = self.dropout(x)
        x = self.fc2(x)
        # x = self.out(x)
        output = F.log_softmax(x, dim=1)
        return output

n_iters = 30000
num_epochs = n_iters / (len(X_train_2dcnn) / batch_size)
num_epochs = int(num_epochs)
print(num_epochs)
from torch.nn import init

# Initilize parameter
def weights_init(m):
    classname=m.__class__.__name__
    if classname.find('Conv') != -1:
        init.xavier_uniform(m.weight.data)
        # torch.nn.init.xavier_uniform(m.bias.data)

# Create CNN
model_2dcnn = Network()
#model.cuda()
print(model_2dcnn)
model_2dcnn.apply(weights_init)

# Cross Entropy Loss 
error = nn.CrossEntropyLoss()
criterion = nn.CrossEntropyLoss()

# SGD Optimizer
learning_rate = 0.001
optimizer = torch.optim.SGD(model_2dcnn.parameters(), lr=learning_rate)

# GPU
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
print(device)
model_2dcnn.to(device)

339
Network(
  (conv1): Conv2d(100, 64, kernel_size=(3, 3), stride=(1, 1))
  (conv2): Conv2d(64, 56, kernel_size=(3, 3), stride=(1, 1))
  (fc1): Linear(in_features=224, out_features=32, bias=True)
  (fc2): Linear(in_features=32, out_features=14, bias=True)
  (dropout): Dropout(p=0.15, inplace=False)
)
cuda:0




Network(
  (conv1): Conv2d(100, 64, kernel_size=(3, 3), stride=(1, 1))
  (conv2): Conv2d(64, 56, kernel_size=(3, 3), stride=(1, 1))
  (fc1): Linear(in_features=224, out_features=32, bias=True)
  (fc2): Linear(in_features=32, out_features=14, bias=True)
  (dropout): Dropout(p=0.15, inplace=False)
)

In [None]:
from torchsummary import summary
summary(model_2dcnn, (100,8,8))

----------------------------------------------------------------
        Layer (type)               Output Shape         Param #
            Conv2d-1             [-1, 64, 6, 6]          57,664
            Conv2d-2             [-1, 56, 4, 4]          32,312
            Linear-3                   [-1, 32]           7,200
           Dropout-4                   [-1, 32]               0
            Linear-5                   [-1, 14]             462
Total params: 97,638
Trainable params: 97,638
Non-trainable params: 0
----------------------------------------------------------------
Input size (MB): 0.02
Forward/backward pass size (MB): 0.03
Params size (MB): 0.37
Estimated Total Size (MB): 0.42
----------------------------------------------------------------


In [None]:
class EarlyStopping:
    """Early stops the training if validation loss doesn't improve after a given patience."""
    def __init__(self, patience=7, verbose=False, delta=0, path='checkpoint.pt', trace_func=print):
        """
        Args:
            patience (int): How long to wait after last time validation loss improved.
                            Default: 7
            verbose (bool): If True, prints a message for each validation loss improvement. 
                            Default: False
            delta (float): Minimum change in the monitored quantity to qualify as an improvement.
                            Default: 0
            path (str): Path for the checkpoint to be saved to.
                            Default: 'checkpoint.pt'
            trace_func (function): trace print function.
                            Default: print            
        """
        self.patience = patience
        self.verbose = verbose
        self.counter = 0
        self.best_score = None
        self.early_stop = False
        self.val_loss_min = np.Inf
        self.delta = delta
        self.path = path
        self.trace_func = trace_func
    def __call__(self, val_loss, model):

        score = -val_loss

        if self.best_score is None:
            self.best_score = score
            self.save_checkpoint(val_loss, model)
        elif score < self.best_score + self.delta:
            self.counter += 1
            self.trace_func(f'EarlyStopping counter: {self.counter} out of {self.patience}')
            if self.counter >= self.patience:
                self.early_stop = True
        else:
            self.best_score = score
            self.save_checkpoint(val_loss, model)
            self.counter = 0

    def save_checkpoint(self, val_loss, model):
        '''Saves model when validation loss decrease.'''
        if self.verbose:
            self.trace_func(f'Validation loss decreased ({self.val_loss_min:.6f} --> {val_loss:.6f}).  Saving model ...')
        torch.save(model.state_dict(), self.path)
        self.val_loss_min = val_loss

In [None]:
def train_model(model, batch_size, patience, n_epochs):
    # to track the training loss as the model trains
    train_losses = []
    # to track the validation loss as the model trains
    valid_losses = []
    # to track the average training loss per epoch as the model trains
    avg_train_losses = []
    # to track the average validation loss per epoch as the model trains
    avg_valid_losses = [] 
    
    # initialize the early_stopping object
    early_stopping = EarlyStopping(patience=patience, verbose=True)
    
    for epoch in range(1, n_epochs + 1):

        ###################
        # train the model #
        ###################
        model.train() # prep model for training
        for batch, (data, target) in enumerate(train_loader_2dcnn, 1):
            # send data to GPU
            data = data.to(device)
            target = target.to(device)
            # clear the gradients of all optimized variables
            optimizer.zero_grad()
            # forward pass: compute predicted outputs by passing inputs to the model
            output = model(data)
            # calculate the loss
            loss = criterion(output, target)
            # backward pass: compute gradient of the loss with respect to model parameters
            loss.backward()
            # perform a single optimization step (parameter update)
            optimizer.step()
            # record training loss
            train_losses.append(loss.item())

        ######################    
        # validate the model #
        ######################
        
        model.eval() # prep model for evaluation
        for data, target in valid_loader_2dcnn:
            # send data to GPU
            data = data.to(device)
            target = target.to(device)
            # forward pass: compute predicted outputs by passing inputs to the model
            output = model(data)
            # calculate the loss
            loss = criterion(output, target)
            # record validation loss
            valid_losses.append(loss.item())

        # print training/validation statistics 
        # calculate average loss over an epoch
        train_loss = np.average(train_losses)
        valid_loss = np.average(valid_losses)
        avg_train_losses.append(train_loss)
        avg_valid_losses.append(valid_loss)
        
        epoch_len = len(str(n_epochs))
        
        print_msg = (f'[{epoch:>{epoch_len}}/{n_epochs:>{epoch_len}}] ' +
                     f'train_loss: {train_loss:.5f} ' +
                     f'valid_loss: {valid_loss:.5f}')
        
        print(print_msg)
        
        # clear lists to track next epoch
        train_losses = []
        valid_losses = []
        
        # early_stopping needs the validation loss to check if it has decresed, 
        # and if it has, it will make a checkpoint of the current model
        early_stopping(valid_loss, model)
        
        if early_stopping.early_stop:
            print("Early stopping")
            break
        
    # load the last checkpoint with the best model
    model.load_state_dict(torch.load('checkpoint.pt'))

    return  model, avg_train_losses, avg_valid_losses

In [None]:
# patience = 10

# model, train_loss, valid_loss = train_model(model, batch_size, patience, num_epochs)

model_2dcnn.load_state_dict(torch.load("/content/drive/My Drive/Colab Notebooks/2dcnn.pth"))
model_2dcnn.eval()

Network(
  (conv1): Conv2d(100, 64, kernel_size=(3, 3), stride=(1, 1))
  (conv2): Conv2d(64, 56, kernel_size=(3, 3), stride=(1, 1))
  (fc1): Linear(in_features=224, out_features=32, bias=True)
  (fc2): Linear(in_features=32, out_features=14, bias=True)
  (dropout): Dropout(p=0.15, inplace=False)
)

### Predict Test Data

In [None]:
# =======================================================
# subsample accuracy test
# =======================================================
from collections import Counter
test_loss = 0.0
num_classes = 14
 
model_2dcnn.eval() # prep model for evaluation
batch_no = 0
prediction_weight_list = []
counter_test_id_batch = Counter(y_test_id_2dcnn)
output_sub = 0
# test_id_list = list(set(y_test_id.tolist()))
current_id = y_test_id_2dcnn[0]
batch_size_evaluation= 128
for data, target in test_loader_2dcnn:
    if len(target.data) != batch_size:
        print('here')
        batch_size_evaluation = len(target.data)
    # send data to GPU
    data = data.to(device)
    target = target.to(device)
    # forward pass: compute predicted outputs by passing inputs to the model
    output = model_2dcnn(data)
    # calculate the loss
    loss = criterion(output, target)
    # update test loss 
    test_loss += loss.item()*data.size(0)
    # convert output probabilities to predicted class
    _, pred = torch.max(output, 1)
    # print(output)
    # compare predictions to true label
    correct = np.squeeze(pred.eq(target.data.view_as(pred)))
    # calculate test accuracy for each object class
        
    
    for i in range(batch_size_evaluation):
        
        if y_test_id_2dcnn[i+batch_no*batch_size]==current_id:
          output_sub += output[i]
        
          
        else:
          prediction_weight = (output_sub/counter_test_id_batch[current_id]).cpu().detach().numpy().tolist()
          prediction_weight_list.append(prediction_weight)
          current_id = y_test_id_2dcnn[i+batch_no*batch_size]
          output_sub = output[i]
 
    batch_no +=1
prediction_weight = (output_sub/counter_test_id_batch[current_id]).cpu().detach().numpy().tolist()
prediction_weight_list.append(prediction_weight)
y_pred_2dcnn = torch.Tensor(prediction_weight_list) 
# print_test_accuracy(np.argmax(y_pred_2dcnn,axis =1),np.array(y_test),num_classes) 
# y_pred_3dcnn  # shape: (N, 14), N == number of len(y_test), 14: probabilty of each class

here


## 3D CNN

### Data Preprocessing

In [None]:
X_test_3dcnn,y_test_3dcnn,y_test_id_3dcnn = padding_sample_3dcnn(X_test,y_test) 
X_train_3dcnn,y_train_3dcnn,y_train_id_3dcnn = padding_sample_3dcnn(X_train,y_train)    
X_valid_3dcnn,y_valid_3dcnn,y_valid_id_3dcnn = padding_sample_3dcnn(X_val,y_val)

### Other Codes

In [None]:
train_x_3dcnn = torch.from_numpy(X_train_3dcnn).float()
train_y_3dcnn = torch.from_numpy(y_train_3dcnn).long()
valid_x_3dcnn = torch.from_numpy(X_valid_3dcnn).float()
valid_y_3dcnn = torch.from_numpy(y_valid_3dcnn).long()
test_x_3dcnn = torch.from_numpy(X_test_3dcnn).float()
test_y_3dcnn = torch.from_numpy(y_test_3dcnn).long()
print(train_x_3dcnn.shape)
print(valid_x_3dcnn.shape)
print(test_x_3dcnn.shape)

batch_size = 128


# Pytorch train and test sets
train_3dcnn = torch.utils.data.TensorDataset(train_x_3dcnn,train_y_3dcnn)
valid_3dcnn = torch.utils.data.TensorDataset(valid_x_3dcnn,valid_y_3dcnn)
test_3dcnn = torch.utils.data.TensorDataset(test_x_3dcnn,test_y_3dcnn)

# data loader
train_loader_3dcnn = torch.utils.data.DataLoader(train_3dcnn, batch_size = batch_size, shuffle = False,drop_last=True)
valid_loader_3dcnn = torch.utils.data.DataLoader(valid_3dcnn, batch_size = batch_size, shuffle = False,drop_last=True)
test_loader_3dcnn = torch.utils.data.DataLoader(test_3dcnn, batch_size = batch_size, shuffle = False,drop_last=False)



torch.Size([11296, 1, 100, 8, 8])
torch.Size([3850, 1, 100, 8, 8])
torch.Size([3952, 1, 100, 8, 8])


3D CNN net

In [None]:
import torch
import torch.nn as nn
# from mypath import Path
class C3D(nn.Module):
    """
    The C3D network.
    """

    def __init__(self, num_classes, pretrained=False):
        super(C3D, self).__init__()

        self.conv1 = nn.Conv3d(1, 16, kernel_size=(3, 3, 3),padding=1)   # 1,16,85,8,8
        self.pool1 = nn.MaxPool3d(kernel_size=(3, 1, 1))      # 1,16,350,8,8

        self.conv2 = nn.Conv3d(16, 32, kernel_size=(3, 3, 3),padding=1)  # 1,32,350,8,8
        self.pool2 = nn.MaxPool3d(kernel_size=(3, 2, 2))      # 1,32,70,4,4

        self.conv3 = nn.Conv3d(32, 64, kernel_size=(3, 3, 3),padding=1)   # 1,64,70,8,8
        self.pool3 = nn.MaxPool3d(kernel_size=(3, 1, 1))      # 1,64,23,8,8

        # self.conv4 = nn.Conv3d(64, 128, kernel_size=(3, 3, 3),padding=1)  # 1，128，23，6，6
        # self.pool4 = nn.MaxPool3d(kernel_size=(3, 1,1))      # 1，128，4，2，2

# The number of fully connected layers is 3. The number of units in the fully connected layers is 1024

        self.fc6 = nn.Linear(32*11*4*4, 1024)
        self.fc7 = nn.Linear(1024, 128)
        self.fc8 = nn.Linear(128, num_classes)
        self.soft = nn.Softmax( dim=1)
        self.dropout = nn.Dropout(p=0.5)
        self.batch=nn.BatchNorm1d(128)

        self.relu = nn.ReLU()

        # self.__init_weight()

        # if pretrained:
        #     self.__load_pretrained_weights()

    def forward(self, x):
        # print(x.shape)
        x = self.relu(self.conv1(x))
        x = self.pool1(x)

        x = self.relu(self.conv2(x))
        x = self.pool2(x)
        # x = self.relu(self.conv3(x))
        # x = self.pool3(x)

        print(x.shape)
        # x = x.view(-1, 524288)
        x = x.view(x.size(0), -1)
        x = self.relu(self.fc6(x))
        x = self.dropout(x)
        x = self.relu(self.fc7(x))
        x = self.batch(x)
        x = self.dropout(x)

        logits = self.fc8(x)
        output = self.soft(logits)
#         print(output)
        return output

n_iters = 30000
num_epochs = n_iters / (len(train_x_3dcnn) / batch_size)
num_epochs = int(num_epochs)
print(num_epochs)
from torch.nn import init

# Initilize parameter
def weights_init(m):
    classname=m.__class__.__name__
    if classname.find('Conv') != -1:
        init.xavier_uniform(m.weight.data)
        # torch.nn.init.xavier_uniform(m.bias.data)

# Create CNN
model_3dcnn = C3D(14)
#model_3dcnn.cuda()
print(model_3dcnn)
model_3dcnn.apply(weights_init)

# Cross Entropy Loss 
error = nn.CrossEntropyLoss()
criterion = nn.CrossEntropyLoss()

# SGD Optimizer
learning_rate = 0.001
optimizer = torch.optim.SGD(model_3dcnn.parameters(), lr=learning_rate)

# GPU
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
print(device)
model_3dcnn.to(device)

339
C3D(
  (conv1): Conv3d(1, 16, kernel_size=(3, 3, 3), stride=(1, 1, 1), padding=(1, 1, 1))
  (pool1): MaxPool3d(kernel_size=(3, 1, 1), stride=(3, 1, 1), padding=0, dilation=1, ceil_mode=False)
  (conv2): Conv3d(16, 32, kernel_size=(3, 3, 3), stride=(1, 1, 1), padding=(1, 1, 1))
  (pool2): MaxPool3d(kernel_size=(3, 2, 2), stride=(3, 2, 2), padding=0, dilation=1, ceil_mode=False)
  (conv3): Conv3d(32, 64, kernel_size=(3, 3, 3), stride=(1, 1, 1), padding=(1, 1, 1))
  (pool3): MaxPool3d(kernel_size=(3, 1, 1), stride=(3, 1, 1), padding=0, dilation=1, ceil_mode=False)
  (fc6): Linear(in_features=5632, out_features=1024, bias=True)
  (fc7): Linear(in_features=1024, out_features=128, bias=True)
  (fc8): Linear(in_features=128, out_features=14, bias=True)
  (soft): Softmax(dim=1)
  (dropout): Dropout(p=0.5, inplace=False)
  (batch): BatchNorm1d(128, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  (relu): ReLU()
)
cuda:0




C3D(
  (conv1): Conv3d(1, 16, kernel_size=(3, 3, 3), stride=(1, 1, 1), padding=(1, 1, 1))
  (pool1): MaxPool3d(kernel_size=(3, 1, 1), stride=(3, 1, 1), padding=0, dilation=1, ceil_mode=False)
  (conv2): Conv3d(16, 32, kernel_size=(3, 3, 3), stride=(1, 1, 1), padding=(1, 1, 1))
  (pool2): MaxPool3d(kernel_size=(3, 2, 2), stride=(3, 2, 2), padding=0, dilation=1, ceil_mode=False)
  (conv3): Conv3d(32, 64, kernel_size=(3, 3, 3), stride=(1, 1, 1), padding=(1, 1, 1))
  (pool3): MaxPool3d(kernel_size=(3, 1, 1), stride=(3, 1, 1), padding=0, dilation=1, ceil_mode=False)
  (fc6): Linear(in_features=5632, out_features=1024, bias=True)
  (fc7): Linear(in_features=1024, out_features=128, bias=True)
  (fc8): Linear(in_features=128, out_features=14, bias=True)
  (soft): Softmax(dim=1)
  (dropout): Dropout(p=0.5, inplace=False)
  (batch): BatchNorm1d(128, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  (relu): ReLU()
)

In [None]:
class EarlyStopping:
    """Early stops the training if validation loss doesn't improve after a given patience."""
    def __init__(self, patience=7, verbose=False, delta=0, path='checkpoint.pt', trace_func=print):
        """
        Args:
            patience (int): How long to wait after last time validation loss improved.
                            Default: 7
            verbose (bool): If True, prints a message for each validation loss improvement. 
                            Default: False
            delta (float): Minimum change in the monitored quantity to qualify as an improvement.
                            Default: 0
            path (str): Path for the checkpoint to be saved to.
                            Default: 'checkpoint.pt'
            trace_func (function): trace print function.
                            Default: print            
        """
        self.patience = patience
        self.verbose = verbose
        self.counter = 0
        self.best_score = None
        self.early_stop = False
        self.val_loss_min = np.Inf
        self.delta = delta
        self.path = path
        self.trace_func = trace_func
    def __call__(self, val_loss, model):

        score = -val_loss

        if self.best_score is None:
            self.best_score = score
            self.save_checkpoint(val_loss, model)
        elif score < self.best_score + self.delta:
            self.counter += 1
            self.trace_func(f'EarlyStopping counter: {self.counter} out of {self.patience}')
            if self.counter >= self.patience:
                self.early_stop = True
        else:
            self.best_score = score
            self.save_checkpoint(val_loss, model)
            self.counter = 0

    def save_checkpoint(self, val_loss, model):
        '''Saves model when validation loss decrease.'''
        if self.verbose:
            self.trace_func(f'Validation loss decreased ({self.val_loss_min:.6f} --> {val_loss:.6f}).  Saving model ...')
        torch.save(model.state_dict(), self.path)
        self.val_loss_min = val_loss

In [None]:
def train_model(model, batch_size, patience, n_epochs):
    # to track the training loss as the model trains
    train_losses = []
    # to track the validation loss as the model trains
    valid_losses = []
    # to track the average training loss per epoch as the model trains
    avg_train_losses = []
    # to track the average validation loss per epoch as the model trains
    avg_valid_losses = [] 
    
    # initialize the early_stopping object
    early_stopping = EarlyStopping(patience=patience, verbose=True)
    
    for epoch in range(1, n_epochs + 1):

        ###################
        # train the model #
        ###################
        model.train() # prep model for training
        for batch, (data, target) in enumerate(train_loader_3dcnn, 1):
            # send data to GPU
            data = data.to(device)
            target = target.to(device)
            # clear the gradients of all optimized variables
            optimizer.zero_grad()
            # forward pass: compute predicted outputs by passing inputs to the model
            output = model(data)
            # calculate the loss
            loss = criterion(output, target)
            # backward pass: compute gradient of the loss with respect to model parameters
            loss.backward()
            # perform a single optimization step (parameter update)
            optimizer.step()
            # record training loss
            train_losses.append(loss.item())

        ######################    
        # validate the model #
        ######################
        
        model.eval() # prep model for evaluation
        for data, target in valid_loader_3dcnn:
            # send data to GPU
            data = data.to(device)
            target = target.to(device)
            # forward pass: compute predicted outputs by passing inputs to the model
            output = model(data)
            # calculate the loss
            loss = criterion(output, target)
            # record validation loss
            valid_losses.append(loss.item())

        # print training/validation statistics 
        # calculate average loss over an epoch
        train_loss = np.average(train_losses)
        valid_loss = np.average(valid_losses)
        avg_train_losses.append(train_loss)
        avg_valid_losses.append(valid_loss)
        
        epoch_len = len(str(n_epochs))
        
        print_msg = (f'[{epoch:>{epoch_len}}/{n_epochs:>{epoch_len}}] ' +
                     f'train_loss: {train_loss:.5f} ' +
                     f'valid_loss: {valid_loss:.5f}')
        
        print(print_msg)
        
        # clear lists to track next epoch
        train_losses = []
        valid_losses = []
        
        # early_stopping needs the validation loss to check if it has decresed, 
        # and if it has, it will make a checkpoint of the current model
        early_stopping(valid_loss, model)
        
        if early_stopping.early_stop:
            print("Early stopping")
            break
        
    # load the last checkpoint with the best model
    model.load_state_dict(torch.load('checkpoint.pt'))

    return  model, avg_train_losses, avg_valid_losses

In [None]:
# patience = 10
 
# model_3dcnn, train_loss, valid_loss = train_model(model_3dcnn, batch_size, patience, num_epochs)

model_3dcnn.load_state_dict(torch.load("/content/drive/My Drive/Colab Notebooks/3dcnn.pth"))
model_3dcnn.eval()

C3D(
  (conv1): Conv3d(1, 16, kernel_size=(3, 3, 3), stride=(1, 1, 1), padding=(1, 1, 1))
  (pool1): MaxPool3d(kernel_size=(3, 1, 1), stride=(3, 1, 1), padding=0, dilation=1, ceil_mode=False)
  (conv2): Conv3d(16, 32, kernel_size=(3, 3, 3), stride=(1, 1, 1), padding=(1, 1, 1))
  (pool2): MaxPool3d(kernel_size=(3, 2, 2), stride=(3, 2, 2), padding=0, dilation=1, ceil_mode=False)
  (conv3): Conv3d(32, 64, kernel_size=(3, 3, 3), stride=(1, 1, 1), padding=(1, 1, 1))
  (pool3): MaxPool3d(kernel_size=(3, 1, 1), stride=(3, 1, 1), padding=0, dilation=1, ceil_mode=False)
  (fc6): Linear(in_features=5632, out_features=1024, bias=True)
  (fc7): Linear(in_features=1024, out_features=128, bias=True)
  (fc8): Linear(in_features=128, out_features=14, bias=True)
  (soft): Softmax(dim=1)
  (dropout): Dropout(p=0.5, inplace=False)
  (batch): BatchNorm1d(128, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  (relu): ReLU()
)

### Predict Test Data

In [None]:
# =======================================================
# subsample accuracy test
# =======================================================
from collections import Counter
test_loss = 0.0
num_classes = 14

model_3dcnn.eval() # prep model for evaluation
batch_no = 0
prediction_weight_list = []
counter_test_id_batch = Counter(y_test_id_3dcnn)
output_sub = 0
# test_id_list = list(set(y_test_id_3dcnn.tolist()))
current_id = y_test_id_3dcnn[0]
batch_size_evaluation= 128
for data, target in test_loader_3dcnn:
    if len(target.data) != batch_size:
        print('here')
        batch_size_evaluation = len(target.data)
    # send data to GPU
    data = data.to(device)
    target = target.to(device)
    # forward pass: compute predicted outputs by passing inputs to the model
    output = model_3dcnn(data)
    # calculate the loss
    loss = criterion(output, target)
    # update test loss 
    test_loss += loss.item()*data.size(0)
    # convert output probabilities to predicted class
    _, pred = torch.max(output, 1)
    # print(output)
    # compare predictions to true label
    correct = np.squeeze(pred.eq(target.data.view_as(pred)))
    # calculate test accuracy for each object class
        
    
    for i in range(batch_size_evaluation):
        
        if y_test_id_3dcnn[i+batch_no*batch_size]==current_id:
          output_sub += output[i]
        
          
        else:
          prediction_weight = (output_sub/counter_test_id_batch[current_id]).cpu().detach().numpy().tolist()
          prediction_weight_list.append(prediction_weight)
          current_id = y_test_id_3dcnn[i+batch_no*batch_size]
          output_sub = output[i]

    batch_no +=1
prediction_weight = (output_sub/counter_test_id_batch[current_id]).cpu().detach().numpy().tolist()
prediction_weight_list.append(prediction_weight)
y_pred_3dcnn = torch.Tensor(prediction_weight_list) 

y_pred_3dcnn  # shape: (N, 14), N == number of len(y_test), 14: probabilty of each class

torch.Size([128, 32, 11, 4, 4])
torch.Size([128, 32, 11, 4, 4])
torch.Size([128, 32, 11, 4, 4])
torch.Size([128, 32, 11, 4, 4])
torch.Size([128, 32, 11, 4, 4])
torch.Size([128, 32, 11, 4, 4])
torch.Size([128, 32, 11, 4, 4])
torch.Size([128, 32, 11, 4, 4])
torch.Size([128, 32, 11, 4, 4])
torch.Size([128, 32, 11, 4, 4])
torch.Size([128, 32, 11, 4, 4])
torch.Size([128, 32, 11, 4, 4])
torch.Size([128, 32, 11, 4, 4])
torch.Size([128, 32, 11, 4, 4])
torch.Size([128, 32, 11, 4, 4])
torch.Size([128, 32, 11, 4, 4])
torch.Size([128, 32, 11, 4, 4])
torch.Size([128, 32, 11, 4, 4])
torch.Size([128, 32, 11, 4, 4])
torch.Size([128, 32, 11, 4, 4])
torch.Size([128, 32, 11, 4, 4])
torch.Size([128, 32, 11, 4, 4])
torch.Size([128, 32, 11, 4, 4])
torch.Size([128, 32, 11, 4, 4])
torch.Size([128, 32, 11, 4, 4])
torch.Size([128, 32, 11, 4, 4])
torch.Size([128, 32, 11, 4, 4])
torch.Size([128, 32, 11, 4, 4])
torch.Size([128, 32, 11, 4, 4])
torch.Size([128, 32, 11, 4, 4])
here
torch.Size([112, 32, 11, 4, 4])


tensor([[8.8621e-04, 9.8607e-04, 3.6517e-04,  ..., 1.8187e-03, 1.7678e-03,
         3.1985e-02],
        [1.2617e-03, 2.4812e-03, 2.5000e-03,  ..., 2.2073e-02, 3.5973e-03,
         6.2896e-01],
        [7.0918e-05, 9.9881e-01, 5.4137e-07,  ..., 8.1263e-06, 1.7068e-04,
         9.3588e-07],
        ...,
        [2.8668e-03, 3.5448e-04, 1.3922e-02,  ..., 2.0583e-03, 7.7945e-04,
         1.2281e-04],
        [5.2617e-05, 9.9872e-01, 9.4594e-07,  ..., 1.8800e-05, 1.1878e-04,
         9.2127e-07],
        [2.2983e-02, 4.2175e-02, 3.8039e-01,  ..., 8.4318e-02, 2.9362e-02,
         4.3146e-02]])

## Ensemble

In [None]:
def print_test_accuracy(y_pred, y_test, num_classes):
    class_correct = list(0. for i in range(num_classes))
    class_total = list(0. for i in range(num_classes))

    for i in range(len(y_test)):
        label = y_test[i]
        class_correct[label] += int(y_pred[i] == y_test[i])
        class_total[label] += 1

    for i in range(num_classes):
        if class_total[i] > 0:
            print('Test Accuracy of %5s: %2.2f%% (%2d/%2d)' % (
                str(i), 100 * class_correct[i] / class_total[i],
                np.sum(class_correct[i]), np.sum(class_total[i])))
        else:
            print('Test Accuracy of %5s: N/A (no training examples)' % (classes[i]))

    print('\nTest Accuracy (Overall): %2.2f%% (%2d/%2d)' % (
        100. * np.sum(class_correct) / np.sum(class_total),
        np.sum(class_correct), np.sum(class_total)))

### Solution 1 (Simple Average)

In [None]:
y_pred_s1 = y_pred_rf + y_pred_2dcnn.numpy() + y_pred_3dcnn.numpy()
y_pred_s1 = np.argmax(y_pred_s1, axis=1)

print_test_accuracy(y_pred_s1, y_test, num_classes)

Test Accuracy of     0: 78.57% (88/112)
Test Accuracy of     1: 58.04% (65/112)
Test Accuracy of     2: 81.98% (91/111)
Test Accuracy of     3: 41.44% (46/111)
Test Accuracy of     4: 71.43% (80/112)
Test Accuracy of     5: 64.29% (72/112)
Test Accuracy of     6: 60.71% (68/112)
Test Accuracy of     7: 43.24% (48/111)
Test Accuracy of     8: 45.05% (50/111)
Test Accuracy of     9: 52.68% (59/112)
Test Accuracy of    10: 26.13% (29/111)
Test Accuracy of    11: 61.26% (68/111)
Test Accuracy of    12: 24.11% (27/112)
Test Accuracy of    13: 79.28% (88/111)

Test Accuracy (Overall): 56.31% (879/1561)


### Solution 2 (Weighted Average)

In [None]:
weights = [0.38, 0.12, 0.5]

y_pred_s2 = y_pred_rf * weights[0] + y_pred_2dcnn.numpy() * weights[1] + y_pred_3dcnn.numpy() * weights[2]
y_pred_s2 = np.argmax(y_pred_s2, axis=1)

print_test_accuracy(y_pred_s2, y_test, num_classes)

Test Accuracy of     0: 79.46% (89/112)
Test Accuracy of     1: 59.82% (67/112)
Test Accuracy of     2: 81.08% (90/111)
Test Accuracy of     3: 50.45% (56/111)
Test Accuracy of     4: 77.68% (87/112)
Test Accuracy of     5: 76.79% (86/112)
Test Accuracy of     6: 66.96% (75/112)
Test Accuracy of     7: 44.14% (49/111)
Test Accuracy of     8: 50.45% (56/111)
Test Accuracy of     9: 72.32% (81/112)
Test Accuracy of    10: 32.43% (36/111)
Test Accuracy of    11: 68.47% (76/111)
Test Accuracy of    12: 26.79% (30/112)
Test Accuracy of    13: 84.68% (94/111)

Test Accuracy (Overall): 62.27% (972/1561)


### Independent Models Accuracy

In [None]:
## Random Forest ##

print_test_accuracy(np.argmax(y_pred_rf, axis=1), np.array(y_test), num_classes)

Test Accuracy of     0: 73.21% (82/112)
Test Accuracy of     1: 52.68% (59/112)
Test Accuracy of     2: 75.68% (84/111)
Test Accuracy of     3: 45.95% (51/111)
Test Accuracy of     4: 67.86% (76/112)
Test Accuracy of     5: 75.00% (84/112)
Test Accuracy of     6: 67.86% (76/112)
Test Accuracy of     7: 41.44% (46/111)
Test Accuracy of     8: 39.64% (44/111)
Test Accuracy of     9: 62.50% (70/112)
Test Accuracy of    10: 38.74% (43/111)
Test Accuracy of    11: 68.47% (76/111)
Test Accuracy of    12: 45.54% (51/112)
Test Accuracy of    13: 54.95% (61/111)

Test Accuracy (Overall): 57.85% (903/1561)


In [None]:
## 2D CNN ##

print_test_accuracy(np.argmax(y_pred_2dcnn, axis=1), np.array(y_test), num_classes)

Test Accuracy of     0: 78.57% (88/112)
Test Accuracy of     1: 46.43% (52/112)
Test Accuracy of     2: 81.08% (90/111)
Test Accuracy of     3: 28.83% (32/111)
Test Accuracy of     4: 61.61% (69/112)
Test Accuracy of     5: 58.04% (65/112)
Test Accuracy of     6: 47.32% (53/112)
Test Accuracy of     7: 36.04% (40/111)
Test Accuracy of     8: 14.41% (16/111)
Test Accuracy of     9: 25.00% (28/112)
Test Accuracy of    10: 15.32% (17/111)
Test Accuracy of    11: 35.14% (39/111)
Test Accuracy of    12: 12.50% (14/112)
Test Accuracy of    13: 53.15% (59/111)

Test Accuracy (Overall): 42.41% (662/1561)


In [None]:
## 3D CNN ##

print_test_accuracy(np.argmax(y_pred_3dcnn, axis=1), np.array(y_test), num_classes)

Test Accuracy of     0: 76.79% (86/112)
Test Accuracy of     1: 55.36% (62/112)
Test Accuracy of     2: 79.28% (88/111)
Test Accuracy of     3: 54.05% (60/111)
Test Accuracy of     4: 79.46% (89/112)
Test Accuracy of     5: 68.75% (77/112)
Test Accuracy of     6: 66.07% (74/112)
Test Accuracy of     7: 42.34% (47/111)
Test Accuracy of     8: 51.35% (57/111)
Test Accuracy of     9: 75.89% (85/112)
Test Accuracy of    10: 33.33% (37/111)
Test Accuracy of    11: 63.06% (70/111)
Test Accuracy of    12: 17.86% (20/112)
Test Accuracy of    13: 85.59% (95/111)

Test Accuracy (Overall): 60.67% (947/1561)
