In [1]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import cv2
import os
from sklearn.model_selection import train_test_split
from sklearn.utils import shuffle
from sklearn.metrics import accuracy_score
import tensorflow as tf
from tensorflow.keras import layers, models, initializers
from tensorflow.keras.preprocessing.image import ImageDataGenerator
from tensorflow.keras.callbacks import EarlyStopping
import torch
import torch.nn as nn
import torch.optim as optimizers

In [2]:
def read_img(path, s):
    img_bgr = cv2.imread(path + '/' + s)
    img_rgb = cv2.cvtColor(img_bgr, cv2.COLOR_BGR2RGB)
    return img_rgb

def read_fruits_img(name, path='C:/Users/akihiro.tanaka.CORP/Downloads/pictures/'):
    path = path + name
 
    list1 = os.listdir(path)
    tmp = np.array([read_img(path, s) for s in list1])
    tmp_gray = np.array([cv2.cvtColor(s, cv2.COLOR_RGB2GRAY) for s in tmp])
    return tmp, tmp_gray

In [3]:
shiraishi, shiraishi_gray = read_fruits_img('shiraishi', path='C:/Users/akihiro.tanaka.CORP/Downloads/images/images/')
nishino, nishino_gray = read_fruits_img('nishino', path='C:/Users/akihiro.tanaka.CORP/Downloads/images/images/')
akimoto, akimoto_gray = read_fruits_img('akimoto', path='C:/Users/akihiro.tanaka.CORP/Downloads/images/images/')
ikuta, ikuta_gray = read_fruits_img('ikuta', path='C:/Users/akihiro.tanaka.CORP/Downloads/images/images/')
test, test_gray = read_fruits_img('test', path='C:/Users/akihiro.tanaka.CORP/Downloads/images/images/')

FileNotFoundError: [WinError 3] 指定されたパスが見つかりません。: 'C:/Users/akihiro.tanaka.CORP/Downloads/images/images/shiraishi'

In [None]:
X = np.concatenate([shiraishi_gray, nishino_gray, akimoto_gray, ikuta_gray])
X_flat = X.flatten().reshape(len(X),128**2)/255

shira = np.repeat(0, len(shiraishi_gray))
nishi = np.repeat(1, len(nishino_gray))
aki = np.repeat(2, len(akimoto_gray))
iku = np.repeat(3, len(ikuta_gray))
y_label = np.concatenate([shira, nishi, aki, iku])
y = tf.keras.utils.to_categorical(y_label, 0)

X_train, X_test, y_train, y_test = train_test_split(X_flat, y)
#X_train = X_train.reshape(len(X_train),128,128)
#X_test = X_test.reshape(len(X_test),128,128)

# 13-1 ニューラルネットワーク
## 13-1-1 単純パーセプトロン
---
### keras

In [None]:
model = models.Sequential()
model.add(layers.Dense(4, activation='softmax', input_shape=(128*128,), name='softmax'))
model.summary()

In [None]:
model.compile(optimizer='adam',
              loss='categorical_crossentropy',
              metrics=['acc'])

In [None]:
history = model.fit(X_train, y_train,
                    validation_data=(X_test, y_test),
                    batch_size=256, epochs=20)

In [None]:
p_val = model.predict(X_test)
df = pd.DataFrame({'pred': list(map(np.argmax, p_val)),
                   'label': list(map(np.argmax, y_test))})
correct = df[df['pred']==df['label']]
incorrect = df[df['pred']!=df['label']]

In [None]:
fig, ax = plt.subplots(4, 6, figsize=(16, 8), subplot_kw={'xticks': (), 'yticks': ()})
for i in range(4):
    indices = list(correct[correct['pred']==i].index[:3]) + list(incorrect[incorrect['pred']==i].index[:3])
    for j in range(6):
        ax[i][j].imshow(X_test[indices[j]].reshape(128,128))
        ax[i][j].set_title('pred: %d  ans: %d' % (df['label'][indices[j]], i))

In [None]:
print('correct:'+str(len(correct)))
print('incorrect:'+str(len(incorrect)))

## 単層ニューラルネットワーク
---


In [None]:
model = models.Sequential()
model.add(layers.Dense(1024, activation='relu', input_shape=(128*128,),
                       kernel_initializer=initializers.TruncatedNormal(),
                       name='hidden'))
model.add(layers.Dense(4, activation='softmax', name='softmax'))
model.summary()

In [None]:
model.compile(optimizer='adam',
              loss='categorical_crossentropy',
              metrics=['acc'])

In [None]:
history = model.fit(X_train, y_train,
                    validation_data=(X_test, y_test),
                    batch_size=64, epochs=30)

In [None]:
p_val = model.predict(X_test)
df = pd.DataFrame({'pred': list(map(np.argmax, p_val)),
                   'label': list(map(np.argmax, y_test))})
correct = df[df['pred']==df['label']]
incorrect = df[df['pred']!=df['label']]

In [None]:
fig, ax = plt.subplots(4, 6, figsize=(16, 8), subplot_kw={'xticks': (), 'yticks': ()})
for i in range(4):
    indices = list(correct[correct['pred']==i].index[:3]) + list(incorrect[incorrect['pred']==i].index[:3])
    for j in range(6):
        ax[i][j].imshow(X_test[indices[j]].reshape(128,128))
        ax[i][j].set_title('pred: %d  ans: %d' % (df['label'][indices[j]], i))

In [None]:
print('correct:'+str(len(correct)))
print('incorrect:'+str(len(incorrect)))

In [None]:
train_loader = torch.utils.dta.DataLoader(datasets=X_train,
                                         batch_size=10,
                                         shuffle=True)

In [None]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

class MLP(nn.Module):
    def __init__(self, input_dim, hidden_dim, output_dim):
        super().__init__()
        self.l1 = nn.Linear(input_dim, hidden_dim)
        self.a1 = nn.Sigmoid()
        self.l2 = nn.Linear(hidden_dim, output_dim)
        
        self.layers = [self.l1, self.a1, self.l2]
        
    def forward(self, x):
        for layer in self.layers:
            x = layer(x)
        return x

In [None]:
model = MLP(128*128, 64, 4)

In [None]:
criterion = nn.CrossEntropyLoss()
optimizer = optimizers.SGD(model.parameters(), lr=0.1)

def compute_loss(t, y):
    return criterion(y, torch.max(t,1)[1].long())

def train_step(x, t):
    model.train() # trainモード
    preds = model(x)
    loss = compute_loss(t, preds)
    optimizer.zero_grad()
    loss.backward()
    optimizer.step()
    return loss, preds

In [None]:
epochs = 50
batch_size = 20
n_batches = X_train.shape[0] // batch_size

for epoch in range(epochs):
    train_loss = 0.
    x_, t_  = shuffle(X_train, y_train)
    x_ = torch.Tensor(x_).to(device)
    t_ = torch.Tensor(t_).to(device)
    
    for n_batch in range(n_batches):
        start = n_batch * batch_size
        end = start + batch_size
        loss, preds = train_step(x_[start:end], t_[start:end])
        train_loss += loss.item()
    
    print('epoch: {}, loss: {:.3}'.format(epoch+1, train_loss))

In [None]:
def test_step(x, t):
    x = torch.Tensor(x).to(device)
    t = torch.Tensor(t).to(device)
    model.eval()
    preds = model(x)
    loss = compute_loss(t, preds)
    return loss, preds

loss, preds = test_step(X_test, y_test)
test_loss = loss.item()
test_acc = accuracy_score(np.argmax(y_test, axis=1).tolist(), preds.argmax(dim=-1).tolist())
print('loss: ', test_loss)
print('acc: ', test_acc)

In [None]:
test_acc

## 多層ニューラルネットワーク
---


In [None]:
model = models.Sequential()
model.add(layers.Dense(1024, activation='relu', input_shape=(128*128,),
                       kernel_initializer=initializers.TruncatedNormal(),
                       name='hidden1'))
model.add(layers.Dense(256, activation='relu', input_shape=(128*128,),
                       kernel_initializer=initializers.TruncatedNormal(),
                       name='hidden2'))
model.add(layers.Dense(4, activation='softmax', name='softmax'))
model.summary()

In [None]:
model.compile(optimizer='adam',
              loss='categorical_crossentropy',
              metrics=['acc'])

In [None]:
history = model.fit(X_train, y_train,
                    validation_data=(X_test, y_test),
                    batch_size=256, epochs=10)

In [None]:
p_val = model.predict(X_test)
df = pd.DataFrame({'pred': list(map(np.argmax, p_val)),
                   'label': list(map(np.argmax, y_test))})
correct = df[df['pred']==df['label']]
incorrect = df[df['pred']!=df['label']]

In [None]:
print('correct:'+str(len(correct)))
print('incorrect:'+str(len(incorrect)))

# 13-2 深層学習

## 畳み込みフィルタ
---


In [None]:
model = models.Sequential()
model.add(layers.Reshape((128, 128, 1), input_shape=(128*128,), name='rehsape'))
model.add(layers.Conv2D(16, (9, 9), padding='same', kernel_initializer=initializers.TruncatedNormal(),
                        use_bias=True, activation='relu',
                        name='conv_filter'))
model.add(layers.MaxPooling2D((2, 2), name='max_pooling'))
model.add(layers.Flatten(name='flatten'))
model.add(layers.Dense(1024, activation='relu', kernel_initializer=initializers.TruncatedNormal(), name='hidden'))
model.add(layers.Dense(4, activation='softmax', name='softmax'))
model.summary()

In [None]:
model.compile(optimizer='adam',
              loss='categorical_crossentropy',
              metrics=['acc'])

In [None]:
history = model.fit(X_train, y_train,
                    validation_data=(X_test, y_test),
                    batch_size=128, epochs=10)

In [None]:
p_val = model.predict(X_test)
df = pd.DataFrame({'pred': list(map(np.argmax, p_val)),
                   'label': list(map(np.argmax, y_test))})
correct = df[df['pred']==df['label']]
incorrect = df[df['pred']!=df['label']]

In [None]:
print('correct:'+str(len(correct)))
print('incorrect:'+str(len(incorrect)))

In [None]:
layer_outputs = [model.get_layer('conv_filter').output,
                 model.get_layer('max_pooling').output]
model2 = models.Model(inputs=model.input, outputs=layer_outputs)

In [None]:
conv_outputs, pool_output = model2.predict(X_test[:4])
filter_vals = model.get_layer('conv_filter').get_weights()[0]

In [None]:
fig, ax = plt.subplots(16, 5, figsize=(8, 20), subplot_kw={'xticks': (), 'yticks': ()})
for i in range(16):
    ax[i][0].imshow(filter_vals[:,:,0,i])
    for j in range(1,5):
        ax[i][j].imshow(conv_outputs[j][:,:,i])

In [None]:
model = models.Sequential()
model.add(layers.Reshape((128,128,1), input_shape=(128*128,),name='reshape'))
model.add(layers.Conv2D(32, (5, 5), padding='same',
                        kernel_initializer=initializers.TruncatedNormal(),
                        use_bias=True, activation='relu',
                        name='conv_filter1'))
model.add(layers.MaxPooling2D((2, 2), name='max_pooling1'))
model.add(layers.Conv2D(64, (5, 5), padding='same',
                        kernel_initializer=initializers.TruncatedNormal(),
                        use_bias=True,
                        activation='relu',
                        name='conv_filter2'))
model.add(layers.MaxPooling2D((2, 2), name='max_pooling2'))
model.add(layers.Flatten(name='flatten'))
model.add(layers.Dropout(rate=0.5, name='dropout'))
model.add(layers.Dense(4, activation='softmax', name='softmax'))
model.summary()

In [None]:
model.compile(optimizer='adam',
              loss='categorical_crossentropy',
              metrics=['acc'])

In [None]:
history = model.fit(X_train, y_train,
                    validation_data=(X_test, y_test),
                    batch_size=128, epochs=30)

In [None]:
p_val = model.predict(X_test)
df = pd.DataFrame({'pred': list(map(np.argmax, p_val)),
                   'label': list(map(np.argmax, y_test))})
correct = df[df['pred']==df['label']]
incorrect = df[df['pred']!=df['label']]

In [None]:
print('correct:'+str(len(correct)))
print('incorrect:'+str(len(incorrect)))

### データの拡張

In [None]:
X = np.concatenate([shiraishi, nishino, akimoto, ikuta])
X_flat = X.flatten().reshape(len(X),128**2*3)

shira = np.repeat(0, len(shiraishi_gray))
nishi = np.repeat(1, len(nishino_gray))
aki = np.repeat(2, len(akimoto_gray))
iku = np.repeat(3, len(ikuta_gray))
y = np.concatenate([shira, nishi, aki, iku])
y = tf.keras.utils.to_categorical(y, 0)

X_train, X_test, y_train, y_test = train_test_split(X_flat, y)
X_train = X_train.reshape(len(X_train),128,128,3)
X_test = X_test.reshape(len(X_test),128,128,3)

In [None]:
datagen = ImageDataGenerator(
    rotation_range=10,
    width_shift_range=0.1,
    height_shift_range=0.1,
    zoom_range=[0.8, 1.2],
    horizontal_flip=True,
    channel_shift_range=0.2
)

In [None]:
fig, ax = plt.subplots(4, 6, figsize=(12, 8), subplot_kw={'xticks': (), 'yticks': ()})

for i in range(4):
    for j in range(len(y_train)):
        if np.argmax(y_train[j])==i:
            break
    ax[i][0].imshow(X_train[j])
    ax[i][0].set_xticks([])
    ax[i][0].set_yticks([])
    for k in range(5):
        img = datagen.flow(np.array([X_train[j]]), batch_size=1)[0][0]
        img = img.astype('uint8')
        ax[i][k+1].imshow(img)

### カラー画像の分類
---


In [None]:
model = models.Sequential()
model.add(layers.Conv2D(32, (3,3), padding='same',
                       kernel_initializer=initializers.TruncatedNormal(),
                       use_bias=True, activation='relu',
                       input_shape=(128, 128, 3),
                       name='conv_filter1-1'))
model.add(layers.Conv2D(32, (3, 3), padding='same',
                       kernel_initializer=initializers.TruncatedNormal(),
                       use_bias=True, activation='relu',
                       name='conv_filter1-2'))
model.add(layers.MaxPooling2D((2, 2), name='max_pooling1'))
model.add(layers.Dropout(rate=0.25, name='dropout1'))
model.add(layers.Conv2D(64, (3, 3), padding='same',
                       kernel_initializer=initializers.TruncatedNormal(),
                       use_bias=True, activation='relu',
                       name='conv_filter2-1'))
model.add(layers.Conv2D(64, (3, 3), padding='same',
                       kernel_initializer=initializers.TruncatedNormal(),
                       use_bias=True, activation='relu',
                       name='conv_filter2-2'))
model.add(layers.MaxPooling2D((2, 2), name='max_pooling2'))
model.add(layers.Dropout(rate=0.25, name='dropout2'))
model.add(layers.Flatten(name='flatten'))
model.add(layers.Dense(512, activation='relu',
                      kernel_initializer=initializers.TruncatedNormal(),
                      name='hidden'))
model.add(layers.Dropout(rate=0.5, name='dropout3'))
model.add(layers.Dense(4, activation='softmax', name='softmax'))
model.summary()

In [None]:
model.compile(optimizer='adam',
              loss='categorical_crossentropy',
              metrics=['acc'])

In [None]:
es = EarlyStopping(monitor='val_loss',
                   patience=5,
                  verbose=1)

In [None]:
batch_size = 64
history = model.fit_generator(
    datagen.flow(X_train, y_train, batch_size=batch_size),
    validation_data = (X_test, y_test),
    steps_per_epoch=len(X_train) / batch_size,
    epochs=30
)

In [None]:
history = model.fit(X_train, y_train,
                    validation_data=(X_test, y_test),
                    batch_size=128, epochs=30,
                    verbose=2,
                    callbacks=[es])

In [None]:
p_val = model.predict(X_test)
df = pd.DataFrame({'pred': list(map(np.argmax, p_val)),
                   'label': list(map(np.argmax, y_test))})
correct = df[df['pred']==df['label']]
incorrect = df[df['pred']!=df['label']]

In [None]:
fig, ax = plt.subplots(4, 6, figsize=(16, 8), subplot_kw={'xticks': (), 'yticks': ()})
for i in range(4):
    indices = list(correct[correct['pred']==i].index[1:4]) + list(incorrect[incorrect['pred']==i].index[:3])
    for j in range(6):
        ax[i][j].imshow(X_test[indices[j]])
        ax[i][j].set_title('pred: %d  ans: %d' % (df['label'][indices[j]], i))

In [None]:
print('correct:'+str(len(correct)))
print('incorrect:'+str(len(incorrect)))

In [None]:
128*128

# 13-3 CNNによる画像認識と画像生成

### オートエンコーダによるアノマリー検知

In [None]:
model = models.Sequential()
model.add(layers.Dense(128*128, activation='relu', input_shape=(128*128,)))
#model.add(layers.Dense(512, activation='relu'))
#model.add(layers.Dense(256, activation='relu'))
#model.add(layers.Dense(512, activation='relu'))
#model.add(layers.Dense(1024, activation='relu'))
model.add(layers.Dense(128*128, activation='sigmoid'))
model.summary()

In [None]:
model.compile(optimizer='adam', loss='mse')

In [None]:
history = model.fit(X_train[np.argmax(y_train, axis=1)==1], X_train[np.argmax(y_train, axis=1)==1], batch_size=256, epochs=5)

In [None]:
fig, ax = plt.subplots(2, 6, figsize=(16, 6), subplot_kw={'xticks': (), 'yticks': ()})
for i in range(6):
    ax[0][i].imshow(X_test[i].reshape((128,128)))
    ax[1][i].imshow(model.predict(X_test[i]).reshape((128,128)))

In [None]:
plt.imshow(model.predict(X_test)[10].reshape((128,128)))