In [1]:
import time
import pandas as pd
import numpy as np
from sklearn.decomposition import PCA
from scipy.ndimage import gaussian_filter, median_filter
import plotly.express as px

## PREPROCESSING

In [2]:
X = pd.read_csv("src/R.csv")
Y = pd.read_csv("src/G.csv")
Z = pd.read_csv("src/B.csv")

In [21]:
X.shape

(49, 4900)

In [6]:
X_train = X.sample(10)

## DISPLAY

In [7]:
def show_images(data: pd.DataFrame, title="Result", col_wrap=10):
    side_length = int(np.sqrt(data.shape[1]))
    fig = px.imshow(data.values.reshape(-1, side_length, side_length),
                    binary_string=True,
                    facet_col=0,
                    facet_col_wrap=col_wrap,
                    title=title)
    fig.show()

In [8]:
col_wrap = 10
show_images(X_train, "Original", col_wrap=col_wrap)

In [9]:
def apply_filters(images, side_length: int, sigma: int = None, size: int = None):
    rows = []
    for image in images:
        image = image.reshape(side_length, side_length)
        if size:
            image = median_filter(image, size=size)
        if sigma:
            image = gaussian_filter(image, sigma=sigma)
        rows.append(image.flatten())

    return pd.DataFrame(rows)

In [10]:
class MyPCA:
    def __init__(self, n_components: int = None) -> None:
        self._components = n_components

    def normalize_x(self, X):
        mean = np.mean(X, axis=0)
        std_dev = np.std(X, axis=0)

        X = (X - mean) / std_dev

        return X, mean, std_dev

    def fit_transform(self, X):
        if self._components is None:
            self._components = X.shape[1]

        # 1. Normalize X
        X, self._X_mean, self._X_std_dev = self.normalize_x(X)

        # 2. Calculate covariance matrix
        cov_matrix = np.cov(X, rowvar=False)

        # 3. Calculate eigen-vectors and eigen-values
        self._eig_vals, self._eig_vecs = np.linalg.eig(cov_matrix)

        # 4. Deduct PCA
        indices = np.argsort(self._eig_vals)[::-1]

        sort_eig_vecs = self._eig_vecs[:, indices]
        self._sort_eig_vals = self._eig_vals[indices]

        self._sel_eig_vecs = sort_eig_vecs[:, :self._components]

        return np.dot(X, self._sel_eig_vecs)

    def explained_variance_ratio_(self):
        return np.real(self._sort_eig_vals / np.sum(self._sort_eig_vals))[:self._components]

    def inverse_transform(self, X_pca):
        dot_product = np.dot(X_pca, self._sel_eig_vecs.T)
        return np.real((dot_product * self._X_std_dev) + self._X_mean)  # avoid complex issues

In [17]:
def denoise(images: pd.DataFrame,
            pca_components: float = None,
            gaussian_strength: int = None,
            median_strength: int = None):

    side_length = int(np.sqrt(images.shape[1]))

    my_pca = MyPCA(n_components=pca_components)
    images = my_pca.fit_transform(images.values)
    images = my_pca.inverse_transform(images)

    images = apply_filters(images,
                           side_length,
                           sigma=gaussian_strength,
                           size=median_strength)

    return pd.DataFrame(images), my_pca

In [18]:
start = time.time()
my_pca_result, my_pca = denoise(X_train, pca_components=8, gaussian_strength=.5, median_strength=1)
show_images(my_pca_result, "Sample", col_wrap=col_wrap)
my_time = time.time() - start

## ANALYSING RESULTS

In [19]:
print(f"Manual PCA -> {my_pca.explained_variance_ratio_().sum()} in {my_time} seconds")

Manual PCA -> 0.9642583171094041 in 43.51296138763428 seconds


## PRODUCTION

In [23]:
for layer, df in zip(("R", "G", "B"), (X, Y, Z)):
    result, pca = denoise(df, pca_components=30, gaussian_strength=.5, median_strength=1)
    result.to_csv(f"output/denoised_{layer}.csv", index=False)