In [None]:
# Import necessary libraries for data handling, image processing, and model building
import pandas as pd
import numpy as np
from tensorflow.keras.preprocessing.image import ImageDataGenerator
from tensorflow.keras.layers import Dense, Input, Dropout,  BatchNormalization, Activation
from tensorflow.keras.layers import Conv2D, Flatten, MaxPooling2D
from tensorflow.keras.callbacks import EarlyStopping, ReduceLROnPlateau
from tensorflow.keras.regularizers import l2
from tensorflow.keras.optimizers import AdamW
from tensorflow.keras.initializers import HeNormal
from tensorflow.keras.models import Model
from tensorflow.keras.utils import Sequence

# Specify the path to the Air-Pollution-Image-Dataset-From-India-and-Nepal CSV files after extraction
Provide the file paths for the training, validation, and testing CSV files from the dataset.

In [None]:
# Define file paths for the dataset CSV files
train_csv_file = 'Datasets/train_data.csv'
val_csv_file = 'Datasets/val_data.csv'
test_csv_file = 'Datasets/testing_data.csv'

In [None]:
# Implement Mixup augmentation to enhance model generalization
def apply_mixup(images, labels, alpha=0.2):
    batch_size = images.shape[0]
    lam = np.random.beta(alpha, alpha, batch_size)
    indices = np.random.permutation(batch_size)
    
    mixed_images = np.zeros_like(images)
    mixed_labels = np.zeros_like(labels)
    
    for i in range(batch_size):
        mixed_images[i] = lam[i] * images[i] + (1 - lam[i]) * images[indices[i]]
        mixed_labels[i] = lam[i] * labels[i] + (1 - lam[i]) * labels[indices[i]]
    
    return mixed_images, mixed_labels

# Create a custom Sequence class to apply Mixup to the data generator
class MixupSequence(Sequence):
    def __init__(self, generator, alpha=0.2):
        self.generator = generator
        self.alpha = alpha
        self.on_epoch_end()

    def __len__(self):
        return len(self.generator)

    def __getitem__(self, index):
        images, labels = self.generator[index]
        mixed_images, mixed_labels = apply_mixup(images, labels, self.alpha)
        return mixed_images, mixed_labels

    def on_epoch_end(self):
        self.generator.on_epoch_end()

    def __getattr__(self, attr):
        return getattr(self.generator, attr)

In [None]:
# Load and preprocess the dataset, including class weight calculation and data augmentation
import cv2

# Read the training, validation, and testing CSV files
train_df = pd.read_csv(train_csv_file)
val_df = pd.read_csv(val_csv_file)
test_df = pd.read_csv(test_csv_file)

# Compute class weights to balance the dataset
max_count = train_df['AQI_Class'].value_counts(normalize=True).max()
class_weights = {cls: max_count / count for cls, count in train_df['AQI_Class'].value_counts(normalize=True).items()}

# Define image normalization function using ImageNet statistics
def normalize(image):
    image = image.astype('float32') / 255.0
    mean = np.array([0.485, 0.456, 0.406])
    std = np.array([0.229, 0.224, 0.225])
    image = (image - mean) / std
    return image

# Set the image directory path
path = 'Datasets/Images'

# Configure training data generator with augmentation
train_datagen = ImageDataGenerator(
    preprocessing_function=normalize,
    width_shift_range=0.1,
    height_shift_range=0.1,
    horizontal_flip=True,
    brightness_range=[0.8, 1.2],
    fill_mode='nearest'
)

# Configure validation and test data generators without augmentation
val_datagen = ImageDataGenerator(preprocessing_function=normalize)

# Create data generators for training, validation, and testing
train_generator = MixupSequence(
    train_datagen.flow_from_dataframe(
        train_df,
        directory='Datasets/Images',
        x_col='Filename',
        y_col='AQI_Class',
        target_size=(224, 224),
        class_mode='categorical',
        batch_size=32
    ),
    alpha=0.1
)

val_generator = val_datagen.flow_from_dataframe(
    val_df,
    directory='Datasets/Images',
    x_col='Filename',
    y_col='AQI_Class',
    target_size=(224, 224),
    class_mode='categorical',
    batch_size=32
)

test_generator = val_datagen.flow_from_dataframe(
    test_df,
    directory='Datasets/Images',
    x_col='Filename',
    y_col='AQI_Class',
    target_size=(224, 224),
    class_mode='categorical',
    batch_size=32
)

# Build and train a model for AQI class image classification

In [None]:
# Build and train a VGG-19 model for AQI class image classification
# Define a helper function to create a convolutional block with specified filters and layers
def make_layer(x, filters, num_convs, kernel_size=3, stride=1, padding='same', weight_decay=1e-4):
    initializer = HeNormal(seed=42)  # Use Kaiming (HeNormal) initialization
    for _ in range(num_convs):
        x = Conv2D(filters, kernel_size, strides=stride, padding=padding, 
                   kernel_regularizer=l2(weight_decay), kernel_initializer=initializer)(x)
        x = BatchNormalization()(x)
        x = Activation('relu')(x)
    return x

# Define the VGG-19 architecture
def create_vgg19_model(input_shape=(224, 224, 3), num_classes=6, weight_decay=1e-4, dropout_rate=0.5):
    initializer = HeNormal(seed=42)  # Use Kaiming (HeNormal) initialization
    # Define input layer
    inputs = Input(shape=input_shape)

    # Block 1: 2 conv layers with 64 filters
    x = make_layer(inputs, filters=64, num_convs=2)
    x = MaxPooling2D(pool_size=(2, 2), strides=2)(x)

    # Block 2: 2 conv layers with 128 filters
    x = make_layer(x, filters=128, num_convs=2)
    x = MaxPooling2D(pool_size=(2, 2), strides=2)(x)

    # Block 3: 4 conv layers with 256 filters
    x = make_layer(x, filters=256, num_convs=4)
    x = MaxPooling2D(pool_size=(2, 2), strides=2)(x)

    # Block 4: 4 conv layers with 512 filters
    x = make_layer(x, filters=512, num_convs=4)
    x = MaxPooling2D(pool_size=(2, 2), strides=2)(x)

    # Block 5: 4 conv layers with 512 filters
    x = make_layer(x, filters=512, num_convs=4)
    x = MaxPooling2D(pool_size=(2, 2), strides=2)(x)

    # Flatten and fully connected layers
    x = Flatten()(x)
    x = Dense(4096, activation='relu', kernel_regularizer=l2(weight_decay), kernel_initializer=initializer)(x)
    x = Dropout(dropout_rate)(x)
    x = Dense(4096, activation='relu', kernel_regularizer=l2(weight_decay), kernel_initializer=initializer)(x)
    x = Dropout(dropout_rate)(x)
    outputs = Dense(num_classes, activation='softmax', kernel_regularizer=l2(weight_decay), kernel_initializer=initializer)(x)

    # Build and return the model
    model = Model(inputs=inputs, outputs=outputs)
    return model

# Create the VGG-19 model
weight_decay = 1e-4  # Set weight decay for regularization
model = create_vgg19_model(input_shape=(224, 224, 3), num_classes=6, weight_decay=weight_decay, dropout_rate=0.3)

In [None]:
# Compile the model with optimizer, loss function, and metrics
from tensorflow.keras.metrics import Precision, Recall
from tensorflow.keras.losses import CategoricalCrossentropy

initial_learning_rate = 0.0001
optimizer = AdamW(learning_rate=initial_learning_rate, weight_decay=weight_decay)

model.compile(
    optimizer=optimizer,
    loss=CategoricalCrossentropy(),
    metrics=['accuracy', Precision(name='precision'), Recall(name='recall')]
)

In [None]:
# Train the model with callbacks for early stopping and learning rate reduction
early_stopping = EarlyStopping(
    monitor='val_loss',
    patience=10,
    restore_best_weights=True,
    verbose=1
)

reduce_lr = ReduceLROnPlateau(
    monitor='val_loss',
    factor=0.2,
    patience=5,
    min_lr=1e-6,
    verbose=1
)

history = model.fit(
    train_generator,
    validation_data=val_generator,
    epochs=150,
    class_weight=class_weights,
    callbacks=[early_stopping, reduce_lr],
    verbose=1
)

model.save_weights('model.weights.h5')

# plot your Training Accuracy and validation accuracy here

In [None]:
import matplotlib.pyplot as plt

# Plot accuracy
plt.plot(history.history['accuracy'])
plt.plot(history.history['val_accuracy'])
plt.title('Model accuracy')
plt.xlabel('Epoch')

plt.ylabel('Accuracy')
plt.legend(['Train_acc', 'Val_acc'], loc='lower right')
plt.show()

In [None]:
import matplotlib.pyplot as plt

# Plot loss
plt.plot(history.history['loss'])
plt.plot(history.history['val_loss'])
plt.title('Model loss')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.legend(['Train_loss', 'Val_loss'], loc='upper right')
plt.show()

# Test your test data and save your result file in this format and upload your solution file on kaggle competition

In [None]:
test_probabilities = model.predict(test_generator)
test_filenames = test_generator.filenames
test_probabilities_df = pd.DataFrame(test_probabilities, columns=['a_Good', 'b_Moderate', 
                                                                  'c_Unhealthy_for_Sensitive_Groups', 'd_Unhealthy', 
                                                                  'e_Very_Unhealthy', 'f_Severe'])
test_probabilities_df.insert(0, 'Filename', test_filenames)
test_probabilities_df.to_csv('Outputs/test_results.csv', index=False)


In [None]:
from sklearn.metrics import f1_score
import matplotlib.pyplot as plt

# Evaluate the model on the test data
test_loss, test_accuracy, test_precision, test_recall = model.evaluate(test_generator)
print(f"Test Loss: {test_loss:.4f}")
print(f"Test Accuracy: {test_accuracy:.4f}")
print(f"Test Precision: {test_precision:.4f}")
print(f"Test Recall: {test_recall:.4f}")

# Generate predictions for the test data
y_pred = model.predict(test_generator)
y_pred = np.argmax(y_pred, axis=1)  # Convert predictions to class labels

# Get the true labels and predicted probabilities
y_true = test_generator.classes
y_pred_prob = model.predict(test_generator)

# Calculate the F1 score for each class
f1_scores = f1_score(y_true, y_pred, average=None)

In [None]:
# Calculate the F1 score for each class
f1_scores = f1_score(y_true, y_pred, average=None)

# Calculate the average F1 score
average_f1_score = f1_score(y_true, y_pred, average='macro')

# Print the F1 scores for each class
for i, f1 in enumerate(f1_scores):
    print("Class {}: F1 Score = {:.4f}".format(i, f1))

# Print the average F1 score
print("Average F1 Score: {:.4f}".format(average_f1_score))
print()
print("Test Accuracy: {:.4f}".format(test_accuracy))

In [None]:
from sklearn.metrics import roc_curve, roc_auc_score
from sklearn.preprocessing import label_binarize

# Binarize the true labels
y_true_binarized = label_binarize(y_true, classes=list(range(len(test_generator.class_indices))))

# Calculate the ROC AUC score for each class
roc_auc_scores = roc_auc_score(y_true_binarized, y_pred_prob, average='micro')

# Plot the ROC curve for each class
fpr = dict()
tpr = dict()
roc_auc = dict()

for i in range(len(test_generator.class_indices)):
    fpr[i], tpr[i], _ = roc_curve(y_true_binarized[:, i], y_pred_prob[:, i])
    roc_auc[i] = roc_auc_score(y_true_binarized[:, i], y_pred_prob[:, i])

# Plot the ROC curves
plt.figure(figsize=(8, 6))
for i in range(len(test_generator.class_indices)):
    plt.plot(fpr[i], tpr[i], label='Class %d (AUC = %0.2f)' % (i, roc_auc[i]))

plt.plot([0, 1], [0, 1], 'k--')  # Plot the random chance curve
plt.xlim([0.0, 1.0])
plt.ylim([0.0, 1.05])
plt.xlabel('False Positive Rate')
plt.ylabel('True Positive Rate')
plt.title('Receiver Operating Characteristic')
plt.legend(loc="lower right")

# Add micro-average ROC AUC score to the plot
plt.text(0.555, 0.34, 'Micro-average ROC AUC = %0.2f' % roc_auc_scores, fontsize=12, bbox=dict(facecolor='white', edgecolor='black'))

plt.show()


In [None]:
from sklearn.metrics import mean_absolute_error, mean_squared_error, confusion_matrix

# Generate predictions for the training data
train_pred = model.predict(train_generator)
train_pred = np.argmax(train_pred, axis=1)  # Convert predictions to class labels

# Generate predictions for the validation data
val_pred = model.predict(val_generator)
val_pred = np.argmax(val_pred, axis=1)  # Convert predictions to class labels

# Generate predictions for the testing data
test_pred = model.predict(test_generator)
test_pred = np.argmax(test_pred, axis=1)  # Convert predictions to class labels

# Calculate overall MAE and RMSE
train_mae = mean_absolute_error(train_generator.classes, train_pred)
train_rmse = np.sqrt(mean_squared_error(train_generator.classes, train_pred))

val_mae = mean_absolute_error(val_generator.classes, val_pred)
val_rmse = np.sqrt(mean_squared_error(val_generator.classes, val_pred))

test_mae = mean_absolute_error(test_generator.classes, test_pred)
test_rmse = np.sqrt(mean_squared_error(test_generator.classes, test_pred))

print("Overall MAE - Training: {:.4f}".format(train_mae))
print("Overall RMSE - Training: {:.4f}".format(train_rmse))
print("Overall MAE - Validation: {:.4f}".format(val_mae))
print("Overall RMSE - Validation: {:.4f}".format(val_rmse))
print("Overall MAE - Testing: {:.4f}".format(test_mae))
print("Overall RMSE - Testing: {:.4f}".format(test_rmse))

In [None]:
import seaborn as sns
import matplotlib.pyplot as plt

# Calculate confusion matrices
train_cm = confusion_matrix(train_generator.classes, train_pred)
val_cm = confusion_matrix(val_generator.classes, val_pred)
test_cm = confusion_matrix(test_generator.classes, test_pred)

# Define class labels
class_labels = list(test_generator.class_indices.keys())

# Plot confusion matrix for training set
plt.figure(figsize=(8, 6))
sns.heatmap(train_cm, annot=True, cmap='Blues', fmt='d', xticklabels=class_labels, yticklabels=class_labels)
plt.title('Confusion Matrix - Training')
plt.xlabel('Predicted Label')
plt.ylabel('True Label')
plt.xticks(rotation=45, ha='right')
plt.tight_layout()
plt.show()

# Plot confusion matrix for validation set
plt.figure(figsize=(8, 6))
sns.heatmap(val_cm, annot=True, cmap='Greens', fmt='d', xticklabels=class_labels, yticklabels=class_labels)
plt.title('Confusion Matrix - Validation')
plt.xlabel('Predicted Label')
plt.ylabel('True Label')
plt.xticks(rotation=45, ha='right')
plt.tight_layout()
plt.show()

# Plot confusion matrix for testing set
plt.figure(figsize=(8, 6))
sns.heatmap(test_cm, annot=True, cmap='Reds', fmt='d', xticklabels=class_labels, yticklabels=class_labels)
plt.title('Confusion Matrix - Testing')
plt.xlabel('Predicted Label')
plt.ylabel('True Label')
plt.xticks(rotation=45, ha='right')
plt.tight_layout() 
plt.show()


: 