In [None]:
import numpy as np
import matplotlib.pyplot as plt
import cv2
from tensorflow.keras.models import Sequential
from tensorflow.keras.utils import Sequence
from tensorflow.keras.layers import Dense, Dropout, Flatten
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.layers import Conv2D, Input, Concatenate
from tensorflow.keras.models import Model
from tensorflow.keras.layers import MaxPooling2D, BatchNormalization
from tensorflow.keras.preprocessing.image import ImageDataGenerator
from tensorflow.keras.regularizers import l2
import os
from keras.applications.vgg16 import VGG16
from keras.applications.resnet import ResNet50
from keras.models import load_model
import tensorflow as tf
import pandas as pd
from tensorflow.keras.preprocessing.image import ImageDataGenerator
from keras_tuner.tuners import RandomSearch

# os.environ['TF_CPP_MIN_LOG_LEVEL'] = '2'
import warnings
warnings.filterwarnings("ignore") 
print("Num GPUs Available: ", len(tf.config.experimental.list_physical_devices('GPU')))

In [None]:
import tensorflow as tf
print("Num GPUs Available: ", len(tf.config.experimental.list_physical_devices('GPU')))

## read data

In [None]:

labels=pd.read_excel('data_u.xlsx')

In [None]:
import pandas as pd
from sklearn.preprocessing import MinMaxScaler
import joblib


cols = ['p', 'Lmin', 'a_4', 'b_4'] 
# cols = ['Lmin', 'a_4', 'b_4'] 

cols2 = ['u']       


ss_input = MinMaxScaler()
labels[cols] = ss_input.fit_transform(labels[cols])
joblib.dump(ss_input, 'scaler_all.pkl')  


ss_output = MinMaxScaler()
labels[cols2] = ss_output.fit_transform(labels[cols2])
joblib.dump(ss_output, 'ss.pkl')       


labels.to_excel('normalized_data.xlsx', index=False)


In [None]:
labels

## Create training and testing data – same as Xception_multimodal

In [None]:
import os
import shutil
import random

image_folder = "./images"
train_folder = './train'
test_folder = './test'

# if os.path.exists(train_folder):
#     shutil.rmtree(train_folder)
# if os.path.exists(test_folder):
#     shutil.rmtree(test_folder)
# os.makedirs(train_folder, exist_ok=True)
# os.makedirs(test_folder, exist_ok=True)

# image_files = [i for i in os.listdir(image_folder) if i.find('jpg')>=0]
# random.shuffle(image_files)


# total_images = len(image_files)
# train_images = int(total_images * 0.7)
# test_images = total_images - train_images


# for i, image_file in enumerate(image_files):
#     source_path = os.path.join(image_folder, image_file)


#     if i < train_images:
#         destination_folder = train_folder
#     else:
#         destination_folder = test_folder


#     destination_path = os.path.join(destination_folder, image_file)
#     shutil.copyfile(source_path, destination_path)

In [None]:
train_list=[i for i in os.listdir(train_folder) if i.find('jpg')>=0]
test_list=[i for i in os.listdir(test_folder) if i.find('jpg')>=0]

In [None]:
train_csv=labels[labels.name.isin(train_list)]
test_csv=labels[labels.name.isin(test_list)]

In [None]:
label_list = ['p', 'Lmin', 'a_4', 'b_4', 'u']

## Load dataset

In [None]:

from tensorflow.keras.preprocessing.image import load_img, img_to_array
class CustomDataGenerator(Sequence):
    def __init__(self, csv_file, directory, batch_size, target_size, label_list, shuffle=True, augment=False):
        self.csv_file = csv_file
        self.directory = directory
        self.batch_size = batch_size
        self.target_size = target_size
        self.label_list = label_list
        self.shuffle = shuffle
        self.augment = augment
        self.on_epoch_end()
        
        if self.augment:
            self.image_data_generator = ImageDataGenerator(
                rotation_range=20,
                width_shift_range=0.2,
                height_shift_range=0.2,
                shear_range=0.2,
                zoom_range=0.2,
                horizontal_flip=True,
                fill_mode='nearest')
        else:
            self.image_data_generator = ImageDataGenerator()

    def __len__(self):
        return int(np.floor(len(self.csv_file) / self.batch_size))

    def __getitem__(self, index):
        indexes = self.indexes[index * self.batch_size:(index + 1) * self.batch_size]
        batch = [self.csv_file.iloc[k] for k in indexes]
        return self.__data_generation(batch)

    def on_epoch_end(self):
        self.indexes = np.arange(len(self.csv_file))
        if self.shuffle:
            np.random.shuffle(self.indexes)

    def __data_generation(self, batch):
        X1 = np.empty((self.batch_size, *self.target_size, 3))
        X2 = np.empty((self.batch_size, 4)) 
        y = np.empty((self.batch_size,1))  

        for i, data in enumerate(batch):
            img_path = f"{self.directory}/{data['name']}"
            image = load_img(img_path, target_size=self.target_size)
            # image = img_to_array(image) / 255.0
            image = img_to_array(image)
            

            if self.augment:
                image = self.image_data_generator.random_transform(image)
            
            X1[i] = image
            X2[i] = data[self.label_list[:4]]  # [p'Lmin', 'a_4', 'b_4']
            y[i] = data[self.label_list[4]]    # ['u']


        return [X1, X2], y


In [None]:
batch_size=32
target_size = (256, 256)

train_generator = CustomDataGenerator(train_csv, './train', batch_size, target_size, label_list, shuffle=False)
test_generator = CustomDataGenerator(test_csv, './test', batch_size, target_size, label_list, shuffle=False)

In [None]:
import tensorflow as tf
tf.test.gpu_device_name()  
physical_devices = tf.config.experimental.list_physical_devices('GPU')  
for device in physical_devices:  
     print(device)

In [None]:
from tqdm import tqdm

y_train = []

for i in tqdm(range(len(train_generator))):
    batch_data, batch_labels = train_generator[i]
    y_train.append(batch_labels)

y_train = np.concatenate(y_train)
y_test = []

for i in tqdm(range(len(test_generator))):
    batch_data, batch_labels = test_generator[i]
    y_test.append(batch_labels)

y_test = np.concatenate(y_test)

In [None]:
def plot_model_history(model_history,model_name):
    fig, axs = plt.subplots(1,2,figsize=(12,4),dpi=120)
    # summarize history for accuracy
    axs[0].plot(range(1,len(model_history.history['mse'])+1),model_history.history['mse'])
    axs[0].plot(range(1,len(model_history.history['val_mse'])+1),model_history.history['val_mse'])
    axs[0].set_title('Model mse')
    axs[0].set_ylabel('mse')
    axs[0].set_xlabel('Epoch')
    # axs[0].set_xticks(np.arange(1,len(model_history.history['accuracy'])+1),len(model_history.history['accuracy'])/10)
    # axs[0].set_xticks(np.arange(1,len(model_history.history['accuracy'])+1),len(model_history.history['accuracy'])/10)
    axs[0].legend(['train', 'val'], loc='best')
    # summarize history for loss
    axs[1].plot(range(1,len(model_history.history['loss'])+1),model_history.history['loss'])
    axs[1].plot(range(1,len(model_history.history['val_loss'])+1),model_history.history['val_loss'])
    axs[1].set_title('Model Loss')
    axs[1].set_ylabel('Loss')
    axs[1].set_xlabel('Epoch')
    # axs[1].set_xticks(np.arange(1,len(model_history.history['loss'])+1),len(model_history.history['loss'])/10)
    axs[1].legend(['train', 'val'], loc='best')
    fig.savefig(model_name+'_loss.jpg',dpi=600)
    plt.show()

In [None]:
import tensorflow as tf
from tensorflow.keras.layers import Input, Dense, Flatten, Concatenate, Multiply, BatchNormalization, Dropout
from tensorflow.keras.models import Model
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.regularizers import l2

def build_model2(hp):
    # —— 1) Image input (299 + official preprocessing) ——
    from tensorflow.keras.applications.xception import Xception, preprocess_input
    input_image = Input(shape=(299, 299, 3), name='img')
    x_in = tf.keras.layers.Lambda(preprocess_input, name='xcep_pre')(input_image)

    # Xception backbone (without top layers; GAP is generally more stable, can switch back to Flatten for a fair comparison)
    base_model = Xception(weights='imagenet', include_top=False,
                          input_shape=(299, 299, 3), pooling='avg')

    # Freeze only BatchNorm layers to avoid updating statistics with small batches; forward pass in inference mode
    for l in base_model.layers:
        if isinstance(l, tf.keras.layers.BatchNormalization):
            l.trainable = False
    x_img = base_model(x_in, training=False)   # BN uses inference statistics

    # —— 2) Numerical input branch (4 auxiliary features) + lightweight attention + stabilization ——
    input_features = Input(shape=(4,), name='aux4')

    # Attention weights (sigmoid) for feature recalibration
    att = Dense(4, activation='sigmoid', name='attention_weights')(input_features)
    aux = Multiply(name='aux_weighted')([input_features, att])

    # Small MLP + BN + Dropout (more stable)
    aux = Dense(32, activation='relu')(aux)
    aux = BatchNormalization()(aux)
    aux = Dropout(0.1)(aux)
    aux = Dense(16, activation='relu')(aux)

    # —— 3) Fusion ——
    x = Concatenate(name='concat_img_aux')([x_img, aux])

    # —— 4) Three fully connected layers: relu → tanh → relu (keeping your style), with a bit of L2 regularization for stability ——
    x = Dense(hp.Int('units_0', 96, 512, step=16),
              activation='relu', kernel_regularizer=l2(1e-5))(x)
    x = Dense(hp.Int('units_1', 96, 512, step=16),
              activation='tanh', kernel_regularizer=l2(1e-5))(x)
    x = Dense(hp.Int('units_2', 96, 512, step=16),
              activation='relu', kernel_regularizer=l2(1e-5))(x)

    output = Dense(1, name='target')(x)

    model = Model(inputs=[input_image, input_features], outputs=output)

    # Slightly lower the minimum learning rate; Xception fine-tuning is more stable this way
    lr = hp.Float('lr', min_value=3e-5, max_value=1e-3, sampling='LOG')
    try:
        # If AdamW is available in the environment, use it with mild weight decay
        from tensorflow.keras.optimizers import AdamW
        opt = AdamW(learning_rate=lr, weight_decay=1e-5)
    except Exception:
        opt = Adam(learning_rate=lr)

    model.compile(optimizer=opt, loss='mse', metrics=['mse'])
    return model


# Tuner configuration
from keras_tuner.tuners import RandomSearch
tuner = RandomSearch(
    build_model2,
    objective='val_loss',
    max_trials=25,
    executions_per_trial=1,
    directory='my_dir',
    project_name='xception_tuning'
)

tuner.search_space_summary()
tuner.search(train_generator, epochs=10, validation_data=test_generator)

best_hps = tuner.get_best_hyperparameters(num_trials=1)[0]

print("Best hyperparameters:")
print(f"Learning rate: {best_hps.get('lr')}")
print(f"Layer 1: units={best_hps.get('units_0')}, activation=relu")
print(f"Layer 2: units={best_hps.get('units_1')}, activation=tanh")
print(f"Layer 3: units={best_hps.get('units_2')}, activation=relu")


In [None]:

model = tuner.hypermodel.build(best_hps)

checkpoint = tf.keras.callbacks.ModelCheckpoint("./model.ckpt", monitor='val_loss', verbose=1,
                                                save_best_only=True, mode='min', save_weights_only=True)

early_stopping = tf.keras.callbacks.EarlyStopping(monitor='val_loss', patience=10, verbose=1, restore_best_weights=True)


In [None]:
model_info = model.fit(
    train_generator,
    steps_per_epoch=len(train_generator),
    epochs=50,
    validation_data=test_generator,
    validation_steps=len(test_generator),
    callbacks=[checkpoint, early_stopping])

In [None]:
plot_model_history(model_info, 'Xception')

In [None]:

model.save('./best_model.h5')

## Model evaluation

In [None]:

from tensorflow.keras.models import load_model
model = load_model('./best_model.h5')

In [None]:

ss_output = joblib.load('ss.pkl')

In [None]:
from sklearn import metrics
y_test_pred=model.predict(test_generator)
y_train_pred=model.predict(train_generator)

In [None]:
y_test_pred.shape

In [None]:
y_test.shape

In [None]:

y_test_pred=ss_output.inverse_transform(y_test_pred)
y_test_true=ss_output.inverse_transform(y_test)

y_train_pred=ss_output.inverse_transform(y_train_pred)
y_train_true=ss_output.inverse_transform(y_train)

In [None]:
# === evaluation ===
from sklearn.metrics import mean_squared_error, mean_absolute_error, r2_score
import numpy as np

def model_result(y_train, y_pred_train, y_test, y_pred_test):
    print('📊 train:')
    print('MSE:  ', mean_squared_error(y_train, y_pred_train))
    print('MAE:  ', mean_absolute_error(y_train, y_pred_train))
    print('RMSE: ', np.sqrt(mean_squared_error(y_train, y_pred_train)))
    print('R²:   ', r2_score(y_train, y_pred_train))
    print('\n📊 test:')
    print('MSE:  ', mean_squared_error(y_test, y_pred_test))
    print('MAE:  ', mean_absolute_error(y_test, y_pred_test))
    print('RMSE: ', np.sqrt(mean_squared_error(y_test, y_pred_test)))
    print('R²:   ', r2_score(y_test, y_pred_test))

model_result(y_train_true, y_train_pred, y_test_true, y_test_pred)

In [None]:
import matplotlib.pyplot as plt
import matplotlib

matplotlib.rcParams['axes.unicode_minus'] = False

color_list = ['red','blue','purple','orange','yellow','green',
              'pink','aquamarine','dodgerblue','purple',
              'red','blue','orange','yellow','green']

plt.figure(figsize=(12,10), dpi=120)

for i in range(1, len(label_list)+1):
    if y_test_pred.shape[1] < i:
        print(f" The {i}-th label exceeds the prediction dimension, skipped")
        continue

    plt.subplot(3, 3, i)
    plt.scatter(y_test_pred[:, i-1].ravel(), y_test_true[:, i-1].ravel(),
                color=color_list[i-1], alpha=0.6)
    
    min_val = min(y_test_pred[:, i-1].min(), y_test_true[:, i-1].min())
    max_val = max(y_test_pred[:, i-1].max(), y_test_true[:, i-1].max())
    plt.plot([min_val, max_val], [min_val, max_val], 'r--', linewidth=1)
    
    plt.grid(True, linestyle='--', linewidth=0.5)
    plt.title(label_list[i-1])
    plt.xlabel('Predict')
    plt.ylabel('True')

plt.tight_layout()
plt.savefig('index.jpg', dpi=600, bbox_inches='tight')
plt.show()


In [None]:

df_result = pd.DataFrame({
    'u_true': y_test_true.ravel(),
    'u_predicted': y_test_pred.ravel()
})
df_result.to_excel('u_true_vs_predicted.xlsx', index=False)

## Model prediction

In [None]:
# ===================== Path Configuration =====================
test_folder = './val_images'                  # Path to the images for prediction
model_path = './best_model.h5'                # Trained model (image + 4 features)
scaler_input_path = './scaler_all.pkl'        # Normalizer for the 4 input features (fitted on the training set)
scaler_output_path = './ss.pkl'               # Output denormalizer (fitted on the training set)
output_excel_path = 'predicted_u_output.xlsx' # Output Excel file

# ===================== Load Model and Scalers =====================
model = load_model(model_path, compile=False)

# Read the model's expected image input size (H, W, C)
# Note: model.inputs[0] corresponds to the image input; if your model has only one input, this is it.
# If the model has multiple inputs, the first one is usually the image 
# (as in your training where inputs=[input_image, input_features])
target_h = int(model.inputs[0].shape[1])
target_w = int(model.inputs[0].shape[2])

# Check whether the model already contains the Xception preprocessing layer 
# (for example, during training you named the Lambda layer 'xcep_pre')
has_internal_preprocess = any(
    (getattr(layer, "name", "") == "xcep_pre") for layer in model.layers
)

# Load scalers
ss_input = joblib.load(scaler_input_path)    # For [porosity, Lmin, a_4, b_4]
ss_output = joblib.load(scaler_output_path)  # For denormalizing u


In [None]:
# ===================== Image List =====================
image_files = [f for f in os.listdir(test_folder) if f.lower().endswith('.jpg')]
image_files = natsorted(image_files)

# ===================== Helper function for prediction: compute four handcrafted features =====================
def compute_aux_features(gray_img, black_threshold=80):
    """
    gray_img: uint8, shape [H, W], range 0~255
    Returns: porosity, Lmin, a_4, b_4
    """
    H, W = gray_img.shape

    # 1) Porosity (ratio of white pixels; threshold consistent with training)
    white_pixel_count = np.sum(gray_img > black_threshold)
    total_pixels = gray_img.size
    porosity = white_pixel_count / total_pixels if total_pixels > 0 else 0.0

    # 2) Lmin: minimum number of black pixels (<128) per row
    black_pixel_counts_per_row = np.sum(gray_img < 128, axis=1)
    Lmin = int(np.min(black_pixel_counts_per_row)) if black_pixel_counts_per_row.size > 0 else 0

    # 3) a_4: Proportion of black pixels in region D (circle centered at image center, radius = H/2)
    center = (W // 2, H // 2)
    mask_total = np.ones_like(gray_img, dtype=np.uint8) * 255
    mask_D = mask_total.copy()
    cv2.circle(mask_D, center, H // 2, 0, -1)  # Radius = H/2
    total_black_pixels = np.sum(gray_img < black_threshold)
    D_black_pixels = np.sum((gray_img < black_threshold) & (mask_D > 0))
    a_4 = (D_black_pixels / total_black_pixels) if total_black_pixels > 0 else 0.0

    # 4) b_4: Proportion of black pixels within a circle of radius 32 (center based on the midline of the image)
    # Reuse your previous formula (derive x0 and r with respect to image height)
    y0 = H / 2
    x_r = 32.0
    h_half = H / 2
    x0 = (x_r**2 - h_half**2) / (2 * x_r)
    r = x_r - x0
    yy, xx = np.meshgrid(np.arange(H), np.arange(W), indexing='ij')
    circular_mask = ((xx - x0)**2 + (yy - y0)**2) <= (r**2)
    black_mask = (gray_img < black_threshold)
    black_in_sector = np.sum(circular_mask & black_mask)
    b_4 = (black_in_sector / total_black_pixels) if total_black_pixels > 0 else 0.0

    return porosity, Lmin, a_4, b_4

# ===================== Prediction Loop =====================
results = []

# If the model does not have built-in preprocess_input, external preprocessing is required (Xception’s [-1, 1])
if not has_internal_preprocess:
    from tensorflow.keras.applications.xception import preprocess_input

for image_name in tqdm(image_files, desc="Processing images"):
    img_path = os.path.join(test_folder, image_name)
    img_bgr = cv2.imread(img_path)
    if img_bgr is None:
        continue

    # ---- Resize to match model requirements ----
    img_resized = cv2.resize(img_bgr, (target_w, target_h))

    # ---- Compute four auxiliary features using grayscale image 
    # (based on resized image to match the model’s input resolution) ----
    gray_img = cv2.cvtColor(img_resized, cv2.COLOR_BGR2GRAY)
    porosity, Lmin, a_4, b_4 = compute_aux_features(gray_img, black_threshold=80)

    # ---- Prepare image input ----
    # OpenCV loads images as BGR, convert to RGB
    img_rgb = cv2.cvtColor(img_resized, cv2.COLOR_BGR2RGB).astype(np.float32)

    if has_internal_preprocess:
        # If the model already has Lambda(preprocess_input), skip /255 or external preprocess
        img_input = img_rgb  # Keep as float32 in 0~255 range
    else:
        # If no preprocessing layer in model: apply Xception’s preprocess_input ([-1, 1]) at inference
        img_input = preprocess_input(img_rgb)

    image_input = np.expand_dims(img_input, axis=0)  # [1, H, W, 3]

    # ---- Build and normalize the 4 auxiliary features (order must match training) ----
    features = np.array([[porosity, Lmin, a_4, b_4]], dtype=np.float32)
    features_scaled = ss_input.transform(features)   # shape: [1, 4]

    # ---- Feed data depending on the number of model inputs ----
    if len(model.inputs) == 2:
        pred_scaled = model.predict([image_input, features_scaled], verbose=0)
    else:
        # If the model has only image input, pass only image_input
        pred_scaled = model.predict(image_input, verbose=0)

    # ---- Denormalize to original scale ----
    # pred_scaled should have shape [1,1]; ss_output must match the one used during training (for single output)
    u_pred = ss_output.inverse_transform(pred_scaled.reshape(-1, 1))[0, 0]

    results.append([image_name, porosity, Lmin, a_4, b_4, u_pred])

# ===================== Save results =====================
df_result = pd.DataFrame(
    results,
    columns=["Image_Name", "Porosity", "Lmin", "a_4", "b_4", "Predicted_u"]
)
df_result.to_excel(output_excel_path, index=False, engine='openpyxl')
print(f"\n✅ All predictions completed, results saved to: {output_excel_path}")

# Additional tip: If you want to check the expected input size/channels for the model, you can print:
print(f"Model expected image input size: ({target_h}, {target_w}, 3)")
print(f"Model contains internal preprocessing layer xcep_pre: {has_internal_preprocess}")
