In [2]:
import pandas as pd
import numpy as np

class ClusterAnalysis():
    def __init__(self,dataset,cluster_column=[]):
        self.dataset = dataset
        self.cluster_column = cluster_column
        self.no_clusters = len(dataset[cluster_column].unique())
        self.cluster_distribution = dict(self.dataset[cluster_column].value_counts())
        self.cluster_distribution_ratio = dict(self.dataset[cluster_column].value_counts(normalize=True)*100)
        print('Cluster Distributions : {}'.format(self.cluster_distribution))
        print('Cluster Distributions Percentages : {}'.format(self.cluster_distribution_ratio))

    def crosstab(self,feature_name,categorical,no_bins=None):
        if categorical:
            self.dataset[feature_name] = [str(i) for i in self.dataset[feature_name]]
            result = pd.crosstab(self.dataset[self.cluster_column],self.dataset[feature_name])
            return result
        else:
            binned_data = self.get_bins(feature_name,no_bins)
            result = pd.crosstab(self.dataset[self.cluster_column],binned_data)
            return result

    def feature_distribution(self,feature_name:str,categorical=True,no_bins = None,percentiles=False):
        if categorical:
            self.dataset[feature_name] = [str(i) for i in self.dataset[feature_name]]
            result = pd.crosstab(self.dataset[self.cluster_column],self.dataset[feature_name],normalize=0)
            return result

        elif percentiles:
            get_statistis=lambda x : x[feature_name].describe(percentiles=[0.05,0.10,0.25,0.50,0.75,0.90,0.95])
            result = self.dataset.groupby(self.cluster_column).apply(get_statistis)
            return result

        else:
            binned_data = self.get_bins(feature_name,no_bins)
            result = pd.crosstab(self.dataset[self.cluster_column],binned_data,normalize=0)
            return result

    def get_feature_label_dispersion(
            self,
            feature_list,
            n_features,
            n_labels,
            categorical=True,
            no_bins=4,
            agg='sum',
            verbose=True
        ):
        ''' From the given feature list : For how many features you want to do analysis'''
        appended_results = []
        total_count = len(feature_list)
        current_count = 1
        for feature in feature_list:
            try:
                if verbose:
                    print('[{}/{}] : Feature : {}'.format(current_count,total_count,feature))
                result = self.feature_distribution(feature,categorical=categorical,no_bins=no_bins)
                try:
                    result.columns = pd.MultiIndex.from_tuples([(result.columns.name,col) for col in result.columns])
                except Exception as e:
                    print('Exception : {}'.format(e))
                else:
                    appended_results.append(result)
            except Exception as e:
                print('Exception Occured for {} : {}: skipping'.format(feature,e))
            current_count+=1

        feature_label_dispersion = pd.DataFrame(pd.concat(appended_results,axis=1).var())
        feature_label_dispersion.reset_index(inplace = True)
        feature_label_dispersion.columns = ['Feature','Label','Dispersion']

        # finding the top feature
        top_features = feature_label_dispersion.groupby('Feature').agg(
            {'Dispersion':agg}

        ).sort_values(by = 'Dispersion',ascending =False)
        top_features = top_features.index.tolist()[:n_features]

        # finding the top labels
        final_result = feature_label_dispersion.loc[feature_label_dispersion.Feature.isin(top_features)].groupby(
            'Feature'
        ).apply(lambda x : x.nlargest(columns=['Dispersion'],n=n_labels))
        final_result = final_result.drop(['Feature',],axis=1)
        final_result = final_result.reset_index().drop('level_1',axis=1)
        return final_result.set_index(['Feature','Label'])

    def get_bins(self,feature,no_bins):
        try:
            output = pd.qcut(self.dataset[feature],q = no_bins,duplicates = 'drop')
            labels = output.value_counts().index.sort_values(ascending=True).categories
            total_bins = len(labels)
            mapping = dict(zip(labels,['{}_Bins_{}'.format(total_bins,i) for i in range(1,len(labels)+1)]))
            output = pd.Series(output.values.map(mapping),index = output.index)
            output.index.names = ['cif_key', 'constitution']
            output.name = feature
            output = output.astype(str)
            return output
        except Exception as e:
            print('Exception occured : {}'.format(e))

In [27]:

# Cosine dissimilarity
def cosine_dissimilarity(p, q):
    p = np.array(p)
    q = np.array(q)
    cosine_similarity = np.dot(p, q) / (np.linalg.norm(p) * np.linalg.norm(q))
    cosine_dissimilarity = 1 - cosine_similarity
    return cosine_dissimilarity

# Create prediction frame
def create_prediction_frame(model, X, y):
    class_labels = y.unique()
    probabilities = model.predict_proba(X)
    predicted_classes = model.predict(X)
    prediction_frame = X.copy()
    prediction_frame['true_label'] = y
    prediction_frame['predicted_label'] = predicted_classes
    for i, label in enumerate(class_labels):
        prediction_frame[f'prob_class_{label}'] = probabilities[:, i]
    for label in class_labels:
        is_true_class = (y == label)
        is_predicted_class = (predicted_classes == label)

        prediction_frame[f'tp_{label}'] = is_true_class & is_predicted_class
        prediction_frame[f'fp_{label}'] = ~is_true_class & is_predicted_class
        prediction_frame[f'fn_{label}'] = is_true_class & ~is_predicted_class
        prediction_frame[f'tn_{label}'] = ~is_true_class & ~is_predicted_class
    return prediction_frame

# Get feature importance
def get_feature_importance(model, training_features):
    importance = model.feature_importances_
    output = pd.DataFrame({'feature': training_features, 'score': importance})
    output.sort_values('score', ascending=False, inplace=True)
    output['cumscore'] = output['score'].cumsum()
    output = output[['feature', 'score', 'cumscore']]
    return output

from sklearn.metrics import confusion_matrix
import pandas as pd
def generate_confusion_matrix_table(prediction_frame, true_column='true_label', predicted_column='predicted_label'):
    # Get the true and predicted labels
    y_true = prediction_frame[true_column]
    y_pred = prediction_frame[predicted_column]

    # Get the unique class labels from the true labels and sort them
    class_labels = sorted(y_true.unique())

    # Compute the confusion matrix
    conf_matrix = confusion_matrix(y_true, y_pred, labels=class_labels)

    # Create a DataFrame to display the confusion matrix
    conf_matrix_df = pd.DataFrame(
        conf_matrix,
        index=[f"True_{label}" for label in class_labels],
        columns=[f"Pred_{label}" for label in class_labels]
    )
    return conf_matrix_df


In [28]:
import sklearn
from sklearn.metrics import silhouette_score,davies_bouldin_score,calinski_harabasz_score
def train(predict_function, param_dict, input_data,return_clusters=False, subset_fraction=0.05):
    # Convert input data to numpy array if it isn't already
    data = np.array(input_data)

    # Create random subset of the data based on the subset_fraction
    np.random.seed(42)
    subset_size = int(data.shape[0] * subset_fraction)
    subset_indices = np.random.choice(data.shape[0], subset_size, replace=False)
    data_subset = data[subset_indices]

    # Perform clustering using the provided predict function
    cluster_labels = predict_function(param_dict, data)

    # Get the distribution of clusters
    unique_clusters, counts_clusters = np.unique(cluster_labels, return_counts=True)
    count_dict = dict(zip(unique_clusters, counts_clusters))

    try:
        # Calculate clustering evaluation metrics
        silhouette_score_euc = silhouette_score(data_subset, cluster_labels[subset_indices], metric='euclidean')
        silhouette_score_man = silhouette_score(data_subset, cluster_labels[subset_indices], metric='manhattan')
        db_score = davies_bouldin_score(data, cluster_labels)
        ch_score = calinski_harabasz_score(data, cluster_labels)

        metrics_dict = {
            'silhouette_score': {
                'euclidean': silhouette_score_euc,
                'manhattan': silhouette_score_man
            },
            'davies_bouldin_score': db_score,
            'calinski_harabaz_score': ch_score
        }

        # Collect results in a dictionary
        return_dict = {
            'parameters': param_dict,
            'cluster_distribution': count_dict,
            'data_size': data.shape,
            'sample_size': data_subset.shape,
            'eval_metrics': metrics_dict,
        }

    except Exception as e:
        print(f"Error calculating metrics: {e}")
        return_dict = {
            'cluster_distribution': count_dict,
            'data_size': data.shape,
            'sample_size': data_subset.shape
        }

    # Return either just the evaluation dictionary or the clusters and evaluation results
    if return_clusters:
        return cluster_labels, return_dict
    else:
        return return_dict

In [45]:
from sklearn.model_selection import train_test_split
from sklearn.metrics import confusion_matrix

def cluster_classifier_analysis(model, X,y, test_size=0.2, random_state=42):
    """
    Function to analyze the classifier model by outputting:
    - Train confusion matrix
    - Test confusion matrix
    - Feature importance

    Args:
    - model: The classification model to train (e.g., RandomForestClassifier).
    - data: The full dataset (including features and target).
    - target_column: The name of the target/label column in the dataset.
    - test_size: Proportion of the dataset to include in the test split (default 0.2).
    - random_state: Random seed for reproducibility (default 42).
    - class_labels: List of class labels for confusion matrix (optional).

    Returns:
    - result_dict: Dictionary with train confusion matrix, test confusion matrix, and feature importance.
    """

    # Split the data into train and test sets
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=test_size, random_state=random_state)

    # Train the model on the training data
    model.fit(X_train, y_train)

    # Generate prediction frames for train and test data
    train_pred_frame = create_prediction_frame(model, X_train, y_train)
    test_pred_frame = create_prediction_frame(model, X_test, y_test)

    # Generate confusion matrices
    train_conf_matrix = generate_confusion_matrix_table(train_pred_frame, true_column='true_label', predicted_column='predicted_label')
    test_conf_matrix = generate_confusion_matrix_table(test_pred_frame, true_column='true_label', predicted_column='predicted_label')

    # Get feature importance
    feature_importance_df = get_feature_importance(model, X_train.columns)

    # Prepare the result dictionary
    result_dict = {
        'train_confusion_matrix': train_conf_matrix,
        'test_confusion_matrix': test_conf_matrix,
        'feature_importance': feature_importance_df
    }

    return result_dict


In [46]:
import pandas as pd
import numpy as np
from sklearn.cluster import KMeans
import matplotlib.pyplot as plt

# 1. Create a Sample Dataset (Synthetic Data)
def create_sample_data():
    # Generating random data: 2 features and 200 samples
    np.random.seed(42)
    feature_1 = np.random.randn(200)
    feature_2 = np.random.randn(200)

    # Create a DataFrame
    data = pd.DataFrame({
        'feature_1': feature_1,
        'feature_2': feature_2
    })

    return data

# 2. K-Means Prediction Function
def kmeans_predict(parameter_dict, data):
    """
    Perform K-Means clustering with the specified parameters.

    Args:
    - parameter_dict: Dictionary containing KMeans parameters (e.g., n_clusters, init, max_iter, etc.).
    - data: DataFrame or array-like data to apply KMeans clustering on.

    Returns:
    - cluster_labels: The predicted cluster labels for each data point.
    """
    # Extract parameters from the parameter_dict
    n_clusters = parameter_dict.get('n_clusters', 3)  # Default to 3 clusters
    init = parameter_dict.get('init', 'k-means++')  # Default to 'k-means++' for initialization
    max_iter = parameter_dict.get('max_iter', 300)  # Default to 300 iterations

    # Create the KMeans model
    kmeans = KMeans(n_clusters=n_clusters, init=init, max_iter=max_iter, random_state=42)

    # Fit the model and predict the clusters
    cluster_labels = kmeans.fit_predict(data)

    return cluster_labels


data = create_sample_data()


In [47]:
cluster_labels, return_dict = train(param_dict={},predict_function=kmeans_predict,input_data = data,return_clusters=True,subset_fraction=0.90)

In [50]:
return_dict

{'parameters': {},
 'cluster_distribution': {0: 58, 1: 72, 2: 70},
 'data_size': (200, 2),
 'sample_size': (180, 2),
 'eval_metrics': {'silhouette_score': {'euclidean': 0.30247555702042617,
   'manhattan': 0.3020365910469795},
  'davies_bouldin_score': 1.008278951845304,
  'calinski_harabaz_score': 101.89019471190366}}

In [51]:
from sklearn.tree import DecisionTreeClassifier
model = DecisionTreeClassifier()

In [53]:
classifier_result = cluster_classifier_analysis(model, X = data,y = pd.Series(cluster_labels))

In [54]:
classifier_result.keys()

dict_keys(['train_confusion_matrix', 'test_confusion_matrix', 'feature_importance'])

In [55]:
classifier_result['train_confusion_matrix']

Unnamed: 0,Pred_0,Pred_1,Pred_2
True_0,48,0,0
True_1,0,57,0
True_2,0,0,55


In [57]:
classifier_result['feature_importance']

Unnamed: 0,feature,score,cumscore
0,feature_1,0.521316,0.521316
1,feature_2,0.478684,1.0
