In [None]:
!pip install tqdm


In [None]:
import os
import sys
import math
import json
import shutil
import pathlib
import numpy as np
import seaborn as sns
import matplotlib.pyplot as plt

import tensorflow as tf
from tqdm import tqdm, tqdm_notebook

sns.set()
%matplotlib inline

In [None]:
!pip install tf-keras-vis

### Build Dataset

In [None]:
!unzip "/content/drive/MyDrive/anupam/datasets/building_age_network/images/cleaned/BuildingAgeSplit_5CLASS.zip" -d /home/

In [None]:
!rm -rf /home/*
!cp -r "/content/drive/My Drive/anupam/datasets/building_age_network/images/cleaned/BuildingAgeSplit_950.zip" /home/.
!unzip -o '/home/BuildingAgeSplit_950.zip' -d '/'

In [None]:
!find /home/BuildingAgeSplit_950/ -name "*.jpg" | wc -l

In [None]:
# !rm -rf /home/home/BuildingAgeSplit_5CLASS/

In [None]:
!find /home/home/BuildingAgeSplit_5CLASS -name "*.jpg" | wc -l

In [None]:
# Dataset split ratios
np.random.seed(42)
VAL_PERC = 0.125
TEST_PERC = 0.125
TRAIN_PERC = 0.75

# Split each folder into train, val and test sets
base = pathlib.Path("/home/home/BuildingAgeSplit_5CLASS")
base_dirs = [x for x in base.iterdir() if x.is_dir()]
labels = [x.stem for x in base.iterdir() if x.is_dir()]
base_dirs, labels

In [None]:
# Create directory structure
dirs = ['train', 'val', 'test']
print(dirs)
CLASS_LABELS = np.unique(labels)

for dirname in dirs:
  cur_dir = base / dirname
  # If old data exists, delete it and create a new blank directory
  if cur_dir.is_dir():
    shutil.rmtree(cur_dir)
    print("DIR DELETED:", cur_dir)
  os.makedirs(cur_dir)
  for label in CLASS_LABELS:
    os.makedirs(cur_dir / label)
    print("DIR MADE FOR", cur_dir / label)

In [None]:
# Create data splits and move files
# Pick first 782 images
for dirname in base_dirs:
  images = [x for x in dirname.iterdir() if x.suffix == ".jpg"]
  labels = len(images) * [dirname.stem]
  images, labels = np.array(images[:615]), np.array(labels[:615])
  # Calculate dataset size for training, validation and test set
  train_size = int(len(images) * TRAIN_PERC)
  val_size = int(len(images) * VAL_PERC)
  test_size = int(len(images) * TEST_PERC)
  print("[INFO] Split size for: ", dirname, ":", train_size, val_size, test_size, 
        (train_size + val_size + test_size))

  # Create a random split of files
  indices = np.arange(len(labels))
  np.random.shuffle(indices)
  images_train, labels_train = images[indices[:train_size]], labels[indices[:train_size]]
  images_val, labels_val = images[indices[train_size:train_size + val_size]], labels[indices[train_size:train_size + val_size]]
  images_test, labels_test = images[indices[train_size + val_size:]], labels[indices[train_size + val_size:]]

  # Copy files for each directory
  # Train Set
  for image, label in zip(images_train, labels_train):
    shutil.copyfile(image, base / "train" / label / image.name)
  print("TRAIN : {}, {}".format(image, label))

  # Val Set
  for image, label in zip(images_val, labels_val):
    shutil.copyfile(image, base / "val" / label / image.name)
  print("VAL : {}, {}".format(image, label))

  # Test Set
  for image, label in zip(images_test, labels_test):
    shutil.copyfile(image, base / "test" / label / image.name)
  print("TEST : {}, {}".format(image, label))

In [None]:
# Cleanup old directories
for dirname in base_dirs:
  # If old data exists, delete it
  if dirname.is_dir():
    shutil.rmtree(dirname)
    print("DIR DELETED:", dirname)

In [None]:
!find /home/home/BuildingAgeSplit_4CLASS -name "*.jpg" | wc -l

In [None]:
# !zip -r /home/BAv5.zip /home/BAv5/
# !cp -r /home/BAv5.zip "/content/drive/My Drive/backup/home/private/datasets/building_age/"

### Extract Dataset

In [None]:
!cp -r "/content/drive/My Drive/backup/home/private/datasets/building_age/BAv5.zip" /home/
!unzip -o "/home/BAv5.zip" /home/

In [None]:
!ls /home/
!find /home/home/BAv5/ -name "*.jpg" | wc -l

### Build **NW**

In [None]:
# Imports
import os
import time
import logging
import pathlib
import numpy as np
import tensorflow as tf
import matplotlib.pyplot as plt


class BuildingAgeNetwork:
    def __init__(self, network, ds_num):
        # Define logger.
        self.logger = logging.getLogger(__name__)
        self.logger.setLevel(logging.INFO)
        # self.logger = config.start_logging_to_stdout(self.logger)
        # self.logger = config.start_logging_to_file(self.logger)

        # Define EfficientNet options.
        self.image_res_list = [224, 240, 260, 300, 380, 456, 528, 600]
        self.network_list = [
            "EfficientNetB0",
            "EfficientNetB1",
            "EfficientNetB2",
            "EfficientNetB3",
            "EfficientNetB4",
            "EfficientNetB5",
            "EfficientNetB6",
            "EfficientNetB7",
        ]
        self.levels_list = [
            ["top"],
            ["top", "block7"],
            ["top", "block7", "block6"],
            ["top", "block7", "block6", "block5"],
            ["top", "block7", "block6", "block5", "block4"],
        ]

        # Load config/environment variables.
        # self.ds_path = pathlib.Path(os.getenv("DATASET_PATH"))
        # self.num_classes = os.getenv("DATASET_NUM_CLASSES")
        self.ds_path = pathlib.Path("./data")
        timestr = time.strftime("%Y%m%d")
        self.ckpt_path = pathlib.Path("./model_runs") / timestr
        self.ckpt_path.mkdir(parents=True, exist_ok=True)
        label_list = [["0","1","2","3","4"]]
        self.ds_labels = label_list[ds_num]
        self.num_classes = len(self.ds_labels)

        # Define network.
        # network_index = int(os.getenv("NETWORK_INDEX"))
        self.model_name = self.network_list[network]
        self.IMG_HEIGHT = self.IMG_WIDTH = self.image_res_list[network]

        # Load default hyperparameters.
        self.BATCH_SIZE = 32
        self.NUM_EPOCHS = 50
        # self.LEARNING_RATE = 9.359659238480389e-05 # 4C
        # self.LEARNING_RATE = 5.488401558788082e-05 # 3C
        self.LEARNING_RATE = 6.523389943576093e-05 # 5C
        self.DROPOUT_RATE = 0.35
        self.NUM_CHANNELS = 3
        self.INPUT_SHAPE = (self.IMG_HEIGHT, self.IMG_WIDTH, self.NUM_CHANNELS)

        # model placeholder.
        self.model = None

    def make_datasets(self, type):
        print(self.ds_path / type)
        dataset = tf.keras.preprocessing.image_dataset_from_directory(
            directory=(self.ds_path / type),
            labels="inferred",
            label_mode="categorical",
            batch_size=self.BATCH_SIZE,
            image_size=(self.IMG_HEIGHT, self.IMG_WIDTH),
            shuffle=True,
        )
        dataset.cache().prefetch(tf.data.experimental.AUTOTUNE)
        return dataset

    def get_aug(self):
        # Define Image Augmentation Strategies
        img_augmentation = tf.keras.models.Sequential(
            [
                tf.keras.layers.experimental.preprocessing.RandomRotation(factor=0.05),
                tf.keras.layers.experimental.preprocessing.RandomFlip(
                    mode="horizontal"
                ),
            ],
            name="img_augmentation",
        )
        print("Image Augmentation strategy defined.")
        return img_augmentation

    def build_and_compile_model(self, ft=True, levels=1):
        # Define model inputs.
        inputs = tf.keras.layers.Input(self.INPUT_SHAPE)
        # Apply data augmentation to inputs.
        x = self.get_aug()(inputs)

        # Load base model from Keras Applications.
        base_model = getattr(tf.keras.applications, self.model_name)(
            include_top=False, input_tensor=x, weights="imagenet"
        )
        # Freeze base model.
        base_model.trainable = False

        # Unfreeze parts of model.
        if ft:
            # Set trainable layers, but keep batch norm layers frozen.
            for layer in base_model.layers:
                if all(
                    lname not in layer.name for lname in self.levels_list[levels]
                ) or isinstance(layer, tf.keras.layers.BatchNormalization):
                    layer.trainable = False
                else:
                    layer.trainable = True

        # Rebuild classifier.
        x = tf.keras.layers.GlobalAveragePooling2D(name="tl_avgpool")(base_model.output)
        x = tf.keras.layers.Dropout(self.DROPOUT_RATE, name="tl_dropout")(x)
        outputs = tf.keras.layers.Dense(
            self.num_classes, activation="softmax", name="tl_pred"
        )(x)

        # Compile model.
        model = tf.keras.Model(inputs, outputs, name=self.model_name)
        optimizer = tf.keras.optimizers.Adam(learning_rate=self.LEARNING_RATE)
        model.compile(
            optimizer=optimizer, loss="categorical_crossentropy", metrics=["accuracy"]
        )
        print("Model {} successfully compiled.".format(self.model_name))
        self.model = model

    def show_trainable_layers(self):
        print("Trainable layers for model {} are:".format(self.model_name))
        for layer in self.model.layers:
            if layer.trainable:
                print(
                    "LAYER: {}, TRAINABLE: {}, I/P SHAPE: {}, O/P SHAPE: {}".format(
                        layer.name,
                        layer.trainable,
                        layer.input_shape,
                        layer.output_shape,
                    )
                )

    def train_model(self, train_ds, val_ds, levels):
        checkpoint_name = (
            "building_age."
            + self.model_name
            + "-LEV-"
            + str(levels)
            + ".weights.{epoch:02d}-{val_loss:.2f}-{val_accuracy:.2f}.hdf5"
        )
        model_checkpoint_callback = tf.keras.callbacks.ModelCheckpoint(
            filepath=self.ckpt_path / checkpoint_name,
            save_weights_only=True,
            monitor="val_accuracy",
            mode="max",
            save_best_only=True,
        )
        early_stopping_callback = tf.keras.callbacks.EarlyStopping(
              # Stop training when `val_loss` is no longer improving
              monitor="val_loss",
              # "no longer improving" being defined as "no better than 1e-2 less"
              min_delta=1e-3,
              # "no longer improving" being further defined as "for at least 2 epochs"
              patience=3,
              verbose=1,
        )
        callbacks = [model_checkpoint_callback, early_stopping_callback]

        print(
            "Starting model training for {} epochs.".format(self.NUM_EPOCHS)
        )
        history = self.model.fit(
            train_ds,
            epochs=self.NUM_EPOCHS,
            callbacks=[model_checkpoint_callback],
            validation_data=val_ds,
            verbose=1,
        )
        print("Model training finished.")
        return history

    def save_hist_plot(self, hist, metric):
        timestr = time.strftime("%Y%m%d-%H%M%S")
        plot_name = (
            "building_age." + self.model_name + "." + metric + "." + timestr + ".png"
        )
        plt.plot(hist.history[metric])
        plt.plot(hist.history["val_" + metric])
        plt.title("model " + metric)
        plt.ylabel(metric)
        plt.xlabel("epoch")
        plt.legend(["train", "validation"], loc="upper left")
        plt.savefig(self.ckpt_path / plot_name)
        print("Model plot for {} saved.".format(metric))
        plt.show()

    def evaluate_model(self, test_ds):
        print("Starting model evaluation on test set.")
        result = self.model.evaluate(test_ds)
        print(dict(zip(self.model.metrics_names, result)))

        y_true = list()
        y_pred = list()
        for i, l in test_ds.unbatch():
            y_pred.append(self.model.predict(np.expand_dims(i, axis=0)))
            y_true.append(l.numpy())

        for i in range(len(self.ds_labels)):
            self.show_per_class_accuracy(y_true, y_pred, i)

    def show_per_class_accuracy(self, y_true, y_pred, class_num):
        cnt = 0
        for yt, yp in zip(y_true, y_pred):
            yt, yp = np.argmax(yt), np.argmax(yp)
            if yt == class_num == yp:
                cnt += 1
        print(
            "Accuracy for class {} is {}%".format(class_num, (cnt / len(y_true)) * 100 * self.num_classes)
        )

### Train and Eval Network

In [None]:
!ls /home/

In [None]:
network = 3
levels = 4
ds_num = 0

# Make in memory datasets from images to train the network.
building_age_network = BuildingAgeNetwork(network, ds_num)
train_ds = building_age_network.make_datasets("train")
val_ds = building_age_network.make_datasets("val")
test_ds = building_age_network.make_datasets("test")

In [None]:
# Build Building Age model
building_age_network.build_and_compile_model(True, levels)
# Check trainable layers.
building_age_network.show_trainable_layers()

In [None]:
# Train model.
history = building_age_network.train_model(train_ds, val_ds, levels)
# Save Accuracy Plots.
building_age_network.save_hist_plot(history, "accuracy")
building_age_network.save_hist_plot(history, "loss")

In [None]:
building_age_network.model.load_weights('./model_runs/20210616/house_detector.EfficientNetB3-LEV-4.weights.44-0.85-0.55.hdf5')

In [None]:
with open("/content/age_b1_0.json", "a") as f:
    json.dump(history.history, f)

In [None]:
# Evaluate model.
building_age_network.evaluate_model(test_ds)

Y_pred

In [None]:
test_ds.class_names

In [None]:
import seaborn as sns
from sklearn.metrics import classification_report, confusion_matrix

In [None]:
def get_test_ds(IMG_HEIGHT, IMG_WIDTH):
    dataset = tf.keras.preprocessing.image_dataset_from_directory(
                directory=(pathlib.Path("./data") / "test"),
                labels="inferred",
                label_mode="categorical",
                batch_size=32,
                image_size=(IMG_HEIGHT, IMG_WIDTH),
                shuffle=False,
    )
    dataset.cache().prefetch(tf.data.experimental.AUTOTUNE)
    return dataset
test_ds = get_test_ds(300, 300)
y_true = np.concatenate([y for x, y in test_ds], axis=0)
y_true = np.argmax(y_true, axis=1)
y_true.shape

In [None]:
Y_pred = building_age_network.model.predict(test_ds)

In [None]:
Y_pred

In [None]:
y_pred = np.argmax(Y_pred, axis=1)
print('Confusion Matrix')
print(confusion_matrix(y_true, y_pred))
cmat = confusion_matrix(y_true, y_pred)
sns.set_style("whitegrid")
sns.set_palette("muted")
sns.heatmap(cmat, annot=True)
print('Classification Report')
target_names = test_ds.class_names
print(classification_report(y_true, y_pred, target_names=target_names))

In [None]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
%matplotlib inline
import seaborn as sns
from sklearn.metrics import confusion_matrix

def plot_cm(y_true, y_pred, class_names, fname, figsize=(5, 5)):
    sns.set_style("whitegrid")
    sns.set_palette("muted")
    y_true = [class_names[y_true[i]] for i in range(len(y_true))]
    y_pred = [class_names[y_pred[i]] for i in range(len(y_pred))]
    cm = confusion_matrix(y_true, y_pred, labels=np.unique(y_true))
    cm_sum = np.sum(cm, axis=1, keepdims=True)
    cm_perc = cm / cm_sum.astype(float) * 100
    annot = np.empty_like(cm).astype(str)
    nrows, ncols = cm.shape
    for i in range(nrows):
        for j in range(ncols):
            c = cm[i, j]
            p = cm_perc[i, j]
            if i == j:
                s = cm_sum[i]
                annot[i, j] = '%.1f%%\n%d/%d' % (p, c, s)
            elif c == 0:
                annot[i, j] = ''
            else:
                annot[i, j] = '%.1f%%\n%d' % (p, c)
    cm = pd.DataFrame(cm, index=np.unique(y_true), columns=np.unique(y_true))
    cm.index.name = 'Actual'
    cm.columns.name = 'Predicted'
    fig, ax = plt.subplots(figsize=figsize)
    fig.suptitle("Confusion Matrix")
    sns.heatmap(cm, cmap="coolwarm", annot=annot, fmt='', ax=ax)
    plt.xticks(rotation=90)
    plt.yticks(rotation=0)
    plt.savefig(fname, bbox_inches="tight")
    
plot_cm(y_true, y_pred, test_ds.class_names, "./content/class_hmap.png")

In [None]:
from sklearn.metrics import classification_report, confusion_matrix

### 3 Class HP Run

In [None]:
!ls /home/

In [None]:
network = 1
levels = 4
ds_num = 0

# Make in memory datasets from images to train the network.
building_age_network = BuildingAgeNetwork(network, ds_num)
train_ds = building_age_network.make_datasets("train")
val_ds = building_age_network.make_datasets("val")
test_ds = building_age_network.make_datasets("test")

In [None]:
# Build Building Age model
building_age_network.build_and_compile_model(True, levels)
# Check trainable layers.
building_age_network.show_trainable_layers()

In [None]:
# Train model.
history = building_age_network.train_model(train_ds, val_ds, levels)
# Save Accuracy Plots.
building_age_network.save_hist_plot(history, "accuracy")
building_age_network.save_hist_plot(history, "loss")

In [None]:
with open("/content/age_b1_3.json", "a") as f:
    json.dump(history.history, f)

In [None]:
# Evaluate model.
building_age_network.evaluate_model(test_ds)

Y_pred

In [None]:
test_ds.class_names

In [None]:
import seaborn as sns
from sklearn.metrics import classification_report, confusion_matrix

In [None]:
def get_test_ds(IMG_HEIGHT, IMG_WIDTH):
    dataset = tf.keras.preprocessing.image_dataset_from_directory(
                directory=(pathlib.Path("/home/BAv6/") / "test"),
                labels="inferred",
                label_mode="categorical",
                batch_size=32,
                image_size=(IMG_HEIGHT, IMG_WIDTH),
                shuffle=False,
    )
    dataset.cache().prefetch(tf.data.experimental.AUTOTUNE)
    return dataset
test_ds = get_test_ds(240, 240)
y_true = np.concatenate([y for x, y in test_ds], axis=0)
y_true = np.argmax(y_true, axis=1)
y_true

In [None]:
Y_pred = building_age_network.model.predict(test_ds)

In [None]:
y_pred = np.argmax(Y_pred, axis=1)
print('Confusion Matrix')
print(confusion_matrix(y_true, y_pred))
cmat = confusion_matrix(y_true, y_pred)
sns.set_style("whitegrid")
sns.set_palette("muted")
sns.heatmap(cmat, annot=True)
print('Classification Report')
target_names = test_ds.class_names
print(classification_report(y_true, y_pred, target_names=target_names))

In [None]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
%matplotlib inline
import seaborn as sns
from sklearn.metrics import confusion_matrix

def plot_cm(y_true, y_pred, class_names, fname, figsize=(5, 5)):
    sns.set_style("whitegrid")
    sns.set_palette("muted")
    y_true = [class_names[y_true[i]] for i in range(len(y_true))]
    y_pred = [class_names[y_pred[i]] for i in range(len(y_pred))]
    cm = confusion_matrix(y_true, y_pred, labels=np.unique(y_true))
    cm_sum = np.sum(cm, axis=1, keepdims=True)
    cm_perc = cm / cm_sum.astype(float) * 100
    annot = np.empty_like(cm).astype(str)
    nrows, ncols = cm.shape
    for i in range(nrows):
        for j in range(ncols):
            c = cm[i, j]
            p = cm_perc[i, j]
            if i == j:
                s = cm_sum[i]
                annot[i, j] = '%.1f%%\n%d/%d' % (p, c, s)
            elif c == 0:
                annot[i, j] = ''
            else:
                annot[i, j] = '%.1f%%\n%d' % (p, c)
    cm = pd.DataFrame(cm, index=np.unique(y_true), columns=np.unique(y_true))
    cm.index.name = 'Actual'
    cm.columns.name = 'Predicted'
    fig, ax = plt.subplots(figsize=figsize)
    fig.suptitle("Confusion Matrix")
    sns.heatmap(cm, cmap="coolwarm", annot=annot, fmt='', ax=ax)
    plt.xticks(rotation=90)
    plt.yticks(rotation=0)
    plt.savefig(fname, bbox_inches="tight")
    
plot_cm(y_true, y_pred, test_ds.class_names, "/content/age_b1_3_hmap.png")

In [None]:
from sklearn.metrics import classification_report, confusion_matrix

### Evaluate Model Weights

In [None]:
class TestENet:
  def __init__(self, d_path, w_path, ds_num):
    # Define EfficientNet options.
    self.image_res_list = [224, 240, 260, 300, 380, 456, 528, 600]
    self.network_list = [
        "EfficientNetB0",
        "EfficientNetB1",
        "EfficientNetB2",
        "EfficientNetB3",
        "EfficientNetB4",
        "EfficientNetB5",
        "EfficientNetB6",
        "EfficientNetB7",
    ]
    self.levels_list = [
      ["top"],
      ["top", "block7"],
      ["top", "block7", "block6"],
      ["top", "block7", "block6", "block5"],
      ["top", "block7", "block6", "block5", "block4"],
    ]
    self.model = None
    model_idx = int(w_path.name.split("EfficientNetB")[1][0])
    lev_idx = int(w_path.name.split("-LEV-")[1][0])

    # Define network and load default hyperparameters.
    self.model_name = self.network_list[model_idx]
    self.BATCH_SIZE = 32
    self.NUM_EPOCHS = 50
    self.LEARNING_RATE = 1e-4
    self.DROPOUT_RATE = 0.25
    self.NUM_CHANNELS = 3
    self.IMG_HEIGHT = self.IMG_WIDTH = self.image_res_list[model_idx]
    self.INPUT_SHAPE = (self.IMG_HEIGHT, self.IMG_WIDTH, self.NUM_CHANNELS)

    # self.ds_path = pathlib.Path("/home/BuildingAgeSplit_750")
    # timestr = time.strftime("%Y%m%d")
    # self.ckpt_path = pathlib.Path("/content/drive/My Drive/anupam/model_runs/bage_runs/class_exp") / timestr
    # self.ckpt_path.mkdir(parents=True, exist_ok=True)
    label_list = [
                  ["1850 - 1978", "1979 - 2001", "2002 - 2020"],
                  ['1850-1955', '1955-1990', '1990-2005', '2005-2020'],
                  ['1850-1935', '1935-1980', '1980-1995', '1995-2010', '2010-2020'],
                  ['1850-1930', '1930-1975', '1975-1992', '1992-2000', '2000-2015', '2015-2020'],
                  ['1850-1920', '1920-1960', '1960-1980', '1980-1992', '1992-1999', '1999-2015', '2015-2020']]
    self.ds_labels = label_list[ds_num]
    self.num_classes = len(self.ds_labels)

    self.run_eval(lev_idx, d_path, w_path)
  
  def run_eval(self, lev_idx, d_path, w_path):
    # Build and Compile Model
    self.build_and_compile_model(levels = lev_idx)
    self.model.load_weights(w_path)

    # Make test dataset
    test_ds = self.make_datasets(d_path, "test")

    # Eval model on test set
    r, pc = self.evaluate_model(test_ds)

    epo = int(w_path.name.split("-")[-2].split(".")[-1])
    valloss = float(w_path.name.split("-")[-1][:-5])

    fields = [self.model_name, lev_idx, epo, valloss, r[0], r[1], pc[0], pc[1], pc[2]]
    print("---".join([str(elem) for elem in fields]))
    
    with open(pathlib.Path("/content/drive/My Drive/anupam/model_runs/bage_runs/class_exp") / "eNet_class_summary.csv", 'a') as f:
        writer = csv.writer(f)
        writer.writerow(fields)

  
  def get_aug(self):
    # Define Image Augmentation Strategies
    img_augmentation = tf.keras.models.Sequential(
        [
            tf.keras.layers.experimental.preprocessing.RandomRotation(factor=0.05),
            tf.keras.layers.experimental.preprocessing.RandomFlip(
                mode="horizontal"
            ),
        ],
        name="img_augmentation",
    )
    # print("Image Augmentation strategy defined.")
    return img_augmentation

  def build_and_compile_model(self, ft=True, levels=1):
    # Define model inputs.
    inputs = tf.keras.layers.Input(self.INPUT_SHAPE)
    # Apply data augmentation to inputs.
    x = self.get_aug()(inputs)

    # Load base model from Keras Applications.
    base_model = getattr(tf.keras.applications, self.model_name)(
        include_top=False, input_tensor=x, weights="imagenet"
    )
    # Freeze base model.
    base_model.trainable = False

    # Unfreeze parts of model.
    if ft:
        # Set trainable layers, but keep batch norm layers frozen.
        for layer in base_model.layers:
            if all(
                lname not in layer.name for lname in self.levels_list[levels]
            ) or isinstance(layer, tf.keras.layers.BatchNormalization):
                layer.trainable = False
            else:
                layer.trainable = True

    # Rebuild classifier.
    x = tf.keras.layers.GlobalAveragePooling2D(name="tl_avgpool")(base_model.output)
    x = tf.keras.layers.Dropout(self.DROPOUT_RATE, name="tl_dropout")(x)
    outputs = tf.keras.layers.Dense(
        self.num_classes, activation="softmax", name="tl_pred"
    )(x)

    # Compile model.
    model = tf.keras.Model(inputs, outputs, name=self.model_name)
    optimizer = tf.keras.optimizers.Adam(learning_rate=self.LEARNING_RATE)
    model.compile(
        optimizer=optimizer, loss="categorical_crossentropy", metrics=["accuracy"]
    )
    # print("Model {} successfully compiled.".format(self.model_name))
    self.model = model
  
  def make_datasets(self, d_path, d_type):
    dataset = tf.keras.preprocessing.image_dataset_from_directory(
        directory=(d_path / d_type),
        labels="inferred",
        label_mode="categorical",
        batch_size=self.BATCH_SIZE,
        image_size=(self.IMG_HEIGHT, self.IMG_WIDTH),
        shuffle=True,
    )
    dataset.cache().prefetch(tf.data.experimental.AUTOTUNE)
    return dataset

  def evaluate_model(self, test_ds):
    # print("Starting model evaluation on test set.")
    result = self.model.evaluate(test_ds)
    # print(dict(zip(self.model.metrics_names, result)))
    # print(result)

    per_class = list()
    y_true = list()
    y_pred = list()
    for i, l in test_ds.unbatch():
      y_pred.append(self.model.predict(np.expand_dims(i, axis=0)))
      y_true.append(l.numpy())

    for i in range(len(self.ds_labels)):
      per_class.append(self.show_per_class_accuracy(y_true, y_pred, i, self.ds_labels[i]))
    
    return result, per_class

  def show_per_class_accuracy(self, y_true, y_pred, class_num, class_lab):
    cnt = 0
    for yt, yp in zip(y_true, y_pred):
      yt, yp = np.argmax(yt), np.argmax(yp)
      if yt == class_num == yp:
        cnt += 1
    accu = (cnt / len(y_true)) * 100 * self.num_classes
    return accu
    # print(
    #   "Accuracy for class {} is {}%".format(class_lab, (cnt / len(y_true)) * 100 * self.num_classes)
    # )

In [None]:
network = 2
levels = 2
ds_num = 1
weight_list = sorted(pathlib.Path("/content/drive/My Drive/anupam/model_runs/bage_runs/class_exp").rglob("*.hdf5"))
d_path = pathlib.Path("/home/BuildingAgeSplit_950")
weight_list

In [None]:
for i, w_path in enumerate(weight_list):
  print(i, d_path, w_path)
  # tenet = TestENet(d_path, w_path, ds_num)

### 5 Class HP Run

In [None]:
!ls /home/

In [None]:
network = 1
levels = 3
ds_num = 2

# Make in memory datasets from images to train the network.
building_age_network = BuildingAgeNetwork(network, ds_num)
train_ds = building_age_network.make_datasets("train")
val_ds = building_age_network.make_datasets("val")
test_ds = building_age_network.make_datasets("test")

In [None]:
# Build Building Age model
building_age_network.build_and_compile_model(True, levels)
# Check trainable layers.
building_age_network.show_trainable_layers()

In [None]:
# Train model.
history = building_age_network.train_model(train_ds, val_ds, levels)
# Save Accuracy Plots.
building_age_network.save_hist_plot(history, "accuracy")
building_age_network.save_hist_plot(history, "loss")

In [None]:
with open("/content/age_b1_5.json", "a") as f:
    json.dump(history.history, f)

In [None]:
# Evaluate model.
building_age_network.evaluate_model(test_ds)

Y_pred

In [None]:
test_ds.class_names

In [None]:
import seaborn as sns
from sklearn.metrics import classification_report, confusion_matrix

In [None]:
def get_test_ds(IMG_HEIGHT, IMG_WIDTH):
    dataset = tf.keras.preprocessing.image_dataset_from_directory(
                directory=(pathlib.Path("/home/home/BuildingAgeSplit_5CLASS") / "test"),
                labels="inferred",
                label_mode="categorical",
                batch_size=32,
                image_size=(IMG_HEIGHT, IMG_WIDTH),
                shuffle=False,
    )
    dataset.cache().prefetch(tf.data.experimental.AUTOTUNE)
    return dataset
test_ds = get_test_ds(240, 240)
y_true = np.concatenate([y for x, y in test_ds], axis=0)
y_true = np.argmax(y_true, axis=1)
y_true

In [None]:
Y_pred = building_age_network.model.predict(test_ds)

In [None]:
Y_pred = building_age_network.model.predict(test_ds)
y_pred = np.argmax(Y_pred, axis=1)
print('Confusion Matrix')
print(confusion_matrix(y_true, y_pred))
cmat = confusion_matrix(y_true, y_pred)
sns.set_style("whitegrid")
sns.set_palette("muted")
sns.heatmap(cmat, annot=True)
print('Classification Report')
target_names = test_ds.class_names
print(classification_report(y_true, y_pred, target_names=target_names))

In [None]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
%matplotlib inline
import seaborn as sns
from sklearn.metrics import confusion_matrix

def plot_cm(y_true, y_pred, class_names, fname, figsize=(5, 5)):
    sns.set_style("whitegrid")
    sns.set_palette("muted")
    y_true = [class_names[y_true[i]] for i in range(len(y_true))]
    y_pred = [class_names[y_pred[i]] for i in range(len(y_pred))]
    cm = confusion_matrix(y_true, y_pred, labels=np.unique(y_true))
    cm_sum = np.sum(cm, axis=1, keepdims=True)
    cm_perc = cm / cm_sum.astype(float) * 100
    annot = np.empty_like(cm).astype(str)
    nrows, ncols = cm.shape
    for i in range(nrows):
        for j in range(ncols):
            c = cm[i, j]
            p = cm_perc[i, j]
            if i == j:
                s = cm_sum[i]
                annot[i, j] = '%.1f%%\n%d/%d' % (p, c, s)
            elif c == 0:
                annot[i, j] = ''
            else:
                annot[i, j] = '%.1f%%\n%d' % (p, c)
    cm = pd.DataFrame(cm, index=np.unique(y_true), columns=np.unique(y_true))
    cm.index.name = 'Actual'
    cm.columns.name = 'Predicted'
    fig, ax = plt.subplots(figsize=figsize)
    fig.suptitle("Confusion Matrix")
    sns.heatmap(cm, cmap="coolwarm", annot=annot, fmt='', ax=ax)
    plt.xticks(rotation=90)
    plt.yticks(rotation=0)
    plt.savefig(fname, bbox_inches="tight")
    
plot_cm(y_true, y_pred, test_ds.class_names, "/content/age_b1_5_hmap.png")

In [None]:
from sklearn.metrics import classification_report, confusion_matrix

### KT

In [None]:
!pip install keras-tuner
import kerastuner as kt
from kerastuner import HyperModel
from kerastuner.tuners import RandomSearch

In [None]:
class BuildingAgeHyperModel4C(HyperModel):
    def __init__(self, network_idx):
        # Define EfficientNet options.
        self.image_res_list = [224, 240, 260, 300, 380, 456, 528, 600]
        self.network_list = [
            "EfficientNetB0",
            "EfficientNetB1",
            "EfficientNetB2",
            "EfficientNetB3",
            "EfficientNetB4",
            "EfficientNetB5",
            "EfficientNetB6",
            "EfficientNetB7",
        ]
        self.levels_list = [
            ["top"],
            ["top", "block7"],
            ["top", "block7", "block6"],
            ["top", "block7", "block6", "block5"],
            ["top", "block7", "block6", "block5", "block4"],
        ]
        self.ds_labels = ['0', '1', '2', '3', '4']
        self.num_classes = len(self.ds_labels)
        
        # Input shape.
        self.NUM_CHANNELS = 3
        self.network_idx = network_idx
    
    def get_aug(self):
        # Define Image Augmentation Strategies
        img_augmentation = tf.keras.models.Sequential(
            [
                tf.keras.layers.experimental.preprocessing.RandomRotation(factor=0.05),
                tf.keras.layers.experimental.preprocessing.RandomFlip(
                    mode="horizontal"
                ),
            ],
            name="img_augmentation",
        )
        # print("Image Augmentation strategy defined.")
        return img_augmentation
    
    def build(self, hp):
        # Define hp
        network_idx = self.network_idx
        levels_idx = hp.Int('levels_idx', min_value=0, max_value=4, step=1)
        tl_dropout = hp.Float('tl_dropout', min_value=0.0, max_value=0.5, 
                              default=0.25, step=0.05)
        learning_rate = hp.Float('learning_rate', min_value=1e-5, max_value=1e-2,
                                 sampling='LOG',
                                 default=1e-3)
        print("MODEL HP: ", network_idx, levels_idx, tl_dropout, learning_rate)

        # Define network.
        self.model_name = self.network_list[network_idx]
        self.IMG_HEIGHT = self.IMG_WIDTH = self.image_res_list[network_idx]
        self.INPUT_SHAPE = (self.IMG_HEIGHT, self.IMG_WIDTH, self.NUM_CHANNELS)

        # Define model inputs.
        inputs = tf.keras.layers.Input(self.INPUT_SHAPE)
        
        # Apply data augmentation to inputs.
        x = self.get_aug()(inputs)

        # Load base model from Keras Applications.
        base_model = getattr(tf.keras.applications, self.model_name)(
            include_top=False, input_tensor=x, weights="imagenet"
        )

        # Freeze base model.
        base_model.trainable = False

        # Set trainable layers, but keep batch norm layers frozen.
        for layer in base_model.layers:
            if all(
                lname not in layer.name for lname in self.levels_list[levels_idx]
            ) or isinstance(layer, tf.keras.layers.BatchNormalization):
                layer.trainable = False
            else:
                layer.trainable = True

        # Rebuild classifier.
        x = tf.keras.layers.GlobalAveragePooling2D(name="tl_avgpool")(base_model.output)
        x = tf.keras.layers.Dropout(tl_dropout, name="tl_dropout")(x)
        outputs = tf.keras.layers.Dense(
            self.num_classes, activation="softmax", name="tl_pred"
        )(x)

        # Compile model.
        model = tf.keras.Model(inputs, outputs, name=self.model_name)
        optimizer = tf.keras.optimizers.Adam(learning_rate=learning_rate)
        model.compile(
            optimizer=optimizer, loss="categorical_crossentropy", metrics=["accuracy"]
        )
        return model

In [None]:
def make_dataset(ds_path, type, img_size):
    print(ds_path / type)
    dataset = tf.keras.preprocessing.image_dataset_from_directory(
        directory=(ds_path / type),
        labels="inferred",
        label_mode="categorical",
        shuffle=True,
        image_size=img_size,
    )
    dataset.cache().prefetch(tf.data.experimental.AUTOTUNE)
    return dataset

In [None]:
class MyRSTuner(kt.tuners.RandomSearch):
    def run_trial(self, trial, *args, **kwargs):
        # print(args)
        # print(kwargs)
        hp = trial.hyperparameters
        # kwargs['batch_size'] = hp.Int('batch_size', 16, 64, step=8, default=32)
        # kwargs['x'] = kwargs['x'].batch(kwargs['batch_size'])
        # kwargs['validation_data'] = kwargs['validation_data'].batch(kwargs['batch_size'])
        super(MyRSTuner, self).run_trial(trial, *args, **kwargs)

In [None]:
# B0
SEED = 42
MAX_TRIALS = 20
EXECUTION_PER_TRIAL = 3
nw_idx = 0
hypermodel = BuildingAgeHyperModel4C(nw_idx)
img_size = hypermodel.image_res_list[nw_idx]
train_ds = make_dataset(pathlib.Path("./data/"), "train", (img_size, img_size))
val_ds = make_dataset(pathlib.Path("./data"), "val", (img_size, img_size))

tuner = MyRSTuner(
    hypermodel,
    objective='val_accuracy',
    seed=SEED,
    max_trials=MAX_TRIALS,
    executions_per_trial=EXECUTION_PER_TRIAL,
    directory='random_search',
    project_name='building_age_4C_B0')

In [None]:
tuner.search(x=train_ds, validation_data=val_ds, 
             epochs=25, batch_size=32, 
             callbacks=[tf.keras.callbacks.EarlyStopping('val_loss', patience=3)])

In [None]:
x = tuner.results_summary()

In [None]:
tuner.oracle.get_best_trials()

### Best Model HeatMaps and Saliency Maps

In [None]:
def get_test_ds(ds_path, IMG_HEIGHT, IMG_WIDTH):
    dataset = tf.keras.preprocessing.image_dataset_from_directory(
                directory=(pathlib.Path(ds_path) / "test"),
                labels="inferred",
                label_mode="categorical",
                batch_size=32,
                image_size=(IMG_HEIGHT, IMG_WIDTH),
                shuffle=False,
    )
    dataset.cache().prefetch(tf.data.experimental.AUTOTUNE)
    return dataset

In [None]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
%matplotlib inline
import seaborn as sns
from sklearn.metrics import confusion_matrix

def plot_cm(y_true, y_pred, class_names, fname, figsize=(5, 5)):
    sns.set_style("whitegrid")
    sns.set_palette("muted")
    y_true = [class_names[y_true[i]] for i in range(len(y_true))]
    y_pred = [class_names[y_pred[i]] for i in range(len(y_pred))]
    cm = confusion_matrix(y_true, y_pred, labels=np.unique(y_true))
    cm_sum = np.sum(cm, axis=1, keepdims=True)
    cm_perc = cm / cm_sum.astype(float) * 100
    annot = np.empty_like(cm).astype(str)
    nrows, ncols = cm.shape
    for i in range(nrows):
        for j in range(ncols):
            c = cm[i, j]
            p = cm_perc[i, j]
            if i == j:
                s = cm_sum[i]
                annot[i, j] = '%.1f%%\n%d/%d' % (p, c, s)
            elif c == 0:
                annot[i, j] = ''
            else:
                annot[i, j] = '%.1f%%\n%d' % (p, c)
    cm = pd.DataFrame(cm, index=np.unique(y_true), columns=np.unique(y_true))
    cm.index.name = 'Actual'
    cm.columns.name = 'Predicted'
    fig, ax = plt.subplots(figsize=figsize)
    fig.suptitle("Confusion Matrix")
    sns.heatmap(cm, cmap="coolwarm", annot=annot, fmt='', ax=ax)
    plt.xticks(rotation=90)
    plt.yticks(rotation=0)
    plt.savefig(fname, bbox_inches="tight")

In [None]:
!ls -lrt /content/drive/MyDrive/anupam/model_runs/bage_runs/class_exp/20210420

In [None]:
# Best Model 3 Class
# building_age.EfficientNetB1-LEV-4.weights.26-1.03-0.79.hdf5
model3c = BuildingAgeNetwork(1, 0)
model3c.build_and_compile_model(True, 4)
model3c.model.load_weights("/content/drive/MyDrive/anupam/model_runs/bage_runs/class_exp/20210420/building_age.EfficientNetB1-LEV-4.weights.26-1.03-0.79.hdf5")

In [None]:
test_ds = get_test_ds("/home/BAv6", 240, 240)
y_true = np.concatenate([y for x, y in test_ds], axis=0)
y_true = np.argmax(y_true, axis=1)
y_true

In [None]:
Y_pred = model3c.model.predict(test_ds)
y_pred = np.argmax(Y_pred, axis=1)
print('Confusion Matrix')
print(confusion_matrix(y_true, y_pred))
cmat = confusion_matrix(y_true, y_pred)
sns.set_style("whitegrid")
sns.set_palette("muted")
sns.heatmap(cmat, annot=True)
print('Classification Report')
target_names = test_ds.class_names
print(classification_report(y_true, y_pred, target_names=target_names))
plot_cm(y_true, y_pred, test_ds.class_names, "/content/age_b1_3_hmap_v2.png")

In [None]:
# Best Model 4 Class
# building_age.EfficientNetB1-LEV-4.weights.26-1.03-0.79.hdf5
# building_age.EfficientNetB1-LEV-3.weights.23-1.32-0.74.hdf5
model4c = BuildingAgeNetwork(1, 1)
model4c.build_and_compile_model(True, 3)
model4c.model.load_weights("/content/drive/MyDrive/anupam/model_runs/bage_runs/class_exp/20210420/building_age.EfficientNetB1-LEV-3.weights.23-1.32-0.74.hdf5")

In [None]:
test_ds = get_test_ds("/home/home/BuildingAgeSplit_4CLASS", 240, 240)
y_true = np.concatenate([y for x, y in test_ds], axis=0)
y_true = np.argmax(y_true, axis=1)
print(y_true)

Y_pred = model4c.model.predict(test_ds)
y_pred = np.argmax(Y_pred, axis=1)
print('Confusion Matrix')
print(confusion_matrix(y_true, y_pred))
cmat = confusion_matrix(y_true, y_pred)
sns.set_style("whitegrid")
sns.set_palette("muted")
sns.heatmap(cmat, annot=True)
print('Classification Report')
target_names = test_ds.class_names
print(classification_report(y_true, y_pred, target_names=target_names))
plot_cm(y_true, y_pred, test_ds.class_names, "/content/age_b1_4_hmap_v2.png")

In [None]:
# Best Model 5 Class
# building_age.EfficientNetB1-LEV-4.weights.26-1.03-0.79.hdf5
# building_age.EfficientNetB1-LEV-3.weights.23-1.32-0.74.hdf5
# building_age.EfficientNetB1-LEV-3.weights.26-1.36-0.70.hdf5
model5c = BuildingAgeNetwork(1, 2)
model5c.build_and_compile_model(True, 3)
model5c.model.load_weights("/content/drive/MyDrive/anupam/model_runs/bage_runs/class_exp/20210420/building_age.EfficientNetB1-LEV-3.weights.26-1.36-0.70.hdf5")

In [None]:
test_ds = get_test_ds("/home/home/BuildingAgeSplit_5CLASS", 240, 240)
y_true = np.concatenate([y for x, y in test_ds], axis=0)
y_true = np.argmax(y_true, axis=1)
print(y_true)

Y_pred = model5c.model.predict(test_ds)
y_pred = np.argmax(Y_pred, axis=1)
print('Confusion Matrix')
print(confusion_matrix(y_true, y_pred))
cmat = confusion_matrix(y_true, y_pred)
sns.set_style("whitegrid")
sns.set_palette("muted")
sns.heatmap(cmat, annot=True)
print('Classification Report')
target_names = test_ds.class_names
print(classification_report(y_true, y_pred, target_names=target_names))
plot_cm(y_true, y_pred, test_ds.class_names, "/content/age_b1_5_hmap_v2.png")

### GradCAM / SaliencyMaps

In [None]:
!pip install tf-keras-vis

In [None]:
import numpy as np
import tensorflow as tf
from tensorflow import keras

# Display
from IPython.display import Image, display
import matplotlib.pyplot as plt
import matplotlib.cm as cm

In [None]:
building_age_network.model.summary()

In [None]:
# model_builder = keras.applications.xception.Xception
# m = model_builder(weights="imagenet")
# m.summary()

In [None]:
last_conv_layer_name = "top_activation"

i0 = "./content/cork1.png"
i1 = "./content/cork2.png"
i2 = "./content/cork3.png"
i3 = "./content/cork4.png"

In [None]:
def get_img_array(img_path, size):
    # `img` is a PIL image of size 299x299
    img = keras.preprocessing.image.load_img(img_path, target_size=size)
    # `array` is a float32 Numpy array of shape (299, 299, 3)
    array = keras.preprocessing.image.img_to_array(img)
    # We add a dimension to transform our array into a "batch"
    # of size (1, 299, 299, 3)
    array = np.expand_dims(array, axis=0)
    return array


def make_gradcam_heatmap(img_array, model, last_conv_layer_name, pred_index=None):
    # First, we create a model that maps the input image to the activations
    # of the last conv layer as well as the output predictions
    grad_model = tf.keras.models.Model(
        [model.inputs], [model.get_layer(last_conv_layer_name).output, model.output]
    )

    # Then, we compute the gradient of the top predicted class for our input image
    # with respect to the activations of the last conv layer
    with tf.GradientTape() as tape:
        last_conv_layer_output, preds = grad_model(img_array)
        if pred_index is None:
            pred_index = tf.argmax(preds[0])
        class_channel = preds[:, pred_index]

    # This is the gradient of the output neuron (top predicted or chosen)
    # with regard to the output feature map of the last conv layer
    grads = tape.gradient(class_channel, last_conv_layer_output)

    # This is a vector where each entry is the mean intensity of the gradient
    # over a specific feature map channel
    pooled_grads = tf.reduce_mean(grads, axis=(0, 1, 2))

    # We multiply each channel in the feature map array
    # by "how important this channel is" with regard to the top predicted class
    # then sum all the channels to obtain the heatmap class activation
    last_conv_layer_output = last_conv_layer_output[0]
    heatmap = last_conv_layer_output @ pooled_grads[..., tf.newaxis]
    heatmap = tf.squeeze(heatmap)

    # For visualization purpose, we will also normalize the heatmap between 0 & 1
    heatmap = tf.maximum(heatmap, 0) / tf.math.reduce_max(heatmap)
    return heatmap.numpy()

In [None]:
import tensorflow as tf

In [None]:
# Prepare image 1
img_array = tf.keras.applications.resnet.preprocess_input(get_img_array(i3, size=(300, 300)))

# Remove last layer's softmax
building_age_network.model.layers[-1].activation = None

# Print what the top predicted class is
preds = building_age_network.model.predict(img_array)
print("Predicted:", np.argmax(preds))

# Generate class activation heatmap
heatmap = make_gradcam_heatmap(img_array, building_age_network.model, last_conv_layer_name)

# Display heatmap

plt.matshow(heatmap)
plt.show()

In [None]:
sns.set_style("whitegrid")
sns.set_palette("muted")
sns.heatmap(heatmap, cmap="coolwarm", fmt='')
plt.xticks([])
plt.yticks([])
plt.savefig("./content/B1_4_gradCAM_3_hmap.png", bbox_inches="tight")

In [None]:
def save_and_display_gradcam(img_path, heatmap, cam_path="cam.jpg", alpha=0.75):
    # Load the original image
    img = keras.preprocessing.image.load_img(img_path)
    img = keras.preprocessing.image.img_to_array(img)

    # Rescale heatmap to a range 0-255
    heatmap = np.uint8(255 * heatmap)

    # Use jet colormap to colorize heatmap
    jet = cm.get_cmap("jet")

    # Use RGB values of the colormap
    jet_colors = jet(np.arange(256))[:, :3]
    jet_heatmap = jet_colors[heatmap]

    # Create an image with RGB colorized heatmap
    jet_heatmap = keras.preprocessing.image.array_to_img(jet_heatmap)
    jet_heatmap = jet_heatmap.resize((img.shape[1], img.shape[0]))
    jet_heatmap = keras.preprocessing.image.img_to_array(jet_heatmap)

    # Superimpose the heatmap on original image
    superimposed_img = jet_heatmap * alpha + img
    superimposed_img = keras.preprocessing.image.array_to_img(superimposed_img)

    # Save the superimposed image
    superimposed_img.save(cam_path)

    # Display Grad CAM
    display(Image(cam_path))


save_and_display_gradcam(i3, heatmap, './content/B1_4_gradCAM_3.png')