**Section 1**

- Importing Libraries
- Setting Global Variables
- Setting Seeds for random data splitting


In [None]:
import os
import random
from pathlib import Path

import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import seaborn as sns
import tensorflow as tf
from sklearn.metrics import classification_report, confusion_matrix
from sklearn.model_selection import train_test_split
from sklearn.utils.class_weight import compute_class_weight
from tensorflow.keras import layers, models
from tensorflow.keras.applications import EfficientNetB0, MobileNetV2, ResNet50
from tensorflow.keras.callbacks import EarlyStopping, ModelCheckpoint, ReduceLROnPlateau
from tensorflow.keras.preprocessing.image import ImageDataGenerator

SEED = 42
random.seed(SEED)
np.random.seed(SEED)
tf.random.set_seed(SEED)

gpus = tf.config.experimental.list_physical_devices("GPU")
if gpus:
    try:
        tf.config.experimental.set_visible_devices(
            gpus[0],
            "GPU",
        )

        for gpu in gpus:
            tf.config.experimental.set_memory_growth(gpu, True)

        logical_gpus = tf.config.list_logical_devices("GPU")
        print(f"Using {len(logical_gpus)} GPU(s): {logical_gpus}")
    except RuntimeError as e:
        print(f"GPU configuration error: {e}")
else:
    print("No GPU found. Using CPU.")


policy = tf.keras.mixed_precision.Policy("mixed_float16")
tf.keras.mixed_precision.set_global_policy(policy)
print(f"Compute dtype: {policy.compute_dtype}")
print(f"Variable dtype: {policy.variable_dtype}")

IMG_SIZE = 224
BATCH_SIZE = 64
EPOCHS = 20

ERA_RANGES = [
    (1980, 2000, "Container Era (CDs, Cassettes)"),
    (2000, 2024, "Digital Era (Streaming, Downloads)"),
]

NUM_CLASSES = 2

DATA_DIR = Path("album_covers_data")
TRAIN_DIR = DATA_DIR / "train"
TEST_DIR = DATA_DIR / "test"
OUTPUT_DIR = Path("output")
OUTPUT_DIR.mkdir(exist_ok=True)


**Section 2**

- Get Era Labels
- Get Era Indices


In [None]:
def get_era_label(year):
    for start_year, end_year, era in ERA_RANGES:
        if start_year <= year <= end_year:
            return era
    return None


def get_era_index(year):
    for i, (start_year, end_year, _) in enumerate(ERA_RANGES):
        if start_year <= year <= end_year:
            return i
    return None

**Section 3**

- Count images in year-based subdirectories
- Outputs training and dataset counts


In [None]:
def count_images_per_year(directory):
    year_counts = {}

    for year_folder in sorted(os.listdir(directory)):
        if not year_folder.isdigit():
            continue

        year = int(year_folder)
        year_path = directory / year_folder
        if os.path.isdir(year_path):
            num_images = len(
                [
                    f
                    for f in os.listdir(year_path)
                    if f.endswith((".jpg", ".jpeg", ".png"))
                ]
            )
            year_counts[year] = num_images

    return year_counts


print("Counting images per year in training data...")
train_counts = count_images_per_year(TRAIN_DIR)
test_counts = count_images_per_year(TEST_DIR)

print("Training data counts:")
for year, count in sorted(train_counts.items()):
    print(f"{year}: {count}")

print("\nTest data counts:")
for year, count in sorted(test_counts.items()):
    print(f"{year}: {count}")

**Section 4**

- Processes year-based subdirectories
- Displays counts in text and barplot formats for training and testing datasets


In [None]:
train_df = pd.DataFrame(list(train_counts.items()), columns=["Year", "TrainCount"])
test_df = pd.DataFrame(list(test_counts.items()), columns=["Year", "TestCount"])

counts_df = pd.merge(
    train_df,
    test_df,
    on="Year",
    how="outer",
).fillna(0)
counts_df["TotalCount"] = counts_df["TrainCount"] + counts_df["TestCount"]

counts_df["TrainPct"] = counts_df["TrainCount"] / counts_df["TotalCount"] * 100
counts_df["TestPct"] = counts_df["TestCount"] / counts_df["TotalCount"] * 100

counts_df["Era"] = counts_df["Year"].apply(get_era_label)

print("\nData Distribution by Era:")
print(counts_df[["Year", "TrainCount", "TestCount", "TotalCount", "Era"]])

plt.figure(figsize=(14, 7))
sns.barplot(
    x="Year",
    y="TotalCount",
    hue="Era",
    data=counts_df,
    palette="viridis",
)
plt.xticks(rotation=90)
plt.title("Number of Album Covers by Year")
plt.ylabel("Count")
plt.tight_layout()
plt.savefig(OUTPUT_DIR / "album_covers_by_year.png")
plt.show()

era_counts = (
    counts_df.groupby("Era")
    .agg(
        {
            "TrainCount": "sum",
            "TestCount": "sum",
            "TotalCount": "sum",
        }
    )
    .reset_index()
)

plt.figure(figsize=(14, 7))
sns.barplot(
    x="Era",
    y="TotalCount",
    data=era_counts,
    palette="magma",
)
plt.xticks(rotation=45)
plt.title("Number of Album Covers by Era")
plt.ylabel("Count")
plt.tight_layout()
plt.savefig(OUTPUT_DIR / "album_covers_by_era.png")
plt.show()

**Section 5**

- Creates datasets with file paths, years and era labels
- Outputs training and testing dataset sizes
- Displays class distribution


In [None]:
def create_dataset(directory, era_mapping=None):
    labels = []
    years = []
    file_paths = []

    for year_folder in os.listdir(directory):
        if not year_folder.isdigit():
            continue
        year = int(year_folder)
        era_idx = get_era_index(year)

        if era_idx is None:
            continue

        year_path = directory / year_folder

        if os.path.isdir(year_path):
            for img_file in os.listdir(year_path):
                if img_file.endswith((".jpg", ".jpeg", ".png")):
                    img_path = year_path / img_file
                    labels.append(era_idx)
                    years.append(year)
                    file_paths.append(str(img_path))

    return pd.DataFrame(
        {
            "filepath": file_paths,
            "year": years,
            "era_index": labels,
            "era": [ERA_RANGES[idx][2] for idx in labels],
        }
    )


print("\nCreating training dataset...")
train_df = create_dataset(TRAIN_DIR)
test_df = create_dataset(TEST_DIR)

print(f"\nTraining dataset size: {len(train_df)}")
print(f"Test dataset size: {len(test_df)}")

print("\nClass distribution in training dataset:")
print(train_df["era"].value_counts().sort_index())

print("\nClass distribution in test dataset:")
print(test_df["era"].value_counts().sort_index())

**Section 6**

- Combines training and testing dataset into one
- Computes balanced class weights for eras


In [None]:
combined_df = pd.concat([train_df, test_df], ignore_index=True)
print("\nCombined dataset size:", len(combined_df))

class_weights = compute_class_weight(
    class_weight="balanced",
    classes=np.unique(combined_df["era_index"]),
    y=combined_df["era_index"],
)

class_weight_dict = {i: weight for i, weight in enumerate(class_weights)}

print("\nClass weights:")
for era_idx, weight in class_weight_dict.items():
    print(f"Era {ERA_RANGES[era_idx][2]}: {weight:.2f}")


**Section 7**

- Splits combined dataset into training, validation and test sets
- Ensures stratification by era index for balanced splits


In [None]:
train_val_df, final_test_df = train_test_split(
    combined_df, test_size=0.15, stratify=combined_df["era_index"], random_state=SEED
)

train_df, val_df = train_test_split(
    train_val_df,
    test_size=0.1765,
    stratify=train_val_df["era_index"],
    random_state=SEED,
)

print("\nFinal dataset sizes:")
print(f"Training size: {len(train_df)} ({len(train_df) / len(combined_df) * 100:.2f}%)")
print(f"Validation size: {len(val_df)} ({len(val_df) / len(combined_df) * 100:.2f}%)")
print(
    f"Test size: {len(final_test_df)} ({len(final_test_df) / len(combined_df) * 100:.2f}%)"
)

print("\nClass distribution in splits:")
for name, df in [
    ("Training", train_df),
    ("Validation", val_df),
    ("Test", final_test_df),
]:
    era_counts = df["era"].value_counts().sort_index()
    print(f"\n{name} dataset:")
    for era, count in era_counts.items():
        pct = count / len(df) * 100
        print(f"{era}: {count} ({pct:.2f}%)")

**Section 8**

- Prepares data generatoes for training, validation, and testing
- Applies data augmentation (to prevent overfitting and the model from learning images rather than learning patterns)


In [None]:
train_datagen = ImageDataGenerator(
    rescale=1.0 / 255,
    rotation_range=30,
    width_shift_range=0.3,
    height_shift_range=0.3,
    brightness_range=[0.7, 1.3],
    shear_range=0.2,
    zoom_range=0.3,
    horizontal_flip=True,
    vertical_flip=False,
    channel_shift_range=0.2,
    fill_mode="nearest",
)

valid_test_datagen = ImageDataGenerator(rescale=1.0 / 255)

train_df["era_index"] = train_df["era_index"].astype(str)
val_df["era_index"] = val_df["era_index"].astype(str)
final_test_df["era_index"] = final_test_df["era_index"].astype(str)

train_generator = train_datagen.flow_from_dataframe(
    dataframe=train_df,
    x_col="filepath",
    y_col="era_index",
    target_size=(IMG_SIZE, IMG_SIZE),
    batch_size=BATCH_SIZE,
    class_mode="sparse",
    shuffle=True,
)

val_generator = valid_test_datagen.flow_from_dataframe(
    dataframe=val_df,
    x_col="filepath",
    y_col="era_index",
    target_size=(IMG_SIZE, IMG_SIZE),
    batch_size=BATCH_SIZE,
    class_mode="sparse",
    shuffle=False,
)

test_generator = valid_test_datagen.flow_from_dataframe(
    dataframe=final_test_df,
    x_col="filepath",
    y_col="era_index",
    target_size=(IMG_SIZE, IMG_SIZE),
    batch_size=BATCH_SIZE,
    class_mode="sparse",
    shuffle=False,
)

print("\nData Generators Created:")
print(f"Training generator size: {len(train_generator)}")
print(f"Validation generator size: {len(val_generator)}")
print(f"Test generator size: {len(test_generator)}")


**Section 9**

- Saves training, validation, and test datasets


In [None]:
train_df.to_csv(OUTPUT_DIR / "train_data.csv", index=False)
val_df.to_csv(OUTPUT_DIR / "validation_data.csv", index=False)
final_test_df.to_csv(OUTPUT_DIR / "test_data.csv", index=False)

print("\nData preparation complete!")
print("The next step is to define and train the model.")

In [None]:
def create_resnet_model(input_shape=(224, 224, 3), num_classes=NUM_CLASSES):
    base_model = ResNet50(
        input_shape=input_shape,
        include_top=False,
        weights="imagenet",
    )

    for layer in base_model.layers[:-30]:
        layer.trainable = False

    model = models.Sequential(
        [
            base_model,
            layers.GlobalAveragePooling2D(),
            layers.BatchNormalization(),
            layers.Dense(512, activation="relu"),
            layers.Dropout(0.5),
            layers.Dense(256, activation="relu"),
            layers.Dropout(0.3),
            layers.Dense(num_classes, activation="softmax"),
        ]
    )

    return model


def create_efficientnet_model(input_shape=(224, 224, 3), num_classes=len(ERA_RANGES)):
    base_model = EfficientNetB0(
        input_shape=input_shape, include_top=False, weights="imagenet"
    )

    for layer in base_model.layers[:-20]:
        layer.trainable = False

    model = models.Sequential(
        [
            base_model,
            layers.GlobalAveragePooling2D(),
            layers.BatchNormalization(),
            layers.Dense(512, activation="relu"),
            layers.Dropout(0.5),
            layers.Dense(256, activation="relu"),
            layers.Dropout(0.3),
            layers.Dense(num_classes, activation="softmax"),
        ]
    )

    return model


In [None]:
def custom_learning_rate(epoch):
    initial_lr = 5e-5
    max_lr = 2e-4
    min_lr = 1e-6
    warmup_epochs = 2

    if epoch < warmup_epochs:
        return initial_lr + (max_lr - initial_lr) * (epoch / warmup_epochs)

    return float(max_lr * tf.math.exp(0.1 * (warmup_epochs - epoch)))

In [None]:
callbacks = [
    EarlyStopping(
        monitor="val_accuracy",
        patience=10,
        restore_best_weights=True,
        verbose=1,
    ),
    ReduceLROnPlateau(
        monitor="val_loss",
        factor=0.5,
        patience=3,
        min_lr=1e-6,
        verbose=1,
    ),
    ModelCheckpoint(
        filepath=OUTPUT_DIR / "best_model_{epoch:02d}_{val_accuracy:.4f}.keras",
        monitor="val_accuracy",
        save_best_only=True,
        mode="max",
        verbose=1,
    ),
    tf.keras.callbacks.LearningRateScheduler(custom_learning_rate, verbose=1),
]

In [None]:
class GPUMemoryCallback(tf.keras.callbacks.Callback):
    def on_epoch_end(self, epoch, logs=None):
        if gpus:
            import subprocess

            result = subprocess.check_output(
                [
                    "nvidia-smi",
                    "--query-gpu=memory.used,memory.total",
                    "--format=csv,nounits,noheader",
                ]
            )
            line = result.decode("utf-8").strip()
            memory_used, memory_total = [s.strip() for s in line.split(",")]
            print(
                f"\nGPU Memory: {int(memory_used)} MB / {int(memory_total)} MB ({int(memory_used) / int(memory_total) * 100:.1f}%)"
            )


callbacks.append(GPUMemoryCallback())

In [None]:
def train_model(
    model, train_generator, validation_generator, class_weight_dict, epochs=30
):
    print("Stage 1: Training only top layers...")
    model.compile(
        optimizer=tf.keras.optimizers.Adam(learning_rate=1e-4),
        loss="sparse_categorical_crossentropy",
        metrics=["accuracy"],
    )

    history1 = model.fit(
        train_generator,
        epochs=5,
        validation_data=validation_generator,
        class_weight=class_weight_dict,
        callbacks=callbacks,
        verbose=1,
    )

    print("Stage 2: Fine-tuning with more layers...")

    if isinstance(model.layers[0], tf.keras.Model) and "resnet" in model.layers[0].name:
        for layer in model.layers[0].layers[-50:]:
            layer.trainable = True

    elif (
        isinstance(model.layers[0], tf.keras.Model)
        and "efficientnet" in model.layers[0].name
    ):
        for layer in model.layers[0].layers[-30:]:
            layer.trainable = True

    model.compile(
        optimizer=tf.keras.optimizers.Adam(learning_rate=5e-5),
        loss="sparse_categorical_crossentropy",
        metrics=["accuracy"],
    )

    history2 = model.fit(
        train_generator,
        epochs=epochs - 5,
        validation_data=validation_generator,
        class_weight=class_weight_dict,
        callbacks=callbacks,
        verbose=1,
    )

    combined_history = {}
    for k in history1.history.keys():
        combined_history[k] = history1.history[k] + history2.history[k]

    return model, combined_history

In [None]:
print("\nTraining ResNet model with advanced techniques...")
resnet_model = create_resnet_model()
resnet_model, resnet_history = train_model(
    resnet_model, train_generator, val_generator, class_weight_dict
)

resnet_model.save(OUTPUT_DIR / "resnet_final_model.keras")
print("ResNet model training complete!")

In [None]:
print("\nTraining EfficientNet model with advanced techniques...")
efficientnet_model = create_efficientnet_model()
efficientnet_model, efficientnet_history = train_model(
    efficientnet_model, train_generator, val_generator, class_weight_dict
)

efficientnet_model.save(OUTPUT_DIR / "efficientnet_final_model.keras")
print("EfficientNet model training complete!")

In [None]:
plt.figure(figsize=(12, 10))

plt.subplot(2, 2, 1)
plt.plot(resnet_history["accuracy"], label="ResNet Training")
plt.plot(efficientnet_history["accuracy"], label="EfficientNet Training")
plt.title("Training Accuracy")
plt.ylabel("Accuracy")
plt.xlabel("Epoch")
plt.legend()
plt.grid(True, linestyle="--", alpha=0.6)

plt.subplot(2, 2, 2)
plt.plot(resnet_history["val_accuracy"], label="ResNet Validation")
plt.plot(efficientnet_history["val_accuracy"], label="EfficientNet Validation")
plt.title("Validation Accuracy")
plt.ylabel("Accuracy")
plt.xlabel("Epoch")
plt.legend()
plt.grid(True, linestyle="--", alpha=0.6)

plt.subplot(2, 2, 3)
plt.plot(resnet_history["loss"], label="ResNet Training")
plt.plot(efficientnet_history["loss"], label="EfficientNet Training")
plt.title("Training Loss")
plt.ylabel("Loss")
plt.xlabel("Epoch")
plt.legend()
plt.grid(True, linestyle="--", alpha=0.6)

plt.subplot(2, 2, 4)
plt.plot(resnet_history["val_loss"], label="ResNet Validation")
plt.plot(efficientnet_history["val_loss"], label="EfficientNet Validation")
plt.title("Validation Loss")
plt.ylabel("Loss")
plt.xlabel("Epoch")
plt.legend()
plt.grid(True, linestyle="--", alpha=0.6)

plt.tight_layout()
plt.savefig(OUTPUT_DIR / "training_comparison.png")
plt.show()

In [None]:
print("\nEvaluating ResNet model on test data...")
resnet_results = resnet_model.evaluate(test_generator, verbose=1)
print(f"ResNet Test Loss: {resnet_results[0]:.4f}")
print(f"ResNet Test Accuracy: {resnet_results[1]:.4f}")

print("\nEvaluating EfficientNet model on test data...")
efficientnet_results = efficientnet_model.evaluate(test_generator, verbose=1)
print(f"EfficientNet Test Loss: {efficientnet_results[0]:.4f}")
print(f"EfficientNet Test Accuracy: {efficientnet_results[1]:.4f}")

In [None]:
def ensemble_predict(models, image_path, img_size=224):
    img = tf.io.read_file(image_path)
    img = tf.image.decode_jpeg(img, channels=3)  # type: ignore
    img = tf.image.resize(img, (img_size, img_size))
    img = img / 255.0  # type: ignore
    img = tf.expand_dims(img, 0)

    predictions = []
    for model in models:
        pred = model.predict(img)
        predictions.append(pred)

    avg_pred = np.mean(predictions, axis=0)
    predicted_class = np.argmax(avg_pred[0])
    confidence = np.max(avg_pred[0]) * 100

    era_names = [era_name for _, _, era_name in ERA_RANGES]
    predicted_era = era_names[predicted_class]

    return predicted_era, confidence, avg_pred[0]


ensemble_models = [resnet_model, efficientnet_model]
print("\nEnsemble model ready for predictions!")


In [None]:
print("\nTesting ensemble prediction on sample images...")
sample_indices = np.random.choice(len(final_test_df), size=5, replace=False)
for idx in sample_indices:
    image_path = final_test_df.iloc[idx]["filepath"]
    true_era_idx = int(final_test_df.iloc[idx]["era_index"])
    true_era = ERA_RANGES[true_era_idx][2]

    predicted_era, confidence, _ = ensemble_predict(ensemble_models, image_path)

    print(f"Image: {os.path.basename(image_path)}")
    print(f"True era: {true_era}")
    print(f"Predicted era: {predicted_era} (Confidence: {confidence:.2f}%)")
    print("---")

print("\nTraining and evaluation complete!")