In [None]:
import os
import numpy as np
import matplotlib.pyplot as plt
import tensorflow as tf
import random
import datetime

In [None]:
SAMPLE_NUM = 4
cross_section_n = 1
train_num = 3
test_num = 1
size1 = 128
size2 = 512

In [None]:
Iteration_NUM = 29 # 迭代步数
DE_NUM = 12 #隐藏层特征图的个数
DE_kernel_size = 37 #卷积核大小
out_channel = 1 #输出
hs_channel = 7 #微分方程的个数-1
bc_channel = 4 #边界条件的图层数
INPUT1_SHAPE = [None,None, bc_channel]
OUTPUT_SHAPE = [None,None, out_channel]
HS_SHAPE = [None,None, hs_channel] 
MIX_SHAPE = [None,None, hs_channel+out_channel]

LAMBDA = 100 #L1损失的权重
# WEIGHT_SSIM = 10 #图像结构相似度损失的权重
G_step_size = 1e-3

In [None]:
# data path
input_path = "./dataset/" # 数据输入路径
save_path = "./output3" #输出结果图片保存路径
current_time = datetime.datetime.now().strftime('%Y%m%d-%H%M%S')
train_log_dir = './logs/' + current_time + '/train'
checkpoint_dir = './training_checkpoints/PDE_D12-K37-H7'#检查点读取、保存路径
checkpoint_prefix = os.path.join(checkpoint_dir, "ckpt")


def itas():
    #读取冷却效率
    itas = []
    for i in range(0,SAMPLE_NUM):
        ita = tf.image.crop_to_bounding_box(plt.imread(input_path+str(i+1)+'.png'), 198, 0, 126, 522)
        ita = ita[:,:,0]
        ita = ita[...,tf.newaxis]
        ita = tf.image.resize(ita,[size1, size2])
        itas.append(ita)
    return itas

def BR():
    #读取吹风比
    BR = []
    for i in range(0,SAMPLE_NUM):
        br = tf.image.crop_to_bounding_box(plt.imread(input_path+str(i+1)+'.png'), 198, 522, 126, 522)
        br = br[:,:,1]
        br = br[...,tf.newaxis]
        br = tf.image.resize(br,[size1, size2])
        BR.append(br)
    return BR

def G(): 
    #读取几何
    G = []
    for i in range(0,SAMPLE_NUM):
        g = tf.image.crop_to_bounding_box(plt.imread(input_path+str(i+1)+'.png'), 198, 522, 126, 522)
        g = g[:,:,1]
        g = g.numpy()
        g = np.int64(g>0)
        g = tf.cast(g, tf.float32)
        g = g[...,tf.newaxis]
        g = tf.image.resize(g,[size1, size2])
        G.append(g)
    return G

In [None]:
itas = itas()
BR = BR()  
G = G()

for i in range(SAMPLE_NUM):
    plt.figure(figsize=(10,10))
    plt.imshow(0.8 - itas[i], cmap=plt.cm.jet, vmin=0, vmax=0.8)
    plt.axis('off') 
    plt.show()
    plt.figure(figsize=(10,10))
    plt.imshow(G[i], cmap=plt.cm.gray,vmin=0, vmax=1)
    plt.axis('off') 
    plt.show()

inlet = np.zeros((size1, size2))
exit = np.zeros((size1, size2))

inlet[:, 0] = 1
exit[:, -1] = 1

inlet = inlet[...,tf.newaxis]
exit = exit[...,tf.newaxis]

In [None]:
class InputSet:
    def __init__(self, path0):
        self.input_path = path0
      
    def loaddata(self):
        in0 = np.zeros((SAMPLE_NUM, size1, size2, bc_channel))
        out0 = np.zeros((SAMPLE_NUM, size1, size2, out_channel))
        
        for i in range(0,SAMPLE_NUM):
            in0[i,:,:,0:1] = G[i]
            in0[i,:,:,1:2] = BR[i]
            in0[i,:,:,2:3] = inlet
            in0[i,:,:,3:4] = exit
            out0[i,:,:,0:1] = itas[i]
        
        self.in0 = in0
        self.out0 = out0
        
        print("shape of all_input:" + str(in0.shape))
        print("shape of all_output:" + str(out0.shape))
        
    def create_input_dataset(self):
        # Split Training and Test Dataset
        train = self.all_input[:train_num, ...]
        test = self.all_input[train_num:, ...]   
        # Change Ndarray to Tensor
        train = tf.cast(train, tf.float32)
        test = tf.cast(test, tf.float32)
        return train, test
    
    def create_output_dataset(self):
        # Split Training and Test Dataset
        train = self.all_output[:train_num, ...]
        test = self.all_output[train_num:, ...]
        # Change Ndarray to Tensor
        train = tf.cast(train, tf.float32)
        test = tf.cast(test, tf.float32)
        return train, test
    
    def generate_dataset(self):
        input_train, input_test = self.create_input_dataset()
        output_train, output_test = self.create_output_dataset()
        input_train_dataset = tf.data.Dataset.from_tensor_slices(input_train)
        input_test_dataset = tf.data.Dataset.from_tensor_slices(input_test)

        output_train_dataset = tf.data.Dataset.from_tensor_slices(output_train)
        output_test_dataset = tf.data.Dataset.from_tensor_slices(output_test)

        train_dataset = tf.data.Dataset.zip((input_train_dataset, output_train_dataset))
        test_dataset = tf.data.Dataset.zip((input_test_dataset, output_test_dataset))

        train_dataset = train_dataset.batch(1).shuffle(train_num)
        test_dataset = test_dataset.batch(1)
        return train_dataset, test_dataset

    def create(self):
        self.loaddata()
        self.all_input = self.in0
        self.all_output = self.out0
        self.train_dataset, self.test_dataset = self.generate_dataset()
    
    def flip_up_down(self):
        #创建一个由原数据集上下翻转后得到的数据集
        self.loaddata()
        self.all_input = tf.image.flip_up_down(self.in0)
        self.all_output = tf.image.flip_up_down(self.out0)
        self.train_dataset, self.test_dataset = self.generate_dataset()
        
    def flip_left_right(self):
        #创建一个由原数据集左右翻转后得到的数据集
        self.loaddata()
        self.all_input = tf.image.flip_left_right(self.in0)
        self.all_output = tf.image.flip_left_right(self.out0)
        self.train_dataset, self.test_dataset = self.generate_dataset()
    
    def rot90(self):
        #创建一个由原数据集顺时针旋转90°后得到的数据集
        self.loaddata()
        self.all_input = tf.image.rot90(self.in0,3)
        self.all_output = tf.image.rot90(self.out0,3)
        self.train_dataset, self.test_dataset = self.generate_dataset()
        
    def crop(self):
        #创建一个由原数据集剪切后得到的数据集
        self.loaddata()
        self.all_input = tf.image.crop_to_bounding_box(self.in0, 0, int(size1/4),size1,int(size2/2))
        self.all_output = tf.image.crop_to_bounding_box(self.out0,0, int(size1/4),size1,int(size2/2))
        self.train_dataset, self.test_dataset = self.generate_dataset()
    
    def resize(self):
        #创建一个由原数据集缩放后得到的数据集
        self.loaddata()
        self.all_input = tf.image.resize(self.in0,[int(size1 * 2), int(size2 * 2)])
        self.all_output = tf.image.resize(self.out0,[int(size1 * 2), int(size2 * 2)])
        self.train_dataset, self.test_dataset = self.generate_dataset()
    
    def add0(self):
        #创建一个在原数据集前补若干列0后得到的数据集
        self.loaddata_1()
        self.all_input = self.in1
        self.all_output = self.out1
        self.train_dataset, self.test_dataset = self.generate_dataset()
    
    def display(self):
        for example_input1, example_target in self.train_dataset.take(1):
            display_list = [example_input1[0, ... , 0], example_target[0, ... , 0]]  
            for i in range(len(display_list)):
                plt.subplot(1, len(display_list), i + 1)        
                plt.imshow(display_list[i], cmap=plt.cm.gray,vmin=0, vmax=1)
                plt.axis('off')
                #plt.colorbar()
            plt.show()

In [None]:
set3 = InputSet(input_path)
set3.create()
print(set3.train_dataset)
set3.display()

# set4 = InputSet(input_path)
# set4.flip_up_down()
# print(set4.train_dataset)
# set4.display()

# set5 = InputSet(input_path)
# set5.flip_left_right()
# print(set5.train_dataset)
# set5.display()

# set6 = InputSet(input_path)
# set6.rot90()
# print(set6.train_dataset)
# set6.display()

# set7 = InputSet(input_path)
# set7.crop()
# print(set7.train_dataset)
# set7.display()

# set8 = InputSet(input_path)
# set8.resize()
# print(set8.train_dataset)
# set8.display()

In [None]:
def Generator():
    input1 = tf.keras.layers.Input(shape=INPUT1_SHAPE)  
    output_pre = tf.keras.layers.Input(shape=OUTPUT_SHAPE) 
    input_hs = tf.keras.layers.Input(shape=HS_SHAPE) 
    
    inputc = tf.keras.layers.concatenate([input1,output_pre,input_hs])      
    
    # 卷积核初始化，均值和标准差
    initializer = tf.random_normal_initializer(0., 0.02)

    y = tf.keras.layers.Conv2D(DE_NUM,
                               DE_kernel_size,
                               strides=1,
                               padding='same',
                               kernel_initializer=initializer,
                               use_bias=False)(inputc)
    y = tf.keras.layers.BatchNormalization()(y)
    y = tf.keras.layers.LeakyReLU()(y)
    
    y = tf.keras.layers.Conv2D(out_channel+hs_channel,
                               1,
                               strides=1,
                               padding='same',
                               kernel_initializer=initializer,
                               use_bias=False)(y)
    #conv_DE.add(tf.keras.layers.BatchNormalization())
    y = tf.keras.layers.LeakyReLU()(y)       
    y = output_pre + y

    return tf.keras.Model(inputs=[input1, output_pre, input_hs], outputs=y)

In [None]:
train_writer = tf.summary.create_file_writer(train_log_dir)
loss_object = tf.keras.losses.BinaryCrossentropy(from_logits=True)
Gen_loss = tf.keras.metrics.Mean('Gen_loss')
# SSIM_loss = tf.keras.metrics.Mean('SSIM_loss')
L1_loss = tf.keras.metrics.Mean('L1_loss')

generator_optimizer = tf.keras.optimizers.Adam(G_step_size, beta_1=0.5)

generator = Generator()
generator.summary()

checkpoint = tf.train.Checkpoint(generator_optimizer=generator_optimizer,generator=generator)

In [None]:
def generator_loss(gen_output, target):       
    l1_loss = tf.reduce_mean(tf.abs(target - gen_output))
    # ssim_loss = 1 - tf.image.ssim(target, gen_output, 1)
    total_gen_loss =  (LAMBDA * l1_loss)
    return total_gen_loss, l1_loss

# 使用AutoGraph转换成静态图表示，加速代码
tf.config.experimental_run_functions_eagerly(True)

@tf.function
def train_step(input1, ini_output, target):          
    with tf.GradientTape() as gen_tape:        
        gen_output, gen_loss, l1_loss = iteration_net(input1, ini_output, target)     #ini_output，  

    generator_gradients = gen_tape.gradient(gen_loss, generator.trainable_variables)   
    generator_optimizer.apply_gradients(zip(generator_gradients, generator.trainable_variables))
    
    Gen_loss(gen_loss)
    L1_loss(l1_loss)

def iteration_net(input1, ini_output, target):
    gen_loss = 0
    l1_loss = 0
    
    ini_hs = tf.tile(tf.zeros_like(input1[:,:,:,0:1]),[1,1,1,hs_channel])
    display_list = []
    gen_output = generator([input1, ini_output, ini_hs], training=True)
        
    for i in range(Iteration_NUM):        
        hs = gen_output[:,:,:,0:hs_channel]
        output = gen_output[:,:,:,hs_channel:hs_channel+out_channel]
        gen_output = generator([input1, output, hs], training=True)
        gen_loss0,l1_loss0 = generator_loss(output, target)
        gen_loss = gen_loss + gen_loss0
        l1_loss = l1_loss + l1_loss0
 
    return output, gen_loss, l1_loss

In [None]:
def boundary_condition(input_1, target):    
    input_1 = input_1.numpy()     
    input_1 = tf.cast(input_1, tf.float32)

    target = target   

    ini_output = tf.tile(tf.zeros_like(input_1[:,:,:,0:1]),[1,1,1,out_channel])
    ini_output = tf.add(ini_output * 0, 0.0*tf.random.normal(shape = ini_output.shape))         
    ini_output = tf.cast(ini_output, tf.float32)
    
    return input_1, ini_output, target

def generate_images(test_input1, test_ini_output, test_tar, name, figsize=(size1, size2)):

    #test_ini_hs = tf.tile(tf.zeros_like(test_input1[:,:,:,0:1]),[1,1,1,hs_channel])
    prediction, gen_loss, l1_loss = iteration_net(test_input1, test_ini_output, test_tar)
    
    plt.figure(figsize=figsize)
    display_list = [test_input1[0, ... , 0],
                    0.8 - test_tar[0, ... , 0] - (test_input1[0, ... , 0]), 
                    0.8 - prediction[0, ... , 0] - (test_input1[0, ... , 0]), 
                    abs(test_tar[0, ... , 0] - prediction[0, ... , 0]) * (1-test_input1[0, ... , 0])]
    title = ['Holes', 'Target', 'Predicted', 'AbsError']
    
    l1_loss = tf.reduce_mean(tf.abs(test_tar[0, ... , 0] - prediction[0, ... , 0]) * (1-test_input1[0, ... , 0]))
    
    for i in range(len(display_list)):
        if i == 0:
            plt.subplot(1, 5, i + 1)
            plt.title(title[i])
            plt.imshow(display_list[i], cmap=plt.cm.gray,vmin=0, vmax=1)
            plt.axis('off')
        if i == 3:
            plt.subplot(1, 5, i + 1)
            plt.title(title[i])
            plt.imshow(display_list[i], cmap=plt.cm.jet, vmin=0.0, vmax=1)
            plt.axis('off') 
        elif (i == 1) or (i == 2):
            plt.subplot(1, 5, i + 1)
            plt.title(title[i])
            plt.imshow(display_list[i], cmap=plt.cm.jet, vmin=0, vmax=0.8)
            plt.axis('off')
            

    #保存图片（训练时注释掉）
#     name = str(name) + '_result.jpg'
#     path = os.path.join(save_path, name)
#     plt.savefig(path)
    plt.show()
    return l1_loss
    
def training(epochs, ispadding=False):
    
    print('Training !\n')
    
    for epoch in range(epochs):
        # Train
        
        for input_1, target in set3.train_dataset:
            input_1, ini_output, target = boundary_condition(input_1, target)
            train_step(input_1, ini_output, target)
            
#         for input_1, target in set4.train_dataset:
#             input_1, ini_output, target = boundary_condition(input_1, target)
#             train_step(input_1, ini_output, target)

#         for input_1, target in set5.train_dataset:
#             input_1, ini_output, target = boundary_condition(input_1, target)
#             train_step(input_1, ini_output, target)

#         for input_1, target in set6.train_dataset:
#             input_1, ini_output, target = boundary_condition(input_1, target)
#             train_step(input_1, ini_output, target)

#         for input_1, target in set7.train_dataset:
#             input_1, ini_output, target = boundary_condition(input_1, target)
#             train_step(input_1, ini_output, target)

#         for input_1, target in set8.train_dataset:
#             input_1, ini_output, target = boundary_condition(input_1, target)
#             train_step(input_1, ini_output, target)
#         print('Epoch {} completed'.format(epoch+1))

        if (epoch + 1) % 20 == 0:
            print('Epoch {}: Gen_loss is {:.3f}, L1_loss is {:.5f}'
                  .format(epoch + 1, Gen_loss.result(), L1_loss.result()))
            
        if (epoch + 1) % 200 == 0:
            for example_input1, example_target in set3.train_dataset.take(2):
                input_1, ini_output, target = boundary_condition(example_input1, example_target)
                generate_images(input_1, ini_output, target, '0')
            for example_input1, example_target in set3.test_dataset.take(2):
                input_1, ini_output, target = boundary_condition(example_input1, example_target)
                generate_images(input_1, ini_output, target, '0') 
        
        if (epoch + 1) > 40 and (epoch + 1) % 50 == 0:
            checkpoint.save(file_prefix = checkpoint_prefix)
        
        Gen_loss.reset_states()

In [None]:
# 加载预训练模型
# checkpoint.restore(tf.train.latest_checkpoint(checkpoint_dir))

In [None]:
training(3000)

In [None]:
i = 0
L1 = []
for example_input1, example_target in set3.train_dataset.take(train_num):
        input_1, ini_output, target = boundary_condition(example_input1, example_target)
        l1_loss = generate_images(input_1, ini_output, target, str(i+1))
        L1.append(float(l1_loss))
        i = i + 1
print(sum(L1)/len(L1))
# a = np.array(L1)
# np.savetxt("PDE_D12-K37-H7.txt",a)

In [None]:
i = 0
L1 = []
for example_input1, example_target in set3.test_dataset.take(test_num):
    input_1, ini_output, target = boundary_condition(example_input1, example_target)
    l1_loss = generate_images(input_1, ini_output, target, str(i+1)) 
    L1.append(float(l1_loss))
    i = i + 1
print(sum(L1)/len(L1))
# a = np.array(L1)
# np.savetxt("PDE_D12-K37-H7-test.txt",a)