In [1]:
import json
import pandas as pd
from pathlib import Path
import numpy as np
from sklearn.preprocessing import MultiLabelBinarizer
import h5py
import tqdm
import plotly.express as px
root = Path("../data/")
import pyperclip as pc
import sys
from sklearn.cluster import DBSCAN
from sklearn.metrics.pairwise import pairwise_distances 
from colorama import Fore, Style, init
import ipywidgets as widgets
from IPython.display import display, clear_output, HTML

In [2]:
# Adaptation of LineTracker notebook
# 1. get a distance matrix with more than 10 lines
distance_matrix = None
choice_build_log = ""
with h5py.File(root / "distances" / "distances_cos_13365.hdf5") as fp:
    for k in fp:
        choice_build_log = k
        distance_matrix = np.copy(fp[choice_build_log])
        if distance_matrix.shape[0] >= 10:
            break
assert distance_matrix is not None
# Apply normalization
distance_matrix /= 2
# 2. Get variables matrix
pathes = list(root.rglob("variables/variables_*.json"))
mapping = {}
variable_matrix = None
for f in tqdm.tqdm(pathes):
    with open(f) as fp:
        d = json.load(fp)
    if choice_build_log in d:
        v = d[choice_build_log]
        d1 = pd.DataFrame(v["log_df"])
        for i,l in enumerate(d1["Content"]):
            mapping[i] = l
        var_df = d1["ParameterList"]

        mlb = MultiLabelBinarizer(sparse_output=False)
        variable_matrix = pairwise_distances(mlb.fit_transform(var_df), metric="jaccard", n_jobs=-1)
        
        break
assert variable_matrix is not None
# 3. Get count matrix
n_lines = len(distance_matrix)
count_matrix = np.zeros((n_lines,n_lines))
for i in range(n_lines):
    for j in range(n_lines):
        count_matrix[i,j] = abs(i-j)
min_val = np.min(count_matrix)
max_val = np.max(count_matrix)
count_matrix = (count_matrix - min_val) / (max_val - min_val)
slider_distance = widgets.FloatSlider(value=0.6, min=0.0, max=1.0, step=0.01, description='slider_distance:')
slider_variables = widgets.FloatSlider(value=0.4, min=0.0, max=1.0, step=0.01, description='slider_variables:')
slider_line_dist = widgets.FloatSlider(value=0.0, min=0.0, max=1.0, step=0.01, description='slider_line_dist:')
slider_eps = widgets.FloatSlider(value=0.9, min=0.0, max=1.0, step=0.01, description='slider_eps:')
output = widgets.Output()

# Define a callback function that will be called when sliders are changed
def on_slider_change(change):
    global distance_matrix
    global variable_matrix
    global count_matrix
    with output:
        # Clear previous output
        clear_output(wait=True)
        
        # Print the values of the sliders
        alpha = slider_distance.value
        beta = slider_variables.value
        gamma = slider_line_dist.value
        eps = slider_eps.value
        if alpha+beta+gamma != 1:
            print("slider_distance+slider_variables+slider_line_dist should be equal to 1")
            return
        # 4. Build final matrix

        # New matrices, corrected by the weights
        distance_matrix_wtd = np.dot(alpha,distance_matrix)
        variable_matrix_wtd = np.dot(beta, variable_matrix)
        count_matrix_wtd = np.dot(gamma, count_matrix)
        # print(f"{distance_matrix.shape=} {variable_matrix_wtd.shape=} {count_matrix.shape=}")

        # Sums remaining matrices
        distance_matrix = np.asarray(distance_matrix_wtd + variable_matrix_wtd + count_matrix_wtd)
        print(f"Is the distance matrix symmetric? {np.allclose(distance_matrix, distance_matrix.T)}")
        clusterer = DBSCAN(eps=eps,min_samples=2,metric='precomputed',algorithm='auto',n_jobs=-1)
        clusterer.fit(distance_matrix)

        print (f"The number of clusters is {len(np.unique(clusterer.labels_))-1} including outliers? {-1 in clusterer.labels_}")
        # print ("The clusters of each element are {}".format(clusterer.labels_))

        ## Checks number of outliers
        cont = np.count_nonzero(clusterer.labels_ == -1)

        # print("The number of outliers is {}".format(cont))
        # print("The total amount of elements is {}".format(len(clusterer.labels_)))
        # Show the clusters
        available_colors = [Fore.RED, Fore.GREEN, Fore.YELLOW, Fore.BLUE, Fore.MAGENTA, Fore.CYAN]
        available_colors = ["red", "green", "brown", "blue", "orange", "darkblue"]
        color_mapping = {}
        for i, integer in enumerate(np.unique(clusterer.labels_)):
            color_mapping[integer] = available_colors[i % len(available_colors)]
        print("Mapping is ", color_mapping)
        for line_id,label in enumerate(clusterer.labels_):
            color = color_mapping.get(label, Fore.WHITE)
            html_code = f'<span style="color:{color}">{mapping[line_id]}</span>'
            display(HTML(html_code))
            # print(color + mapping[line_id] + Style.RESET_ALL)
slider_distance.observe(on_slider_change, names='value')
slider_variables.observe(on_slider_change, names='value')
slider_line_dist.observe(on_slider_change, names='value')
slider_eps.observe(on_slider_change, names='value')
display(slider_distance, slider_variables, slider_line_dist, slider_eps, output)

  0%|          | 0/50 [00:00<?, ?it/s]

 10%|█         | 5/50 [00:00<00:07,  5.86it/s]


FloatSlider(value=0.6, description='slider_distance:', max=1.0, step=0.01)

FloatSlider(value=0.4, description='slider_variables:', max=1.0, step=0.01)

FloatSlider(value=0.0, description='slider_line_dist:', max=1.0, step=0.01)

FloatSlider(value=0.9, description='slider_eps:', max=1.0, step=0.01)

Output()