In [1]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import random

import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import DataLoader, Dataset

device = torch.device('cuda:{}'.format(0) if torch.cuda.is_available() else 'cpu')
if torch.cuda.is_available():
    torch.cuda.set_device(device) # change allocation of current GPU 

from sklearn.preprocessing import StandardScaler
from sklearn.preprocessing import MinMaxScaler

  return torch._C._cuda_getDeviceCount() > 0


Generete dataset

In [2]:
sensor_num = 11
sensor_list = [f'Sensor_{num}' for num in range(1, sensor_num + 1)]

classes = [0, 1]
class_probabilities = [0.9, 0.1]

min_data_length, max_data_length = 90, 102
data_cycle_num = 5
data_num = 50
data_key_list = ['key','cycle']

dataset_df_list = []

for cycle in range(1, data_cycle_num + 1):
    for data_index in range(data_num):
        sampling_size = random.sample(range(min_data_length, max_data_length), 1)[0]
        class_label = random.choices(classes, class_probabilities, k=1)[0]    

        generation_array = np.random.rand(sampling_size, sensor_num)
        generation_df = pd.DataFrame(generation_array, columns=sensor_list)
        generation_df['cycle'] = cycle
        generation_df['key'] = data_index
        generation_df['label'] = class_label
        
        dataset_df_list.append(generation_df)

concat_df = pd.concat(dataset_df_list, axis=0).reset_index(drop=True)

Get cycle statistic

In [25]:
cycle_num = concat_df.cycle.unique()
cycle_dict = {}

c_m_list = []
c_s_list = []

for cycle in cycle_num:
    cycle_df = concat_df[concat_df.cycle == cycle]
    wafer_unit = list(cycle_df.groupby(data_key_list)) # KEY 값

    w_mean = np.array([df.loc[:, sensor_list].mean().values for info, df in wafer_unit])
    w_std = np.array([df.loc[:, sensor_list].std().values for info, df in wafer_unit])

    c_mean = np.mean(w_mean, axis = 0)
    c_m_list.append(c_mean)
    
    c_std = np.mean(w_std, axis = 0)
    c_s_list.append(c_std)
    
cycle_dict['mean'] = np.array(c_m_list)
cycle_dict['std'] = np.array(c_s_list)

dataset_list = list(concat_df.groupby(data_key_list))

Dataset

In [26]:
class datasetAE(Dataset):
    def __init__(self, wafer_unit,  max_len, cycle_dict, sensor_list):
        
        super(datasetAE, self).__init__()
        self.wafer_unit = wafer_unit
        self.max_len = max_len
        self.cycle_dict = cycle_dict
    
    def __getitem__(self, idx):
        
        info = self.wafer_unit[idx][0]
        df = self.wafer_unit[idx][1].reset_index(drop=True)
    
        c_mean = self.cycle_dict['mean'][0]
        c_std = self.cycle_dict['std'][0]
        y = df.label.unique()[0]
        
        sensor_df = df.loc[:, sensor_list].reset_index(drop=True)
                
        # padding length
        if len(sensor_df) < self.max_len:
            new_index = list(range(self.max_len))
            sensor_df = sensor_df.reindex(new_index).ffill()
                
        sensor_array = np.array(sensor_df)
        
        # nomralization with cycle mean and std
        normed_array = (sensor_array - c_mean) / (c_std)
        
        # (time_length, sensor_num) -> (sensor_num, time_length)
        x = np.array(normed_array).T
        
        return np.array(x), np.array(y)
    
    def __len__(self):
        return len(self.wafer_unit)

In [27]:
max_len = max(set([len(dataset[1]) for dataset in dataset_list]))

torch_dataset = datasetAE(dataset_list, max_len, cycle_dict, sensor_list)
torch_loader = DataLoader(torch_dataset, batch_size=64, drop_last=True)

In [29]:
torch_dataset[0][0].shape

(11, 101)

Model

In [44]:
class I_CNN(torch.nn.Module):
    def __init__(self, sensor_num):
        super(I_CNN, self).__init__()

        layer_list = []
        for num in range(sensor_num):
            extract_layer = torch.nn.Sequential(
                torch.nn.Conv1d(in_channels=1, out_channels=256, kernel_size=9, stride=2),
                torch.nn.ReLU(),
                torch.nn.MaxPool1d(kernel_size=3, stride =3),
                torch.nn.Conv1d(in_channels=256, out_channels=256, kernel_size=5, stride=2),
                torch.nn.ReLU(),
                torch.nn.MaxPool1d(kernel_size=3, stride =3),
            )

            layer_list.append(extract_layer)

        self.extract_layer = nn.ModuleList(layer_list)

        self.diagnosis_layer = torch.nn.Sequential(
            torch.nn.Conv1d(in_channels=sensor_num, out_channels=256, kernel_size=512),
            torch.nn.ReLU(),
        )

        self.detection_layer = torch.nn.Sequential(
            torch.nn.Linear(256, 128),
            torch.nn.ReLU(),
            torch.nn.Dropout(0.5),
            torch.nn.Linear(128, 2),
        )

    def forward(self, x):
        
        batch_size, sensor_num, sequence = x.shape

        # (batch_size, sensor_num, sequence) -> (sensor_num, batch_size, sequence)
        x_ = x.transpose(1,0)

        # 센서 별로 convolution 적용        
        stack_sensor = []
        for sensor_num, x__ in enumerate(x_):
            # (batch_size, sequence)
            # Feature Extraction Layer
            x__ = x__.unsqueeze(1)
            feature = self.extract_layer[sensor_num](x__)
            # (batch_size, (*feature_size))
            flatten = feature.view(batch_size, -1)
            # (batch_size, 1)
            stack_sensor.append(flatten)

        # Stack By Sensor
        # (sensor_num, batch_size, out_channels_num) -> (batch_size, sensor_num, out_channels_num)
        feature_stack = torch.stack(stack_sensor).transpose(0, 1)
        
        spartial_out = self.diagnosis_layer(feature_stack)
        out = self.detection_layer(spartial_out.squeeze())

        return out

In [45]:
test_tensor = torch.rand(64, 11, 101)
model = I_CNN(11)
output = model(test_tensor)

In [47]:
for name, param in model.named_parameters():
    if name == 'diagnosis_layer.0.weight':
        weight = param.detach().cpu().numpy()
        break

In [49]:
def sparse_group_regul(model, sensor_num, alpha, mode):
    '''
    model : I-CNN model
    sensor_num : number of sensors
    alpha : weights of l1, 12 loss
    mode : train (training mode) , valid (validation mode)
    '''
    
    for name, param in model.named_parameters():
        if name == 'diagnosis_layer.0.weight':
            weight = param

    if mode == 'train':
        norm_weights = torch.tensor(0.0, requires_grad=True)
    elif mode == 'valid':
        norm_weights = torch.tensor(0.0)

    for sensor in range(0, sensor_num):
        sensor_weight = weight[:, sensor, :]
        para_m = np.sqrt(len(sensor_weight.flatten()))

        # L1 part
        l1_loss = (alpha) * torch.norm(sensor_weight, p=1)

        # L2 part
        l2_loss = (1 - alpha) * (para_m) * torch.norm(sensor_weight, p=2)

        norm_weights = norm_weights + (l1_loss + l2_loss)

    return norm_weights

def loss_icnn(model, x, y, criterion, sensor_num, alpha = 0.5, lambda_ = 0.1, mode = 'train'):
    '''
    model : I-CNN model
    x : input data
    y : label
    criterion : loss funcion e.g. nn.CrossEntrophy
    sensor_num : number of sensors
    alpha : weights of l1, 12 loss
    mode : t (training mode) , v (validation mode)
    lambda_ : weight of sparse_group_regul_loss
    '''
    
    output = model(x)
    loss_ = criterion(output, y)
    sparse_group_regul_loss = sparse_group_regul(model = model, sensor_num = sensor_num,
                                                 alpha = alpha, mode = mode)
    loss = loss_ + (lambda_ * sparse_group_regul_loss)
    
    return loss