In [None]:
import numpy as np
import pandas as pd
from typing import Optional, Union, List, Tuple, Dict, Any

from sklearn.base import BaseEstimator, TransformerMixin
from .utils import *


In [None]:
class FuzzyGranularitySelector(BaseEstimator, TransformerMixin):

    def __init__(self, k: int = 3, eps: float = 0.5, d: int = 10, sigma: int = 10, random_state: Optional[int] = None):
        if not isinstance(k, int) or k <= 0 or k > d:
            raise ValueError("k must be a positive integer and less or equal d.")
        if not isinstance(eps, (int, float)) or eps <= 0:
            raise ValueError("eps must be a positive number.")
        if not isinstance(d, int) or d <= 0:
            raise ValueError("d must be a positive integer.")
        if not isinstance(sigma, int) or not (1 <= sigma <= 100):
            raise ValueError("sigma must be an integer in [1, 100].")
        if random_state is not None and not isinstance(random_state, int):
            raise ValueError("random_state must be an integer or None.")

        self.k = k
        self.eps = float(eps)
        self.d = int(d)
        self.sigma = int(sigma)
        self.random_state = random_state

        self.S: Optional[List[int]] = None

        self._U: Optional[pd.DataFrame] = None
        self._delta_cache: Dict[Any, Any] = {}
        self._D: Tuple[str, str] = ()
        self._n: int = 0
        self._m: int = 0
        self._target_name: str = "target"
        self._fuzzy_adaptive_neighbourhood_radius: Dict[str, Optional[float]] = {}
        self._similarity_matrices: Dict[str, np.ndarray] = {}
        self._D_partition: Dict[Any, pd.DataFrame] = {}

    def fit(self, X: Union[pd.DataFrame, np.ndarray, List[List[Any]]], y: Optional[Union[pd.Series, np.ndarray, pd.DataFrame]] = None):
        #X = check_input_dataset(X, allow_nan=False)
        X = pd.DataFrame(X) ## do zastapienia

        if y is not None and isinstance(y, (np.ndarray, pd.Series, pd.DataFrame)) and len(y) != len(X):
            raise ValueError("X and y must have the same number of rows.")
        

        i = 1
        while self._target_name in X.columns:
            self._target_name = f"target_{i}"
            i += 1

        if y is None:
            y_ser = pd.Series(np.zeros(len(X), dtype=int), name=self._target_name)
        elif isinstance(y, pd.DataFrame):
            y_ser = y.iloc[:, 0]
        else:
            y_ser = pd.Series(y).reset_index(drop=True)
            y_ser.name = self._target_name

        self._U = X.copy()
        self._U[self._target_name] = y_ser.values
        self._n = len(self._U)

        self._C = { col: "numeric" if pd.api.types.is_numeric_dtype(X[col]) else "nominal" for col in X.columns}
        self._m = len(self._C)

        self._D = { self._target_name: "numeric" if pd.api.types.is_numeric_dtype(self._U[self._target_name]) else "nominal"}

        self._fuzzy_adaptive_neighbourhood_radius = {}
        for col_name, col_type in {**self._C, **self._D}.items():
            if col_type == "numeric":
                std_val = float(self._U[col_name].std(ddof=0))
                self._fuzzy_adaptive_neighbourhood_radius[col_name] = std_val / self.eps if self.eps != 0 else 0.0
            else:
                self._fuzzy_adaptive_neighbourhood_radius[col_name] = None

        self._delta_cache = {}
        self._entropy_cache = {}
        self._D_partition = self._create_partitions()

        for col in self._U.columns:
            self._similarity_matrices[col] = self._calculate_similarity_matrix_for_df(col, self._U)

        self.S = self._FIGFS_algorithm()
        return self
    

    def transform(self, X: Union[pd.DataFrame, np.ndarray, List[List[Any]]]) -> pd.DataFrame:
        #X = check_input_dataset(X, allow_nan=False)
        X = pd.DataFrame(X) ## do zastapienia

        if self._C is None:
            raise RuntimeError("fit() must be called before transform().")
        if list(X.columns) != list(self._C.keys()):
            raise ValueError("Input X columns differ from those used in fit().")

        X_transformed = X.copy()
        final_cols = self.S[:self.k]

        return X_transformed[final_cols].copy()
    

    def _calculate_similarity_matrix_for_df(self, colname: str, df: pd.DataFrame) -> np.ndarray:
        """
        Compute fuzzy similarity matrix for a single column (numeric or categorical),
        working correctly in both global and local contexts.

        Parameters
        ----------
        colname : str
            Column name (can refer to position in df or in self.C).
        df : pd.DataFrame
            DataFrame containing the data (global or local context).

        Returns
        -------
        np.ndarray
            n x n fuzzy similarity matrix.
        """
        if colname in self._C:
            col_type = self._C[colname]

        elif colname in self._D:
            col_type = self._D[colname]

        vals = df[colname].values
        n = len(df)
        mat = np.zeros((n, n), dtype=float)

        if col_type == 'numeric':
            sd = float(df[colname].std(ddof=0)) if n > 1 else 0.0
            denom = 1.0 + sd

            radius = self._fuzzy_adaptive_neighbourhood_radius[colname]

            for i in range(n):
                diff = np.abs(vals[i] - vals)
                sim = 1.0 - (diff / denom)
                sim = np.clip(sim, 0.0, 1.0)

                if radius is None:
                    mat[i, :] = sim
                else:
                    thresh = 1.0 - radius
                    mat[i, :] = np.where(sim >= thresh, sim, 0.0)
        else: 
            for i in range(n):
                mat[i, :] = (vals[i] == vals).astype(float)

        return mat
    

    def _calculate_delta_for_column_subset(self, row_index: int, B: List[str], df: Optional[pd.DataFrame] = None) -> Tuple[np.ndarray, float]:
        """
        Calculate granule membership vector and size for a given row and subset of features.

        Parameters
        ----------
        row_index : int
            Row index in the DataFrame.
        B : List[str]
            List of column names representing feature subset.
        df : Optional[pd.DataFrame]
            Local DataFrame context. If None, use global self.U.

        Returns
        -------
        Tuple[np.ndarray, float]
            Tuple containing granule_vector and size
        """
        if df is None:
            df = self._U.copy()
            use_global = True
        else:
            df = df.reset_index(drop=True).copy()
            use_global = False

        if use_global and row_index in self._delta_cache:
            return self._delta_cache[row_index]

        mats = []

        for colname in B:
            if colname == self._target_name:
                y_vals = df[colname].values
                current_class = y_vals[row_index]
                vec = (y_vals == current_class).astype(float)
            else:
                if use_global:
                    mat = self._similarity_matrices[colname]
                    if mat is None:
                        mat = self._calculate_similarity_matrix_for_df(colname, df)
                        self._similarity_matrices[colname] = mat
                else:
                    mat = self._calculate_similarity_matrix_for_df(colname, df)

                vec = mat[row_index, :].astype(float)

            mats.append(vec)

        if len(mats) == 0:
            granule = np.zeros(len(df), dtype=float)
        else:
            granule = np.minimum.reduce(mats)

        size = float(np.sum(granule))
        if use_global:
            self._delta_cache[row_index] = (granule, size)
        
        return granule, size
    


    def _calculate_multi_granularity_fuzzy_implication_entropy(self, B: List[str], type: str = 'basic', T: Optional[List[str]] = None)-> float:
        """
        Measure the uncertainty or fuzziness of information granules
        formed by a subset of features B, optionally conditioned on another subset T.

        Parameters
        ----------
        B : List[str]
            Feature subset columns.
        type : str
            Entropy type ('basic', 'conditional', 'joint', 'mutual').
        T : Optional[List[str]]
            Optional secondary feature subset for conditional/mutual entropy.

        Returns
        -------
        float
            Entropy value of the subset.
        """

        B_tuple = tuple(B) if B is not None else ()
        T_tuple = tuple(T) if T is not None else ()

        res = 0.0

        if len(B_tuple) == 0:
            return 0.0

        for i in range(self._n):
            delta_B_size = self._calculate_delta_for_column_subset(i, B_tuple)[1]
            delta_T_size = self._calculate_delta_for_column_subset(i, T_tuple)[1] if len(T_tuple) > 0 else 0.0

            if type == 'basic':
                res += (1.0 - delta_B_size / max(self._n, 1.0))
            elif type == 'conditional':
                res += max(delta_B_size, delta_T_size) - delta_B_size
            elif type == 'joint':
                res += 1.0 + max(delta_B_size, delta_T_size) / max(self._n,1.0) - (delta_B_size + delta_T_size) / max(self._n,1.0)
            else:
                res += 1.0 - max(delta_B_size, delta_T_size) / max(self._n,1.0)

        if type == 'conditional':
            out = res / (self._n ** 2 if self._n > 0 else 1.0)
        else:
            out = res / max(self._n, 1.0)

        return out
    

    def _granular_consistency_of_B_subset(self, B: List[str]) -> float:
        """
        Measure how well a subset of features B preserves the structure of the target variable D in terms of fuzzy information granules.

        Parameters
        ----------
        B : list
            List of feature names representing the subset B.

        Returns
        -------
        float
            Granularity consistency score in the range [0,1], where 1 indicates perfect
            consistency (granules align exactly with the target classes) and 0 indicates
            maximum inconsistency.
        """

        total = 0.0
        y_vals = self._U[self._target_name].values
        
        for i in range(self._n):
            delta_b_vec = np.array(self._calculate_delta_for_column_subset(i, B)[0])
            
            target_vec = (y_vals == y_vals[i]).astype(float)
            
            delta_B_minus_D = np.maximum(0, delta_b_vec - target_vec)
            D_minus_delta_B = np.maximum(0, target_vec - delta_b_vec)
            
            diff_norm = np.sum(delta_B_minus_D + D_minus_delta_B) / self._n
            score_i = 1.0 - diff_norm
            
            total += score_i
        
        return total / self._n


    def _local_granularity_consistency_of_B_subset(self, B: List[str]) -> float:

        """
        Evaluates how consistent the fuzzy granules of B are within each
        class-specific partition of the dataset.

        Parameters
        ----------
        B : List[str]
            List of feature subset columns.

        Returns
        -------
        float
            Average local granularity consistency across all partitions.
        """

        total = 0.0

        for key, df_part in self._D_partition.items():
            df_local = df_part.reset_index(drop=True)
            part_n = len(df_local)
            res = 0.0
            for i_local in range(part_n):
                _, delta_df_size = self._calculate_delta_for_column_subset(i_local, B, df=df_local)
                row_series = df_local.iloc[i_local]
                mask = np.all(self._U[df_local.columns].values == row_series.values, axis=1)
                if not np.any(mask):
                    ratio = 1.0
                else:
                    global_idx = np.where(mask)[0][0]
                    _, delta_U_size = self._calculate_delta_for_column_subset(int(global_idx), B, df=None)
                    ratio = delta_df_size / delta_U_size
                res += ratio
            total += (res / part_n)
        return total / len(self._D_partition)
        
    

    def _create_partitions(self) -> Dict[Any, pd.DataFrame]:
        """
        Partition the dataset into subsets according to target values.

        Returns
        -------
        Dict[Any, pd.DataFrame]
            Dictionary mapping each target class value to a sub-DataFrame
            containing only the objects belonging to that class.
        """
        partitions = {}
        vals = self._U[self._target_name].unique()
        for v in vals:
            partitions[v] = self._U[self._U[self._target_name] == v].reset_index(drop=True).copy()
        return partitions
    

    def _FIGFS_algorithm(self):
        """
        Execute the Fuzzy Implication Granularity-based Feature Selection (FIGFS) algorithm.

        FIGFS iteratively selects features that maximize granularity consistency
        and minimize redundancy.

        Returns
        -------
        List[str]
            Ordered list of selected feature cnames according to the FIGFS algorithm.
            The order reflects the importance of the features.
        """

        B = list(self._C.keys())
        S = []
        cor_list = []
        for colname in B:
            cor = self._granular_consistency_of_B_subset([colname]) + self._local_granularity_consistency_of_B_subset([colname])
            cor_list.append((colname, cor))

        c1 = max(cor_list, key=lambda x: x[1])[0]

        S.append(c1)
        B.remove(c1)

        if self._m < self.d:
            while len(B) > 0:
                J_list = []
                for colname in B:
                    sim = 0
                    for s_colname in S:
                        fimi_d_cv = self._calculate_multi_granularity_fuzzy_implication_entropy([self._target_name], type='mutual' , T=[colname])
                        fimi_cv_cu = self._calculate_multi_granularity_fuzzy_implication_entropy([colname], type='mutual' , T=[s_colname])
                        fimi_cd = self._calculate_multi_granularity_fuzzy_implication_entropy([colname], type='mutual' , T=[self._target_name, s_colname])
                        sim += fimi_d_cv + fimi_cv_cu - fimi_cd
                    sim = sim / len(S)

                    l = S + [colname]
                    W =  1 + (self._calculate_multi_granularity_fuzzy_implication_entropy(S, type='conditional' , T=[self._target_name]) - self._calculate_multi_granularity_fuzzy_implication_entropy(S, type='conditional' , T=l)) / (self._calculate_multi_granularity_fuzzy_implication_entropy(S, type='conditional' , T=[self._target_name]) + 0.01)
                    cor = self._granular_consistency_of_B_subset([colname]) + self._local_granularity_consistency_of_B_subset([colname])
                    j = W * cor - sim
                    J_list.append((colname, j))

                cv = max(J_list, key=lambda x: x[1])[0]
                
                S.append(cv)
                B.remove(cv)
        else:
            FIE_dc = self._calculate_multi_granularity_fuzzy_implication_entropy([self._target_name], type='conditional' , T=list(self._C.keys()))
            FIE_ds = self._calculate_multi_granularity_fuzzy_implication_entropy([self._target_name], type='conditional' , T=S)
            while FIE_dc != FIE_ds:
                J_list = []
                W_list = []
                for col_index in B:
                    sim = 0
                    for s_index in S:
                        fimi_d_cv = self._calculate_multi_granularity_fuzzy_implication_entropy([self._target_name], type='mutual' , T=[col_index])
                        fimi_cv_cu = self._calculate_multi_granularity_fuzzy_implication_entropy([col_index], type='mutual' , T=[s_index])
                        fimi_cd = self._calculate_multi_granularity_fuzzy_implication_entropy([col_index], type='mutual' , T=[self._target_name, s_index])
                        sim += fimi_d_cv + fimi_cv_cu - fimi_cd
                    sim = sim / len(S)

                    l = S + [col_index]
                    W =  1 + (self._calculate_multi_granularity_fuzzy_implication_entropy(S, type='conditional' , T=[self._target_name]) - self._calculate_multi_granularity_fuzzy_implication_entropy(S, type='conditional' , T=l)) / (self._calculate_multi_granularity_fuzzy_implication_entropy(S, type='conditional' , T=[self._target_name]) + 0.01)
                    cor = self._granular_consistency_of_B_subset([col_index]) + self._local_granularity_consistency_of_B_subset([col_index])
                    j = W * cor - sim
                    J_list.append((colname, j))
                    W_list.append(W)

                cv = max(J_list, key=lambda x: x[1])[0]

                l = S + [cv]
                W_cv_max =  1 + (self._calculate_multi_granularity_fuzzy_implication_entropy(S, type='conditional' , T=[self._target_name]) - self._calculate_multi_granularity_fuzzy_implication_entropy(S, type='conditional' , T=l)) / (self._calculate_multi_granularity_fuzzy_implication_entropy(S, type='conditional' , T=[self._target_name]) + 0.01)
                percen = np.percentile(np.array(W_list), self.sigma)
                if W_cv_max >= percen:
                    S.append(cv)
                    B.remove(cv)
                else:
                    break
                FIE_ds = self._calculate_multi_granularity_fuzzy_implication_entropy([self._target_name], type='conditional' , T=S)

        return S

In [106]:
def sample_data():
    X = pd.DataFrame({
        "a": [0.1, 0.4, 0.5, 0.9, 0.3],
        "b": [1, 2, 1, 2, 1],
        "c": ["x", "y", "x", "x", "y"]
    })
    y = pd.Series([0, 1, 0, 1, 0])
    return X, y
X, y = sample_data()

In [107]:
c = FuzzyGranularitySelector()
c.fit(X, y)

0,1,2
,k,3.0
,eps,0.5
,d,10.0
,sigma,10.0
,random_state,


In [108]:
c.transform(X)

Unnamed: 0,b,a,c
0,1,0.1,x
1,2,0.4,y
2,1,0.5,x
3,2,0.9,x
4,1,0.3,y


In [65]:
c._similarity_matrices

{'a': array([[1.        , 0.7629077 , 0.68387693, 0.        , 0.84193846],
        [0.7629077 , 1.        , 0.92096923, 0.60484616, 0.92096923],
        [0.68387693, 0.92096923, 1.        , 0.68387693, 0.84193846],
        [0.        , 0.60484616, 0.68387693, 1.        , 0.52581539],
        [0.84193846, 0.92096923, 0.84193846, 0.52581539, 1.        ]]),
 'b': array([[1.        , 0.32881309, 1.        , 0.32881309, 1.        ],
        [0.32881309, 1.        , 0.32881309, 1.        , 0.32881309],
        [1.        , 0.32881309, 1.        , 0.32881309, 1.        ],
        [0.32881309, 1.        , 0.32881309, 1.        , 0.32881309],
        [1.        , 0.32881309, 1.        , 0.32881309, 1.        ]]),
 'c': array([[1., 0., 1., 1., 0.],
        [0., 1., 0., 0., 1.],
        [1., 0., 1., 1., 0.],
        [1., 0., 1., 1., 0.],
        [0., 1., 0., 0., 1.]]),
 'target': array([[1.        , 0.32881309, 1.        , 0.32881309, 1.        ],
        [0.32881309, 1.        , 0.32881309, 1.  

In [13]:
pd.DataFrame(X)

Unnamed: 0,a,b,c
0,0.1,1,x
1,0.4,2,y
2,0.5,1,x
3,0.9,2,x
4,0.3,1,y
