In [None]:
import os
from pathlib import Path
from typing import Any, Dict, List, Tuple

import matplotlib.pyplot as plt
import numpy as np
from numpy.typing import ArrayLike, NDArray
from dotenv import load_dotenv

from keras.applications.vgg16 import VGG16, preprocess_input
from keras.models import Model
from keras.preprocessing.image import load_img

# clustering and dimension reduction
from sklearn.cluster import KMeans, AgglomerativeClustering, MeanShift
from sklearn.decomposition import PCA
from sklearn.metrics import silhouette_score

load_dotenv()

DATA_PATH = Path(os.getenv("DATA_PATH"))

# only for .ipynb because relative imports don't work
root_path = (DATA_PATH.parent) 
os.chdir(str(root_path))

import src.database.db_connector as db

In [None]:
# directory for dataset
datadir = None

# parameter for cluster amount
k = None

# database name for results
db_name = "clustering_db"

In [None]:
# Connecting to database
# exceptions intentionally crash
cnx = db.connect_to_database(db_name)
cursor = db.get_connection_cursor(cnx)


# Functions

In [None]:
def load_dataset(name: str) -> Tuple[List[str], Model]:
    """Loads a local dataset with name `dataset_name` and the pretrained VGG16 model.

    Args:
        dataset_name (str): the name of the dataset to be loaded

    Returns:
        Tuple[List[str], Model]: (file list of dataset, pretrained model)
    """
    global datadir
    datadir = (
        Path(DATA_PATH)
        / "datasets"
        / ("dataset_" + name + "_root")
        / ("dataset_" + name)
    )

    images = []

    with os.scandir(datadir) as files:
        for file in files:
            images.append(file.name)

    # load pretrained VGG16 model
    model = VGG16()
    model = Model(inputs=model.inputs, outputs=model.layers[-2].output)
    return (images, model)


In [None]:
def extract_features(file_name: str, model: Model) -> ArrayLike:
    """Extracts the features for an image file using the specified (pre-trained) model.

    Args:
        file_name (str): the name of image file
        model (Model): pretrained model for feature prediction

    Returns:
        ArrayLike: predictions for the image
    """
    # load the image as a 224x224 array
    img = load_img(datadir / file_name, target_size=(224, 224))

    # convert from 'PIL.Image.Image' to numpy array
    img = np.array(img)

    # reshape the data for the model reshape(num_of_samples, dim 1, dim 2, channels)
    reshaped_img = img.reshape(1, 224, 224, 3)

    # prepare image for model
    imgx = preprocess_input(reshaped_img)

    # get the feature vector using pre-trained model
    features = model.predict(imgx, use_multiprocessing=True)
    return features

In [None]:
def preview_cluster(cluster_files: List[str], cluster_idx: int = 0, cluster_name: str = None) -> None:
    """Preview images of the specified cluster in a subplot.

    Args:
        cluster_files (List[str]): all files belonging to cluster
        cluster_idx (int, optional): index of the cluster
    """
    cluster_len = len(cluster_files)

    # only allow up to 10 images to be shown at a time
    if len(cluster_files) > 10:
        cluster_files = cluster_files[:10]

    # show cluster name
    ax = plt.subplot(k, 10 + 1,1+ max(cluster_idx, 0) * 11)
    ax.axis("off")
    ax.text(0.3, 0.4, f"{cluster_name}\nlen: {cluster_len}")

    # plot each image in the cluster
    for index, file in enumerate(cluster_files):
        # calculate img index in plot
        idx = index + 1 + 1 + (max(cluster_idx, 0) * 11)
        img = load_img(datadir / file)

        plt.subplot(k, 10 + 1, idx)
        plt.imshow(img)
        plt.axis("off")

In [None]:
def preview_all_clusters(clusters: Dict[Any, List[str]]) -> None:
    """Plot previews for all clusters.

    Args:
        clusters (Dict[Any, List[str]]): dict of all clusters with file names
    """
    plt.figure(figsize=(20 + 2, 2 * k))

    # line number for plot
    i = 0

    for key in clusters:
        preview_cluster(clusters[key], i, key)
        i += 1
    plt.ylabel(clusters.keys)
    plt.show()

In [None]:
def plot_distribution(labels: NDArray, X: ArrayLike, centroids=None) -> None:
    """Plot the distribution of images as individuals as a scatter plot.

    Args:
        labels (NDArray): predicted cluster for each image
        X (ArrayLike): ndarray of shape (n_clusters, n_features)
        centroids (NDArray, optional): ndarray of shape (n_clusters, n_features)
    """
    # Get unique labels
    unique_labels = np.unique(labels)

    # plot the results
    for i in unique_labels:
        plt.scatter(X[labels == i, 0], X[labels == i, 1], label=i)
    plt.legend()

    # plot cluster centroids
    if centroids is not None:
        plt.scatter(centroids[:, 0], centroids[:, 1], s=80, color="k")
    plt.show()

In [None]:
def store_clusters(
    run_name: str,
    k_value: int,
    n_screenshots: int,
    n_components: int,
    clusters: Dict[str, List[str]]
) -> None:
    """Store clusters in database.

    Args:
        run_name (str): name of the clustering run
        k_value (int): k-value of clustering run
        n_screenshots (int): total number of screenshots
        n_components (int): number of pca components
        clusters (Dict[str, List[str]]): dict of clusters
    """
    
    path_prefix = "screenshots\\raw\\"

    try:
        db.insert_clustering_run(db_name, run_name, k_value, n_screenshots, n_components, cursor)
        cnx.commit()
    except Exception:
        cnx.rollback()
        # return to avoid duplicates
        return

    for cluster_id in clusters:
        # store cluster properties
        try:
            db.insert_cluster(db_name, str(cluster_id), run_name, k_value, len(clusters[cluster_id]), cursor)
            cnx.commit()
        except Exception:
            cnx.rollback()
            # do not continue, so more screenshots can be added to cluster
        
        # store file-to-cluster mappings
        for filename in clusters[cluster_id]:
            scr_id = db.get_screenshot_by_path(db_name, path_prefix + filename, cursor)

            if scr_id is None:
                continue
            
            try:
                db.insert_cluster_assignment(db_name, str(cluster_id), run_name, k_value, scr_id[0], cursor)
                cnx.commit()
            except Exception:
                cnx.rollback()
                continue

In [None]:
def init_training() -> Dict[str, ArrayLike]:
    """Prepare environment and data for training.

    Returns:
        Dict[str, ArrayLike]: prepared training data
    """
    # parameters
    # dataset_name = "v01_startups_clean"
    dataset_name = "v01_busicorp"
    model_annotation = "kmeans3_test"

    # load dataset and model
    dataset, model = load_dataset(dataset_name, model_annotation)

    data = {}

    for image in dataset:
        # try to extract the features and update the dictionary
        try:
            feat = extract_features(image, model)
            data[image] = feat
        # if something fails, save the extracted features as a pickle file (optional)
        except Exception as err:
            print(err)
            # with open(p, "wb") as file:
            #     pickle.dump(data, file)

    return data

# Training

In [None]:
data = init_training()

# get a list of the filenames
filenames = np.array(list(data.keys()))

# get a list of just the features
feat = np.array(list(data.values()))

# reshape so that there are 210 samples of 4096 vectors
feat = feat.reshape(-1, 4096)

## PCA

In [None]:
# reduce the amount of dimensions in the feature vector
n_components = 20
pca = PCA(n_components=n_components, random_state=22)
pca.fit(feat)
X_reduced = pca.transform(feat)

## K-Means

In [None]:
# cluster feature vectors
# global k
k = 25
kmeans = KMeans(n_clusters=k, random_state=22, max_iter=300)
kmeans_labels = kmeans.fit_predict(X_reduced)

# sort clusters ascendingly 
kmeans_cluster_assignments = list(zip(filenames, kmeans_labels))
kmeans_cluster_assignments.sort(key=lambda x: x[1])  

# store filenames for each cluster
kmeans_clusters = {}  # Dict[Any, List[str]]

for file, cluster in kmeans_cluster_assignments:
    if cluster not in kmeans_clusters.keys():
        kmeans_clusters[cluster] = []
        kmeans_clusters[cluster].append(file)
    else:
        kmeans_clusters[cluster].append(file)

In [None]:
# get cluster centroids
centroids = kmeans.cluster_centers_

plot_distribution(kmeans_labels, X_reduced, centroids)

In [None]:
# calculate cluster sizes
kmeans_cluster_sizes = np.bincount(list(map(lambda tpl: tpl[1], kmeans_cluster_assignments)))

plt.xlabel("clusters")
plt.ylabel("size")

bars = plt.bar(list(range(0,len(kmeans_cluster_sizes))), kmeans_cluster_sizes)
plt.show()

In [None]:
# plot preview for all
preview_all_clusters(kmeans_clusters)

## Agglomerative Clustering

In [None]:
# cluster feature vectors
agglo = AgglomerativeClustering(n_clusters=k) # 6
agglo_labels = agglo.fit_predict(X_reduced)

# sort clusters ascendingly 
agglo_cluster_assignments = list(zip(filenames, agglo_labels))
agglo_cluster_assignments.sort(key=lambda x: x[1])  

# store filenames for each cluster
agglo_clusters = {}  # Dict[Any, List[str]]

for file, cluster in agglo_cluster_assignments:
    if cluster not in agglo_clusters.keys():
        agglo_clusters[cluster] = []
        agglo_clusters[cluster].append(file)
    else:
        agglo_clusters[cluster].append(file)

In [None]:
# get cluster centroids
# centroids = agglo.cluster_centers_

plot_distribution(agglo_labels, X_reduced)


In [None]:
# calculate cluster sizes
agglo_cluster_sizes = np.bincount(list(map(lambda tpl: tpl[1], agglo_cluster_assignments)))

plt.xlabel("clusters")
plt.ylabel("size")

plt.bar(list(range(0,len(agglo_cluster_sizes))), agglo_cluster_sizes)
plt.show()

In [None]:
# plot preview for all
preview_all_clusters(agglo_clusters)

## Meanshift Clustering

In [None]:
# cluster feature vectors
mshift = MeanShift(bandwidth=45)
mshift_labels = mshift.fit_predict(X_reduced)

# sort clusters ascendingly 
mshift_cluster_assignments = list(zip(filenames, mshift_labels))
mshift_cluster_assignments.sort(key=lambda x: x[1])  

# store filenames for each cluster
mshift_clusters = {}  # Dict[Any, List[str]]

for file, cluster in mshift_cluster_assignments:
    if cluster not in mshift_clusters.keys():
        mshift_clusters[cluster] = []
        mshift_clusters[cluster].append(file)
    else:
        mshift_clusters[cluster].append(file)

In [None]:
# get cluster centroids
centroids = mshift.cluster_centers_

plot_distribution(mshift_labels, X_reduced, centroids)

In [None]:
# calculate cluster sizes
mshift_cluster_sizes = np.bincount(list(map(lambda tpl: tpl[1], mshift_cluster_assignments)))

plt.xlabel("clusters")
plt.ylabel("size")

bars = plt.bar(list(range(0,len(mshift_cluster_sizes))), mshift_cluster_sizes)
plt.show()

In [None]:
# plot preview for all
preview_all_clusters(mshift_clusters)

In [None]:
# this is just incase you want to see which value for k might be the best
sse = []
sil = []
list_k = list(range(2, 20, 1))
# list_k = [10]

for k_var in list_k:
    km = KMeans(n_clusters=k_var, random_state=22, n_init=20)
    km.fit(X_reduced)

    # within cluster sum of squared errors
    sse.append(km.inertia_)

    # sihloutte method
    sil.append(silhouette_score(X_reduced, km.labels_, metric = 'euclidean'))



## Within Cluster Sum of Squared Errors (WSS)

In [None]:
# Plot sse against k
plt.figure(figsize=(6, 6))
plt.plot(list_k, sse)
plt.xlabel(r"Number of clusters *k*")
plt.ylabel("Sum of squared distance")
plt.show()

## Silhouette Score

In [None]:
# Plot sil against k
plt.figure(figsize=(6, 6))
plt.plot(list_k, sil)
plt.xlabel(r"Number of clusters *k*")
plt.ylabel("Silhouette score")
plt.show()

# Store Clusters

In [None]:
store_clusters("kmeans3_test_v01_busicorp",25, len(filenames), n_components, kmeans_clusters)