In [None]:
import os
import re
import yaml
import seaborn as sns
from sklearn.manifold import TSNE


import numpy as np

import pandas as pd
import tensorflow as tf
from tensorflow.keras import backend as K
from tensorboard.plugins import projector
from PIL import Image

import matplotlib.pyplot as plt

LABELS_PATH = "/workspaces/motion/labels.yaml"
MODEL_PATH = "./output"

sns.set_style("darkgrid")

In [None]:
model = tf.keras.models.load_model(MODEL_PATH)
# model.summary()

In [None]:
with open(LABELS_PATH) as f:
    label_map = yaml.load(f, Loader=yaml.SafeLoader)["labels"]
reverse_label_map = {v: k for k, v in label_map.items()}
print(reverse_label_map)

if 'discard' in label_map:
    HAS_DISCARD = True
else:
    HAS_DISCARD = False

if not HAS_DISCARD:
    # If no explicit discard label we need to make one so Phoenix has
    # the label to reference
    discard_id = max(reverse_label_map.keys())+1
    label_map['discard'] = discard_id
    reverse_label_map[discard_id] = 'discard'

image_files = []
GOOD_IMAGES_DIR = "/workspaces/motion/images/esp32/good"
for dir, _, files in os.walk(GOOD_IMAGES_DIR):
    for file in files:
        image_files.append((os.path.basename(dir), os.path.join(dir, file)))

DISCARD_IMAGES_DIR = "/workspaces/motion/images/esp32/discard/"
for dir, _, files in os.walk(DISCARD_IMAGES_DIR):
    for file in files:
        image_files.append(('discard', os.path.join(dir, file)))

print(len(image_files))
print(image_files[0])

In [None]:
in_layer = model.get_layer('conv2d').input
out_layer = model.get_layer('dense_1').output
func = K.function([in_layer], [out_layer])

In [None]:
predicts = []
img_arrays = []
for img_desc in image_files:
    img = Image.open(img_desc[1])
    img_array = np.asarray(img, dtype=np.uint8).reshape(29, 40, 1)
    formatted_img_array = np.expand_dims(img_array, axis=0)
    img_arrays.append(formatted_img_array)

img_arrays = np.squeeze(np.array(img_arrays), axis=1)
vectors = func(img_arrays)[0]
p = model.predict(img_arrays, verbose=False)
pred_scores = np.amax(p, axis=1)
pred_argmax = np.argmax(p, axis=1)
for i in range(len(pred_scores)):
    if HAS_DISCARD == False and pred_scores[i] < .98:
        # If we aren't using explicit discard labels then fabricate one
        # from lower-probability scores which we would throw out at runtime
        predicts.append(('discard', pred_scores[i]))
    else:
        predicts.append((reverse_label_map[pred_argmax[i]], pred_scores[i]))

vectors.shape

In [None]:
from sklearn.decomposition import PCA

pca = PCA(n_components=3)
pca_result = pca.fit_transform(vectors)
# y = [label_map[p.split('/')[3]] for p in image_files]
y = [img_desc[0] for img_desc in image_files]
y_int = [label_map[img_desc[0]] for img_desc in image_files]
pca_df = pd.DataFrame(
    {
        "pca-one": pca_result[:, 0],
        "pca-two": pca_result[:, 1],
        "pca-three": pca_result[:, 2],
        "y": y,
        "y-int": y_int,
    }
)
print(
    "Explained variation per principal component: {}, total explained variance: {}".format(
        pca.explained_variance_ratio_, sum(pca.explained_variance_ratio_)
    )
)

In [None]:
ax = sns.scatterplot(
    x=pca_df["pca-one"], y=pca_df["pca-two"], hue=pca_df.y, alpha=0.4, legend="full"
)
sns.move_legend(ax, "upper left", bbox_to_anchor=(1, 1))

In [None]:
ax = plt.figure(figsize=(16, 10)).add_subplot(projection="3d")
scatter = ax.scatter(
    xs=pca_df["pca-one"],
    ys=pca_df["pca-two"],
    zs=pca_df["pca-three"],
    c=pca_df["y-int"],
    cmap="tab10",
    label=pca_df["y-int"],
)
ax.set_xlabel("pca-one")
ax.set_ylabel("pca-two")
ax.set_zlabel("pca-three")
legend1 = ax.legend(
    scatter.legend_elements()[0],
    # Not sure if the legend elements are guaranteed w.r.t. their order so better
    # just parse the math expression
    [
        reverse_label_map[int(re.sub(r"\$\\mathdefault{(\d+)}\$", r"\1", i))]
        for i in scatter.legend_elements()[1]
    ],
    loc="upper right",
    title="Pattern",
)
ax.add_artist(legend1)
plt.show()

In [None]:
log_dir = '/workspaces/motion/logs'

os.makedirs(os.path.join(log_dir, 'embeddings'), exist_ok=True)

images_pil = []
labels = []
for img_desc in image_files:
    img = Image.open(img_desc[1])
    labels.append(img_desc[0])
    images_pil.append(img)

one_square_size = int(np.ceil(np.sqrt(len(vectors))))
master_width = 40 * one_square_size
master_height = 29 * one_square_size
spriteimage = Image.new(
    mode='RGBA',
    size=(master_width, master_height),
    color=(0,0,0,0) # fully transparent
)
for count, image in enumerate(images_pil):
    div, mod = divmod(count, one_square_size)
    h_loc = 29 * div
    w_loc = 40 * mod
    spriteimage.paste(image, (w_loc, h_loc))
spriteimage.convert("RGB").save(os.path.join(log_dir, 'embeddings/sprite.jpg'), transparency=0)

with open(os.path.join(log_dir, 'embeddings/metadata.tsv'), 'w') as file: 
    file.write('shape\ttest\n')
    for label in labels:
        file.write(f'{label}\tfoo\n')

weights = tf.Variable(model.get_layer('dense').get_weights()[0][1:])
checkpoint = tf.train.Checkpoint(embedding=tf.Variable(vectors))
checkpoint.save(os.path.join(log_dir, 'embedding.ckpt'))

config = projector.ProjectorConfig()
embedding = config.embeddings.add()
embedding.tensor_name = "embedding/.ATTRIBUTES/VARIABLE_VALUE"
embedding.metadata_path = 'embeddings/metadata.tsv'
# embedding.sprite.image_path = 'embeddings/sprite.jpg'
# embedding.sprite.single_image_dim.extend([40,29])
projector.visualize_embeddings(log_dir, config)

In [None]:
import phoenix as px

px_df = pd.DataFrame({
    'image_vector': [list(x) for x in vectors],
    'actual': [i[0] for i in image_files],
    'predict': [i[0] for i in predicts],
    'predict_score': [i[1] for i in predicts],
    'image_url': [f"http://localhost:8000/{i[1][len('/workspaces/motion'):]}" for i in image_files]
})

train_schema = px.Schema(
    actual_label_column_name="actual",
    prediction_label_column_name="predict",
    prediction_score_column_name="predict_score",
    embedding_feature_column_names={
        "image_embedding": px.EmbeddingColumnNames(
            vector_column_name="image_vector",
            link_to_data_column_name="image_url"
        ),
    },
)

px_ds = px.Dataset(px_df, train_schema)
px.launch_app(px_ds)