In [None]:
import numpy as np
from itertools import combinations
import re
from sklearn.metrics import pairwise_distances
import plotly.graph_objects as go
import plotly.figure_factory as ff

import umap
class Frontera:
    """
    The Frontera class represents a frontier in a high-dimensional space and is used for feature extraction and dimensionality reduction. It takes as input the data points (X), their corresponding labels (y), and parameters for defining the frontier (percentil_min, percentil_max, and N_points_frontera).
    This class provides methods for calculating distances between points, as well as storing and managing information about categories, minimum distances, frontier points, and vectors.
    Attributes:
    - X: Data points
    - y: Labels for data points
    - percentil_min: Minimum percentile for the frontier
    - percentil_max: Maximum percentile for the frontier
    - N_points_frontera: Number of points for the frontier
    - dic_categorias: Dictionary for storing category information
    - dic_min_dst: Dictionary for storing minimum distances
    - list_dist_median: List for storing median distances
    - dic_categorias_UMAP: Dictionary for storing UMAP category information
    - dic_min_dst_UMAP: Dictionary for storing UMAP minimum distances
    - Frontier_Point_A_X: Dictionary for storing frontier points and their corresponding categories (A -> X)
    - Frontier_Point_X_A: Dictionary for storing frontier points and their corresponding categories (X -> A)
    - Frontier_Point: Dictionary for storing frontier points
    - Frontier_Vector: Dictionary for storing frontier vectors
    - class_vector: Dictionary for storing class vectors
    """
    def __init__(self, X, y, percentil_min, percentil_max, N_points_frontera):
        self.X = X  # Data points
        self.y = y  # Labels for data points
        self.percentil_min = percentil_min  # Minimum percentile for the frontier
        self.percentil_max = percentil_max  # Maximum percentile for the frontier
        self.N_points_frontera = N_points_frontera  # Number of points for the frontier
        
        # Dictionaries for storing category information and minimum distances
        self.dic_categorias = {}
        self.dic_min_dst = {}
        self.list_dist_median = []
        
        # Dictionaries for storing UMAP category information and minimum distances
        self.dic_categorias_UMAP = {}
        self.dic_min_dst_UMAP = {}
        
        # Dictionaries for storing frontier points and their corresponding categories
        self.Frontier_Point_A_X = {}
        self.Frontier_Point_X_A = {}
        
        # Dictionaries for storing frontier points, frontier vectors, and class vectors
        self.Frontier_Point = {}
        self.Frontier_Vector = {}
        self.class_vector = {}
        
    def distance(self, x0, x1):
        # This function calculates the pairwise squared Euclidean distance between two sets of points x0 and x1
        # The squared Euclidean distance between two points x and y in Euclidean space is given by the formula:
        # (x - y)^2 = x^2 - 2 * x * y + y^2
        
        # Calculate the pairwise squared Euclidean distance using the following steps:
        # 1. Compute the squared norms of x0 and x1: x0^2 and x1^2
        # 2. Compute the product of x0 and x1: x0 * x1
        # 3. Compute the pairwise distance matrix M_distance using the formula: x0^2 - 2 * x0 * x1 + x1^2
        
        M_distance = np.reshape(
            np.diag(np.dot(x0, x0.T)), (-1, 1)) - 2 * np.dot(x0, x1.T) + np.dot(np.ones((x0.shape[0], 1)), np.reshape(np.diag(np.dot(x1, x1.T)).T, (1, -1)))
        return M_distance
    def get_frontier(self):
        # Iterate through unique elements in self.y
        for i in np.unique(self.y): 
            # Create a dictionary with key format X_i and values being the corresponding X values when self.y is equal to i
            dic_categorias_aux = {'X_' + str(i): self.X[self.y == i]}
            # Update the main dictionary, self.dic_categorias
            self.dic_categorias.update(dic_categorias_aux)

        # Generate a list of all keys in self.dic_categorias
        categorias_list = [key for key in self.dic_categorias]
        # Generate all possible combinations of two categories
        comb_categories = combinations(categorias_list, 2)
        
        # Iterate through all category combinations
        for categories in list(comb_categories):
            # Calculate distance between the two categories using self.distance
            dist = self.distance(self.dic_categorias.get(categories[0]), self.dic_categorias.get(categories[1]))
            # Append median of the distance to self.list_dist_median
            self.list_dist_median.append(np.median(dist))

            # Calculate row-wise mean of the distance matrix
            row = np.mean(dist, axis=1)
            # Select indices of rows that are within the specified percentiles
            select_indices_row = np.where(
                (row > np.percentile(row, self.percentil_min)) & (row < np.percentile(row, self.percentil_max)))[0]
            # Get corresponding data points from the first category
            min_dst_row = self.dic_categorias.get(categories[0])[select_indices_row]
            # Update the self.dic_min_dst dictionary with the new values
            dic_min_dst_aux = {categories[0] + '_with_' + categories[1]: min_dst_row}
            self.dic_min_dst.update(dic_min_dst_aux)
            # Calculate column-wise mean of the distance matrix
            column = np.mean(dist, axis=0)
            # Select indices of columns that are within the specified percentiles
            select_indices_column = np.where(
                (column > np.percentile(column, self.percentil_min)) & (column < np.percentile(column, self.percentil_max)))[0]
            # Get corresponding data points from the second category
            min_dst_column = self.dic_categorias.get(categories[1])[select_indices_column]
            # Update the self.dic_min_dst dictionary with the new values
            dic_min_dst_aux = {categories[1] + '_with_' + categories[0]: min_dst_column}
            self.dic_min_dst.update(dic_min_dst_aux)

        # Generate a list of all keys in self.dic_min_dst
        list_all_frontier = [key for key in self.dic_min_dst]
        
        # Iterate through unique elements in self.y
        for i in np.unique(self.y):
            # Compile regex patterns for matching dictionary keys
            my_regex = r"^X_" + str(i)  # Regex pattern to match keys starting with "X_" and current element value
            p = re.compile(my_regex)   # Compile regex pattern
            list_A_with_X = [s for s in list_all_frontier if p.match(s)]  # Extract matching elements from list_all_frontier
            
            my_regex = r".*" + str(i) + r'$'  # Regex pattern to match keys ending with current element value
            p = re.compile(my_regex)   # Compile regex pattern
            list_X_with_A = [s for s in list_all_frontier if p.match(s)]  # Extract matching elements from list_all_frontier

            # Get the first elements in the lists
            mtz_A = self.dic_min_dst.get(list_A_with_X[0])  # Retrieve value from dic_min_dst using the first element in list_A_with_X
            mtz_X = self.dic_min_dst.get(list_X_with_A[0])  # Retrieve value from dic_min_dst using the first element in list_X_with_A
            
            # Concatenate remaining elements to mtz_A and mtz_X
            for j in range(1,len(list_A_with_X)):
                mtz_A = np.concatenate((mtz_A,self.dic_min_dst.get(list_A_with_X[j])), axis=0)
                mtz_X = np.concatenate((mtz_X,self.dic_min_dst.get(list_X_with_A[j])), axis=0)
                        
            # Create dictionaries to store updated values of mtz_A and mtz_X
            Front_Point_A_X ={'FrontPoints:(' + str(i) + ',X)' : mtz_A}    
            self.Frontier_Point_A_X.update(Front_Point_A_X)  # Update dictionary Frontier_Point_A_X with new values
            Front_Point_X_A ={'FrontPoints:(X,' + str(i) + ')' : mtz_X}    
            self.Frontier_Point_X_A.update(Front_Point_X_A)  # Update dictionary Frontier_Point_X_A with new values
            
    def centroid_regions(self):
        ## CALCULAR LOS CENTROIDES EN REGION DE FRONTERA
        
        # Loop through the dictionaries containing the frontier points
        for (key_A,value_A), (key_X,value_X) in zip(self.Frontier_Point_A_X.items(), self.Frontier_Point_X_A.items()):
            
            # Make copies of the dictionaries to avoid modifying the original dictionaries
            Point_A_X_copy = self.Frontier_Point_A_X.copy()
            Point_X_A_copy = self.Frontier_Point_X_A.copy()
            Front_Point = {}
            
            # Create a matrix to store the points and their coordinates
            points_matriz = np.zeros(shape=(self.N_points_frontera, self.X.shape[1]))
            
            # Iterate through the points on the frontier
            for i in range(0,self.N_points_frontera):
                
                # Calculate the distances between the A and X points
                dist = self.distance(Point_A_X_copy.get(key_A), Point_X_A_copy.get(key_X))
                
                # Find the indices of the minimum distance for row and column
                min_dist_A_with_X = np.where( dist==np.min(dist) )[0]
                min_dist_X_with_A = np.where( dist==np.min(dist) )[1]
                
                # Retrieve the A and X points with the minimum distance
                min_A_with_X = Point_A_X_copy.get(key_A)[min_dist_A_with_X]
                min_X_with_A = Point_X_A_copy.get(key_X)[min_dist_X_with_A]
                
                # Remove the selected points from the dictionaries
                Point_A_X_copy.update({ key_A: np.delete(Point_A_X_copy.get(key_A), min_dist_A_with_X, axis=0)})
                Point_X_A_copy.update({ key_X: np.delete(Point_X_A_copy.get(key_X), min_dist_X_with_A, axis=0)})
                
                # Calculate the centroid of the selected points and store it in the matrix
                point_value = (np.mean(min_A_with_X+min_X_with_A,axis=0))/2
                
                # Check for closeness with existing points and store only unique points
                if i != 0:
                    closeness_criterion = self.distance(np.reshape(point_value,(-1,self.X.shape[1])), points_matriz) < 0.8          #################
                    
                    if not np.any(closeness_criterion == True):
                        points_matriz[i] = point_value
                else:
                    points_matriz[i] = point_value
                
                # If one of the dictionaries is empty, exit the loop
                if (Point_A_X_copy.get(key_A).shape[0] == 0) or (Point_X_A_copy.get(key_X).shape[0] == 0):
                    break
            
            # Remove unused rows from the matrix
            row_points_matriz = np.sum(points_matriz, axis=1)
            select_indices_points_matriz = np.where( row_points_matriz != 0 )[0]
            points_matriz = points_matriz[select_indices_points_matriz]
            
            # Create a dictionary to store the frontier points
            Front_Point ={'Frontier:' + key_A.split(':')[1] : points_matriz}    
            self.Frontier_Point.update(Front_Point)  # Update the dictionary with the frontier points
        
    def calculate_vectors(self):
        # Loop through the categories and their associated frontier points
        for (key_region,region), (key_Frontier,Frontier_Point) in zip(self.dic_categorias.items(), self.Frontier_Point.items()):
            
            # Create a matrix to store the vectors
            Front_vector = {}
            n_row = 0
            vectors_matriz = np.zeros(shape=(self.Frontier_Point.get(key_Frontier).shape[0], self.X.shape[1]))
            
            # Iterate through the frontier points
            for key_Frontier in self.Frontier_Point.get(key_Frontier):

                # Calculate the distances between the category and the frontier point
                dist = self.distance(self.dic_categorias.get(key_region), np.reshape(key_Frontier,(-1,self.X.shape[1])) ) 

                # Find the indices of the minimum distance
                min_dist_A_with_B = np.where( dist <= 2)[0]      #################
                
                # Retrieve the category points with the minimum distance
                min_A_with_B = self.dic_categorias.get(key_region)[min_dist_A_with_B]        
                
                # Calculate the vector and store it in the matrix
                vector_value = np.median(min_A_with_B,axis=0)
                vectors_matriz[n_row] = vector_value
            
                n_row += 1
            
            # Create a dictionary to store the vectors for the current frontier
            Front_vector ={'Vec_Frontier:(X,' + key_region.split('_')[-1] + ')' : vectors_matriz}    
            self.Frontier_Vector.update(Front_vector) 
            
        # Loop through the frontier points and their associated vectors
        for (key_origin,value_origin), (key_vec,value_vector) in zip(self.Frontier_Point.items(), self.Frontier_Vector.items()):
            
            # Calculate the class vector by subtracting the frontier point from the vector
            vector_aux = np.concatenate((value_origin,value_vector-value_origin), axis=1)
            class_vec = {'Class_vector:' + key_vec.split(':')[1] : vector_aux}
            self.class_vector.update(class_vec)
    def filter_vector(self):
        # Combine all frontier points into a single matrix
        mtz_origin = self.Frontier_Point.get([key for key in self.Frontier_Point][0])
        for j in range(1, len(self.Frontier_Point)):
            mtz_origin = np.concatenate((mtz_origin, self.Frontier_Point.get([key for key in self.Frontier_Point][j])), axis=0)

        # Calculate distance matrix between all frontier points and replace diagonal with max distance
        dist_mtz_origin = self.distance(mtz_origin, mtz_origin)
        dis_max = np.max(dist_mtz_origin)
        indent_max = np.nan_to_num(np.identity(dist_mtz_origin.shape[0]) * dis_max)
        dist_mtz_origin = (dist_mtz_origin + indent_max)

        # Create a boolean matrix for distances less than 2
        mtz_bool_eucl = dist_mtz_origin < 2

        # Combine all class vectors into a single matrix
        mtz_vec = self.class_vector.get([key for key in self.class_vector][0])
        for j in range(1, len(self.class_vector)):
            mtz_vec = np.concatenate((mtz_vec, self.class_vector.get([key for key in self.class_vector][j])), axis=0)
        mtz_vec = mtz_vec[:, self.X.shape[1]:]

        # First filter: filter by cosine distance
        dist_cosine = 1 - pairwise_distances(mtz_vec, metric="cosine")
        mtz_bool_cos = dist_cosine <= -0.9

        # Create an upper triangular boolean matrix
        diag = np.triu(np.ones(dist_mtz_origin.shape), 1).T == 1

        # Filter points based on boolean matrices and the upper triangular matrix
        regla = mtz_bool_cos & mtz_bool_eucl & diag
        select_indices_1 = np.where(regla == True)[0]

        # Update origin and vectors based on first filter
        origin_redu = mtz_origin[select_indices_1]
        vec_redu = mtz_vec[select_indices_1]

        # Update dictionaries with reduced frontier points and vectors
        self.Frontier_Point.update({'Frontier: Reduct': origin_redu})
        self.Frontier_Vector.update({'Vec_Frontier: Reduct': vec_redu + origin_redu})
        self.class_vector.update({'Class_vector_Reduct:': np.concatenate((origin_redu, vec_redu), axis=1)})

        # Second filter: filter by cosine distance again
        dist_cosine_2 = 1 - pairwise_distances(vec_redu, metric="cosine")
        mtz_bool_cos_2 = dist_cosine_2 <= 0.8

        # Create an upper triangular boolean matrix
        diag_2 = np.triu(np.ones(dist_cosine_2.shape), 1).T == 1

        # Filter points based on boolean matrices and the upper triangular matrix
        regla_2 = mtz_bool_cos_2 & diag_2
        select_indices_1_1 = np.where(regla_2 == True)[0]

        # Update origin and vectors based on second filter
        origin_redu_2 = origin_redu[select_indices_1_1]
        vec_redu_2 = vec_redu[select_indices_1_1]

        # Update dictionaries with fully reduced frontier points and vectors
        Front_Point = {'Frontier: Full Reduct': origin_redu_2}
        self.Frontier_Point.update(Front_Point)
        Front_vector ={'Vec_Frontier: Full Reduct' : vec_redu_2+origin_redu_2}    
        self.Frontier_Vector.update(Front_vector) 
        class_vec = {'Class_vector_Full_Reduct:': np.concatenate((origin_redu_2,vec_redu_2), axis=1)}
        self.class_vector.update(class_vec) 
        self.color_list = [0] * len(self.dic_categorias.keys())
        for i in range( len(self.dic_categorias.keys()) ):
            self.color_list[i] = np.random.randint(0, 1000)
    def frontier(self):
        self.get_frontier()
        self.centroid_regions()
        self.calculate_vectors()
        self.filter_vector()
    def plot_muestra_2D(self, col_1, col_2, include_layout=True):
        # Set initial state for the loop
        door = True
        next_color = 0

        # Loop through the categories in the dictionary
        for key, value in self.dic_categorias.items():
            # If it's the first iteration, create a new plot with the first category's data
            if door:
                fig = go.Figure(data=[go.Scatter(x=value[:, col_1], y=value[:, col_2],
                                                 mode='markers',
                                                 name=key,
                                                 marker=dict(
                                                     size=6,
                                                     # Set color to an array/list of desired values
                                                     # color=self.color_list[next_color],
                                                     colorscale='picnic',  # Choose a colorscale
                                                     opacity=0.7)
                                                 )])
                # Change the door variable to False, indicating that the first iteration is done
                door = False
                # Increment the color index
                next_color += 1
            else:
                # For the rest of the categories, add them to the existing plot
                fig.add_trace(go.Scatter(x=value[:, col_1], y=value[:, col_2],
                                         mode='markers',
                                         name=key,
                                         marker=dict(
                                             size=6,
                                             # Set color to an array/list of desired values
                                             # color=self.color_list[next_color],
                                             colorscale='picnic',  # Choose a colorscale
                                             opacity=0.7)
                                         ))
                # Increment the color index
                next_color += 1

        # If the include_layout parameter is True, update the layout properties of the plot
        if include_layout:
            fig.update_layout(
                autosize=False,
                width=600,
                height=600,
                margin=dict(l=0, r=0, b=0, t=10))

            # Display the plot
            fig.show()
    def plot_frontera_2D(self, col_1, col_2):
        # Set initial state for the loop
        door = True
        next_color = 0

        # Loop through the categories in the dictionary
        for key, value in self.dic_categorias.items():
            # If it's the first iteration, create a new plot with the first category's data
            if door:
                fig = go.Figure(data=[go.Scatter(x=value[:, col_1], y=value[:, col_2],
                                                 mode='markers',
                                                 name=key,
                                                 marker=dict(
                                                     size=6,
                                                     # Set color to an array/list of desired values
                                                     # color=self.color_list[next_color],
                                                     colorscale='picnic',  # Choose a colorscale
                                                     opacity=0.7)
                                                 )])
                # Change the door variable to False, indicating that the first iteration is done
                door = False
                # Increment the color index
                next_color += 1
            else:
                # For the rest of the categories, add them to the existing plot
                fig.add_trace(go.Scatter(x=value[:, col_1], y=value[:, col_2],
                                         mode='markers',
                                         name=key,
                                         marker=dict(
                                             size=6,
                                             # Set color to an array/list of desired values
                                             # color=self.color_list[next_color],
                                             colorscale='picnic',  # Choose a colorscale
                                             opacity=0.7)
                                         ))
                # Increment the color index
                next_color += 1

        # Loop through the dictionaries containing frontier points and add them to the plot
        for point_dict in [self.Frontier_Point_A_X, self.Frontier_Point_X_A, self.Frontier_Point]:
            for key, value_dst in point_dict.items():
                # Configure the marker symbol and size based on the dictionary being processed
                if point_dict == self.Frontier_Point:
                    symbol = 300
                    size = 50
                else:
                    symbol = 220
                    size = 14

                # Add the frontier points to the plot
                fig.add_trace(go.Scatter(x=value_dst[:, col_1], y=value_dst[:, col_2],
                                         mode='markers',
                                         name=key,
                                         marker=dict(
                                             symbol=symbol,
                                             size=size,
                                             color=np.random.randint(100),  # Set color to a random integer value
                                             # colorscale='Viridis',  # Choose a colorscale
                                             opacity=1)
                                         ))

        # Update the layout properties of the plot
        fig.update_layout(
            autosize=True,
            width=1200,
            height=750,
            margin=dict(l=10, r=10, b=10, t=20))

        # Display the plot
        fig.show()
    def plot_Vectors(self, col_1, col_2):
        # Set initial state for the loop
        door = True
        next_color = 0

        # Loop through the categories in the dictionary
        for key, value in self.dic_categorias.items():
            # If it's the first iteration, create a new plot with the first category's data
            if door:
                fig = go.Figure(data=[go.Scatter(x=value[:, col_1], y=value[:, col_2],
                                                 mode='markers',
                                                 name=key,
                                                 marker=dict(
                                                     size=6,
                                                     # Set color to an array/list of desired values
                                                     # color=self.color_list[next_color],
                                                     colorscale='picnic',  # Choose a colorscale
                                                     opacity=0.7)
                                                 )])
                # Change the door variable to False, indicating that the first iteration is done
                door = False
                # Increment the color index
                next_color += 1
            else:
                # For the rest of the categories, add them to the existing plot
                fig.add_trace(go.Scatter(x=value[:, col_1], y=value[:, col_2],
                                         mode='markers',
                                         name=key,
                                         marker=dict(
                                             size=6,
                                             # Set color to an array/list of desired values
                                             # color=self.color_list[next_color],
                                             colorscale='picnic',  # Choose a colorscale
                                             opacity=0.7)
                                         ))
                # Increment the color index
                next_color += 1

        # Loop through the dictionaries containing frontier points and vectors
        for point_dict, symbol, size in zip([self.Frontier_Point, self.Frontier_Vector], [300, 2], [50, 15]):
            for key, value_dst in point_dict.items():
                # Add the points or vectors to the plot
                fig.add_trace(go.Scatter(x=value_dst[:, col_1], y=value_dst[:, col_2],
                                         mode='markers',
                                         name=key,
                                         marker=dict(
                                             symbol=symbol,
                                             size=size,
                                             color=np.random.randint(100),  # Set color to a random integer value
                                             # colorscale='Viridis',  # Choose a colorscale
                                             opacity=1)
                                         ))

        # Loop through the class vectors and create quiver plots
        for key, value in self.class_vector.items():
            # Calculate the midpoints for the u and v components of the quiver plot
            col_1_ = int(col_1 + value.shape[1] / 2)
            col_2_ = int(col_2 + value.shape[1] / 2)

            # Create a quiver plot for the class vector
            quiver_fig = ff.create_quiver(x=value[:, col_1], y=value[:, col_2],
                                          u=value[:, col_1_], v=value[:, col_2_],
                                          scale=1,
                                          arrow_scale=.3,
                                          name=key,
                                          line_width=1.5)

            # Add the quiver plot to the main plot
            fig.add_traces(data=quiver_fig.data)

        # Update the layout properties of the plot
        fig.update_layout(
            autosize=True,
            width=1200,
            height=750,
            margin=dict(l=10, r=10, b=10, t=20))

        # Display the plot
        fig.show()
    def plot_UMAP(self):
        # Perform UMAP dimensionality reduction on the data
        trans = umap.UMAP(random_state=42).fit(self.X)
    
        # Transform the original data points and store them in a new dictionary
        for key, value in self.dic_categorias.items():
            value_UMAP = trans.transform(value)
            dic_categorias_aux = {key: value_UMAP}
            self.dic_categorias_UMAP.update(dic_categorias_aux)
    
        # Transform the minimum distance points and store them in a new dictionary
        for key, value in self.dic_min_dst.items():
            value_UMAP = trans.transform(value)
            dic_min_dst_aux = {key: value_UMAP}
            self.dic_min_dst_UMAP.update(dic_min_dst_aux)
    
        # Set initial state for the loop
        door = True
        next_color = 0
    
        # Loop through the categories in the UMAP transformed dictionary
        for key, value in self.dic_categorias_UMAP.items():
            # If it's the first iteration, create a new plot with the first category's data
            if door:
                fig = go.Figure(data=[go.Scatter(x=value[:, 0], y=value[:, 1],
                                                 mode='markers',
                                                 name=key,
                                                 marker=dict(
                                                     size=6,
                                                     # color=self.color_list[next_color],  # Set color to an array/list of desired values
                                                     colorscale='picnic',  # Choose a colorscale
                                                     opacity=0.7)
                                                 )])
                # Change the door variable to False, indicating that the first iteration is done
                door = False
                # Increment the color index
                next_color += 1
            else:
                # For the rest of the categories, add them to the existing plot
                fig.add_trace(go.Scatter(x=value[:, 0], y=value[:, 1],
                                         mode='markers',
                                         name=key,
                                         marker=dict(
                                             size=6,
                                             # color=self.color_list[next_color],  # Set color to an array/list of desired values
                                             colorscale='picnic',  # Choose a colorscale
                                             opacity=0.7)
                                         ))
                # Increment the color index
                next_color += 1
    
        # Loop through the minimum distance points dictionary and add them to the plot
        for key, value_dst in self.dic_min_dst_UMAP.items():
            fig.add_trace(go.Scatter(x=value_dst[:, 0], y=value_dst[:, 1],
                                     mode='markers',
                                     name=key,
                                     marker=dict(
                                         symbol=220,
                                         size=14,
                                         color=np.random.randint(100),  # Set color to an array/list of desired values
                                         # colorscale='Viridis',  # Choose a colorscale
                                         opacity=1)
                                     ))
    
        # Update the layout properties of the plot
        fig.update_layout(
            autosize=True,
            width=800,
            height=600,
            margin=dict(l=10, r=10, b=10, t=20))
    
        # Display the plot
        fig.show()