In [1]:
from keras.models import Model
from keras import backend as K
from scipy.stats import norm
from keras import metrics
from keras.layers import *
from tqdm.notebook import tqdm as tqdm
from keras.losses import mse, binary_crossentropy
from Dental_Tool.KFold_v2 import *

from keras.utils import *
import matplotlib.pyplot as plt
import tensorflow as tf
import numpy as np

Using TensorFlow backend.


In [2]:
batch_size = 8
original_dim = 128 * 128
latent_dim = 2
intermediate_dim = 256
epochs = 50
epsilon_std = 1.0
classes = 3

In [3]:
def sampling(args: tuple):
        # we grab the variables from the tuple
        z_mean, z_log_var = args
        epsilon = K.random_normal(shape=(K.shape(z_mean)[0], latent_dim), mean=0.,
                                  stddev=epsilon_std)
        return z_mean + K.exp(z_log_var / 2) * epsilon

In [4]:
def build_encoder(input_shape = (200, 180, 1)): 
        inputs = Input(shape=input_shape)
        x = Conv2D(32, kernel_size=3, strides=(2, 2), padding="same")(inputs)
        x = LeakyReLU(alpha=0.2)(x)
        x = Dropout(0.25)(x)

        x = Conv2D(64, kernel_size=3, strides=(2, 2), padding="same")(x)
        x = BatchNormalization(momentum=0.8)(x)
        x = LeakyReLU(alpha=0.2)(x)
        x = Dropout(0.25)(x)

        x = Conv2D(128, kernel_size=3, strides=(2, 2), padding="same")(x)
        x = BatchNormalization(momentum=0.8)(x)
        x = LeakyReLU(alpha=0.2)(x)
        x = Dropout(0.25)(x)

        x = Conv2D(256, kernel_size=3, strides=(2, 2), padding="same")(x)
        x = BatchNormalization(momentum=0.8)(x)
        x = LeakyReLU(alpha=0.2)(x)
        x = Dropout(0.25)(x)

        x = Conv2D(512, kernel_size=3, strides=(2, 2), padding="same")(x)
        x = BatchNormalization(momentum=0.8)(x)
        x = LeakyReLU(alpha=0.2)(x)
        x = Dropout(0.25)(x)

        x = Conv2D(1024, kernel_size=3, strides=(2, 2), padding="same")(x)
        x = BatchNormalization(momentum=0.8)(x)
        x = LeakyReLU(alpha=0.2)(x)
        x = Dropout(0.25)(x)

        x = Flatten()(x)
        x = Dense(1024)(x)
        x = LeakyReLU(alpha=0.2)(x)
        
        global z_mean
        global z_log_var
        # defining the mean of the latent space
        z_mean = Dense(latent_dim, name="mean")(x)
        # defining the log variance of the latent space
        z_log_var = Dense(latent_dim, name="log-variance")(x)

        z = Lambda(sampling, output_shape=(latent_dim,))([z_mean, z_log_var])
        # defining the encoder as a keras model
        encoder = Model(inputs, [z_mean, z_log_var, z], name="encoder")
        # print out summary of what we just did

        encoder.summary()
        return encoder

In [5]:
encoder = build_encoder(input_shape = (128, 128, 1))

Model: "encoder"
__________________________________________________________________________________________________
Layer (type)                    Output Shape         Param #     Connected to                     
input_1 (InputLayer)            (None, 128, 128, 1)  0                                            
__________________________________________________________________________________________________
conv2d_1 (Conv2D)               (None, 64, 64, 32)   320         input_1[0][0]                    
__________________________________________________________________________________________________
leaky_re_lu_1 (LeakyReLU)       (None, 64, 64, 32)   0           conv2d_1[0][0]                   
__________________________________________________________________________________________________
dropout_1 (Dropout)             (None, 64, 64, 32)   0           leaky_re_lu_1[0][0]              
____________________________________________________________________________________________

In [6]:
def build_decoder(input_shape):
        latent_inputs = Input(shape=(latent_dim,), name='z_sampling')
        label_inputs = Input(shape=(classes,), name='label')
        x = Concatenate()([latent_inputs, label_inputs])
        x = Dense(128, activation='relu')(x)
        x = Dense(4 * 4 * 32, activation='relu')(x)
        x = Reshape((4, 4, 32))(x)
        x = Dropout(0.25)(x)
        
        x = Conv2DTranspose(256, kernel_size=3, strides=(2, 2), padding="same")(x)
        x = BatchNormalization()(x)
        x = Activation("relu")(x)
        
        x = Conv2DTranspose(128, kernel_size=3, strides=(2, 2), padding="same")(x)
        x = BatchNormalization()(x)
        x = Activation("relu")(x)
        
        x = Conv2DTranspose(64, kernel_size=3, strides=(2, 2), padding="same")(x)
        x = BatchNormalization()(x)
        x = Activation("relu")(x)
        
        x = Conv2DTranspose(32, kernel_size=3, strides=(2, 2), padding="same")(x)
        x = BatchNormalization()(x)
        x = Activation("relu")(x)
        
        x = Conv2DTranspose(16, kernel_size=3, strides=(2, 2), padding="same")(x)
        x = BatchNormalization()(x)
        x = Activation("relu")(x)
        
        x = Conv2D(32, (3, 3), activation='relu', padding='same')(x)
        x = Conv2D(1, (3, 3), activation='sigmoid', padding='same')(x)
        
        outputs = Reshape(input_shape)(x)
        
        decoder = Model([latent_inputs, label_inputs], outputs, name='decoder')
        decoder.summary()
        return decoder

In [7]:
build_decoder(input_shape=(128, 128, 1))

Model: "decoder"
__________________________________________________________________________________________________
Layer (type)                    Output Shape         Param #     Connected to                     
z_sampling (InputLayer)         (None, 2)            0                                            
__________________________________________________________________________________________________
label (InputLayer)              (None, 3)            0                                            
__________________________________________________________________________________________________
concatenate_1 (Concatenate)     (None, 5)            0           z_sampling[0][0]                 
                                                                 label[0][0]                      
__________________________________________________________________________________________________
dense_2 (Dense)                 (None, 128)          768         concatenate_1[0][0]        

<keras.engine.training.Model at 0x224f849a208>

In [8]:
def vae_loss(x: tf.Tensor, x_decoded_mean: tf.Tensor,
                z_log_var=z_log_var, z_mean=z_mean,
                original_dim=original_dim):
                xent_loss = original_dim * metrics.binary_crossentropy(x, x_decoded_mean)
                kl_loss = - 0.5 * K.sum(
                    1 + z_log_var - K.square(z_mean) - K.exp(z_log_var), axis=-1)
                vae_loss = K.mean(xent_loss + kl_loss)
                return vae_loss
            
def build_vae(encoder, decoder, input_shape):
        inputs = Input(shape=input_shape)
        label_inputs = Input(shape=(classes,), name='label')
        z_mean, z_log_var, z = encoder(inputs)
#         outputs = decoder(z)
#         vae = Model(inputs, outputs, name='vae_mlp')
        outputs = decoder([z, label_inputs])
        vae = Model([inputs, label_inputs], outputs, name='vae_mlp')
        
#         z_mean = encoder(inputs)[0]
#         z_log_var = encoder(inputs)[1]
        
#         reconstruction_loss = mse(inputs, outputs)
#         reconstruction_loss *= original_dim
#         kl_loss = 1 + z_log_var - K.square(z_mean) - K.exp(z_log_var)
#         kl_loss = K.sum(kl_loss, axis=-1)
#         kl_loss *= -0.5
#         vae_loss = K.mean(reconstruction_loss + kl_loss)
#         vae.add_loss(vae_loss)
        vae.compile(optimizer='adam', loss=vae_loss)
        vae.summary()
        return vae

input_shape = (128, 128, 1)
encoder = build_encoder(input_shape=input_shape)
decoder = build_decoder(input_shape=input_shape)
vae = build_vae(encoder, decoder, input_shape)

Model: "encoder"
__________________________________________________________________________________________________
Layer (type)                    Output Shape         Param #     Connected to                     
input_2 (InputLayer)            (None, 128, 128, 1)  0                                            
__________________________________________________________________________________________________
conv2d_9 (Conv2D)               (None, 64, 64, 32)   320         input_2[0][0]                    
__________________________________________________________________________________________________
leaky_re_lu_8 (LeakyReLU)       (None, 64, 64, 32)   0           conv2d_9[0][0]                   
__________________________________________________________________________________________________
dropout_8 (Dropout)             (None, 64, 64, 32)   0           leaky_re_lu_8[0][0]              
____________________________________________________________________________________________

Model: "vae_mlp"
__________________________________________________________________________________________________
Layer (type)                    Output Shape         Param #     Connected to                     
input_3 (InputLayer)            (None, 128, 128, 1)  0                                            
__________________________________________________________________________________________________
encoder (Model)                 [(None, 2), (None, 2 10494980    input_3[0][0]                    
__________________________________________________________________________________________________
label (InputLayer)              (None, 3)            0                                            
__________________________________________________________________________________________________
decoder (Model)                 (None, 128, 128, 1)  539633      encoder[1][2]                    
                                                                 label[0][0]                

In [9]:
vae.summary()

Model: "vae_mlp"
__________________________________________________________________________________________________
Layer (type)                    Output Shape         Param #     Connected to                     
input_3 (InputLayer)            (None, 128, 128, 1)  0                                            
__________________________________________________________________________________________________
encoder (Model)                 [(None, 2), (None, 2 10494980    input_3[0][0]                    
__________________________________________________________________________________________________
label (InputLayer)              (None, 3)            0                                            
__________________________________________________________________________________________________
decoder (Model)                 (None, 128, 128, 1)  539633      encoder[1][2]                    
                                                                 label[0][0]                

In [10]:
# dir_path = "10_20200901_Original"

# mapping_data = json.load(open(f"Dental_Data/PBL/{dir_path}/mapping.json", "r"))
# filter_data = { path: max(list(map(int, state))) for path, state in mapping_data.items() if max(list(map(int, state))) >= 0 }

from Dental_Tool.Data_processing import *
from Dental_Tool.Dental_Model import *
from Dental_Tool.Process_results import *
from Dental_Tool.Dataloader import *
from Dental_Tool.KFold_v3 import *

directory = [ 
                "Dental_Data/PBL/10_20200901", 
                "Dental_Data/PBL/10_20200901_Flip", 
                "Dental_Data/PBL/10_clahe_20200901", 
                "Dental_Data/PBL/10_clahe_20200901_Flip"
            ]

directory = [ i + "/mapping.json" for i in directory]
argscale_num = len(directory) * 20



data = load_json(directory, interdental=False)
dataset = json_2_dataframe_PBL(data, mode="molar")
dataset
# dataset = pd.DataFrame(columns=PBL_Columns)
# dataset = json_2_dataframe_PBL(dataset, filter_data, mode="premolar")

# severe = dataset[dataset["Class"] == 2]
# severe

Unnamed: 0,Path,State,Class,source,tooth_num,angle
0,Dental_Data/PBL/10_20200901/15-53-19-666_00040...,3,2,NN_191024_151631_BE78BA_2,2,-10
1,Dental_Data/PBL/10_20200901_Flip/15-53-19-666_...,3,2,NN_191024_151631_BE78BA_2,2,-10
2,Dental_Data/PBL/10_clahe_20200901/16-31-09-808...,3,2,NN_191024_151631_BE78BA_2,2,-10
3,Dental_Data/PBL/10_clahe_20200901_Flip/16-31-0...,3,2,NN_191024_151631_BE78BA_2,2,-10
4,Dental_Data/PBL/10_20200901/15-53-19-697_00040...,3,2,NN_191024_151631_BE78BA_2,2,-9
...,...,...,...,...,...,...
106475,Dental_Data/PBL/10_clahe_20200901_Flip/17-03-1...,1,0,NN_140521_095055_C0A4EA_19,19,8
106476,Dental_Data/PBL/10_20200901/16-30-47-021_S4210...,1,0,NN_140521_095055_C0A4EA_19,19,9
106477,Dental_Data/PBL/10_20200901_Flip/16-30-47-021_...,1,0,NN_140521_095055_C0A4EA_19,19,9
106478,Dental_Data/PBL/10_clahe_20200901/17-03-16-474...,1,0,NN_140521_095055_C0A4EA_19,19,9


In [11]:
def load_images(path_list, resize):
        X = []
        for path in tqdm(path_list):
                image = cv2.imread(path, 0)
                image = cv2.resize(image, resize)
                image = image.astype("float32") / 255.0
#                 image = image - np.mean(image)
                image = np.expand_dims(image, axis=2)
                X.append(image)
        return np.array(X)


for train, valid, test, train_gen, valid_gen, test_gen in \
    K_Fold_balance_data_generator(dataset, argscale_num, batch_size=32, k_fold_num=5):
        
        x_train, y_train = load_images(train["Path"], (128, 128)), to_categorical(train["tooth_type"], classes)
        x_valid, y_valid = load_images(valid["Path"], (128, 128)), to_categorical(valid["tooth_type"], classes)
        x_test, y_test = load_images(test["Path"], (128, 128)), to_categorical(test["tooth_type"], classes)
        
        print(x_train.shape), print(y_train.shape)
        
#         vae.fit([x_train, y_train], x_train,
#                  epochs=epochs, 
#                  batch_size=batch_size,validation_data=([x_valid, y_valid], None),
#                  verbose=2)


stage 0: 10400, stage 1: 27120, stage 2: 17360, stage 3: 8640
stage 0: 3600, stage 1: 9120, stage 2: 5680, stage 3: 2880
stage 0: 3520, stage 1: 9200, stage 2: 5840, stage 3: 3120


HBox(children=(HTML(value=''), FloatProgress(value=0.0, max=25920.0), HTML(value='')))




HBox(children=(HTML(value=''), FloatProgress(value=0.0, max=8640.0), HTML(value='')))




HBox(children=(HTML(value=''), FloatProgress(value=0.0, max=9360.0), HTML(value='')))


(25920, 128, 128, 1)
(25920, 3)
Train on 25920 samples, validate on 8640 samples
Epoch 1/50


InvalidArgumentError: 2 root error(s) found.
  (0) Invalid argument:  You must feed a value for placeholder tensor 'input_1' with dtype float and shape [?,128,128,1]
	 [[node input_1 (defined at C:\Users\Lab620\Anaconda3\envs\lawrence\lib\site-packages\tensorflow_core\python\framework\ops.py:1751) ]]
  (1) Invalid argument:  You must feed a value for placeholder tensor 'input_1' with dtype float and shape [?,128,128,1]
	 [[node input_1 (defined at C:\Users\Lab620\Anaconda3\envs\lawrence\lib\site-packages\tensorflow_core\python\framework\ops.py:1751) ]]
	 [[gradients/loss/decoder_loss/vae_loss/weighted_loss/mul_grad/Shape/_181]]
0 successful operations.
0 derived errors ignored. [Op:__inference_keras_scratch_graph_17079]

Function call stack:
keras_scratch_graph -> keras_scratch_graph


In [None]:
batch_size = 100
original_dim = 128*128
latent_dim = 128 # 隐变量取2维只是为了方便后面画图
intermediate_dim = 256
epochs = 20000
num_classes = 3

x = Input(shape=(original_dim,))
h = Dense(intermediate_dim, activation='relu')(x)

# 算p(Z|X)的均值和方差
z_mean = Dense(latent_dim)(h)
z_log_var = Dense(latent_dim)(h)

y = Input(shape=(num_classes,)) # 输入类别
yh = Dense(latent_dim)(y) # 这里就是直接构建每个类别的均值

# 重参数技巧
def sampling(args):
    z_mean, z_log_var = args
    epsilon = K.random_normal(shape=K.shape(z_mean))
    return z_mean + K.exp(z_log_var / 2) * epsilon

# 重参数层，相当于给输入加入噪声
z = Lambda(sampling, output_shape=(latent_dim,))([z_mean, z_log_var])

# 解码层，也就是生成器部分
decoder_h = Dense(intermediate_dim, activation='relu')
decoder_mean = Dense(original_dim, activation='sigmoid')
h_decoded = decoder_h(z)
x_decoded_mean = decoder_mean(h_decoded)

# 建立模型
vae = Model([x, y], [x_decoded_mean, yh])

# xent_loss是重构loss，kl_loss是KL loss
xent_loss = K.sum(K.binary_crossentropy(x, x_decoded_mean), axis=-1)

# 只需要修改K.square(z_mean)为K.square(z_mean - yh)，也就是让隐变量向类内均值看齐
kl_loss = - 0.5 * K.sum(1 + z_log_var - K.square(z_mean - yh) - K.exp(z_log_var), axis=-1)
vae_loss = K.mean(xent_loss + kl_loss)

# add_loss是新增的方法，用于更灵活地添加各种loss
vae.add_loss(vae_loss)
vae.compile(optimizer='rmsprop')
vae.summary()



x_train, y_train = load_images(train["Path"], (128, 128)), to_categorical(train["Class"], classes)
x_valid, y_valid = load_images(valid["Path"], (128, 128)), to_categorical(valid["Class"], classes)
        
x_train = x_train.reshape((len(x_train), np.prod(x_train.shape[1:])))
x_valid = x_valid.reshape((len(x_valid), np.prod(x_valid.shape[1:])))

vae.fit([x_train, y_train], 
        shuffle=True,
        epochs=epochs,
        batch_size=batch_size,
        validation_data=([x_valid, y_valid], None))


In [None]:
# 构建encoder，然后观察各个数字在隐空间的分布
encoder = Model(x, z_mean)

# x_test_encoded = encoder.predict(x_valid, batch_size=batch_size)
# plt.figure(figsize=(6, 6))
# plt.scatter(x_test_encoded[:, 0], x_test_encoded[:, 1], c=y_valid)
# plt.colorbar()
# plt.show()

# 构建生成器
decoder_input = Input(shape=(latent_dim,))
_h_decoded = decoder_h(decoder_input)
_x_decoded_mean = decoder_mean(_h_decoded)
generator = Model(decoder_input, _x_decoded_mean)

# 输出每个类的均值向量
mu = Model(y, yh)
mu = mu.predict(np.eye(num_classes))

# 观察能否通过控制隐变量的均值来输出特定类别的数字
n = 15  # figure with 15x15 digits
digit_size = 128
figure = np.zeros((digit_size * n, digit_size * n))

output_digit = 2 # 指定输出数字

# 用正态分布的分位数来构建隐变量对
grid_x = norm.ppf(np.linspace(0.05, 0.95, n)) + mu[output_digit][1]
grid_y = norm.ppf(np.linspace(0.05, 0.95, n)) + mu[output_digit][0]

for i, yi in enumerate(grid_x):
    for j, xi in enumerate(grid_y):
        z_sample = np.array([[xi, yi]])
        x_decoded = generator.predict(z_sample)
        digit = x_decoded[0].reshape(digit_size, digit_size)
        figure[i * digit_size: (i + 1) * digit_size,
               j * digit_size: (j + 1) * digit_size] = digit

plt.figure(figsize=(10, 10))
plt.imshow(figure, cmap='Greys_r')
plt.show()

In [None]:
def fc(input_img):
#         flat = Flatten()(enco)
#         x = Input(shape=(original_dim,))
        h = Dense(intermediate_dim, activation='relu')(input_img)

        # 算p(Z|X)的均值和方差
        z_mean = Dense(latent_dim)(h)
        z_log_var = Dense(latent_dim)(h)
        z = Lambda(sampling, output_shape=(latent_dim,))([z_mean, z_log_var])
        
        den = Dense(128, activation='relu')(z)
        out = Dense(num_classes, activation='softmax')(den)
        return out

input_img = Input(shape=(original_dim,))
# encoder = Model(x, z_mean)
# encode = encoder(input_img)
# print(encode)
full_model = Model(input_img, fc(input_img))
# for l1,l2 in zip(full_model.layers[:3],vae.layers[0:3]):
#     l1.set_weights(l2.get_weights())

full_model.summary()

In [None]:
vae.summary()

In [None]:
for l1,l2 in zip(full_model.layers[:5], vae.layers[0:5]):
    l1.set_weights(l2.get_weights())
    
for layer in full_model.layers[0:5]:
    layer.trainable = False
full_model.compile(loss=keras.losses.categorical_crossentropy, optimizer=keras.optimizers.Adam(),metrics=['accuracy'])

In [None]:
classify_train = full_model.fit(x_train, y_train, batch_size=64,epochs=20000,verbose=1,validation_data=(x_valid, y_valid))

In [None]:
full_model.evaluate(x_valid, y_valid, batch_size=batch_size, verbose=1)