Code for HW4_312551814

In [None]:
# Import Libraries
import os
import pickle
import numpy as np
import tensorflow as tf
from tensorflow.keras.applications import EfficientNetB0
from tensorflow.keras.applications.efficientnet import preprocess_input
from tensorflow.keras.layers import Dense, GlobalAveragePooling2D, Input, Attention, Permute
from tensorflow.keras.models import Model, save_model
from tensorflow.keras.optimizers import Adam
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestClassifier, GradientBoostingClassifier
from sklearn.svm import SVC
from sklearn.neural_network import MLPClassifier

# Define constants
BASE_PATH = r"/content"
OUTPUT_PATH = r"/kaggle/working/"
TRAIN_PATH = os.path.join(BASE_PATH, 'train')
TEST_PATH = os.path.join(BASE_PATH, 'test')
CLASS_0_PATH = os.path.join(TRAIN_PATH, 'class_0')
CLASS_1_PATH = os.path.join(TRAIN_PATH, 'class_1')
IMAGE_SIZE = (128, 128)
BATCH_SIZE = 128

def load_images_from_pkl(file_path):
    """Load images from a pickle file."""
    with open(file_path, 'rb') as f:
        data = pickle.load(f)
    return np.array(data)

def load_data(class_paths):
    """Load training data and labels."""
    data, labels, files = [], [], []
    for label, class_path in enumerate(class_paths):
        for file_name in os.listdir(class_path):
            file_path = os.path.join(class_path, file_name)
            images = load_images_from_pkl(file_path)
            data.append(images)
            labels.append(label)
            files.append(file_name)
    return np.array(data), np.array(labels), files

def extract_features(model, images):
    """Extract features using a pre-trained model."""
    processed_images = preprocess_input(images)
    features = model.predict(processed_images)
    flattened_features = features.reshape((features.shape[0], -1))
    return flattened_features

def create_efficientnet_model(input_shape):
    """Create an EfficientNet model without the top layer."""
    base_model = EfficientNetB0(weights='imagenet', include_top=False, input_shape=input_shape)
    x = base_model.output
    x = GlobalAveragePooling2D()(x)
    return Model(inputs=base_model.input, outputs=x)

def create_submission_file(test_files, predictions, output_folder):
    """Create the submission file."""
    output_file = os.path.join(output_folder, 'submission.csv')
    with open(output_file, 'w') as f:
        f.write('image_id,y_pred\n')
        for file, pred in zip(test_files, predictions):
            file_id = os.path.splitext(file)[0]
            f.write(f'{file_id},{pred}\n')
    print("Submission file created successfully.")

def mil_attention_pooling(input_shape):
    """Create an MIL model with attention mechanism."""
    input_layer = Input(shape=input_shape)
    attention_probs = Dense(input_shape[0], activation='softmax', name='attention_vec')(input_layer)
    attention_mul = tf.keras.layers.multiply([input_layer, attention_probs])
    return Model(inputs=input_layer, outputs=attention_mul)

def main():
    # Load training data
    train_data, train_labels, _ = load_data([CLASS_0_PATH, CLASS_1_PATH])

    # Load test data
    test_data, _, test_files = load_data([TEST_PATH])

    # Create EfficientNet model
    efficientnet_model = create_efficientnet_model((*IMAGE_SIZE, 3))

    # Extract features for training data
    train_features = [extract_features(efficientnet_model, bag) for bag in train_data]
    train_features = np.array(train_features)
    flattened_train_features = train_features.reshape((train_features.shape[0], -1))

    # Split data into training and validation sets
    X_train, X_val, y_train, y_val = train_test_split(flattened_train_features, train_labels, test_size=0.2, random_state=42)

    # Initialize classifiers
    classifiers = {
        "RandomForest": RandomForestClassifier(n_estimators=100, random_state=42),
        "SVC": SVC(kernel='linear', random_state=42),
        "MLP": MLPClassifier(hidden_layer_sizes=(100,), max_iter=1000, random_state=42)
    }

    # Train classifiers and evaluate
    for clf_name, clf in classifiers.items():
        clf.fit(X_train, y_train)
        accuracy = clf.score(X_val, y_val)
        print(f'Validation accuracy {clf_name}: {accuracy}')

    # Choose the best model based on validation accuracy
    best_clf_name = max(classifiers, key=lambda clf_name: classifiers[clf_name].score(X_val, y_val))
    best_clf = classifiers[best_clf_name]
    print(f'Best classifier: {best_clf_name}')

    # Save the best classifier model
    with open(os.path.join(OUTPUT_PATH, 'best_classifier.pkl'), 'wb') as f:
        pickle.dump(best_clf, f)
    print(f'{best_clf_name} model saved successfully.')

    # Extract features for test data
    test_features = [extract_features(efficientnet_model, bag) for bag in test_data]
    test_features = np.array(test_features)
    flattened_test_features = test_features.reshape((test_features.shape[0], -1))

    # Predict using the best classifier
    test_predictions = best_clf.predict(flattened_test_features)

    # Create submission file
    create_submission_file(test_files, test_predictions, OUTPUT_PATH)

if __name__ == "__main__":
    main()


Question 1 code


In [None]:
# Import Libraries
import os
import pickle
import numpy as np
import tensorflow as tf
import matplotlib.pyplot as plt
from tensorflow.keras.applications import EfficientNetB0
from tensorflow.keras.applications.efficientnet import preprocess_input
from tensorflow.keras.layers import Dense, GlobalAveragePooling2D, Input, Attention, Permute
from tensorflow.keras.models import Model, save_model
from tensorflow.keras.optimizers import Adam
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestClassifier, GradientBoostingClassifier
from sklearn.svm import SVC
from sklearn.neural_network import MLPClassifier
from sklearn.metrics import precision_score, recall_score, f1_score, accuracy_score
from sklearn.model_selection import learning_curve

# Define constants
BASE_PATH = r"/content"
OUTPUT_PATH = r"/content/"
TRAIN_PATH = os.path.join(BASE_PATH, 'train')
TEST_PATH = os.path.join(BASE_PATH, 'test')
CLASS_0_PATH = os.path.join(TRAIN_PATH, 'class_0')
CLASS_1_PATH = os.path.join(TRAIN_PATH, 'class_1')
IMAGE_SIZE = (128, 128)
BATCH_SIZE = 128

def load_images_from_pkl(file_path):
    """Load images from a pickle file."""
    with open(file_path, 'rb') as f:
        data = pickle.load(f)
    return np.array(data)

def load_data(class_paths):
    """Load training data and labels."""
    data, labels, files = [], [], []
    for label, class_path in enumerate(class_paths):
        for file_name in os.listdir(class_path):
            file_path = os.path.join(class_path, file_name)
            images = load_images_from_pkl(file_path)
            data.append(images)
            labels.append(label)
            files.append(file_name)
    return np.array(data), np.array(labels), files

def extract_features(model, images):
    """Extract features using a pre-trained model."""
    processed_images = preprocess_input(images)
    features = model.predict(processed_images)
    flattened_features = features.reshape((features.shape[0], -1))
    return flattened_features

def create_efficientnet_model(input_shape):
    """Create an EfficientNet model without the top layer."""
    base_model = EfficientNetB0(weights='imagenet', include_top=False, input_shape=input_shape)
    x = base_model.output
    x = GlobalAveragePooling2D()(x)
    return Model(inputs=base_model.input, outputs=x)

def create_submission_file(test_files, predictions, output_folder):
    """Create the submission file."""
    output_file = os.path.join(output_folder, 'submission.csv')
    with open(output_file, 'w') as f:
        f.write('image_id,y_pred\n')
        for file, pred in zip(test_files, predictions):
            file_id = os.path.splitext(file)[0]
            f.write(f'{file_id},{pred}\n')
    print("Submission file created successfully.")

def mil_attention_pooling(input_shape):
    """Create an MIL model with attention mechanism."""
    input_layer = Input(shape=input_shape)
    attention_probs = Dense(input_shape[0], activation='softmax', name='attention_vec')(input_layer)
    attention_mul = tf.keras.layers.multiply([input_layer, attention_probs])
    return Model(inputs=input_layer, outputs=attention_mul)

def plot_learning_curve(estimator, X, y, title="Learning Curve", cv=5, n_jobs=None):
    """Plot learning curve for a given estimator."""
    train_sizes, train_scores, test_scores = learning_curve(estimator, X, y, cv=cv, n_jobs=n_jobs, train_sizes=np.linspace(.1, 1.0, 5))

    train_scores_mean = np.mean(train_scores, axis=1)
    train_scores_std = np.std(train_scores, axis=1)
    test_scores_mean = np.mean(test_scores, axis=1)
    test_scores_std = np.std(test_scores, axis=1)

    plt.figure()
    plt.title(title)
    plt.xlabel("Training examples")
    plt.ylabel("Score")
    plt.ylim(0.0, 1.1)
    plt.grid()

    plt.fill_between(train_sizes, train_scores_mean - train_scores_std, train_scores_mean + train_scores_std, alpha=0.1, color="r")
    plt.fill_between(train_sizes, test_scores_mean - test_scores_std, test_scores_mean + test_scores_std, alpha=0.1, color="g")
    plt.plot(train_sizes, train_scores_mean, 'o-', color="r", label="Training score")
    plt.plot(train_sizes, test_scores_mean, 'o-', color="g", label="Cross-validation score")

    plt.legend(loc="best")
    return plt

def main():
    # Load training data
    train_data, train_labels, _ = load_data([CLASS_0_PATH, CLASS_1_PATH])

    # Load test data
    test_data, _, test_files = load_data([TEST_PATH])

    # Create EfficientNet model
    efficientnet_model = create_efficientnet_model((*IMAGE_SIZE, 3))

    # Extract features for training data
    train_features = [extract_features(efficientnet_model, bag) for bag in train_data]
    train_features = np.array(train_features)
    flattened_train_features = train_features.reshape((train_features.shape[0], -1))

    # Split data into training and validation sets
    X_train, X_val, y_train, y_val = train_test_split(flattened_train_features, train_labels, test_size=0.2, random_state=42)

    # Initialize classifiers
    classifiers = {
        "RandomForest": RandomForestClassifier(n_estimators=100, random_state=42),
        "SVC": SVC(kernel='linear', random_state=42),
        "MLP": MLPClassifier(hidden_layer_sizes=(100,), max_iter=1000, random_state=42)
    }

    # Train classifiers, evaluate, and plot learning curves
    best_accuracy = 0
    best_clf_name = ""
    best_clf = None

    for clf_name, clf in classifiers.items():
        clf.fit(X_train, y_train)
        val_predictions = clf.predict(X_val)

        # Calculate evaluation metrics
        accuracy = accuracy_score(y_val, val_predictions)
        precision = precision_score(y_val, val_predictions)
        recall = recall_score(y_val, val_predictions)
        f1 = f1_score(y_val, val_predictions)

        print(f'Validation metrics for {clf_name}:')
        print(f'Accuracy: {accuracy:.4f}')
        print(f'Precision: {precision:.4f}')
        print(f'Recall: {recall:.4f}')
        print(f'F1-score: {f1:.4f}\n')

        # Plot learning curve
        plot_learning_curve(clf, X_train, y_train, title=f"Learning Curve for {clf_name}")

        # Save the best classifier based on accuracy
        if accuracy > best_accuracy:
            best_accuracy = accuracy
            best_clf_name = clf_name
            best_clf = clf

    print(f'Best classifier: {best_clf_name}')

    # Save the best classifier model
    with open(os.path.join(OUTPUT_PATH, 'best_classifier.pkl'), 'wb') as f:
        pickle.dump(best_clf, f)
    print(f'{best_clf_name} model saved successfully.')

    # Perform ablation study by modifying one feature and evaluating the impact
    def ablation_study(feature_modification_func):
        modified_train_features = feature_modification_func(flattened_train_features)
        X_train_mod, X_val_mod, y_train_mod, y_val_mod = train_test_split(modified_train_features, train_labels, test_size=0.2, random_state=42)
        best_clf.fit(X_train_mod, y_train_mod)
        val_predictions_mod = best_clf.predict(X_val_mod)
        accuracy_mod = accuracy_score(y_val_mod, val_predictions_mod)
        print(f'Ablation study accuracy with feature modification: {accuracy_mod:.4f}')

    # Example ablation: removing the last feature
    def remove_last_feature(features):
        return features[:, :-1]

    # Perform ablation on the validation set
    ablation_study(remove_last_feature)

    # Extract features for test data
    test_features = [extract_features(efficientnet_model, bag) for bag in test_data]
    test_features = np.array(test_features)
    flattened_test_features = test_features.reshape((test_features.shape[0], -1))

    # Apply the same ablation to the test set
    flattened_test_features = remove_last_feature(flattened_test_features)

    # Predict using the best classifier
    test_predictions = best_clf.predict(flattened_test_features)

    # Create submission file
    create_submission_file(test_files, test_predictions, OUTPUT_PATH)

if __name__ == "__main__":
    main()
