In [None]:
import os
import sqlite3
from pathlib import Path
import cv2
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import tensorflow as tf
from sklearn.metrics import f1_score, precision_score, recall_score, accuracy_score, mean_squared_error, confusion_matrix, r2_score
from sklearn.model_selection import train_test_split
from tensorflow.keras.callbacks import (CSVLogger, EarlyStopping, LearningRateScheduler, ModelCheckpoint, ReduceLROnPlateau)
from tensorflow.keras.layers import (BatchNormalization, Conv2D, Dense, Dropout, Flatten, GlobalAveragePooling2D, MaxPooling2D)
from tensorflow.keras.metrics import RootMeanSquaredError
from tensorflow.keras.models import Sequential
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.applications import EfficientNetB3

In [None]:
tf.config.list_physical_devices('GPU')

In [None]:
RANDOM_STATE = 32  
SQL_DATABASE = "data/galaxy_data.sqlite"
TRAIN_IMAGES_ARRAY_NPY = "data/training_images_array.npy"
TEST_IMAGES_ARRAY_NPY = "data/testing_images_array.npy"

TRAIN_IMAGES_DIR = "data/train_images/"
TEST_IMAGES_DIR = "data/test_images/"
MODEL_SAVE_PATH = "data/model/GalaxyModel.keras"
TRAINING_LOG_PATH = "data/model/training_log.csv"
CHECKPOINT_PATH = "data/model/checkpoints/cp-{epoch:03d}.weights.h5"
model = None

In [None]:
connection = sqlite3.connect(SQL_DATABASE)
df_import = pd.read_sql("SELECT * from galaxy_data", connection)
connection.close()
df_import.isnull().any(axis=1).sum()

In [None]:
df_import.dropna(inplace=True)
stratify_data = df_import["class_reduced"].values
x_image_id_names = df_import["asset_id"]
y_output_data = df_import.drop(["objid", "sample", "asset_id", "dr7objid", "ra", "dec", "gz2_class", "class_reduced"], axis=1)
y_output_data.shape

In [None]:
X_train_assets, X_test_assets, y_train, y_test = train_test_split(x_image_id_names,
                                                                  y_output_data,
                                                                  random_state=RANDOM_STATE,
                                                                  stratify=stratify_data)
y_train = y_train.astype("float32")
y_test = y_test.astype("float32") 

In [None]:
def load_images(directory, image_list):
    images = []
    for img_name in image_list:
        img_path = os.path.join(directory, f"{img_name}.png")
        img = cv2.imread(img_path, cv2.IMREAD_GRAYSCALE)
        img = cv2.resize(img, (128, 128))
        img = np.expand_dims(img, axis=-1)
        images.append(img.astype('float32') / 255.0)
    return np.array(images)

In [None]:
if Path(TRAIN_IMAGES_ARRAY_NPY).exists():
    X_train_images = np.load(TRAIN_IMAGES_ARRAY_NPY, "r")
else:
    print("Creating `training_images_array.npy` file...")
    X_train_images = load_images(TRAIN_IMAGES_DIR, X_train_assets)
    np.save(TRAIN_IMAGES_ARRAY_NPY, X_train_images)
    
if Path(TEST_IMAGES_ARRAY_NPY).exists():
    X_test_images = np.load(TEST_IMAGES_ARRAY_NPY, "r")
else:
    print("Creating `testing_images_array.npy` file...")
    X_test_images = load_images(TEST_IMAGES_DIR, X_test_assets)
    np.save(TEST_IMAGES_ARRAY_NPY, X_test_images)

In [None]:
print("X_train_images Shape:", X_train_images.shape)
print("X_train_images Size", X_train_images.nbytes, "bytes")
print("y_train Shape:", y_train.shape)
print("y_test Shape:", y_test.shape)

In [None]:
X_train_images = X_train_images[..., np.newaxis]
X_test_images = X_test_images[..., np.newaxis]

In [None]:
checkpoints = ModelCheckpoint(CHECKPOINT_PATH, monitor="val_loss", save_weights_only=True, save_best_only=True)
early_stopping = EarlyStopping(monitor="val_loss", patience=10, restore_best_weights=True)
reduce_lr_plateau = ReduceLROnPlateau(monitor="val_loss", factor=0.5, patience=5, min_lr=1e-6)
csv_logger = CSVLogger(TRAINING_LOG_PATH, append=True)

callbacks = [checkpoints, early_stopping, reduce_lr_plateau, csv_logger]

In [None]:
def r2_score(y_true, y_pred):
    SS_res = tf.reduce_sum(tf.square(y_true - y_pred)) 
    SS_tot = tf.reduce_sum(tf.square(y_true - tf.reduce_mean(y_true))) 
    return (1 - SS_res/(SS_tot + tf.keras.backend.epsilon()))

In [None]:
import tensorflow as tf
from tensorflow.keras.applications import ResNet50
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import GlobalAveragePooling2D, Dense, BatchNormalization, Dropout
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.metrics import BinaryAccuracy

def f1_score(y_true, y_pred):
    y_pred = tf.round(y_pred) 
    tp = tf.reduce_sum(y_true * y_pred, axis=0)
    fp = tf.reduce_sum((1 - y_true) * y_pred, axis=0)
    fn = tf.reduce_sum(y_true * (1 - y_pred), axis=0)

    precision = tp / (tp + fp + tf.keras.backend.epsilon())
    recall = tp / (tp + fn + tf.keras.backend.epsilon())

    f1 = 2 * precision * recall / (precision + recall + tf.keras.backend.epsilon())
    f1 = tf.reduce_mean(f1)
    return f1

base_model = ResNet50(include_top=False,weights=None, input_shape=(128, 128, 1))
base_model.trainable = True

model = Sequential([
    base_model,
    GlobalAveragePooling2D(),
    Dense(256, activation='relu'),
    BatchNormalization(),  
    Dropout(0.3), 
    Dense(128, activation='relu'),
    Dropout(0.3), 
    Dense(37, activation='sigmoid') 
])

model.compile(
    optimizer=Adam(learning_rate=0.001),
    loss='binary_crossentropy',
    metrics=['binary_accuracy', f1_score] 
)

model.summary()

In [None]:
history = model.fit(
    X_train_images, y_train,
    validation_split=0.1,
    epochs=50,
    batch_size=32,
    callbacks=callbacks
)

In [None]:
model.save(MODEL_SAVE_PATH)

In [None]:
history = model.fit(
    X_train_images, y_train,
    validation_split=0.1,
    initial_epoch=50,
    epochs=120,
    batch_size=32,
    callbacks=callbacks
)

In [None]:
model.save(MODEL_SAVE_PATH)

In [None]:
import numpy as np
from sklearn.metrics import f1_score, confusion_matrix, log_loss
import matplotlib.pyplot as plt

def evaluate_metrics(y_true, y_pred, threshold=0.5):
    y_pred_binary = (y_pred >= threshold).astype(int)
    f1 = f1_score(y_true, y_pred_binary, average='weighted')
    bce = log_loss(y_true, y_pred, eps=1e-7)
    return f1, bce

In [None]:
y_pred = model.predict(X_test_images)

In [None]:
def plot_training_history(history):
    plt.figure(figsize=(14, 6))
    plt.subplot(1, 2, 1)
    if 'binary_accuracy' in history.history:
        plt.plot(history.history['binary_accuracy'], label='Train Binary Accuracy')
        plt.plot(history.history['val_binary_accuracy'], label='Validation Binary Accuracy')
    else:
        plt.plot(history.history['accuracy'], label='Train Accuracy')
        plt.plot(history.history['val_accuracy'], label='Validation Accuracy')
    plt.title('Train vs Validation Accuracy')
    plt.xlabel('Epochs')
    plt.ylabel('Accuracy')
    plt.legend()

    plt.subplot(1, 2, 2)
    plt.plot(history.history['loss'], label='Train Loss')
    plt.plot(history.history['val_loss'], label='Validation Loss')
    plt.title('Train vs Validation Loss')
    plt.xlabel('Epochs')
    plt.ylabel('Loss')
    plt.legend()

    plt.tight_layout()
    plt.show()

In [None]:
f1, bce = evaluate_metrics(y_test, y_pred)

plot_training_history(history)

print("\nTest Sonuçları:")
print(f"F1 Score: {f1}")
print(f"Binary Cross-Entropy (BCE): {bce}")

In [None]:
def prepare_image(image_path: str, square_size: int=106) -> np.array:
    image = cv2.imread(image_path, cv2.IMREAD_GRAYSCALE)
    
    if(image.shape[0] > image.shape[1]):
        r = square_size / float(image.shape[0])
        dim = (int(image.shape[1] * r), square_size)
    else:
        r = square_size / float(image.shape[1])
        dim = (square_size, int(image.shape[0] * r))

    resized = cv2.resize(image, dim, interpolation=cv2.INTER_CUBIC)
    resized = np.expand_dims(resized, axis=2)  
    prepared_image = np.full((square_size, square_size, 1), 7, np.uint8) 
    x_offset = (square_size - resized.shape[1]) // 2
    y_offset = (square_size - resized.shape[0]) // 2
    prepared_image[y_offset:y_offset+resized.shape[0], x_offset:x_offset+resized.shape[1]] = resized
    predict_img = prepared_image.astype("float32") / 255.0

    return predict_img

def predict_image(file_path: str) -> pd.Series:
    image_array = prepare_image(file_path)
    image_array = np.expand_dims(image_array, axis=0)
    predictions = model.predict(image_array)
    readable_predict = pd.Series(predictions[0], y_output_data.columns.values)
    return readable_predict

In [None]:
test_image_path = r"images\Ex_SBb_NGC1300-HST.jpg"
plt.imshow(plt.imread(test_image_path))
plt.axis('off')
plt.show()
result = predict_image(test_image_path)
result