In [39]:
import pandas as pd
import numpy as np
from statistics import NormalDist
from sklearn.metrics import confusion_matrix, recall_score
from sklearn.model_selection import train_test_split
import scipy
from numpy.typing import NDArray
from dataclasses import dataclass ,field
from typing import Any, Self, Deque
import plotly.express as px
from math import ceil, inf
from collections import deque



zdroj dat: https://www.kaggle.com/datasets/fedesoriano/heart-failure-prediction?resource=download

In [38]:
df = pd.read_csv("heart.csv", dtype={
    "Sex":"category",
    "ChestPainType": "category",
    "FastingBS":"category",
    "RestingECG":"category",
    "ExerciseAngina":"category",
    "ST_Slope":"category",
    "HeartDisease":"category"
    })
df

Unnamed: 0,Age,Sex,ChestPainType,RestingBP,Cholesterol,FastingBS,RestingECG,MaxHR,ExerciseAngina,Oldpeak,ST_Slope,HeartDisease
0,40,M,ATA,140,289,0,Normal,172,N,0.0,Up,0
1,49,F,NAP,160,180,0,Normal,156,N,1.0,Flat,1
2,37,M,ATA,130,283,0,ST,98,N,0.0,Up,0
3,48,F,ASY,138,214,0,Normal,108,Y,1.5,Flat,1
4,54,M,NAP,150,195,0,Normal,122,N,0.0,Up,0
...,...,...,...,...,...,...,...,...,...,...,...,...
913,45,M,TA,110,264,0,Normal,132,N,1.2,Flat,1
914,68,M,ASY,144,193,1,Normal,141,N,3.4,Flat,1
915,57,M,ASY,130,131,0,Normal,115,Y,1.2,Flat,1
916,57,F,ATA,130,236,0,LVH,174,N,0.0,Flat,1


In [2]:
bar = px.histogram(df,x="HeartDisease", color="HeartDisease")
bar.update_traces(showlegend=False)

In [3]:
df["ChestPainType"] = df["ChestPainType"].map({'ATA':0,'NAP':1,'ASY':2,'TA':3})
df["RestingECG"] = df["RestingECG"].map({"Normal":0,"ST":1,"LVH":2})
df["ExerciseAngina"] = df["ExerciseAngina"].map({"N":0,"Y":1})
df["ST_Slope"] =  df["ST_Slope"].map({"Up":0 ,"Flat":1,"Down":2})
df["FastingBS"] =  df["FastingBS"].map({"0": 0, "1": 1})
df["Sex"] = df["Sex"].map({'M':0,'F':1})

In [4]:
x_train,x_test = train_test_split(df,test_size=0.2,stratify=df["HeartDisease"])
y_test = x_test.pop("HeartDisease")

## SVM

In [5]:
class Adam:
    def __init__(
        self,
        n_features: int,
        beta1: float = 0.9,
        beta2: float = 0.999,
        epsilon: float = 1e-8,
        lr: float = 0.004
    ) -> None:
        """
        Adam optimizer

        Args:
            n_features (int): Number of features
            beta1 (float, optional): Decay for first moment. Defaults to 0.9.
            beta2 (float, optional): Decay for secodn moment. Defaults to 0.999.
            epsilon (float, optional): Small value to prevent dividing by 0. Defaults to 1e-8.
            lr (float, optional): Learning rate. Defaults to 0.004.
        """
        self._counter = 1
        self._weight_moments = np.zeros(n_features)
        self._weight_variance, self._bias_variance = 0, 0
        self._bias_moments = 0
        self._beta1 = beta1
        self._beta2 = beta2
        self._epsilon = epsilon
        self._lr = lr
            
    def step(self, gradient_weights: NDArray[np.float32], gradient_bias: NDArray[np.bool_]) -> tuple:
        """
        Calculate optimizing step

        Args:
            gradient_weights (NDArray[np.float32]): Calculated gradients of model weights.
            gradient_bias (NDArray[np.bool_]): Calculated bias of model.

        Returns:
            tuple: Tuple where first item is step for weights and second for bias.
        """
        # Update moment vectors
        self._weight_moments = self._beta1 * self._weight_moments + (1 - self._beta1) * gradient_weights
        self._bias_moments = self._beta1 * self._bias_moments + (1 - self._beta1) * gradient_bias

        #rms 
        self._weight_variance = self._beta2 * self._weight_variance + (1 - self._beta2) * (gradient_weights ** 2)
        self._bias_variance = self._beta2 * self._bias_variance + (1 - self._beta2) * gradient_bias

        # Bias correction for moment vectors
        m_hat = self._weight_moments / (1 - self._beta1 ** self._counter)
        m_b_hat = self._bias_moments / (1 - self._beta1 ** self._counter)
        
        v_hat = self._weight_variance / (1 - self._beta2 ** self._counter)
        v_b_hat = self._bias_variance / (1 - self._beta2 ** self._counter)

        self._counter += 1
        return self._lr * (m_hat / (np.sqrt(np.abs(v_hat) + self._epsilon))), self._lr * (m_b_hat / (np.sqrt(np.abs(v_b_hat) + self._epsilon)))

class SVM:
    def __init__(
        self,
        n_features: int,
        learning_rate: float =0.001,
        lambda_param: float = 0.01,
        num_iterations: int = 1000,
        beta1: float  = 0.9,
        beta2: float  = 0.999,
        epsilon: float = 1e-8
    ):
        """
        SVM model.

        Args:
            n_features (int): Number of input features to the model
            learning_rate (float, optional): Learning grade ffor optimization. Defaults to 0.001.
            lambda_param (float, optional): Constrain checking constant. Defaults to 0.01.
            num_iterations (int, optional): Number of iterations for optimization. Defaults to 1000.
            beta1 (float, optional): Adam optimizer argument. Defaults to 0.9.
            beta2 (float, optional): Adam optimizer argument. Defaults to 0.999.
            epsilon (float, optional): Adam optimizer argument. Defaults to 1e-8.
        """
        self._optimizer = Adam(n_features= n_features, beta1 = beta1, beta2 = beta2, epsilon = epsilon, lr = learning_rate)
        self._learning_rate = learning_rate
        self._lr = lambda_param
        self._num_iterations = num_iterations
        self._bias = np.random.randn() + 0.001
    
    def _satisfy_constraint(self, x: NDArray, y: NDArray) -> NDArray[np.bool_]:
        """
        Calculate condition for each sample

        Args:
            x (NDArray): Array of features
            y (NDArray): Array of targeted feature

        Returns:
            NDArray[np.bool_]: If constrain is satisfied for each sample/
        """
        linear = np.dot(x, self._weights) + self._bias
        return (y * linear >= 1).astype(int)
    
    def _get_gradients(self, constrain: NDArray[np.bool_],x: NDArray,y: NDArray) -> tuple:
        """
        Calculate gradient for svm  model.

        Args:
            constrain (NDArray[np.bool_]): if the constrain is satisfied for each sample.
            x (_type_): Array of features.
            y (_type_): Array of targeted feature.

        Returns:
            tuple: First value is gradient for weaight ans second for bias.
        """
        constrain = np.expand_dims(constrain,-1)
        grad =  constrain * self._weights + (1 - constrain) * (self._weights - np.expand_dims(y,-1) * x)
        bias = (1 - constrain) * np.expand_dims(-y,-1)
        return grad, bias
        
    def fit(self, x: NDArray, y: NDArray , batch_size: int = 64) -> None:
        """
        Fit the model.

        Args:
            x (NDArray): Two dimensional array of features. First dimension is for samples, second for feature 
            y (NDArray): One dimensional array of targeted feature.
            batch_size (int, optional): Batch size for gradient step. Defaults to 64.
        """
        y =  np.where( y <= 0, -1,1)
        n_features = x.shape[-1]
        data = np.concatenate((x,np.expand_dims(y,-1)),axis=1)

        # Initialize the model parameters
        self._weights = np.zeros(n_features, dtype=np.float64)
        self._bias = 0

        for _ in range(self._num_iterations):
            #shuffle and split
            np.random.shuffle(data)
            y = np.array_split(data[:,-1], ceil(data.shape[0]/batch_size)) # type: ignore
            x = np.array_split(data[:,:-1], ceil(data.shape[0]/batch_size)) # type: ignore
            gradients = 0
            for x_batch, y_batch in zip(x,y):
                #map binary input to {-1,1}
                constrains = self._satisfy_constraint(x_batch,y_batch)
                res = self._get_gradients(constrains,x_batch,y_batch)
                gradient = np.sum(res[0],0)/x_batch.shape[0]
                bias = np.sum(res[1],0)/x_batch.shape[0]
                
                gradients += np.sum(gradient) + np.sum(bias)

                # Update weights and bias with Adam optimizer
                step = self._optimizer.step(gradient, bias)
                self._weights -= step[0]
                self._bias -= step[1]
                # self._weights -= self._optimizer._lr * gradient
                # self._bias -= self._optimizer._lr * bias
            print(f"iteration: {_} grad_size: {gradients}")

    def predict(self, x: NDArray) -> NDArray:
        """
        Predict feature for given rows 

        Args:
            x (NDArray): two dimensional input data. First dimension are samples, second features.

        Returns:
            NDArray: Predicted feature for each sample.
        """
        # Compute the predicted class labels
        scores = np.dot(x, self._weights) + self._bias
        scores = np.sign(scores)
        return np.where(scores == -1, 0, 1)


data pocházejí z odkazu https://www.kaggle.com/datasets/kamilpytlak/personal-key-indicators-of-heart-disease .

In [6]:
svm = SVM(11, num_iterations=1000, learning_rate=0.00001)
svm.fit(x_train.drop("HeartDisease",axis=1).to_numpy(),x_train["HeartDisease"].astype(int).to_numpy(), batch_size=32)

iteration: 0 grad_size: -570.3335096736804
iteration: 1 grad_size: -564.4840330353323
iteration: 2 grad_size: -561.1280389282784
iteration: 3 grad_size: -561.363418849095
iteration: 4 grad_size: -566.9273863663117
iteration: 5 grad_size: -567.9324670035543
iteration: 6 grad_size: -573.3267380822613
iteration: 7 grad_size: -568.970751925629
iteration: 8 grad_size: -573.3406411185413
iteration: 9 grad_size: -569.8311552649326
iteration: 10 grad_size: -569.4990978118834
iteration: 11 grad_size: -574.1949510294008
iteration: 12 grad_size: -571.8403106525541
iteration: 13 grad_size: -567.334050615101
iteration: 14 grad_size: -559.1071523394788
iteration: 15 grad_size: -565.8525989294014
iteration: 16 grad_size: -565.3443479860648
iteration: 17 grad_size: -567.0315977465759
iteration: 18 grad_size: -573.0127307043889
iteration: 19 grad_size: -572.8696903177419
iteration: 20 grad_size: -562.1947077579001
iteration: 21 grad_size: -568.5129372688206
iteration: 22 grad_size: -557.9121397477794
i

In [7]:
print(confusion_matrix(y_test.to_numpy().astype(np.int16),svm.predict(x_test.to_numpy()) > 0))
print(recall_score(y_test.to_numpy().astype(np.int16),svm.predict(x_test.to_numpy()) > 0))

[[55 27]
 [11 91]]
0.8921568627450981


## decision tree

In [13]:
@dataclass
class LeafNode():
    value: int


@dataclass    
class Node():
    depth: int
    column_index: int
    column_value: int
    left_child: Self | LeafNode | None = None
    right_child: Self | LeafNode | None = None

@dataclass
class BuildingNode():
    depth: int
    data: pd.DataFrame
    parent: Node| None = None
    is_left_child: bool = True

    def _add_to_parent(self, node: LeafNode | Node):
        if self.parent is not None:
            if self.is_left_child:
                self.parent.left_child = node
            else:
                self.parent.right_child = node


@dataclass
class Cut():
    gain: float
    column_index: int
    value: int | float

class DecisionTree():

    def __init__(self) -> None:
        self.root = None

    def fit(
        self,
        x: pd.DataFrame,
        target_index: int,
        min_samples_split = 30,
        min_gain = 0.01,
        max_depth = 6
    ) -> None:
        """
        Fit the model with data

        Args:
            x (pd.DataFrame): Fitting data.
            target_index (int): Column index of target feature to predict in data
            min_samples_split (int, optional): Minimal samples to concider for cut. Defaults to 30.
            min_gain (float, optional): Minimal acceptable information gain from cut. Defaults to 0.01.
            max_depth (int, optional): Maximal depth of tree. Defaults to 6.
        """
        stack: Deque["BuildingNode"] = deque()
        stack.append(BuildingNode(0,x.copy(deep=False)))

        while True:
            print()
            print(len(stack))
            if len(stack) == 0:
                return
            splitting_node  = stack.pop()
            print(f"depth:{splitting_node.depth} shape: {splitting_node.data.shape}, is left: {splitting_node.is_left_child}")

            #=== finding minimal split and checking end conditions =======
            if splitting_node.data.shape[0] <= min_samples_split or splitting_node.depth == max_depth :
                self._create_leaf_node(splitting_node, target_index)
                continue

            cut = self._find_cut(splitting_node.data, target_index)
            print(f"gain: {cut.gain}, column: {cut.column_index}")

            if cut.gain <= min_gain:
                self._create_leaf_node(splitting_node, target_index)
                continue
            #======================+=======================================

            #=== adding non leaf node ========
            new_node = Node(splitting_node.depth,cut.column_index, cut.value) # type: ignore
            if self.root is None:
                self.root = new_node
            else:
                splitting_node._add_to_parent(new_node)
            #=================================

            #=== creating and adding left and right nodes to cut =======
            right_x = splitting_node.data[splitting_node.data.iloc[:,cut.column_index] != cut.value]
            stack.append(BuildingNode(splitting_node.depth + 1, right_x, new_node, is_left_child= False))
            left_x = splitting_node.data[splitting_node.data.iloc[:,cut.column_index] == cut.value]
            stack.append(BuildingNode(splitting_node.depth + 1, left_x,new_node))
            #============================================================

            print(f"left: {left_x.shape}, right: {right_x.shape}")

    def predict(self, x: pd.DataFrame) -> list:
        """
        predicting data

        Args:
            x (pd.DataFrame): dataframe without target column

        Raises:
            Exception: When the data aren't in right format

        Returns:
            list: predicted value for each row
        """
        if self.root == None:
            raise Exception("Tree ins't built")
        result = []
        for _,row in x.iterrows(): #HACK: slow solution
            result.append(int(self._find_result(row,self.root)))
        return result
    
    def _find_result(self, row:pd.Series, node: Node | LeafNode) -> int:
        """
        Recursively traverse the tree.

        Args:
            row (pd.Series): Row to predict without targeted column.
            node (Node | LeafNode, Optional): Current node in tree.

        Returns:
            int: value 
        """

        if type(node) is LeafNode:
            return node.value
        elif row[node.column_index] == node.column_value: # type: ignore
            return self._find_result(row, node.left_child) # type: ignore
        return self._find_result(row,node.right_child) # type: ignore
        

    def _create_leaf_node(self, splitting_node: BuildingNode, target_index: int) -> None:
            leaf_node = LeafNode(value=splitting_node.data.iloc[:,target_index].mode()[0])
            splitting_node._add_to_parent(leaf_node)

    def _find_cut(self, x: pd.DataFrame, target_index: int) -> Cut:
        """
        Find the best cut for given data.

        Args:
            x (pd.DataFrame): Fitting data for splitting.
            target_index (int): Index column with targeted feature.

        Raises:
            Exception: If there are not any possible cut.

        Returns:
            Cut: Best cut for current data.
        """
        pre_cut_impurity = self._qini_impurity(x.iloc[:,target_index])
        best_cut = Cut(0,0,float("inf"))

        for column_index,_ in enumerate(x.columns):
            if x.columns[target_index] == x.columns[column_index]:
                continue
            best_column_cut = self._best_cut(
                x,
                index = column_index,
                pre_cut_impurity = pre_cut_impurity,
                target_index = target_index
            )
            if best_column_cut.gain > best_cut.gain:
                best_cut = best_column_cut

        if type(best_cut.value) is inf:
            raise Exception("Cut wasn't found.")
        return  best_cut

    def _best_cut(
        self,
        x: pd.DataFrame,
        index: int,
        pre_cut_impurity: float,
        target_index: int
    ) -> Cut:
        """
        Find best cut for specified feature.

        Args:
            x (pd.DataFrame): Fitting data.
            index (int): Index of splitting feature.
            pre_cut_impurity (float): Precomputed qini impurity for pre_splitted data.
            target_index (int): Column index with target feature.

        Returns:
            Cut: Best feature cut.
        """

        def best_continuous_cut(
            x: pd.DataFrame,
            index: int,
            pre_cut_impurity: float,
            target_index: int
        ) -> Cut:
            best_cut = Cut(0,index,float("inf"))
            diffs = x.iloc[:,index].sort_values().diff()

            for diff in diffs: #HACK: lot of values really slow
                gain =self._information_gain(pre_cut_impurity, x[x.iloc[:,index] <= diff].iloc[:,target_index])
                if gain > best_cut.value:
                    best_cut = Cut(gain,index,gain)
            if type(best_cut.value) is inf:
                raise Exception(" cut wasn't found on column.")
            return best_cut

        def best_categorical_cut(
                x: pd.DataFrame,
                index: int,
                pre_cut_impurity: float,
                target_index
        ) -> Cut:
            unique_values = x.iloc[:,index].unique()
            # if x[:,target_index].unique().shape[0] == unique.shape[0] == 2:
            #     return pre_cut_impurity - x[x.iloc[:,index] == unique[0]].iloc[:,target_index]
            best_cut = Cut(0,index,float("inf"))

            for unique_value in unique_values:
                gain = self._information_gain(pre_cut_impurity, x[x.iloc[:,index] == unique_value].iloc[:,target_index])
                if gain > best_cut.gain:
                    best_cut = Cut(gain,index,unique_value)
            if type(best_cut.value) is inf:
                raise Exception(" cut wasn't found on column.")
            return best_cut
        
        if x.iloc[:,index].dtype.name == "category":
            return best_categorical_cut(
                x = x,
                index = index,
                pre_cut_impurity = pre_cut_impurity,
                target_index = target_index
            )
        return best_continuous_cut(
            x = x,
            index = index,
            pre_cut_impurity = pre_cut_impurity, 
            target_index= target_index 
        )    
        
    def _information_gain(self, pre_cut_impurity, x:pd.Series) -> float: 
        return pre_cut_impurity - self._qini_impurity(x)
    
    def _qini_impurity(self, x: pd.Series) -> float:
        p = x.value_counts()/x.shape[0]
        return 1 - np.sum(p**2)

In [19]:
tree = DecisionTree()
tree.fit(x_train,-1,min_gain=0.01, max_depth=6)
pred = tree.predict(x_test)



1
depth:0 shape: (734, 11), is left: True
gain: 0.2468832719204046, column: 2
left: (148, 11), right: (586, 11)

2
depth:1 shape: (148, 11), is left: True
gain: 0.05023998350253167, column: 6
left: (105, 11), right: (43, 11)

3
depth:2 shape: (105, 11), is left: True
gain: 0.0423364227187315, column: 8
left: (97, 11), right: (8, 11)

4
depth:3 shape: (97, 11), is left: True
gain: 0.018615958233649477, column: 1
left: (35, 11), right: (62, 11)

5
depth:4 shape: (35, 11), is left: True
gain: 0.20244897959183672, column: 5
left: (1, 11), right: (34, 11)

6
depth:5 shape: (1, 11), is left: True

5
depth:5 shape: (34, 11), is left: False
gain: 0, column: 0

4
depth:4 shape: (62, 11), is left: False
gain: 0.005881814224128812, column: 5

3
depth:3 shape: (8, 11), is left: False

2
depth:2 shape: (43, 11), is left: False
gain: 0.10436063272977603, column: 6
left: (22, 11), right: (21, 11)

3
depth:3 shape: (22, 11), is left: True

2
depth:3 shape: (21, 11), is left: False

1
depth:1 shape: (

In [20]:
print(confusion_matrix(y_test.astype(int),pred))
print(recall_score(y_test.astype(int),pred))

[[49 33]
 [13 89]]
0.8725490196078431


## naivní Bayes

In [34]:
class Probability():

    def get_probability(self):
        pass

@dataclass
class CategoricalProbability(Probability):
    categories: dict[Any,float] 

    def get_probability(self, value: Any):
        return self.categories[value]

@dataclass
class ContinuousLikelihood(Probability):
    dist: NormalDist

    def get_probability(self, value: float):
        return self.dist.pdf(value)


@dataclass
class Probabilities():
    probabilities: dict[Any,list[Probability]]
    target_values: list = field(init=False)
    size_column: int = field(init=False)

    def get_probability(self, target_value, index, value)-> float:
        return self.probabilities[target_value][index].get_probability(value)
        
    def __post_init__(self):
        self.target_values = list(self.probabilities.keys())
        self.size_column = len(self.probabilities[self.target_values[0]])

class NaiveBayes():

    def __init__(self, alpha: int = 1) -> None:
        self._alpha = alpha


    def fit(self, x: pd.DataFrame, target_index: int) -> None:
        """
        Fit the model with data.

        Args:
            x (pd.DataFrame): data
            target_index (int): column index of target feature to predict in data
        """
        probabilities = {}
        total = x.shape[0]
        y = x.pop(str(x.columns[target_index]))

        for target_value in y.unique():
            filtered_data = x[x.iloc[:,target_index] == target_value]
            probabilities[target_value] = []
            for column in x.columns:
                probabilities[target_value].append(self._calculate_probabilities(filtered_data[column], total))
        self._probabilities = Probabilities(probabilities)


    def predict(self, rows: pd.DataFrame)-> list:
        """
        Predict values for given rows.

        Args:
            rows (pd.DataFrame): Rows of predicting data without target column.

        Returns:
            list: Predicted values for each row.
        """
        probabilities = []
        for _,row in rows.iterrows(): # HACK: slow solution
            probabilities .append(self._get_category(row))
        return probabilities

    
    def _get_category(self, row: pd.Series) -> Any:
        """
        Calculate probability for all classes.

        Args:
            row (pd.Series): Row to predict.

        Returns:
            Any : Category with biggest probability.
        """
        max_prob = -9999
        target = None
        for target_value in self._probabilities.target_values:
            probability = 1
            for index_column in range(self._probabilities.size_column):
                probability *= np.log(self._probabilities.get_probability(target_value,index_column,row.iloc[index_column]))
            if probability > max_prob:
                max_prob = probability
                target = target_value
        return target


    def _calculate_probabilities(self, x: pd.Series, total: int) -> Probability:
        """
        Calculate probability metrics.

        Args:
            x (pd.Series): Feature ( column )

        Returns:
            Probability: return Probability object for given class ( category or continous ) which can calculate probability.
        """
        if x.dtype.name == "category":
            probabilities = {}
            for index, value in x.value_counts().items():
                probabilities[index] = ((value + self._alpha) / (total + self._alpha))
            return CategoricalProbability(probabilities) 
        return ContinuousLikelihood(NormalDist(mu=x.mean() ,sigma = x.std()))

In [35]:
bayes_classifier = NaiveBayes()
bayes_classifier.fit(x_train,-1)
pred = bayes_classifier.predict(x_test)

In [37]:
print(recall_score(y_test.astype(int),pred))
print(confusion_matrix(y_test.astype(int),pred))

0.8333333333333334
[[ 0 82]
 [17 85]]
