# A. Reverse PCA (experimental)

A notebook to apply PCA and try reversing it while calculating the overall accuracy of the process

In [None]:
# Enable these line if live changes in the codebase are made
%load_ext autoreload
%autoreload 2

In [None]:
# Disable tensorflow logging
import tensorflow as tf
import os
import logging
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '3'  # or any {'0', '1', '2'}
logging.getLogger('tensorflow').setLevel(logging.FATAL)

In [None]:
# Specific instruction to run the notebooks from a sub-folder.
import sys
sys.path.append("..")

In [None]:
import logging
from bugfinder.settings import LOGGER
from bugfinder.dataset import CWEClassificationDataset as Dataset
from bugfinder.models.dnn_classifier import DNNClassifierTraining
from bugfinder.models.linear_classifier import LinearClassifierTraining
from bugfinder.dataset.processing.dataset_ops import CopyDataset, RightFixer
from bugfinder.features.extraction.any_hop.all_flows import FeatureExtractor as AnyHopAllFlowsExtractor
from bugfinder.features.extraction.any_hop.single_flow import FeatureExtractor as AnyHopSingleFlowExtractor
from bugfinder.features.extraction.single_hop.raw import FeatureExtractor as SingleHopRawExtractor
from bugfinder.features.reduction.pca import FeatureSelector as PCA

from os.path import join, exists, basename, dirname
from shutil import rmtree, copytree

import tensorflow as tf
from sklearn.metrics import classification_report
from sklearn.model_selection import train_test_split

from bugfinder.dataset.processing import DatasetProcessing, DatasetProcessingCategory
from bugfinder.settings import LOGGER
from bugfinder.utils.statistics import has_better_metrics
from os.path import join, exists
from shutil import rmtree, copytree

import tensorflow as tf
from sklearn.metrics import classification_report
from sklearn.model_selection import train_test_split

from pprint import pprint
from copy import deepcopy
import numpy as np
import pandas as pd
import xlsxwriter
from xlsxwriter.utility import xl_rowcol_to_cell
import sklearn.decomposition
from tqdm.notebook import tqdm
import math
import pickle

In [None]:
# Setup logging to only output INFO level messages
LOGGER.setLevel(logging.INFO)
LOGGER.propagate = False

In [None]:
# test_dataset = "../data/cwe121_v112"
test_dataset = "../data/ds-rev-pca"
dataset = Dataset(test_dataset)

orig_cols = list(dataset.features.columns)
print(len(orig_cols))

In [None]:
df = dataset.features
df = df.drop(orig_cols[-2:], axis=1)

orig_cols = orig_cols[:-2]
print(len(orig_cols))

In [None]:
X = df.to_numpy()
mu = np.mean(X, axis=0)
n_comp = 50

# PCA
pca = sklearn.decomposition.PCA(n_components=n_comp)
pca.fit(X)

# Reverse PCA
Xhat = np.dot(pca.transform(X)[:,:n_comp], pca.components_[:n_comp,:])
Xhat += mu

In [None]:
max_items = X.shape[0]
# max_items = 1000
errors = [[], [], [], []]
avg_err_append = errors[0].append
med_err_append = errors[1].append
std_err_append = errors[2].append
est_err_append = errors[3].append

for item in tqdm(range(max_items)):    
    origX = list(X[item,])
    newX = list(Xhat[item,])
    assert len(origX)==len(newX)
    
    base_errors = list()
    base_errors_append = base_errors.append
    guess_errors = list()
    guess_errors_append = guess_errors.append

    for idx in range(len(origX)):
        orig = float(origX[idx])
        new = float(newX[idx])
        guess = int(round(new, 0))
        
        base_errors_append(abs(new-orig))
        guess_errors_append(int(abs(guess-orig)!=0))
        
    
    avg_err_append(np.mean(base_errors))
    med_err_append(np.median(base_errors))
    std_err_append(np.std(base_errors))
    est_err_append(sum(guess_errors))
        
print("Computing stats...")

avg_err = np.mean(errors[0])
median_err = np.mean(errors[1])
std_err = np.mean(errors[2])

err_count = sum(errors[3])
avg_err_count = np.mean(errors[3])
max_err_count = np.max(errors[3])

rate = np.array(errors[3]) / X.shape[1]
err_rate = np.mean(rate)
max_error_rate = np.max(rate)

print("Job done!")

In [None]:
print(f"*** Reverse PCA on {max_items} items with {X.shape[1]} rows ***\n")
print(f"{'=' * 5} Error count {'=' * 5}")
print(
    f"Dataset: {err_count} errors / {max_items * X.shape[1]} "
    f"({round(100 * err_count / (max_items * X.shape[1]), 2)}%)\n"
)
avg_per_item = round(avg_err_count, 2)
print(
    f"Single item: {avg_per_item} errors / item "
    f"({round(100 * avg_per_item / X.shape[1], 2)}%)\n"
)
print(
    f"Maximum for a single item: {max_err_count} errors"
    f"({round(100 * max_err_count / X.shape[1], 2)}%)\n"
)

In [None]:
pca_weights = deepcopy(pca.components_)

pprint(pca.components_.shape)
pprint(pca.components_)

In [None]:
pprint(mu.shape)
pprint(mu)

In [None]:
pca_weights = np.append([mu], pca_weights, axis=0)
pca_weights.shape

In [None]:
workbook  = xlsxwriter.Workbook('/mnt/data/ai_bugfinder/pca.weights.xlsx')
worksheet = workbook.add_worksheet()
cols = df.columns

worksheet.write(0, 0, "Mult.")
worksheet.write(0, 1, 1)

useless_fmt = workbook.add_format({'bg_color': '#FFC7CE',
                               'font_color': '#9C0006'})
useful_fmt = workbook.add_format({'bg_color': '#C6EFCE',
                               'font_color': '#006100'})
condition_useless = {
    'type': 'cell', 
    'criteria': '<',
    'value': None,
    'format': None
}
condition_useful = {
    'type': 'cell', 
    'criteria': '>=',
    'value': None,
    'format': None
}


for x in range(pca_weights.shape[0]-1):
    worksheet.write(x+3, 0, f"pca{x}")
    
worksheet.write(pca_weights.shape[0]+2, 0, "mu")

for y in range(len(cols)):
    worksheet.write(2, y+1, cols[y])

row_idx = 3
row_stats = list()

for row in pca_weights:
    col_idx = 1
    row_avg = np.mean(row)
    
    for item in row:
        worksheet.write(row_idx, col_idx, f"={item}*{xl_rowcol_to_cell(0, 1)}")
        col_idx += 1
        
    start = xl_rowcol_to_cell(row_idx, 1)
    end = xl_rowcol_to_cell(row_idx, col_idx-1)
    
    row_stats.append(np.argsort(row))
    
#     current_condition = deepcopy(condition_useless)
#     current_condition["value"] = .01 * row_avg
#     current_condition["format"] = useless_fmt
#     worksheet.conditional_format(f"{start}:{end}", current_condition)
    
#     current_condition = deepcopy(condition_useful)
#     current_condition["value"] = .99 * row_avg
#     current_condition["format"] = useful_fmt
#     worksheet.conditional_format(f"{start}:{end}", current_condition)
#     worksheet.conditional_format(f"{start}:{end}", {
#         "type": "3_color_scale",
#         "min_color": "red",
#         "mid_color": "white",
#         "max_color": "green"
#     })
    worksheet.conditional_format(f"{start}:{end}", {
        "type": "2_color_scale",
        "min_color": "white",
        "max_color": "green"
    })
    row_idx += 1

workbook.close()
print("Job done!")

In [None]:
summary = {col: [] for col in cols}
percentage = len(cols) / 100

for col_idx in range(len(cols)):
    for stat in row_stats:
#         summary[cols[col_idx]] = int(round(np.where(stat == col_idx)[0] / percentage, 0))
        summary[cols[col_idx]].append(int(np.where(stat == col_idx)[0][0] / percentage))
    

In [None]:
# print(np.mean(list(summary.values())))
stds = list()

for values in summary.values():
    stds.append(np.std(values))

print(np.mean(stds))

In [None]:
X = df.to_numpy()
mu = np.mean(X, axis=0)

n_comp = 50
pca = sklearn.decomposition.PCA(n_components=n_comp)
pca.fit(X)

Xhat = np.dot(pca.transform(X)[:,:n_comp], pca.components_[:n_comp,:])
Xhat += mu

workbook = xlsxwriter.Workbook("/mnt/data/ai_bugfinder/pca.xlsx")
worksheet = workbook.add_worksheet()

# Error correction test number
worksheet.write(1, 0, "Correction")
worksheet.write(1, 1, 0)

offset = 5
mean_xy = (offset, 0)
mean_xy_title = mean_xy[0]-2
mean_xy_avg = mean_xy[0]-1

# worksheet.write(mean_xy_title, mean_xy[1]+1, "avg err")
# worksheet.write(mean_xy_title, mean_xy[1]+2, "err count")
# worksheet.write(mean_xy_title, mean_xy[1]+3, "err rate")
# worksheet.write(mean_xy_title, mean_xy[1]+4, "avg err")
# worksheet.write(mean_xy_title, mean_xy[1]+5, "err count")
# worksheet.write(mean_xy_title, mean_xy[1]+6, "err rate")

# worksheet.write(mean_xy_avg, mean_xy[1], "avg.")
# worksheet.write(
#     mean_xy_avg, mean_xy[1]+1, 
#     f"=AVERAGE({xl_rowcol_to_cell(mean_xy[0], mean_xy[1]+1)}:{xl_rowcol_to_cell(mean_xy[0]+5000, mean_xy[1]+1)})"
# )
# worksheet.write(
#     mean_xy_avg, mean_xy[1]+2, 
#     f"=AVERAGE({xl_rowcol_to_cell(mean_xy[0], mean_xy[1]+2)}:{xl_rowcol_to_cell(mean_xy[0]+5000, mean_xy[1]+2)})"
# )
# worksheet.write(
#     mean_xy_avg, mean_xy[1]+3, 
#     f"=AVERAGE({xl_rowcol_to_cell(mean_xy[0], mean_xy[1]+3)}:{xl_rowcol_to_cell(mean_xy[0]+5000, mean_xy[1]+3)})"
# )
# worksheet.write(
#     mean_xy_avg, mean_xy[1]+4, 
#     f"=AVERAGE({xl_rowcol_to_cell(mean_xy[0], mean_xy[1]+4)}:{xl_rowcol_to_cell(mean_xy[0]+5000, mean_xy[1]+4)})"
# )
# worksheet.write(
#     mean_xy_avg, mean_xy[1]+5, 
#     f"=AVERAGE({xl_rowcol_to_cell(mean_xy[0], mean_xy[1]+5)}:{xl_rowcol_to_cell(mean_xy[0]+5000, mean_xy[1]+5)})"
# )
# worksheet.write(
#     mean_xy_avg, mean_xy[1]+6, 
#     f"=AVERAGE({xl_rowcol_to_cell(mean_xy[0], mean_xy[1]+6)}:{xl_rowcol_to_cell(mean_xy[0]+5000, mean_xy[1]+6)})"
# )

max_items = X.shape[0]

for item in tqdm(range(max_items)):    
    origX = list(X[item,])
    newX = list(Xhat[item,])

    assert len(origX)==len(newX)
    
#     col = int(2*offset*item + offset + 4)

    for idx in range(len(origX)):
        orig = float(origX[idx])
        new = float(newX[idx])
        
        guess = int(round(new, 0))
        
        
#         row = idx + offset
        
#         worksheet.write(row, col, orig)
#         worksheet.write(row, col+1, new)
#         worksheet.write(row, col+2, f"=ABS({xl_rowcol_to_cell(row, col)}-{xl_rowcol_to_cell(row, col+1)})")
#         worksheet.write(row, col+3, f"=ROUND({xl_rowcol_to_cell(row, col+1)}, 0)")
#         worksheet.write(row, col+4, f"={xl_rowcol_to_cell(row, col)}<>{xl_rowcol_to_cell(row, col+3)}")
#         worksheet.write(
#             row, col+5, 
#             f"=SIGN({xl_rowcol_to_cell(row, col+1)})*("
#             f"ABS({xl_rowcol_to_cell(row, col+1)})+{xl_rowcol_to_cell(1, 1)})"
#         )
#         worksheet.write(row, col+6, f"=ABS({xl_rowcol_to_cell(row, col)}-{xl_rowcol_to_cell(row, col+5)})")
#         worksheet.write(row, col+7, f"=ROUND({xl_rowcol_to_cell(row, col+5)}, 0)")
#         worksheet.write(row, col+8, f"={xl_rowcol_to_cell(row, col)}<>{xl_rowcol_to_cell(row, col+7)}")
    
#     worksheet.write(
#         mean_xy[0]+item, mean_xy[1], 
#         f"item{item}"
#     )
#     worksheet.write(
#         mean_xy[0]+item, mean_xy[1]+1, 
#         f"=AVERAGE({xl_rowcol_to_cell(offset, col+2)}:{xl_rowcol_to_cell(offset+len(origX), col+2)})"
#     )
#     worksheet.write(
#         mean_xy[0]+item, mean_xy[1]+2, 
#         f"=COUNTIF({xl_rowcol_to_cell(offset, col+4)}:{xl_rowcol_to_cell(offset+len(origX), col+4)}, TRUE)"
#     )
#     worksheet.write(
#         mean_xy[0]+item, mean_xy[1]+3, 
#         f"={xl_rowcol_to_cell(mean_xy[0]+item, mean_xy[1]+2)}/{len(origX)}"
#     )
#     worksheet.write(
#         mean_xy[0]+item, mean_xy[1]+4, 
#         f"=AVERAGE({xl_rowcol_to_cell(offset, col+6)}:{xl_rowcol_to_cell(offset+len(origX), col+6)})"
#     )
#     worksheet.write(
#         mean_xy[0]+item, mean_xy[1]+5, 
#         f"=COUNTIF({xl_rowcol_to_cell(offset, col+8)}:{xl_rowcol_to_cell(offset+len(origX), col+8)}, TRUE)"
#     )
#     worksheet.write(
#         mean_xy[0]+item, mean_xy[1]+6, 
#         f"={xl_rowcol_to_cell(mean_xy[0]+item, mean_xy[1]+5)}/{len(origX)}"
#     )
    
workbook.close()
print("Job done!")