In [1]:
import numpy as np
import pandas as pd
import tensorflow as tf
from tensorflow.keras.applications.vgg16 import VGG16, preprocess_input
from tensorflow.keras.preprocessing.image import load_img, img_to_array
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Input, Dense, Dropout, Flatten, concatenate, GlobalAveragePooling2D
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.callbacks import ReduceLROnPlateau, EarlyStopping
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from tensorflow.keras.applications import VGG16, InceptionV3, ResNet50, EfficientNetB0
from tensorflow.keras.preprocessing.image import load_img, img_to_array
from tensorflow.keras.callbacks import ReduceLROnPlateau, EarlyStopping
import pickle

def load_csv(csv_file):
    return pd.read_csv(csv_file)

def fix_level_cat_format(row, separator=' ', replacement=', '):
    return eval(row.replace(separator, replacement))

def extract_labels(df, column_name='level_cat', fix_func=fix_level_cat_format):
    return np.array([fix_func(row) for row in df[column_name].values])

def extract_additional_features(df, feature_columns):
    return df[feature_columns].values

def normalize_features(features, scaler=None):
    if scaler is None:
        scaler = StandardScaler()
        scaled_features = scaler.fit_transform(features)
    else:
        scaled_features = scaler.transform(features)
    return scaled_features, scaler

def save_scaler(scaler, scaler_file):
    with open(scaler_file, 'wb') as f:
        pickle.dump(scaler, f)

def load_and_preprocess_image(image_path, target_size=(224, 224), preprocessing_function=preprocess_input):
    img = load_img(image_path, target_size=target_size)
    img = img_to_array(img)
    img = preprocessing_function(img)
    return img

def load_images(image_paths, preprocess_func=load_and_preprocess_image):
    return np.array([preprocess_func(path) for path in image_paths])

def split_data(images, additional_features, labels, test_size=0.2, random_state=42):
    return train_test_split(images, additional_features, labels, test_size=test_size, random_state=random_state)

def create_vgg16_model(input_shape=(224, 224, 3), base_model_weights='imagenet', include_top=False, custom_layers=None):
    base_model = VGG16(weights=base_model_weights, include_top=include_top, input_shape=input_shape)
    x = base_model.output
    if custom_layers is not None:
        for layer in custom_layers:
            x = layer(x)
    else:
        x = Flatten()(x)
        x = Dense(256, activation='relu')(x)
        x = Dropout(0.5)(x)
        x = Dense(128, activation='relu')(x)
    return base_model.input, x

def create_inceptionv3_model(input_shape=(299, 299, 3), base_model_weights='imagenet', include_top=False, custom_layers=None):
    base_model = InceptionV3(weights=base_model_weights, include_top=include_top, input_shape=input_shape)
    x = base_model.output
    if custom_layers is not None:
        for layer in custom_layers:
            x = layer(x)
    else:
        x = GlobalAveragePooling2D()(x)
        x = Dense(256, activation='relu')(x)
        x = Dropout(0.5)(x)
        x = Dense(128, activation='relu')(x)
    return base_model.input, x

def create_resnet50_model(input_shape=(224, 224, 3), base_model_weights='imagenet', include_top=False, custom_layers=None):
    base_model = ResNet50(weights=base_model_weights, include_top=include_top, input_shape=input_shape)
    x = base_model.output
    if custom_layers is not None:
        for layer in custom_layers:
            x = layer(x)
    else:
        x = GlobalAveragePooling2D()(x)
        x = Dense(256, activation='relu')(x)
        x = Dropout(0.5)(x)
        x = Dense(128, activation='relu')(x)
    return base_model.input, x

def create_efficientnetb0_model(input_shape=(224, 224, 3), base_model_weights='imagenet', include_top=False, custom_layers=None):
    base_model = EfficientNetB0(weights=base_model_weights, include_top=include_top, input_shape=input_shape)
    x = base_model.output
    if custom_layers is not None:
        for layer in custom_layers:
            x = layer(x)
    else:
        x = GlobalAveragePooling2D()(x)
        x = Dense(256, activation='relu')(x)
        x = Dropout(0.5)(x)
        x = Dense(128, activation='relu')(x)
    return base_model.input, x

def create_additional_input(shape):
    return Input(shape=shape)

def create_combined_model(vgg_input, vgg_output, additional_input, num_classes=5, additional_layers=None):
    combined = concatenate([vgg_output, additional_input])
    if additional_layers is not None:
        for layer in additional_layers:
            combined = layer(combined)
    else:
        combined = Dense(128, activation='relu')(combined)
        combined = Dropout(0.5)(combined)
    final_output = Dense(num_classes, activation='softmax')(combined)
    return Model(inputs=[vgg_input, additional_input], outputs=final_output)

def compile_and_train_model(model, X_train_img, X_train_feat, y_train, X_test_img, X_test_feat, y_test, callbacks, optimizer=Adam(learning_rate=0.0001), loss='categorical_crossentropy', metrics=['accuracy'], epochs=12, batch_size=64):
    model.compile(optimizer=optimizer, loss=loss, metrics=metrics)
    history = model.fit(
        [X_train_img, X_train_feat], y_train,
        validation_data=([X_test_img, X_test_feat], y_test),
        epochs=epochs,
        batch_size=batch_size,
        callbacks=callbacks
    )
    return history

def evaluate_and_save_model(model, X_test_img, X_test_feat, y_test, model_save_path):
    loss, accuracy = model.evaluate([X_test_img, X_test_feat], y_test)
    print(f'Loss: {loss}, Accuracy: {accuracy}')
    model.save(model_save_path)


In [14]:
# Usage example for VGG16
csv_file = r'E:\Computer_Vision_projects\inference_drd\Diabetic_Retinopathy_Detection\remaining_dataset.csv'
df = load_csv(csv_file)
labels = extract_labels(df)
additional_features = extract_additional_features(df, ['HbA1c', 'Systolic_BP', 'Diastolic_BP', 'LDL', 'Duration', 'BMI', 'Glucose_SD', 'Triglycerides', 'Microalbuminuria', 'Smoking_years', 'Alcohol_frequency'])
additional_features, scaler = normalize_features(additional_features)
scaler_file = r'E:\Computer_Vision_projects\inference_drd\Diabetic_Retinopathy_Detection\additional_features_scaler.pkl'
save_scaler(scaler, scaler_file)
images = load_images(df['full_path'])
X_train_img, X_test_img, X_train_feat, X_test_feat, y_train, y_test = split_data(images, additional_features, labels)

custom_layers_vgg = [Flatten(), Dense(256, activation='relu'), Dropout(0.5), Dense(128, activation='relu')]
vgg_input, vgg_output = create_vgg16_model(custom_layers=custom_layers_vgg)
additional_input = create_additional_input((additional_features.shape[1],))
additional_layers = [Dense(128, activation='relu'), Dropout(0.5)]
model_vgg = create_combined_model(vgg_input, vgg_output, additional_input, additional_layers=additional_layers)

custom_callbacks = [
    ReduceLROnPlateau(monitor='val_loss', factor=0.2, patience=5, min_lr=0.00001),
    EarlyStopping(monitor='val_loss', patience=10, restore_best_weights=True)
]
history_vgg = compile_and_train_model(model_vgg, X_train_img, X_train_feat, y_train, X_test_img, X_test_feat, y_test, callbacks=custom_callbacks)

model_save_path = r'E:\Computer_Vision_projects\inference_drd\Diabetic_Retinopathy_Detection\vgg16_with_additional_features.h5'
evaluate_and_save_model(model_vgg, X_test_img, X_test_feat, y_test, model_save_path)


Epoch 1/12
[1m 1/34[0m [37m━━━━━━━━━━━━━━━━━━━━[0m [1m1:37:07[0m 177s/step - accuracy: 0.2969 - loss: 7.4810

In [5]:

# Usage example for EfficientNetB0
csv_file = r'E:\Computer_Vision_projects\inference_drd\Diabetic_Retinopathy_Detection\remaining_dataset.csv'
df = load_csv(csv_file)
labels = extract_labels(df)
additional_features = extract_additional_features(df, ['HbA1c', 'Systolic_BP', 'Diastolic_BP', 'LDL', 'Duration', 'BMI', 'Glucose_SD', 'Triglycerides', 'Microalbuminuria', 'Smoking_years', 'Alcohol_frequency'])
additional_features, scaler = normalize_features(additional_features)
scaler_file = r'E:\Computer_Vision_projects\inference_drd\Diabetic_Retinopathy_Detection\additional_features_scaler.pkl'
save_scaler(scaler, scaler_file)
images = load_images(df['full_path'])
X_train_img, X_test_img, X_train_feat, X_test_feat, y_train, y_test = split_data(images, additional_features, labels)

custom_layers_efficientnet = [GlobalAveragePooling2D(), Dense(256, activation='relu'), Dropout(0.5), Dense(128, activation='relu')]
efficientnet_input, efficientnet_output = create_efficientnetb0_model(custom_layers=custom_layers_efficientnet)
additional_input = create_additional_input((additional_features.shape[1],))
additional_layers = [Dense(128, activation='relu'), Dropout(0.5)]
model_efficientnet = create_combined_model(efficientnet_input, efficientnet_output, additional_input, additional_layers=additional_layers)

custom_callbacks = [
    ReduceLROnPlateau(monitor='val_loss', factor=0.2, patience=5, min_lr=0.00001),
    EarlyStopping(monitor='val_loss', patience=10, restore_best_weights=True)
]
history_efficientnet = compile_and_train_model(model_efficientnet, X_train_img, X_train_feat, y_train, X_test_img, X_test_feat, y_test, callbacks=custom_callbacks)

model_efficientnet_save_path = r'E:\Computer_Vision_projects\inference_drd\Diabetic_Retinopathy_Detection\efficientnet_with_additional_features.h5'
evaluate_and_save_model(model_efficientnet, X_test_img, X_test_feat, y_test, model_efficientnet_save_path)

Epoch 1/12
[1m 8/34[0m [32m━━━━[0m[37m━━━━━━━━━━━━━━━━[0m [1m8:12[0m 19s/step - accuracy: 0.3296 - loss: 1.5173

In [4]:
# Usage example for ResNet50
csv_file = r'E:\Computer_Vision_projects\inference_drd\Diabetic_Retinopathy_Detection\remaining_dataset.csv'
df = load_csv(csv_file)
labels = extract_labels(df)
additional_features = extract_additional_features(df, ['HbA1c', 'Systolic_BP', 'Diastolic_BP', 'LDL', 'Duration', 'BMI', 'Glucose_SD', 'Triglycerides', 'Microalbuminuria', 'Smoking_years', 'Alcohol_frequency'])
additional_features, scaler = normalize_features(additional_features)
scaler_file = r'E:\Computer_Vision_projects\inference_drd\Diabetic_Retinopathy_Detection\additional_features_scaler.pkl'
save_scaler(scaler, scaler_file)
images = load_images(df['full_path'])
X_train_img, X_test_img, X_train_feat, X_test_feat, y_train, y_test = split_data(images, additional_features, labels)

custom_layers_resnet = [GlobalAveragePooling2D(), Dense(256, activation='relu'), Dropout(0.5), Dense(128, activation='relu')]
resnet_input, resnet_output = create_resnet50_model(custom_layers=custom_layers_resnet)
additional_input = create_additional_input((additional_features.shape[1],))
additional_layers = [Dense(128, activation='relu'), Dropout(0.5)]
model_resnet = create_combined_model(resnet_input, resnet_output, additional_input, additional_layers=additional_layers)

custom_callbacks = [
    ReduceLROnPlateau(monitor='val_loss', factor=0.2, patience=5, min_lr=0.00001),
    EarlyStopping(monitor='val_loss', patience=10, restore_best_weights=True)
]
history_resnet = compile_and_train_model(model_resnet, X_train_img, X_train_feat, y_train, X_test_img, X_test_feat, y_test, callbacks=custom_callbacks)

model_resnet_save_path = r'E:\Computer_Vision_projects\inference_drd\Diabetic_Retinopathy_Detection\resnet_with_additional_features.h5'
evaluate_and_save_model(model_resnet, X_test_img, X_test_feat, y_test, model_resnet_save_path)

Epoch 1/12
[1m 2/34[0m [32m━[0m[37m━━━━━━━━━━━━━━━━━━━[0m [1m18:37[0m 35s/step - accuracy: 0.1484 - loss: 2.0969   

In [6]:
import pkg_resources

# Function to get installed packages
def get_installed_packages():
    installed_packages = []
    for package in pkg_resources.working_set:
        installed_packages.append(package.project_name)
    return installed_packages

# Get installed packages
installed_packages = get_installed_packages()

# Write to requirements.txt
with open('requirements.txt', 'w') as f:
    for package in installed_packages:
        f.write(package + '\n')

print("Successfully saved installed packages to requirements.txt")


Successfully saved installed packages to requirements.txt
