#import packge

In [None]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import tensorflow as tf
from tensorflow.keras import Sequential
from tensorflow.keras.layers import Conv2D, MaxPooling2D, Flatten, Dense, Dropout
from tensorflow.keras.utils import to_categorical
from tensorflow.keras.callbacks import EarlyStopping
from sklearn.metrics import confusion_matrix
from tensorflow.keras.applications import EfficientNetB0
import random


In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
df = pd.read_pickle("/content/drive/MyDrive/Colab Notebooks/LSWMD.pkl")
df = df.drop(['trianTestLabel'], axis = 1)
df = df.drop(['waferIndex'], axis = 1)

In [None]:
failure_types = []
for f in df.failureType:
    if f.size > 0:
        failure_types.append(f[0][0])
failure_types = set(failure_types)
print('Different failure types:', failure_types)

In [None]:
df['failureNum'] = df.failureType
mapping = {'Center':0, 'Donut':1, 'Edge-Loc':2, 'Edge-Ring':3, 'Loc':4, 'Random':5, 'Scratch':6, 'Near-full':7, 'none':8}
df=df.replace({'failureNum':mapping})

In [None]:
df = df[(df['failureNum'] >= 0) & (df['failureNum'] <= 7)]

Resizing wafer maps

In [None]:
target_dim = 64

#tf2.0 resize image

In [None]:
def resize_wafer_map(wm, target_size=(target_dim, target_dim)):
    return tf.image.resize(np.expand_dims(wm, axis=-1), target_size, method='nearest').numpy()

In [None]:
def one_hot_img(wm):
    h, w = wm.shape[0], wm.shape[1]
    one_hot = np.zeros((h, w, 3))
    for i in range(h):
        for j in range(w):
            one_hot[i, j, int(wm[i, j])] = 1
    return one_hot

In [None]:
import random
random.uniform(0.01,0.09)

In [None]:
# #100分之一轉換
# def add_noise(wm):
#     h, w = wm.shape[0], wm.shape[1] 
#     n_change = int(random.uniform(0.01,0.05) * h * w)
#     changed = 0
#     noised = wm.copy()
#     while changed < n_change:
#         i, j = np.random.randint(0, h), np.random.randint(0, w)
#         if wm[i, j, 0] == 0:  # If the point is on the wafer
#             noised[i, j, 1] = int(not noised[i, j, 1])
#             noised[i, j, 2] = int(not noised[i, j, 2])
#             changed += 1
#     return noised

In [None]:
df['waferMapResized']=df.waferMap.apply(resize_wafer_map).apply(one_hot_img)

In [None]:
df['waferMapResized'][19].shape

In [None]:
df['waferMapResized'][19].shape

In [None]:
wm = df.waferMap[19]
wm_resized = df.waferMapResized[19]
#wm_noised = add_noise(wm_resized)
plt.imshow(wm.squeeze())

In [None]:
# plt.imshow(wm_resized.squeeze())
# print(wm_resized.shape)

In [None]:
# plt.imshow(wm_noised.squeeze())
# print(wm_noised.shape)

Stratified sampling with proportionate allocation

Center : 4294(2.5%)

Donut : 555(0.3%)

Edge-Loc : 5189(3.0%)

Edge-Ring : 9680(5.6%)

Loc : 3593(2.1%)

Random : 866(0.5%)

Scratch : 1193(0.7%)

Near-full : 149(0.1%)

In [None]:
for label in range(len(failure_types)):
    globals()['df_{}'.format(label)] = df[(df['failureNum'] == label)].sample(frac=1)

In [None]:
print('Number of samples per label:')
for label in range(len(failure_types) - 1):
    print('{}:'.format(label), len(globals()['df_{}'.format(label)]))

In [None]:
test_ratio, valid_ratio = 0.25, 0.15
for label in range(len(failure_types)):
    length = len(globals()['df_{}'.format(label)])
    idx_test = int(length * test_ratio)
    idx_valid = idx_test + int(length * valid_ratio)
    globals()['df_{}_test'.format(label)] = globals()['df_{}'.format(label)][:idx_test].reset_index()
    globals()['df_{}_valid'.format(label)] = globals()['df_{}'.format(label)][idx_test:idx_valid].reset_index()
    globals()['df_{}_train'.format(label)] = globals()['df_{}'.format(label)][idx_valid:].reset_index()
    del globals()['df_{}'.format(label)]

In [None]:
print('\nNumber of samples per label in test set:')
for label in range(len(failure_types) - 1):
    print('{}:'.format(label), len(globals()['df_{}_test'.format(label)]))

print('\nNumber of samples per label in valid set:')
for label in range(len(failure_types) - 1):
    print('{}:'.format(label), len(globals()['df_{}_valid'.format(label)]))

print('Number of samples per label in train set:')
for label in range(len(failure_types) - 1):
    print('{}:'.format(label), len(globals()['df_{}_train'.format(label)]))

In [None]:
max_class_len_train = np.max([len(globals()['df_{}_train'.format(i)]) for i in range(len(failure_types) - 1)])
print(max_class_len_train)

Data augmentation for the train set

In [None]:
x_test, y_test = [], []
for label in range(len(failure_types) - 1):
    for wm in globals()['df_{}_test'.format(label)].waferMapResized:
        x_test.append(np.expand_dims(wm, axis=0))
        y_test.append(label)

In [None]:
len(x_test)

In [None]:
x_valid, y_valid = [], []
for label in range(len(failure_types) - 1):
    for wm in globals()['df_{}_valid'.format(label)].waferMapResized:
        x_valid.append(np.expand_dims(wm, axis=0))
        y_valid.append(label)

隨機擴增

In [None]:
x_train, y_train = [], []
for label in range(len(failure_types) - 1):
    length = len(globals()['df_{}_train'.format(label)])
    n_new = max_class_len_train - length
    for i in range(n_new):
        x = globals()['df_{}_train'.format(label)].waferMapResized.loc[np.random.choice(range(length))]
        #print(x)
        if np.random.random() < 0.5:
            new_x = tf.image.flip_left_right(x).numpy()
        else:
            new_x = tf.image.rot90(x).numpy()
        #new_x = add_noise(new_x)
        x_train.append(np.expand_dims(new_x, axis=0))
        y_train.append(label)
    for wm in globals()['df_{}_train'.format(label)].waferMapResized:
        x_train.append(np.expand_dims(wm, axis=0))
        y_train.append(label)

In [None]:
x_test_arr = np.concatenate(x_test, axis=0)
y_test_arr = np.array(y_test)
x_valid_arr = np.concatenate(x_valid, axis=0)
y_valid_arr = np.array(y_valid)
x_train_arr = np.concatenate(x_train, axis=0)
y_train_arr = np.array(y_train)
del x_test, y_test, x_valid, y_valid, x_train, y_train

In [None]:
y_test_arr = to_categorical(y_test_arr)
y_valid_arr = to_categorical(y_valid_arr)
y_train_arr = to_categorical(y_train_arr)

In [None]:
plt.imshow(x_train_arr[11422, :, :, -1].squeeze())
print(y_train_arr[11422])

Model definition

In [None]:
tf.keras.backend.clear_session()

cnn = Sequential()
cnn.add(Conv2D(16, (3, 3), padding='same', activation='relu', input_shape=(target_dim, target_dim, 1)))
cnn.add(MaxPooling2D())
cnn.add(Conv2D(32, (3, 3), padding='same', activation='relu'))
cnn.add(MaxPooling2D())
cnn.add(Conv2D(64, (3, 3), padding='same', activation='relu'))
cnn.add(MaxPooling2D())
cnn.add(Flatten())
cnn.add(Dropout(0.25))
cnn.add(Dense(128, activation='relu'))
cnn.add(Dropout(0.5))
cnn.add(Dense(8, activation='softmax'))

cnn.compile('nadam', loss='categorical_crossentropy', metrics=['categorical_accuracy'])
cnn.summary()

In [None]:
cb = [EarlyStopping(patience=10)]

In [None]:
history = cnn.fit(np.expand_dims(x_train_arr[:, :, :, -1], axis=-1), y_train_arr, batch_size=128, epochs=25,
                  validation_data=(np.expand_dims(x_valid_arr[:, :, :, -1], axis=-1), y_valid_arr), callbacks=cb)

In [None]:
cnn.save('/content/drive/MyDrive/Colab Notebooks/18_epochs.h5')

In [None]:
plt.figure()
plt.plot(history.history['loss'], label='Training loss')
plt.plot(history.history['val_loss'], label='Validation loss')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.legend()
plt.title('Categorical Cross-Entropy')
plt.show()

In [None]:
plt.figure()
plt.plot(history.history['categorical_accuracy'], label='Training accuracy')
plt.plot(history.history['val_categorical_accuracy'], label='Validation accuracy')
plt.xlabel('Epoch')
plt.ylabel('Accuracy')
plt.legend()
plt.title('Accuracy')
plt.show()

In [None]:
score = cnn.evaluate(np.expand_dims(x_test_arr[:, :, :, 2], axis=-1), y_test_arr, verbose=0)
print('Test loss:', score[0])
print('Test accuracy:', score[1])

In [None]:
predictions = cnn.predict(np.expand_dims(x_test_arr[:, :, :, 2], axis=-1))

In [None]:
cm = confusion_matrix(np.argmax(y_test_arr, axis=1), np.argmax(predictions, axis=1))
cm

In [None]:
import seaborn as sns
#畫出heatmap
plt.rcParams['figure.figsize'] = (8,8)
#annot為在上面呈現的數字
#".2f"表示float类型保留小数点后两位。
sns.heatmap(cm, annot=cm, fmt = '.1f',annot_kws={"size": 13})

#EfficientNet

In [None]:
import warnings
warnings.filterwarnings('ignore')

from tensorflow.keras.preprocessing.image import ImageDataGenerator
from tensorflow.keras.models import Sequential, load_model, Model
from tensorflow.keras.layers import Dense, Dropout, Activation, Flatten,MaxPooling2D, BatchNormalization, GlobalAveragePooling2D
from tensorflow.keras.layers import Conv2D, MaxPooling2D, BatchNormalization
from tensorflow.keras.callbacks import EarlyStopping, ModelCheckpoint
from tensorflow.keras.applications import vgg16
from tensorflow.keras.applications.resnet import ResNet50
from tensorflow.keras.applications.densenet import DenseNet201

In [None]:
# transfer learning
base_model = EfficientNetB0(weights='imagenet', include_top=False, input_shape = (64, 64, 3))
x = base_model.output
x = GlobalAveragePooling2D()(x)
predictions = Dense(8, activation='softmax')(x)
model = Model(base_model.input, predictions)
model.compile(optimizer="nadam", loss="categorical_crossentropy", metrics=["accuracy"])
model.summary()

In [None]:
cb = [EarlyStopping(patience=3)]

In [None]:
history = model.fit(x_train_arr[:, :, :, :], y_train_arr, batch_size=32, epochs=25,
                  validation_data=(x_valid_arr[:, :, :, :], y_valid_arr), callbacks=cb)

In [None]:
model.save('/content/drive/MyDrive/Colab Notebooks/effnet_25_epochs.h5')

In [None]:
history = load_model('/content/drive/MyDrive/Colab Notebooks/effnet_25_epochs.h5')

In [None]:
plt.figure()
plt.plot(history.history['loss'], label='Training loss')
plt.plot(history.history['val_loss'], label='Validation loss')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.legend()
plt.title('Categorical Cross-Entropy')
plt.show()

In [None]:
plt.figure()
plt.plot(history.history['categorical_accuracy'], label='Training accuracy')
plt.plot(history.history['val_categorical_accuracy'], label='Validation accuracy')
plt.xlabel('Epoch')
plt.ylabel('Accuracy')
plt.legend()
plt.title('Accuracy')
plt.show()

In [None]:
score = model.evaluate(x_test_arr), y_test_arr, verbose=0)
print('Test loss:', score[0])
print('Test accuracy:', score[1])

In [None]:
predictions = cnn.predict(x_test_arr[:, :, :, :], axis=-1))

In [None]:
cm = confusion_matrix(np.argmax(y_test_arr, axis=1), np.argmax(predictions, axis=1))
cm

# Resnet50

In [None]:
import seaborn as sns
#畫出heatmap
plt.rcParams['figure.figsize'] = (8,8)
#annot為在上面呈現的數字
#".2f"表示float类型保留小数点后两位。
sns.heatmap(cm, annot=cm, fmt = '.1f',annot_kws={"size": 13})

In [None]:

# 此例完全不鎖層，充分的 fine tuning
model_name = 'VGG16-Fine-Tune'

img_rows, img_cols, img_channel = 64, 64, 3

base_model = vgg16.VGG16(weights='imagenet', include_top=False,
                                            input_shape=(img_rows, img_cols, img_channel))
# for layer in base_model.layers:
#     layer.trainable = False

x = base_model.output
x = Flatten()(x)
x = Dropout(0.5)(x)
predictions = Dense(8, activation='softmax')(x)
model = Model(inputs=base_model.input, outputs=predictions)

model.summary()

In [None]:
model_name = 'Xception-Fine-Tune'

img_rows, img_cols, img_channel = 64, 64, 3
base_model = DenseNet201(weights='imagenet', include_top=False,
                         input_shape=(img_rows, img_cols, img_channel))

x = base_model.output
x = GlobalAveragePooling2D(data_format='channels_last')(x)
x = Dropout(0.5)(x)
predictions = Dense(8, activation='softmax')(x)

model = Model(inputs=base_model.input, outputs=predictions)

model.summary()

In [None]:
earlystop = EarlyStopping(monitor='val_loss', patience=10, verbose=1)

model.compile('nadam', loss='categorical_crossentropy', metrics=['categorical_accuracy'])


In [None]:
x_train_arr[:, :, :, :].shape

In [None]:
y_train_arr.shape

In [None]:
model.fit(x_train_arr[:, :, :, :], y_train_arr, batch_size=32, epochs=25,
                  validation_data=(x_valid_arr[:, :, :, :], y_valid_arr), callbacks=[earlystop])