In [None]:
import os
from typing import Dict

import numpy as np
import pandas as pd
import tensorflow as tf

In [None]:
os.chdir("/home/ec2-user/cs231n")
os.listdir()

In [None]:
MAX_EPOCHS=2

# Notes
* `load_data` will recast the image shape to (224, 224, 3). This size is compatible with all candidate embedding architectures (MobileNetV2, ResNet50, Xception).

* Each embedding architecture has a corresponding `preprocess_input` function through which the image should pass first.

# Load data

In [None]:
def load_data(
    dir: str,
    target_shape=[224, 224]
  ) -> np.ndarray:
  """Load data.

  Args
  ----
    dir: Directory from which to load images.
    target_shape: Shape for output

  """
  orig_dir = os.getcwd()
  os.chdir(dir)
  files = os.listdir()
  arrays = []
  for in_file in files:
    arrays.append(np.load(in_file, allow_pickle=True))
  out = np.stack(arrays)
  if target_shape != [256, 256]:
    out = tf.image.resize(out, target_shape)
    out = out.numpy()
  os.chdir(orig_dir)
  return out

## Training validation split


In [None]:
def gen_sampling_weights(y: np.ndarray) -> np.ndarray:
  """Inverse probability sampling weights.
  
  Prepares a weight vector with weight inversely proportional to the
  frequency of the class in the input vector.

  Args
  ----
    y: Binary [0, 1] labels.
  
  """
  n1 = sum(y == 1)
  n0 = sum(y == 0)
  p1 = n1 / (n1 + n0)
  p0 = 1.0 - p1
  w1 = 1.0 / p1
  w0 = 1.0 / p0
  out = np.ones_like(y)
  out[y == 1] = w1
  out[y == 0] = w0
  return out

In [None]:
def prep_data_for_model(
    pos: np.ndarray,
    neg: np.ndarray,
    train_prop = 0.6,
) -> Dict[str, np.ndarray]:
  """Prepare data for modeling.

  Args
  ----
    pos: Positive example images.
    neg: Negative example images.
    train_prop: Proportion of data for training. Remaining is allocated to
    validation.
  
  """
  n1 = pos.shape[0]
  n0 = neg.shape[0]
  cut1 = int(train_prop * n1)
  cut0 = int(train_prop * n0)

  # Split into training and validation sets.
  pos_train = pos[:cut1, :]
  pos_val = pos[cut1:, :]
  neg_train = neg[:cut0, :]
  neg_val = neg[cut0:, :]
  x_train = np.concatenate((pos_train, neg_train), axis=0)
  x_val = np.concatenate((pos_val, neg_val), axis=0)

  # Prepare labels.
  y1_train = np.ones(pos_train.shape[0])
  y0_train = np.zeros(neg_train.shape[0])
  y_train = np.concatenate((y1_train, y0_train), axis=0)

  y1_val = np.ones(pos_val.shape[0])
  y0_val = np.zeros(neg_val.shape[0])
  y_val = np.concatenate((y1_val, y0_val), axis=0)

  # Prepare weights.
  w_train = gen_sampling_weights(y_train)
  w_val = gen_sampling_weights(y_val)

  # Output.
  return {
      "x_train": x_train,
      "y_train": y_train,
      "w_train": w_train,
      "x_val": x_val,
      "y_val": y_val,
      "w_val": w_val
  }

# Model

## Generators

In [None]:
def prep_generators(
    data: Dict[str, np.ndarray],
    train_batch=64,
    val_batch=32,
) -> Dict:
  """Prepare data generators

  Generators apply random relections and rotations.

  """
  transformer = tf.keras.preprocessing.image.ImageDataGenerator(
    rotation_range=360,
    horizontal_flip=True,
    vertical_flip=True,
  )

  train_generator = transformer.flow(
    x=data["x_train"],
    y=data["y_train"],
    batch_size=train_batch,
    sample_weight=data["w_train"]
  )

  val_generator = transformer.flow(
    x=data["x_val"],
    y=data["y_val"],
    batch_size=val_batch,
    sample_weight=data["w_val"]
  )

  return {
      "train_gen": train_generator,
      "val_gen": val_generator
  }


## Compile

In [None]:
def define_mode(
    embed="resnet",
    input_shape=(224, 224, 3),
    lr=1e-4,
) -> tf.keras.Model:
  """Define model.

  Defines a model of the form:
  Image -> Preprocessor -> Embedder -> Dense(1) -> Sigmoid.

  Args
  ----
    lr: Learning rate.
  
  """
  # Embedding module.
  if embed == "mobile":
    preprocessor = tf.keras.applications.mobilenet_v2.preprocess_input
    embedder = tf.keras.applications.mobilenet_v2.MobileNetV2(
      include_top=False,
      input_shape=input_shape,
      pooling="avg"
    )
  # Xception.
  elif embed == "xception":
    preprocessor = tf.keras.applications.xception.preprocess_input
    embedder = tf.keras.applications.xception.Xception(
      include_top=False,
      input_shape=input_shape,
      pooling="avg"
    )
  # Resnet50 (default).
  else:
    preprocessor = tf.keras.applications.resnet.preprocess_input
    embedder = tf.keras.applications.resnet50.ResNet50(
      include_top=False,
      input_shape=input_shape,
      pooling="avg"
    )
  embedder.trainable = False

  # Model.
  inputs = tf.keras.Input(shape=input_shape, name="input")
  h = preprocessor(inputs)
  h = embedder(h, training=False)
  outputs = tf.keras.layers.Dense(1, activation="sigmoid", name="output")(h)
  model = tf.keras.Model(inputs, outputs)

  # Compile.
  model.compile(
      optimizer=tf.keras.optimizers.Adam(learning_rate=lr),
      loss=tf.keras.losses.BinaryCrossentropy(),
      metrics=[
          tf.keras.metrics.BinaryAccuracy(name="acc"),
          tf.keras.metrics.AUC(name="auc")
        ]
  )
  return model

## Training

In [None]:
def train_and_eval(
    pos_dir: str,
    neg_dir: str,
    lr=1e-4,
    max_epochs=MAX_EPOCHS,
    embed="resnet",
    patience=10,
) -> Dict[str, float]:
  """Train and evaluate a model given the input data.
  
  Args
  ----
    pos_dir: Directory containing positive examples.
    neg_dir: Directory containing negative examples.
    max_epochs: Maximum training epochs.
    patience: Patience for early stopping.
  
  """
  # Load data.
  pos = load_data(pos_dir)
  neg = load_data(neg_dir)
  data = prep_data_for_model(pos, neg)

  # Prepare model.
  model = define_mode(embed=embed, lr=lr)

  # Prepare generators.
  gen = prep_generators(data)

  # Callbacks.
  callbacks = [
    tf.keras.callbacks.TensorBoard(log_dir="final"),
    tf.keras.callbacks.EarlyStopping(
        patience=patience, restore_best_weights=True)
  ]

  # Train model.
  hist = model.fit(
    x=gen["train_gen"],
    validation_data=gen["val_gen"],
    validation_freq=10,
    epochs=max_epochs,
    callbacks=callbacks,
    verbose=0
  )

  # Evaluate model.
  train_eval = model.evaluate(
    x=data["x_train"],
    y=data["y_train"]
  )
  val_eval = model.evaluate(
    x=data["x_val"],
    y=data["y_val"]
  )

  # Output.
  return {
      "train_loss": train_eval[0],
      "train_acc":  train_eval[1],
      "train_auc":  train_eval[2],
      "val_loss": val_eval[0],
      "val_acc":  val_eval[1],
      "val_auc":  val_eval[2]
  }

# Depth Experiment

In [None]:
# Projection depths.
depths = [1, 3, 5, 7]

In [None]:
results = {
  "depth": [],
  "train_loss": [],
  "train_acc": [],
  "train_auc": [],
  "val_loss": [],
  "val_acc": [],
  "val_auc": []
}

In [None]:
# Run experiment.
for d in depths:
  results["depth"].append(d)
  
  print(f"\n\nStarting depth {d}.\n\n")
  out = train_and_eval(
    pos_dir=f"data/pos_d{d}",
    neg_dir=f"data/neg_d{d}",
  )

  for key in out:
    results[key].append(out[key])


In [None]:
# Save results.
out = pd.DataFrame(results)
out.to_csv("results/depth_experiment.tsv", sep="\t")

# Embedding Experiment

In [None]:
# Embedders
embedder = ["mobile", "resnet", "xception"]

In [None]:
results = {
  "embedder": [],
  "train_loss": [],
  "train_acc": [],
  "train_auc": [],
  "val_loss": [],
  "val_acc": [],
  "val_auc": []
}

In [None]:
# Run experiment.
d = 1
for e in embedder:
  results["embedder"].append(e)
  
  print(f"\n\nStarting model with {e} embedding.\n\n")
  out = train_and_eval(
    pos_dir=f"data/pos_d{d}",
    neg_dir=f"data/neg_d{d}",
    embed=e,
  )

  for key in out:
    results[key].append(out[key])


In [None]:
# Save results.
out = pd.DataFrame(results)
out.to_csv("results/embed_experiment.tsv", sep="\t")