In [None]:
import os
os.environ["CUDA_DEVICE_ORDER"]="PCI_BUS_ID" 
os.environ["CUDA_VISIBLE_DEVICES"]="1"

device = "cuda:0"

In [None]:
import torch
import numpy as np
import h5py
import time
import os
import sys
import io
import imageio
import matplotlib.pyplot as plt
import sklearn
import sklearn.linear_model
import sklearn.model_selection
import sklearn.metrics
import skimage.exposure
import sklearn.manifold

import deepVAEHelpers.hps as hps
from deepVAEHelpers.train_helpers import set_up_hyperparams, load_opt
from deepVAEHelpers.vae import VAE

In [None]:
############ set up vae model in order to get latents #############

# set hyperparameter
H = set_up_hyperparams(s=["--dataset=i64"])
H_ = hps.ffhq_256
H_["image_channels"] = 1
H_["image_size"] = 128
H_["width"] = 128
H_["n_batch"] = 2
H_.dec_blocks = "1x2,4m1,4x3,8m4,8x4,16m8,16x9,32m16,32x20,64m32,64x12,128m64"
H_.enc_blocks = "128x4,128d2,64x7,64d2,32x7,32d2,16x7,16d2,8x7,8d2,4x7,4d4,1x8"
H_["adam_warmup_iters"] = 100
H.update(H_)
H.lr = 0.0001
H.num_epochs = 1
H["skip_threshold"] = -1

with h5py.File('/storage/mi/jennyonline/data/data_2020_100000_unbiased.h5', 'r') as f:
    sample_image = f['images'][:8].astype(np.float32)
    sample_mask = f['tag_masks'][:8].astype(np.float32)
    sample_loss_mask = f['loss_masks'][:8].astype(np.float32)
    H["std"] = f['std'][()]
    std = H["std"]
    H["mean"] = f['mean'][()]
    mean = H["mean"]

# load model
vae = VAE(H).to(device)
vae.load_state_dict(torch.load("/storage/mi/jennyonline/data/vae_supervised.pt"))
_ = vae.eval()

In [None]:
f = h5py.File('/storage/mi/jennyonline/data/data_2020_100000_unbiased.h5', 'r')

images = f['images']
tag_masks = f['tag_masks']
loss_masks = f['loss_masks']
labels = f['labels']

In [None]:
idxs_deformed = np.argwhere(labels).flatten()
images_deformed = images[idxs_deformed]
tag_masks_deformed = tag_masks[idxs_deformed]

idxs_normal = np.argwhere(~labels[:]).flatten()
images_normal = images[idxs_normal]
tag_masks_normal = tag_masks[idxs_normal]

print(images_deformed.shape, images_normal.shape)

In [None]:
def get_latents_batch(images, tag_mask, latent_dimensions=None):
    with torch.no_grad():
        x = torch.from_numpy(images.astype(np.float32))[:, :, :, None]

        x -= H["mean"]
        x /= H["std"]
        x = x.to(device)

        tag_mask = torch.from_numpy(tag_mask.astype(np.float32))[:, :, :, None].to(device)
        data_input = (x * tag_mask).float()
        target = data_input.clone().detach()

        stats = vae.forward_get_latents(data_input)
        
        zs = []
        
        if latent_dimensions is None:
            latent_dimensions = np.arange(len(stats))
        
        for dim in latent_dimensions:
            z = stats[dim]['qm'].cpu().numpy()

            if z.shape[-1] == 1:
                zs.append(z[:, :, 0, 0])
            else:
                zs.append(z.mean(axis=(2, 3)))
                
        return np.concatenate(zs, axis=1)

In [None]:
def get_latents(images, tag_masks, batch_size=8, num_dimensions=1):
    zs = []
    i = 0
    while sum(map(len, zs)) < len(images):
        with torch.no_grad():
            idxer = slice(i, min(len(images), i + batch_size))
            zs.append(get_latents_batch(images[idxer], tag_masks[idxer], latent_dimensions=np.arange(num_dimensions)))

            sys.stdout.write(f'\r{i}/{len(images)}')
            i += batch_size

    zs = np.concatenate(zs)
    return zs

In [None]:
latents_deformed = get_latents(images_deformed, tag_masks_deformed)
labels_deformed = np.ones(len(latents_deformed))
latents_normal = get_latents(images_normal, tag_masks_normal)
labels_normal = np.zeros(len(latents_normal))

latents = np.concatenate((latents_deformed, latents_normal))
labels = np.concatenate((labels_deformed, labels_normal))
images_samples = np.concatenate((images_deformed, images_normal))
tag_masks_samples = np.concatenate((tag_masks_deformed, tag_masks_normal))

In [None]:
latents.shape, labels.shape

In [None]:
# f1 score
linear = sklearn.linear_model.LogisticRegression(class_weight='balanced', max_iter=1000)

sklearn.model_selection.cross_val_score(
    linear, 
    latents, 
    labels, 
    cv=sklearn.model_selection.StratifiedShuffleSplit(), 
    scoring=sklearn.metrics.make_scorer(sklearn.metrics.roc_auc_score, needs_proba=True)
).mean()

In [None]:
# Accuracy f
(linear.fit(latents, labels).predict(latents) == labels).mean()

In [None]:
cv = sklearn.model_selection.StratifiedShuffleSplit()

X = latents
y = labels

tprs = []
aucs = []
mean_fpr = np.linspace(0, 1, 100)

fig, ax = plt.subplots(figsize=(8, 6))
for i, (train, test) in enumerate(cv.split(X, y)):
    linear = sklearn.linear_model.LogisticRegression(class_weight='balanced', max_iter=1000)
    linear.fit(X[train], y[train])
    viz = sklearn.metrics.plot_roc_curve(linear, X[test], y[test],
                         name='ROC fold {}'.format(i),
                         alpha=0.3, lw=1, ax=ax)
    interp_tpr = np.interp(mean_fpr, viz.fpr, viz.tpr)
    interp_tpr[0] = 0.0
    tprs.append(interp_tpr)
    aucs.append(viz.roc_auc)

ax.plot([0, 1], [0, 1], linestyle='--', lw=2, color='r',
        label='Chance', alpha=.8)

mean_tpr = np.mean(tprs, axis=0)
mean_tpr[-1] = 1.0
mean_auc = sklearn.metrics.auc(mean_fpr, mean_tpr)
std_auc = np.std(aucs)
ax.plot(mean_fpr, mean_tpr, color='b',
        label=r'Mean ROC (AUC = %0.2f $\pm$ %0.2f)' % (mean_auc, std_auc),
        lw=2, alpha=.8)

std_tpr = np.std(tprs, axis=0)
tprs_upper = np.minimum(mean_tpr + std_tpr, 1)
tprs_lower = np.maximum(mean_tpr - std_tpr, 0)
ax.fill_between(mean_fpr, tprs_lower, tprs_upper, color='grey', alpha=.2,
                label=r'$\pm$ 1 std. dev.')

ax.set(xlim=[-0.05, 1.05], ylim=[-0.05, 1.05],
       title="Receiver operating characteristic")
ax.legend(bbox_to_anchor=(1.05, 1), loc='upper left')
plt.savefig("/storage/mi/jennyonline/new_images/roc.pdf")
plt.show()

In [None]:
linear = linear.fit(X, y)
yp = linear.predict_proba(X)[:, 1]

In [None]:
fig, axes = plt.subplots(5, 5, figsize=(16, 16))

for i in range(5 * 5):
    idx = np.argsort(yp)[::-1][i]
    r, c = divmod(i, 5)
    axes[r, c].imshow(images_samples[idx], cmap=plt.cm.gray, vmin=0, vmax=255)
    axes[r, c].set_title(f'label={bool(labels[idx])} | p={yp[idx]:.2f}')
    axes[r, c].axis('off')
    
fig.suptitle('Samples with highest probability for deformed wings', fontsize=16, y=1.01)
plt.tight_layout()

In [None]:
fig, axes = plt.subplots(5, 5, figsize=(16, 16))

for i in range(5 * 5):
    idx = np.argsort(yp)[i]
    r, c = divmod(i, 5)
    axes[r, c].imshow(images_samples[idx], cmap=plt.cm.gray, vmin=0, vmax=255)
    axes[r, c].set_title(f'label={bool(labels[idx])} | p={yp[idx]:.2f}')
    axes[r, c].axis('off')
    
fig.suptitle('Samples with lowest probability for deformed wings', fontsize=16, y=1.01)
plt.tight_layout()

In [None]:
fig, axes = plt.subplots(5, 5, figsize=(16, 16))

for i in range(5 * 5):
    idx = np.argsort(np.abs(yp - 0.5))[i]
    r, c = divmod(i, 5)
    axes[r, c].imshow(images_samples[idx], cmap=plt.cm.gray, vmin=0, vmax=255)
    axes[r, c].set_title(f'label={bool(labels[idx])} | p={yp[idx]:.2f}')
    axes[r, c].axis('off')
    
fig.suptitle('Samples with highest uncertainty for deformed wings', fontsize=16, y=1.01)
plt.tight_layout()

## Null hypothesis: Main contributing factor is image brightness

In [None]:
Xi = images_samples.mean(axis=(1, 2))[:, None]

plt.scatter(
    yp,
    Xi
)

In [None]:
linear = sklearn.linear_model.LogisticRegression(class_weight='balanced')

sklearn.model_selection.cross_val_score(
    linear, 
    Xi,
    labels, 
    cv=sklearn.model_selection.StratifiedShuffleSplit(), 
    scoring=sklearn.metrics.make_scorer(sklearn.metrics.roc_auc_score, needs_proba=True)
).mean()

In [None]:
(linear.fit(Xi, labels).predict(Xi) == labels).mean()