In [1]:
import torch

from models.embeddings.embedding_v1 import Embedding_v1

%load_ext autoreload
%autoreload 2

  warn("The installed version of bitsandbytes was compiled without GPU support. "


'NoneType' object has no attribute 'cadam32bit_grad_fp32'
The autoreload extension is already loaded. To reload it, use:
  %reload_ext autoreload


In [3]:
import os
import pandas as pd
import numpy as np
from PIL import Image
import CONST
from utils import (
    filter_empty_columns,
    num_duplicated_values,
    load_onnx,
    init_session_onnx,
    find_path,
    load_image,
    forward_transform,
    get_segmentation,
    resize_segmentation,
    remove_image_extension,
    split_and_flatten,
    split_string,
)
import cv2
import plotly.graph_objects as go
import plotly.io as pio
import plotly.express as px
pio.renderers.default = "jupyterlab"
np.seterr(divide="ignore", invalid="ignore")

{'divide': 'warn', 'over': 'warn', 'under': 'ignore', 'invalid': 'warn'}

# Preliminary analysis

In this section we will examine colour distribution in all un-processed images

In [None]:
def calculate_average_histogram(folder_path):
    histogram_sums = np.zeros((3, 256))
    image_count = 0

    for root, dirs, files in os.walk(folder_path):
        for file in files:
            if file.lower().endswith((".jpg", ".jpeg")):
                image_path = os.path.join(root, file)
                image = cv2.imread(image_path)
                if image is not None:
                    image = cv2.resize(image, (64, 64))  # Resize to 64x64
                    for i in range(3):
                        hist = cv2.calcHist(
                            [image], [i], None, [256], [0, 256]
                        ).flatten()
                        histogram_sums[i] += hist
                    image_count += 1

    # Calculate the average histogram for each channel
    return histogram_sums / image_count if image_count > 0 else None


# Trigger calculation and then store to numpy array
average_histograms = calculate_average_histogram(CONST.UNPROCESSED_IMAGES_DIR)

In [58]:
average_histograms

array([[ 4.64168366,  4.26669864,  3.69372826,  3.99604269,  4.37198705,
         4.82959587,  5.3047128 ,  5.56709438,  6.14030459,  6.61638086,
         7.01498981,  7.58616141,  8.10792661,  8.36743015,  8.96606308,
         9.53039933, 10.02542271, 10.64492145, 11.17616021, 11.69780549,
        12.45640964, 12.89782948, 13.29547907, 14.01822761, 14.60678738,
        15.03021945, 15.75920374, 16.30567214, 16.79386018, 17.49262501,
        17.66566735, 18.16320902, 18.4925051 , 19.0077947 , 19.42163329,
        19.88295959, 20.17364192, 20.47643602, 20.90082744, 21.18899149,
        21.39081425, 21.94052045, 22.11979854, 22.44393812, 22.81844346,
        23.10432906, 23.33445257, 23.53627533, 23.71879122, 23.91761602,
        24.17136347, 24.32953592, 24.44933445, 24.67538074, 24.75092937,
        25.15097734, 25.25638566, 25.40436503, 25.65667346, 26.05252428,
        26.01714834, 26.1260343 , 25.9285286 , 26.19582684, 26.39752968,
        26.40460487, 26.4567694 , 26.53363713, 26.3

In [57]:
# Create traces for each color channel
trace_red = go.Bar(
    x=list(range(256)), y=average_histograms[0], name="Red", marker=dict(color="red")
)
trace_green = go.Bar(
    x=list(range(256)),
    y=average_histograms[1],
    name="Green",
    marker=dict(color="green"),
)
trace_blue = go.Bar(
    x=list(range(256)), y=average_histograms[2], name="Blue", marker=dict(color="blue")
)

# Combine the traces in a single figure
fig = go.Figure(data=[trace_red, trace_green, trace_blue])

# Update the layout for a more readable chart
fig.update_layout(
    title="Average RGB Histogram",
    xaxis_title="Pixel values",
    yaxis_title="Pixel counts",
    barmode="overlay",  # Allows bars to overlap
)

# Reduce the opacity to see all colors
fig.update_traces(opacity=0.3)

# Show the figure
fig.show()

# Annotation cleaning

In [None]:
# Check annotation dataframe
df = pd.read_csv(CONST.ANNOTATION_PATH)
df

In [None]:
# Filter the data into only selected column
df = df.filter(items=CONST.SELECT_COLUMN)

# If the wound type and wound location is empty, we will drop the data
df = filter_empty_columns(df, CONST.WOUND_TYPE)
df = filter_empty_columns(df, CONST.WOUND_LOCATION)

# There are many duplications on the file name that we need to drop them
df = df.drop_duplicates()
df = df.drop_duplicates(subset=[CONST.FILE_NAME], keep="first")

In [None]:
# Re-check to see if there is still any duplicated file name
print(f"Number of overlapped file: {num_duplicated_values(df, CONST.FILE_NAME)}")

In [None]:
# Check the data again
df

# Annotation preprocessing

As there are many data with duplicated semantics (for example: a same wound type in a same location but different shape will cause the model confuse of what to render at each pixel), therefore, we will need to tidy up the dataset so that:
- Different sample will have different semantics
- This will make the model less confusing while generating the images

In [None]:
# Check for duplicated semantics
df_duplicated = df.duplicated(
    subset=[
        CONST.WOUND_RULER,
        CONST.WOUND_TYPE,
        CONST.WOUND_BED,
        CONST.WOUND_DEPTH,
        CONST.WOUND_LOCATION,
    ],
    keep="last",
)
df_cleaned = df[df_duplicated == False]
df_cleaned

In [None]:
# Save the processed annotation file
df_cleaned.to_csv(CONST.ANNOTATION_PROCESSED_PATH, index=False)

# Sample segmentation

In [None]:
# Read the processed annotation
annotation = pd.read_csv(CONST.ANNOTATION_PROCESSED_PATH)
annotation

In [None]:
# Initialise the model
model = load_onnx(CONST.SEG_MODEl_PATH)
session = init_session_onnx(model)

In [None]:
# For each record, load file
for _, row in annotation.iterrows():
    # Get file path
    file_path = find_path(
        filename=row[CONST.FILE_NAME], root_dir=CONST.UNPROCESSED_IMAGES_DIR
    )

    # Skip the missing file
    if file_path is None:
        continue

    # Crop the ROI
    if row[CONST.ROI_Y_HEIGHT] != 0 and row[CONST.ROI_X_WIDTH] != 0:
        image = load_image(file_path)[
            row[CONST.ROI_Y] : (row[CONST.ROI_Y] + row[CONST.ROI_Y_HEIGHT]),
            row[CONST.ROI_X] : (row[CONST.ROI_X] + row[CONST.ROI_X_WIDTH]),
        ]
    else:
        image = load_image(file_path)

    # Prepare for tensor
    model_input = np.expand_dims(
        forward_transform(
            image=image, target_size=CONST.SEG_INPUT_SIZE, to_tensor=False
        ),
        axis=0,
    ).transpose((0, 3, 1, 2))

    # Get segments
    segment = get_segmentation(_sample=model_input, _session=session)
    segment = resize_segmentation(
        _segmentation_matrix=segment, _size=image.shape[:-1][::-1]
    )

    # Save images and segments
    Image.fromarray(image).save(
        os.path.join(CONST.PROCESSED_IMAGES_DIR, row[CONST.FILE_NAME])
    )
    np.save(
        os.path.join(
            CONST.PROCESSED_SEGMENT_DIR,
            remove_image_extension(row[CONST.FILE_NAME]),
        )
        + ".npy",
        arr=segment,
    )

print("Done segmentation!")

# Sample annotations (embeddings)

In [None]:
# Read the annotation again
annotation = pd.read_csv(
    CONST.ANNOTATION_PROCESSED_PATH, dtype={CONST.WOUND_RULER: str}
)

# Get all important image annotations
wound_ruler = split_and_flatten(set(annotation[CONST.WOUND_RULER].astype(str)))
wound_type = split_and_flatten(set(annotation[CONST.WOUND_TYPE].astype(str)))
wound_bed = split_and_flatten(set(annotation[CONST.WOUND_BED].astype(str)))
wound_depth = split_and_flatten(set(annotation[CONST.WOUND_DEPTH].astype(str)))
wound_location = split_and_flatten(set(annotation[CONST.WOUND_LOCATION].astype(str)))

embedding_arr = [
    list(wound_ruler),
    list(wound_type),
    list(wound_bed),
    list(wound_depth),
    list(wound_location),
]

# Get the embedder
embedder = Embedding_v1(
    device=torch.device("cuda" if torch.cuda.is_available() else "cpu"),
    all_embeddings=embedding_arr,
    embedding_dim=256,
)

In [None]:
# Iterate the data
for _, row in annotation.iterrows():
    single_annotation = []
    single_annotation.append(split_string(str(row[CONST.WOUND_RULER])))
    single_annotation.append(split_string(str(row[CONST.WOUND_TYPE])))
    single_annotation.append(split_string(str(row[CONST.WOUND_BED])))
    single_annotation.append(split_string(str(row[CONST.WOUND_DEPTH])))
    single_annotation.append(split_string(str(row[CONST.WOUND_LOCATION])))
    semantics = (
        embedder.single_semantic_embedding(single_annotation).squeeze().detach().clone()
    )
    torch.save(
        semantics,
        (
            os.path.join(
                CONST.PROCESSED_EMBEDDING_DIR,
                remove_image_extension(row[CONST.FILE_NAME]),
            )
            + ".pt"
        ),
    )

print("Done processing embeddings!")

# Inspect the distribution of the embeddings

In [4]:
import os
import torch
from sklearn.decomposition import PCA
import plotly.express as px

# Load all tensors
tensors = []
for filename in os.listdir(CONST.PROCESSED_EMBEDDING_DIR):
    if filename.endswith('.pt'):  # Assuming the tensors have a .pt extension
        tensor_path = os.path.join(CONST.PROCESSED_EMBEDDING_DIR, filename)
        tensor = torch.load(tensor_path)
        tensors.append(tensor)

# Stack the tensors into a 2D tensor
tensor_stack = torch.stack(tensors)

# Perform PCA to reduce to 2 dimensions
pca = PCA(n_components=2)
pca_result = pca.fit_transform(tensor_stack)

# Plot the 2D PCA components
fig = px.scatter(x=pca_result[:, 0], y=pca_result[:, 1], labels={'x': 'PCA 1', 'y': 'PCA 2'}, marginal_x="rug", marginal_y="rug")
fig.update_layout(title='PCA Result of constructed embeddings', autosize=False, width=800, height=800)
fig.show()

In [5]:
pca.components_

array([[-8.52500024e-04,  4.17363506e-02, -8.86399995e-03,
        -3.33493710e-02, -4.12162780e-02,  5.01002176e-02,
        -2.79453324e-02, -2.85693166e-02, -1.33877023e-02,
        -1.95502195e-02,  8.11798603e-02,  1.17914699e-01,
        -1.83899263e-02, -3.35477039e-02, -5.91026238e-02,
        -1.19102694e-01, -1.26522116e-02,  6.81426141e-02,
         5.49546624e-02,  5.38856405e-02, -6.05234271e-02,
         8.56601796e-02, -3.47555378e-02,  2.72539345e-03,
         3.16049027e-03,  8.91372093e-02, -3.56098559e-02,
         3.95724902e-02, -4.85481852e-02, -4.87799416e-02,
         6.67853560e-02,  1.90188228e-02, -8.60180914e-02,
         1.23769979e-02,  1.15878577e-01,  1.43136127e-01,
        -1.37371181e-01, -5.83295257e-02, -2.56714820e-04,
        -1.12419709e-01,  9.99412565e-02,  9.80244232e-02,
        -1.02890727e-01,  3.90018593e-02, -2.36796337e-01,
         4.31626071e-02, -2.82378364e-02, -2.41691763e-02,
        -2.55595564e-02,  4.27286361e-02,  1.50241468e-0

In [6]:
averages = np.zeros((2, 8))

# Calculate the average for each segment of 32 continuous columns
for i in range(8):  # 8 segments
    start_col = i * 32
    end_col = start_col + 32
    averages[:, i] = np.mean(pca.components_[:, start_col:end_col], axis=1)

averages

array([[ 0.00403835, -0.00900515,  0.00051007,  0.02839388, -0.0119021 ,
         0.        ,  0.        ,  0.        ],
       [-0.00113447,  0.0148038 , -0.00010778, -0.01552183, -0.00690196,
         0.        ,  0.        ,  0.        ]])

In [7]:
# Log the scale of PCA components contribution so that it is easier to view
log_scaled_ndarray = np.log(averages + 1)
log_scaled_ndarray

# Get the current max so that we will scale the indicator arrow
current_max = np.max(log_scaled_ndarray)

# Calculate the scale factor necessary to make the maximum value of 15
scale_factor = 12 / current_max

# Scale the entire array by this scale factor
scaled_ndarray = averages * scale_factor

scaled_ndarray

array([[ 1.7308293 , -3.85959329,  0.21861543, 12.16956832, -5.10122088,
         0.        ,  0.        ,  0.        ],
       [-0.48623391,  6.34488463, -0.0461931 , -6.65263011, -2.9581683 ,
         0.        ,  0.        ,  0.        ]])

In [8]:
# The features must be in the order that we used to construct the embeddings
features = ["wound_ruler", "wound_type", "wound_bed", "wound_depth", "wound_location"]

In [9]:
# Add the arrow to indicate the contribution of PCA components
for i, feature in enumerate(features):
    fig.add_annotation(
        ax=0, ay=0,
        axref="x", ayref="y",
        x=scaled_ndarray[0, i],
        y=scaled_ndarray[1, i],
        showarrow=True,
        arrowsize=2,
        arrowhead=1,
        xanchor="right",
        yanchor="top"
    )
    fig.add_annotation(
        x=scaled_ndarray[0, i],
        y=scaled_ndarray[1, i],
        ax=0, ay=0,
        xanchor="center",
        yanchor="bottom",
        text=feature,
        yshift=5,
    )
fig.show()