In [None]:
import os
import numpy as np
import pandas as pd
from PIL import Image
from tensorflow.keras.applications import EfficientNetB7
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Input, Dense
from tensorflow.keras.optimizers import Adam
from sklearn.metrics import r2_score, mean_absolute_error
import matplotlib.pyplot as plt
from tensorflow.keras.applications.efficientnet import preprocess_input
from sklearn.preprocessing import StandardScaler
import joblib
from sklearn.metrics import mean_squared_error

def extract_parameters_from_filename(filename):
    iron_concentration = float(filename[3:12])
    boron_concentration_str = filename[14:22]
    boron_concentration = float(boron_concentration_str.replace('e', 'E'))
    temperature = int(filename[23:26])
    thickness = float(filename[27:30]) * 1e-6

    iron_concentration_log = np.log10(iron_concentration)
    return iron_concentration_log

def augment_image(img, target_size):
    augmented_images = []

    left = 11
    top = 10
    right = 506
    bottom = 379
    img_cropped = img.crop((left, top, right, bottom))

    img_resized = img_cropped.resize(target_size)

    augmented_images.append(img_resized)

    img_flipped_horizontally = img_resized.transpose(method=Image.FLIP_LEFT_RIGHT)
    augmented_images.append(img_flipped_horizontally)

    img_flipped_vertically = img_resized.transpose(method=Image.FLIP_TOP_BOTTOM)
    augmented_images.append(img_flipped_vertically)

    img_rotated_90 = img_resized.rotate(90, expand=True)
    img_rotated_180 = img_resized.rotate(180, expand=True)
    img_rotated_270 = img_resized.rotate(270, expand=True)

    augmented_images.extend([img_rotated_90, img_rotated_180, img_rotated_270])

    return augmented_images

def generate_features(image_directory, target_size, test_directory):
    X_train = []
    y_train = []
    X_test = []
    y_test = []

    base_model = EfficientNetB7(weights='imagenet', include_top=True, input_shape=(600, 600, 3))
    # base_model = EfficientNetB7(weights='imagenet', include_top=False, pooling='avg', input_shape=(600, 600, 3))

    # Load training data from CWT_Images
    train_image_filenames = [f for f in os.listdir(image_directory) if f.endswith('.png')]
    
    for filename in train_image_filenames:
        print(f"Processing (train) file: {filename}")
        try:
            iron_concentration_log = extract_parameters_from_filename(filename)
            img_path = os.path.join(image_directory, filename)
            img = Image.open(img_path)

            augmented_images = augment_image(img, target_size)

            for augmented_img in augmented_images:
                img_array = np.array(augmented_img)
                img_array_rgb = img_array[:, :, :3]  
                img_array_rgb = np.expand_dims(img_array_rgb, axis=0)  

                img_array_preprocessed = preprocess_input(img_array_rgb)
                predictions = base_model.predict(img_array_preprocessed)
                image_features = np.squeeze(predictions)

                X_train.append(image_features)
                y_train.append(iron_concentration_log)

        except Exception as e:
            print(f"Error processing file {filename}: {e}")
    
    # Load test data from TEST directory
    test_image_filenames = [f for f in os.listdir(test_directory) if f.endswith('.png')]
    
    for filename in test_image_filenames:
        print(f"Processing (test) file: {filename}")
        try:
            iron_concentration_log = extract_parameters_from_filename(filename)
            img_path = os.path.join(test_directory, filename)
            img = Image.open(img_path)
            
            img_cropped = img.crop((11, 10, 506, 379))
            img_resized = img_cropped.resize(target_size)
            
            img_array = np.array(img_resized)
            img_array_rgb = img_array[:, :, :3]  
            img_array_rgb = np.expand_dims(img_array_rgb, axis=0)  

            img_array_preprocessed = preprocess_input(img_array_rgb)
            predictions = base_model.predict(img_array_preprocessed)
            image_features = np.squeeze(predictions)

            X_test.append(image_features)
            y_test.append(iron_concentration_log)

        except Exception as e:
            print(f"Error processing file {filename}: {e}")
    
    X_train = np.array(X_train)
    y_train = np.array(y_train)
    X_test = np.array(X_test)
    y_test = np.array(y_test)

    print(f"Train feature shape: {X_train.shape}, Test feature shape: {X_test.shape}")
    print(f"Train labels shape: {y_train.shape}, Test labels shape: {y_test.shape}")

    return X_train, X_test, y_train, y_test

def create_regression_model(input_shape):
    image_features_input = Input(shape=(input_shape,), name='image_features_input')

    x = Dense(128, activation='relu', kernel_regularizer='l2')(image_features_input)
    x = Dense(64, activation='relu', kernel_regularizer='l2')(x)
    x = Dense(32, activation='relu', kernel_regularizer='l2')(x)

    output_layer = Dense(1)(x)

    model = Model(inputs=image_features_input, outputs=output_layer)
    return model

image_directory = 'C:/Users/user/Desktop/wavelets2024/CWT_Images/'
test_directory = 'C:/Users/user/Desktop/wavelets2024/TEST/'
scaler_directory = 'C:/Users/user/Desktop/wavelets2024/scalers25/'  
target_size = (600, 600)

X_train, X_test, y_train_log, y_test_log = generate_features(image_directory, target_size, test_directory)

if not os.path.exists(scaler_directory):
    os.makedirs(scaler_directory)
    print(f"Creating a folder for scalers: {scaler_directory}")

scaler_X_filename = os.path.join(scaler_directory, 'scalerX.bin')
scaler_y_filename = os.path.join(scaler_directory, 'scalerY.bin')


scalers_loaded = False  # для відслідковування завантаження скалерів

if os.path.exists(scaler_X_filename) and os.path.exists(scaler_y_filename):
    print("Downloading Scalers...")
    scaler_X = joblib.load(scaler_X_filename)
    scaler_y = joblib.load(scaler_y_filename)
    scalers_loaded = True  # Скалери завантажені
else:
    print("Creating and saving Scalers...")
    scaler_X = StandardScaler()
    X_train_normalized = scaler_X.fit_transform(X_train)
    X_test_normalized = scaler_X.transform(X_test)
    
    joblib.dump(scaler_X, scaler_X_filename)
    
    scaler_y = StandardScaler()
    y_train_normalized = scaler_y.fit_transform(y_train_log.reshape(-1, 1)).flatten()
    y_test_normalized = scaler_y.transform(y_test_log.reshape(-1, 1)).flatten()
    
    joblib.dump(scaler_y, scaler_y_filename)

if scalers_loaded:  # Якщо вони були завантажені
    X_train_normalized = scaler_X.transform(X_train)
    X_test_normalized = scaler_X.transform(X_test)
    
    y_train_normalized = scaler_y.transform(y_train_log.reshape(-1, 1)).flatten()
    y_test_normalized = scaler_y.transform(y_test_log.reshape(-1, 1)).flatten()

# print(f"X_train mean before standardisation : {np.mean(X_train)}, std before standardisation : {np.std(X_train)}")
# print(f"y_train mean before standardisation : {np.mean(y_train_log)}, std before standardisation : {np.std(y_train_log)}")

# print(f"X_train mean after standardisation: {np.mean(X_train_normalized)}, std after standardisation: {np.std(X_train_normalized)}")
# print(f"y_train mean after standardisation: {np.mean(y_train_normalized)}, std after standardisation: {np.std(y_train_normalized)}")

input_shape = X_train.shape[1]  
model = create_regression_model(input_shape) 
optimizer = Adam(learning_rate=0.0001)
model.compile(loss='mean_squared_error', optimizer=optimizer, metrics=['mean_squared_error'])

history = model.fit(
    X_train_normalized,
    y_train_normalized,
    epochs=800,
    batch_size=1
) 

In [None]:
loss, mse = model.evaluate(X_test_normalized, y_test_normalized)
#print(f"Test Loss: {loss}")
print(f"MSE: {mse}")

y_pred_log = model.predict(X_test_normalized)

# денормалізація
y_test_denorm = scaler_y.inverse_transform(y_test_normalized.reshape(-1, 1))
y_pred_denorm = scaler_y.inverse_transform(y_pred_log)

y_test_exp = np.power(10, y_test_denorm)
y_pred_exp = np.power(10, y_pred_denorm)

r2_denorm = r2_score(y_test_denorm, y_pred_denorm)

r2_exp = r2_score(y_test_exp, y_pred_exp)

mape = np.mean(np.abs((y_test_exp - y_pred_exp) / y_test_exp)) * 100  # in percentage
print(f"MAPE: {mape:.2f}%")

with open("result_5.dat", "w") as f:
    f.write(f"MSE (normalized): {mse:.8f}\n")
    f.write(f"R2 Score (denorm): {r2_denorm:.8f}\n")
    f.write(f"R2 Score (10^x): {r2_exp:.8f}\n")
    f.write(f"MAPE (%): {mape:.8f}\n")

# Save the Actual vs. Predicted Iron Concentration to a file
true_vs_predict = np.column_stack((y_test_exp, y_pred_exp))
np.savetxt("TrueVsPredict_5.dat", true_vs_predict, comments='')

In [None]:
plt.scatter(y_test_exp, y_pred_exp, alpha=0.7)
plt.plot([min(y_test_exp), max(y_test_exp)], [min(y_test_exp), max(y_test_exp)], 'r--')  # идеальная линия
plt.xscale("log")
plt.yscale("log")
plt.xlabel("Actual Fe concentration")
plt.ylabel("Predicted Fe concentration")
plt.title("Predicted vs Actual Fe Concentration")
plt.show()