In [1]:
import numpy as np
import matplotlib.pyplot as plt
import pandas as pd
import seaborn as sns
import warnings
import meshio
import os
from scipy.interpolate import griddata
from sklearn.model_selection import train_test_split
from tensorflow import keras

warnings.filterwarnings('ignore')
%matplotlib inline
import tensorflow as tf





# Loading Dataset

In [9]:
x = np.arange(0.006, 0.0135, (0.0135-0.006)/300)
y = np.arange(0, 0.0025, 0.0025/75)

data_path = './Data'
path_sep = '\\' # use '/' for Unix and '\\' for Windows
folders = os.listdir(data_path)
        
subfolder = []
path = []
names = []
condition = []

for i in folders:
    if os.path.isdir(data_path + path_sep + i): 
        subfolder.append(data_path + path_sep + i)

for folder in subfolder:
    files = os.listdir(folder + path_sep)
    for i in files:
        ext = os.path.splitext(i)
        if (ext[-1].lower() == '.vtk') & (ext[0][-2] != '_'):
            names.append(ext[0])
            string = ext[0].replace('ER', '').replace('Tin', '').replace('Uin', '').replace('Twall', '').split('_')[0:4]
            var = []
            for j in string:
                if j == 'Adiabatic':
                    var.append(0.)
                else:
                    var.append(float(j))
            condition.append(var)

Q_list = []
for folder in subfolder:
    files = os.listdir(folder + path_sep)
    for counter,file in enumerate(files):
        mesh = meshio.read(folder+ path_sep +file)
        points = mesh.points
        Qdot = mesh.point_data['Qdot']
        boolArr = (points[:,1] == 0) & (points[:,0] >= 0.006)  
        Qdot = Qdot[boolArr]
        points = points[boolArr]
        old_points = points[:,[0, 2]]
        grid_x, grid_y = np.meshgrid(x, y)
        grid_new = griddata(old_points, Qdot, (grid_x, grid_y), method='nearest')
        Q_list.append(grid_new)

In [None]:
Q_list

# Data Preparation

In [None]:
Qdot = np.array(Q_list)
Qdot = Qdot / np.max(Qdot)
mean = Qdot.mean(axis = 0)
Qdot = np.reshape(Qdot, (-1, 75, 300, 1))

normaliser = []
conditions = np.array(condition)
df = np.zeros(conditions.shape)
for i in range(conditions.shape[1]):
    df[:,i] = conditions[:,i] / np.max(conditions[:,i])
    normaliser.append(np.max(conditions[:,i]))

train_data, test_data, label_train, label_test = train_test_split(Qdot, df, test_size = 0.15)
print(Qdot.shape)
print(df.shape)
# Manual train test split if needed
#test_index = 

#train_index = 

#test_data = Qdot[test_index]
#train_data = Qdot[train_index]
#label_train = df[train_index]
#label_test = df[test_index]

In [None]:
fig = plt.figure(figsize=(16, 80))
plt.imshow(np.reshape(Qdot[0],(75, 300)))
# plt.axis('off')

In [None]:
fig = plt.figure(figsize=(12, 16))
columns = 3
rows = 12

for i in range(1, columns * rows):
    img = Qdot[(i-1)]
    fig.add_subplot(rows, columns, i)
    plt.imshow(img)
plt.show()



## Convolutional Autoencoder 

In [None]:
class CVAE(tf.keras.Model):
  """Convolutional autoencoder."""

  def __init__(self, latent_dim):
    super(CVAE, self).__init__()
    self.latent_dim = latent_dim
    self.encoder = tf.keras.Sequential(
        [
            tf.keras.layers.InputLayer(input_shape=(75, 300, 1)),
            # 75x300x1
            tf.keras.layers.Conv2D(filters=32, kernel_size=3, strides=(1, 1), padding='same',
                                   activation='relu', name='conv_1'),
            tf.keras.layers.BatchNormalization(name='bn_1'),
            tf.keras.layers.MaxPooling2D(pool_size=(1, 2), name='maxpool_1')(x),
            # 75x150x32
            
            tf.keras.layers.Conv2D(filters=64, kernel_size=3, strides=(1, 1), padding='same',
                                   activation='relu', name='conv_2'),
            tf.keras.layers.BatchNormalization(name='bn_2'),
            tf.keras.layers.MaxPooling2D(pool_size=(1, 2), name='maxpool_2')(x),
            # 75x75x64
            
            tf.keras.layers.Conv2D(filters=64, kernel_size=3, strides=(2, 2), padding='same',
                                   activation='relu', name='conv_3'),
            # 38x38x64
            tf.keras.layers.BatchNormalization(name='bn_3'),
            tf.keras.layers.MaxPooling2D(pool_size=(1, 2), name='maxpool_3')(x),
            # 19x19x64
              
            tf.keras.layers.Flatten(),
            # No activation
            tf.keras.layers.Dense(latent_dim + latent_dim),
        ]
    )

    self.decoder = tf.keras.Sequential(
        [
            tf.keras.layers.InputLayer(input_shape=(latent_dim,)),
            tf.keras.layers.Dense(units=19*19*64, activation='relu'),
            tf.keras.layers.Reshape(target_shape=(19,19,64)),
            tf.keras.layers.Conv2DTranspose(filters=64, kernel_size=3, strides=2, padding='same',
                                            activation='relu'),
            tf.keras.layers.Conv2DTranspose(filters=32, kernel_size=3, strides=2, padding='same',
                                            activation='relu'),
            # No activation
            tf.keras.layers.Conv2DTranspose(filters=1, kernel_size=3, strides=1, padding='same'),
        ]
    )

  @tf.function
  def sample(self, eps=None):
    if eps is None:
      eps = tf.random.normal(shape=(100, self.latent_dim))
    return self.decode(eps, apply_sigmoid=True)

  def encode(self, x):
    mean, logvar = tf.split(self.encoder(x), num_or_size_splits=2, axis=1)
    return mean, logvar

  def reparameterize(self, mean, logvar):
    eps = tf.random.normal(shape=mean.shape)
    return eps * tf.exp(logvar * .5) + mean

  def decode(self, z, apply_sigmoid=False):
    logits = self.decoder(z)
    if apply_sigmoid:
      probs = tf.sigmoid(logits)
      return probs
    return logits

## Encoder Part

In [None]:
# def encoder(input_encoder):

inputs = keras.Input(shape=[75,300,1], name='input_layer')
#75x300x1

# Conv + MaxPooling
x = keras.layers.Conv2D(32, kernel_size=3, strides= 1, padding='same', name='conv_1')(inputs)
x = keras.layers.BatchNormalization(name='bn_1')(x)
x = keras.layers.ReLU(name='relu_1')(x)
x = keras.layers.MaxPooling2D(pool_size=(1, 2), name='maxpool_1')(x) 
#75x150x32

# Conv + MaxPooling
x = keras.layers.Conv2D(64, kernel_size=3, strides=1, padding='same', name='conv_2')(x)  # 卷积步幅设置为1
x = keras.layers.BatchNormalization(name='bn_2')(x)
x = keras.layers.ReLU(name='relu_2')(x)
x = keras.layers.MaxPooling2D(pool_size=(1, 2), name='maxpool_2')(x) 
#75x75x64

# Conv + MaxPooling
x = keras.layers.Conv2D(64, kernel_size=3, strides=2, padding='same', name='conv_3')(x)  # 卷积步幅设置为1
#38x38x64
x = keras.layers.BatchNormalization(name='bn_3')(x)
x = keras.layers.ReLU(name='relu_3')(x)
x = keras.layers.MaxPooling2D(pool_size=(2, 2), name='maxpool_3')(x) 
#19x19x64
    
# Flatten
flatten = keras.layers.Flatten(name='flatten_1')(x)
bottleneck = keras.layers.Dense(16, name='dense_1')(flatten)

model = tf.keras.Model(inputs, bottleneck, name="Encoder")
model.summary()
    # return model
    

## Decoder Part

In [None]:
def decoder(input_decoder):
    # Input is the bottleneck vector

