Load Dataset from .pth Files

In [None]:
def load_pth_data(file_path):
    """
    Load data from a .pth file.

    Parameters:
        file_path (str): Path to the .pth file containing the dataset.

    Returns:
        X (numpy array): Feature vectors (input data).
        y (numpy array): Labels (target data).
    """
    # Load the .pth file using PyTorch's torch.load method
    data = torch.load(file_path)
    
    # Extract the feature vectors and convert to numpy array
    X = data['features'].numpy()
    
    # Extract the labels and convert to numpy array
    y = data['labels'].numpy()
    
    return X, y  # Return feature vectors and labels as numpy arrays

# Paths to the processed dataset files
train_file = "../Data/ProcessedData/train_data.pth"  # Path to the training data file
test_file = "../Data/ProcessedData/test_data.pth"    # Path to the testing data file

# Load the training data
X_train, y_train = load_pth_data(train_file)
# Load the testing data
X_test, y_test = load_pth_data(test_file)

# Directory to store model weights
weights_dir = "Weights"
# Create the weights directory if it doesn't already exist
os.makedirs(weights_dir, exist_ok=True)

# Generate class names dynamically based on the unique classes in the training labels
class_names = [f"Class {i}" for i in range(len(np.unique(y_train)))]

# Print the shape of the training data and labels
print(f"Training data shape: {X_train.shape}, Training labels shape: {y_train.shape}")
# Print the shape of the testing data and labels
print(f"Testing data shape: {X_test.shape}, Testing labels shape: {y_test.shape}")

Define the Custom Decision Tree

In [None]:
# Node class for the Custom Decision Tree
class Node:
    def __init__(self, feature=None, threshold=None, left=None, right=None, value=None):
        """
        Represents a single node in the decision tree.

        Parameters:
            feature (int): Index of the feature used for splitting at this node.
            threshold (float): Threshold value for splitting the data.
            left (Node): Left child node.
            right (Node): Right child node.
            value (int): Class label if the node is a leaf (no children).
        """
        self.feature = feature        # Feature index for the split
        self.threshold = threshold    # Threshold value for the split
        self.left = left              # Left child node
        self.right = right            # Right child node
        self.value = value            # Class label if the node is a leaf


# Custom Decision Tree class
class CustomDecisionTree:
    def __init__(self):
        """
        Initializes an empty decision tree.
        """
        self.tree = None  # Root node of the tree, initially None

    def fit(self, X, y, max_depth=50):
        """
        Trains the decision tree by building its structure.

        Parameters:
            X (numpy array): Feature matrix (samples x features).
            y (numpy array): Target labels.
            max_depth (int): Maximum depth of the tree.
        """
        self.tree = self.build_tree(X, y, depth=0, max_depth=max_depth)

    def predict(self, X):
        """
        Predicts class labels for a given dataset.

        Parameters:
            X (numpy array): Feature matrix (samples x features).

        Returns:
            numpy array: Predicted class labels.
        """
        if self.tree is None:
            raise ValueError("Model has not been trained or loaded.")
        # Predict for each sample in X
        return np.array([self.predict_single(self.tree, x) for x in X])

    def predict_single(self, node, x):
        """
        Recursively traverses the tree to predict the class label for a single sample.

        Parameters:
            node (Node): Current node being evaluated.
            x (numpy array): Feature vector for the sample.

        Returns:
            int: Predicted class label.
        """
        if node.value is not None:  # Leaf node
            return node.value
        # Traverse left or right subtree based on the feature value and threshold
        if x[node.feature] <= node.threshold:
            return self.predict_single(node.left, x)
        return self.predict_single(node.right, x)

    def gini_impurity(self, y):
        """
        Calculates the Gini impurity for a given set of labels.

        Parameters:
            y (numpy array): Target labels.

        Returns:
            float: Gini impurity value.
        """
        classes, counts = np.unique(y, return_counts=True)
        probs = counts / len(y)  # Probability of each class
        return 1 - np.sum(probs ** 2)

    def split_dataset(self, X, y, feature, threshold):
        """
        Splits the dataset into two subsets based on a feature and threshold.

        Parameters:
            X (numpy array): Feature matrix.
            y (numpy array): Target labels.
            feature (int): Feature index used for the split.
            threshold (float): Threshold value for the split.

        Returns:
            tuple: (X_left, y_left, X_right, y_right) - Subsets of the data.
        """
        left_idx = X[:, feature] <= threshold  # Boolean mask for the left subset
        right_idx = X[:, feature] > threshold  # Boolean mask for the right subset
        return X[left_idx], y[left_idx], X[right_idx], y[right_idx]

    def find_best_split(self, X, y):
        """
        Finds the best feature and threshold to split the dataset.

        Parameters:
            X (numpy array): Feature matrix.
            y (numpy array): Target labels.

        Returns:
            tuple: (best_feature, best_threshold) - Feature index and threshold for the best split.
        """
        best_feature, best_threshold = None, None
        best_impurity = float("inf")  # Initialize with the highest possible impurity
        for feature in range(X.shape[1]):  # Iterate over all features
            thresholds = np.unique(X[:, feature])  # Unique values in the feature
            for threshold in thresholds:  # Test each threshold
                _, y_left, _, y_right = self.split_dataset(X, y, feature, threshold)
                # Skip if no valid split
                if len(y_left) == 0 or len(y_right) == 0:
                    continue
                # Calculate weighted impurity for the split
                impurity = (
                    len(y_left) / len(y) * self.gini_impurity(y_left) +
                    len(y_right) / len(y) * self.gini_impurity(y_right)
                )
                # Update the best split if this split has lower impurity
                if impurity < best_impurity:
                    best_impurity = impurity
                    best_feature, best_threshold = feature, threshold
        return best_feature, best_threshold

    def build_tree(self, X, y, depth=0, max_depth=50):
        """
        Recursively builds the decision tree.

        Parameters:
            X (numpy array): Feature matrix.
            y (numpy array): Target labels.
            depth (int): Current depth of the tree.
            max_depth (int): Maximum allowed depth of the tree.

        Returns:
            Node: Root node of the built subtree.
        """
        # Stop condition: maximum depth reached or pure node
        if depth == max_depth or len(np.unique(y)) == 1:
            value = np.bincount(y).argmax()  # Majority class
            return Node(value=value)

        # Find the best feature and threshold for splitting
        feature, threshold = self.find_best_split(X, y)
        if feature is None:  # No valid split
            value = np.bincount(y).argmax()
            return Node(value=value)

        # Split the dataset and build child nodes recursively
        X_left, y_left, X_right, y_right = self.split_dataset(X, y, feature, threshold)
        left_child = self.build_tree(X_left, y_left, depth + 1, max_depth)
        right_child = self.build_tree(X_right, y_right, depth + 1, max_depth)
        return Node(feature, threshold, left_child, right_child)

    def save_tree(self, filepath):
        """
        Saves the trained tree structure to a file.

        Parameters:
            filepath (str): Path to save the tree structure.
        """
        if self.tree is None:
            raise ValueError("No tree structure to save. Train the model first.")
        with open(filepath, "wb") as file:
            pickle.dump(self.tree, file)  # Serialize the tree object
            print(f"Tree structure saved to {filepath}")

    def load_tree(self, filepath):
        """
        Loads a saved tree structure from a file.

        Parameters:
            filepath (str): Path to the saved tree structure.
        """
        with open(filepath, "rb") as file:
            self.tree = pickle.load(file)  # Deserialize the tree object
            print(f"Tree structure loaded from {filepath}")

Metrics and Visualization Utilities

In [None]:
def calculate_metrics(y_true, y_pred, average="macro"):
    """
    Calculate accuracy, precision, recall, and F1-score.

    Parameters:
        y_true (array): True labels.
        y_pred (array): Predicted labels.
        average (str): Averaging method for precision, recall, F1-score.

    Returns:
        dict: Metrics dictionary.
    """
    return {
        "Accuracy": accuracy_score(y_true, y_pred),
        "Precision": precision_score(y_true, y_pred, average=average),
        "Recall": recall_score(y_true, y_pred, average=average),
        "F1-Score": f1_score(y_true, y_pred, average=average),
    }

def plot_confusion_matrix(y_true, y_pred, class_names, title="Confusion Matrix"):
    """
    Plot a confusion matrix with class labels.

    Parameters:
        y_true (array): True labels.
        y_pred (array): Predicted labels.
        class_names (list): List of class names.
        title (str): Title of the plot.
    """
    cm = confusion_matrix(y_true, y_pred)
    plt.figure(figsize=(8, 8))
    plt.imshow(cm, interpolation='nearest', cmap=plt.cm.Blues)
    plt.title(title)
    plt.colorbar()
    tick_marks = np.arange(len(class_names))
    plt.xticks(tick_marks, class_names, rotation=45)
    plt.yticks(tick_marks, class_names)

    # Add text annotations
    fmt = "d"
    thresh = cm.max() / 2.0
    for i, j in np.ndindex(cm.shape):
        plt.text(j, i, format(cm[i, j], fmt),
                 ha="center", va="center",
                 color="white" if cm[i, j] > thresh else "black")

    plt.ylabel("True label")
    plt.xlabel("Predicted label")
    plt.tight_layout()
    plt.show()



Train Models

In [None]:
for depth in depths:
    print(f"\nTraining Models with Depth: {depth}")
    
    # Scikit-learn Decision Tree
    clf = DecisionTreeClassifier(criterion="gini", max_depth=depth, random_state=42)
    clf.fit(X_train, y_train)
    y_pred_sklearn = clf.predict(X_test)
    model_path_sklearn = os.path.join(weights_dir, f"Decision_Tree_Scikit_Depth{depth}.pth")
    with open(model_path_sklearn, "wb") as file:
        pickle.dump(clf, file)

    # Custom Decision Tree
    custom_tree = CustomDecisionTree()
    custom_tree.fit(X_train, y_train, max_depth=depth)
    y_pred_custom = custom_tree.predict(X_test)
    model_path_custom = os.path.join(weights_dir, f"CustomDecisionTree_Depth{depth}.pth")
    custom_tree.save_tree(model_path_custom)



Loading and Evaluating models

P.S In testing flow after imports, please run the cell of dataloading (2nd cell) first then the class CustomDecisionTreeh (3rd cell) 
and metrics cell ( 4th cell ) before predicition. After predictions pleas run the last cell to display the results.

In [None]:
# Loading and evaluating models
for depth in depths:
    print(f"\nEvaluating Models with Depth: {depth}")
    
    # Load and Evaluate Scikit-learn Model
    model_path_sklearn = os.path.join(weights_dir, f"Decision_Tree_Scikit_Depth{depth}.pth")
    with open(model_path_sklearn, "rb") as file:
        clf = pickle.load(file)
    y_pred_sklearn = clf.predict(X_test)
    
    # Calculate overall metrics
    metrics_sklearn = calculate_metrics(y_test, y_pred_sklearn)
    print(f"Scikit-learn Metrics at Depth {depth}: {metrics_sklearn}")
    
    # Calculate and print per-class metrics
    from sklearn.metrics import precision_recall_fscore_support
    precision, recall, f1, _ = precision_recall_fscore_support(y_test, y_pred_sklearn, average=None)
    print(f"Scikit-learn Per-class Precision at Depth {depth}: {precision}")
    print(f"Scikit-learn Per-class Recall at Depth {depth}: {recall}")
    print(f"Scikit-learn Per-class F1-Score at Depth {depth}: {f1}")
    
    # Plot confusion matrix
    plot_confusion_matrix(y_test, y_pred_sklearn, class_names, title=f"Scikit-learn Confusion Matrix (Depth {depth})")
    
    # Load and Evaluate Custom Decision Tree
    model_path_custom = os.path.join(weights_dir, f"CustomDecisionTree_Depth{depth}.pth")
    custom_tree = CustomDecisionTree()
    custom_tree.load_tree(model_path_custom)
    y_pred_custom = custom_tree.predict(X_test)
    
    # Calculate overall metrics
    metrics_custom = calculate_metrics(y_test, y_pred_custom)
    print(f"Custom Decision Tree Metrics at Depth {depth}: {metrics_custom}")
    
    # Calculate and print per-class metrics
    precision, recall, f1, _ = precision_recall_fscore_support(y_test, y_pred_custom, average=None)
    print(f"Custom Tree Per-class Precision at Depth {depth}: {precision}")
    print(f"Custom Tree Per-class Recall at Depth {depth}: {recall}")
    print(f"Custom Tree Per-class F1-Score at Depth {depth}: {f1}")
    
    # Plot confusion matrix
    plot_confusion_matrix(y_test, y_pred_custom, class_names, title=f"Custom Decision Tree Confusion Matrix (Depth {depth})")

Summarize Findings in a Table

In [None]:
# Collect and display all metrics in a summary table
metrics_summary = []  # Initialize an empty list if not already initialized

for depth in depths:
    # Load Scikit-learn model for the current depth
    model_path_sklearn = os.path.join(weights_dir, f"Decision_Tree_Scikit_Depth{depth}.pth")
    with open(model_path_sklearn, "rb") as file:
        clf = pickle.load(file)
    
    # Make predictions and calculate metrics for Scikit-learn model
    y_pred_sklearn = clf.predict(X_test)
    metrics_sklearn = calculate_metrics(y_test, y_pred_sklearn)
    metrics_sklearn["Depth"] = depth
    metrics_sklearn["Model"] = "Scikit-Learn Decision Tree"
    metrics_summary.append(metrics_sklearn)

    # Load Custom Decision Tree model for the current depth
    model_path_custom = os.path.join(weights_dir, f"CustomDecisionTree_Depth{depth}.pth")
    custom_tree = CustomDecisionTree()
    custom_tree.load_tree(model_path_custom)
    
    # Make predictions and calculate metrics for Custom Decision Tree
    y_pred_custom = custom_tree.predict(X_test)
    metrics_custom = calculate_metrics(y_test, y_pred_custom)
    metrics_custom["Depth"] = depth
    metrics_custom["Model"] = "Custom Decision Tree"
    metrics_summary.append(metrics_custom)

# Convert collected metrics to a DataFrame
metrics_table = pd.DataFrame(metrics_summary)

# Rearrange columns for better readability
metrics_table = metrics_table[["Model", "Depth", "Accuracy", "Precision", "Recall", "F1-Score"]]

# Display the table in the notebook
display(metrics_table)