In [90]:
import pandas as pd
import numpy as np

df = pd.read_csv('lab2.cleveland.csv')
df_test = pd.read_csv('test.lab2.cleveland.csv')

In [91]:
columns_with_invalid_data = set()

for col in df:
    for el in df[col]:
        if el == '?':
            columns_with_invalid_data.add(col)
            
print(columns_with_invalid_data)

for col_name in columns_with_invalid_data:
    df.drop(df[df[col_name] == '?'].index, inplace=True)

df.reset_index(inplace=True, drop=True)

{'thal', 'ca'}


In [92]:
data = df.values.tolist()
data_test = df_test.values.tolist()

for i in range(len(data)):
    data[i] = list(map(float, data[i]))

print(data[0])

[41.0, 0.0, 2.0, 105.0, 198.0, 0.0, 0.0, 168.0, 0.0, 0.0, 1.0, 1.0, 3.0, 0.0]


In [93]:
df

Unnamed: 0,age,sex,cp,trestbps,chol,fbs,restecg,thalach,exang,oldpeak,slope,ca,thal,num
0,41,0,2,105,198,0,0,168,0,0.0,1,1,3,0
1,65,1,4,120,177,0,0,140,0,0.4,1,0,7,0
2,44,1,4,112,290,0,2,153,0,0.0,1,1,3,2
3,44,1,2,130,219,0,2,188,0,0.0,1,0,3,0
4,60,1,4,130,253,0,0,144,1,1.4,1,1,7,1
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
242,57,0,4,140,241,0,0,123,1,0.2,2,0,7,1
243,45,1,1,110,264,0,0,132,0,1.2,2,0,7,1
244,68,1,4,144,193,1,0,141,0,3.4,2,2,7,2
245,57,1,4,130,131,0,0,115,1,1.2,2,1,7,3


#### Преобразуем признаки из диапазона [0:4] в значения [0,1]

[0 -> 0]
[1 -> 0]
[2 -> 1]
[3 -> 1]
[4 -> 1]

In [94]:
for li in data:
    li[-1] = 0 if li[-1] <= 1 else 1

for li in data_test:
    li[-1] = 0 if li[-1] <= 1 else 1

### Подготовка тренировочной и тестовой выборок

In [95]:
print(len(data), len(data_test))

247 50


In [96]:
X_train = [row[:-1] for row in data]
Y_train = [row[-1] for row in data]
X_test = [row[:-1] for row in data_test]
Y_test = [row[-1] for row in data_test]

### Logistic regression

In [143]:
class LogisticRegression:
    def __init__(self, learning_step=0.01, iters=5000):
        self.step = learning_step
        self.num_iter = iters

    def add_ones(self, X):
        return np.concatenate((np.ones((X.shape[0], 1)), X), axis=1)

    def sigm(self, z):
        overflow_pos = sum([i > 4 for i in z])
        overflow_neg = sum([i < -4 for i in z])
        if overflow_pos > 0:
            return [1 for _ in range(len(z))]
        elif overflow_neg > 0:
            return [0 for _ in range(len(z))]
        else:
            return 1 / (1 + np.exp(-z))

    def fit(self, X, Y):
        x = self.add_ones(np.array(X))
        y = np.array(Y)
        self.w = np.zeros(x.shape[1])

        z = np.dot(x, self.w)
        yp = self.sigm(z)

        for i in range(self.num_iter):
            gradient = np.dot(x.T, (yp - y)) / len(y)
            self.w -= self.step * gradient
            z = np.dot(x, self.w)
            yp = self.sigm(z)

        return self

    def predict(self, X):
        x = self.add_ones(np.array(X))
        z = np.dot(x, self.w)
        return self.sigm(z)

    def score(self, X, Y):
        return sum(np.array(Y) == self.predict(X)) / len(X)

In [144]:
lr = LogisticRegression().fit(X_train, Y_train)

In [145]:
print(f'accuracy = {lr.score(X_test, Y_test)*100}%')

accuracy = 74.0%


### Decision Tree

Этропия и полученная информация

In [101]:
def entropy(array):
    total = len(array)
    ones = sum(el[-1] for el in array)
    zeros = total - ones

    p0 = zeros / total
    p1 = ones / total

    if p0 == 0 or p1 == 0:
        return 0

    return -p0 * np.log2(p0) - p1 * np.log2(p1)


def information_gain(root, children):
    n0 = len(root)
    return entropy(root) - sum(len(arr) / n0 * entropy(arr) for arr in children)

Список условий для разбиения подвыборок

In [102]:
class SplitCondition:
    def __init__(self, idx, val, operation):
        self.idx = idx
        self.val = val
        self.operation = operation

    def __str__(self):
        return f'<SplitCond> idx={self.idx}, val={self.val}'


less = lambda x, y: x < y
equal = lambda x, y: x == y

my_split_conditions = [
    # age
    SplitCondition(0, 40, less),
    SplitCondition(0, 50, less),
    SplitCondition(0, 60, less),

    # sex
    SplitCondition(1, 0, equal),

    # cp
    SplitCondition(2, 4, equal),

    # trestbps
    SplitCondition(3, 130, less),
    SplitCondition(3, 150, less),

    # chol
    SplitCondition(4, 150, less),
    SplitCondition(4, 200, less),
    SplitCondition(4, 250, less),
    SplitCondition(4, 300, less),

    # fbs
    SplitCondition(5, 0, equal),

    # restecg
    SplitCondition(6, 0, equal),

    # thalach
    SplitCondition(7, 120, less),
    SplitCondition(7, 145, less),
    SplitCondition(7, 170, less),

    # exang
    SplitCondition(8, 0, equal),

    # oldpeak
    SplitCondition(9, 0.5, less),
    SplitCondition(9, 1.5, less),
    SplitCondition(9, 2.5, less),

    # slope
    SplitCondition(10, 2, equal),

    # ca
    SplitCondition(11, 2, less),

    # thal
    SplitCondition(12, 3, equal),
    SplitCondition(12, 7, equal),
]

In [103]:
# all rows in dataset [0-12]
table_rows = [*range(13)]

Реализация самого дерева решений

In [104]:
class Node:
    def __init__(self, array=None, split_condition=None):
        self.array = array
        self.children = []
        self.split_condition = split_condition

    def __str__(self):
        return f'<Node> with size={len(self.array)}, {len(self.children)} childrens'

    def add_children(self, array):
        self.children.append(Node(array))


class DecisionTree:
    def __init__(self, max_depth=5):
        self.max_depth = max_depth
        self.root = None
        self.split_conditions = my_split_conditions

    def fit(self, x, y, n_usable_columns=None):
        merged = []
        for i in range(len(x)):
            merged.append([*x[i], y[i]])

        self.root = Node(merged)
        self.build(self.root, 0, n_usable_columns)

    def build(self, node, depth, n_usable_columns):
        if depth == self.max_depth:
            return

        left, right, split_condition = self.find_best_split(node, n_usable_columns)
        if left is None or right is None:
            return

        node.add_children(left)
        node.add_children(right)
        node.split_condition = split_condition

        self.build(node.children[0], depth+1, n_usable_columns)
        self.build(node.children[1], depth+1, n_usable_columns)

    def split_by_condition(self, array, condition: SplitCondition):
        left = []
        right = []

        for row in array:
            if condition.operation(row[condition.idx], condition.val):
                left.append(row)
            else:
                right.append(row)

        return left, right

    def find_best_split(self, node, n_usable_columns):
        max_info_gain = 0
        res_left, res_right = None, None
        res_split_condition = None

        available_conditions = []
        available_rows = table_rows

        if n_usable_columns is not None:
            available_rows = random.sample(available_rows, n_usable_columns)
            for condition in self.split_conditions:
                if condition.idx in available_rows:
                    available_conditions.append(condition)
        else:
            available_conditions = self.split_conditions

        for condition in available_conditions:
            left, right = self.split_by_condition(node.array, condition)

            if len(left) == 0 or len(right) == 0:
                continue

            current_info_gain = information_gain(node.array, [left, right])
            if current_info_gain > max_info_gain:
                max_info_gain = current_info_gain
                res_left = left.copy()
                res_right = right.copy()
                res_split_condition = condition

        return res_left, res_right, res_split_condition
    
    def predict(self, x):
        node = self.root

        while len(node.children) != 0:
            if node.split_condition.operation(x[node.split_condition.idx], node.split_condition.val):
                node = node.children[0]
            else:
                node = node.children[1]

        ones = sum(row[-1] for row in node.array)
        zeros = len(node.array) - ones

        return 1 if ones > zeros else 0

    def score(self, x, y):
        success = 0
        for i in range(len(x)):
            success += 1 if y[i] == self.predict(x[i]) else 0
        
        return success / len(x)

    def precision(self, x, y):
        truepos = 0
        falsepos = 0
        for i in range(len(x)):
            truepos += 1 if (y[i] == self.predict(x[i]) and self.predict(x[i]) == 1)  else 0
            falsepos += 1 if (y[i] == 0 and self.predict(x[i]) == 1) else 0
        return (truepos /(truepos + falsepos))*1

   

In [105]:
tree = DecisionTree(max_depth=5)

In [106]:
tree.fit(X_train, Y_train)

In [107]:
print(f'accuracy = {tree.score(X_test, Y_test)*100}%')

accuracy = 88.0%


In [108]:
print(f'precision = {tree.precision(X_test, Y_test)*100}%')

precision = 81.81818181818183%


### Random Forest

In [109]:
import math
import random

class RandomForest:
    def __init__(self, n_usable_columns, n_trees=100, max_tree_depth=5):
        self.trees = []
        self.n_usable_columns = n_usable_columns
        self.n_trees = n_trees
        self.accuracy = 0
        self.max_tree_depth = max_tree_depth

    def fit(self, x, y):
        (x_train, y_train), (x_test, y_test) = self.bootstrap(x, y, int(len(x) / 1.5))
        
        for i in range(self.n_trees):
            tree = DecisionTree(self.max_tree_depth)
            tree.fit(x_train, y_train, self.n_usable_columns)
            self.trees.append(tree)
            
        if len(x_test) != 0:
            self.accuracy = self.score(x_test, y_test)

    def bootstrap(self, x, y, new_size):
        x_new, y_new = [], []
        x_out_of_bag, y_out_of_bag = [], []

        merged = []
        for i in range(len(x)):
            merged.append([*x[i], y[i]])

        for i in range(new_size):
            idx = random.randint(0, len(x)-1)
            x_new.append(x[idx])
            y_new.append(y[idx])

        for i in range(len(x)):
            if x[i] not in x_new:
                x_out_of_bag.append(x[i])
                y_out_of_bag.append(y[i])

        return (x_new, y_new), (x_out_of_bag, y_out_of_bag)

    def predict(self, x):
        ones = sum([tree.predict(x) for tree in self.trees])
        zeros = len(self.trees) - ones
        return 1 if ones > zeros else 0

    def score(self, x, y):
        success = 0
        for i in range(len(x)):
            success += 1 if y[i] == self.predict(x[i]) else 0

        return success / len(x)

    def precision(self, x, y):
        truepos = 0
        falsepos = 0
        for i in range(len(x)):
            truepos += 1 if (y[i] == self.predict(x[i]) and self.predict(x[i]) == 1)  else 0
            falsepos += 1 if (y[i] == 0 and self.predict(x[i]) == 1) else 0
        return (truepos /(truepos + falsepos))*1 

In [110]:
class ExtraRandomForest:
    def __init__(self):
        pass

    def find_best_random_forest(self, x, y, usable_columns_range:range, n_trees=100, max_tree_depth=5):
        best_accuracy = 0
        best_random_forest = None
        
        for usable_column in usable_columns_range:
            #print(f'building forest with {usable_column} columns')
            forest = RandomForest(usable_column, n_trees, max_tree_depth)
            forest.fit(x, y)
            #print(f'{forest.accuracy=}')
            if forest.accuracy > best_accuracy:
                best_random_forest = forest
                best_accuracy = forest.accuracy
    
        return best_random_forest

In [111]:
erf = ExtraRandomForest()
cols = int(math.sqrt(len(X_train[0])))
cols_bottom = max(0, cols-2)
cols_top = min(len(X_train[0]), cols+6)

print([*range(cols_bottom, cols_top)])

best_random_forest = None


[1, 2, 3, 4, 5, 6, 7, 8]


In [116]:
def test_forest(trees, depth):
    global best_random_forest
    best_random_forest = erf.find_best_random_forest(X_train, Y_train, range(cols_bottom, cols_top), 
                                                 n_trees=trees, max_tree_depth=depth)
    n_trees=trees
    max_tree_depth=depth
    accuracy = best_random_forest.score(X_test, Y_test)
    precision = best_random_forest.precision(X_test, Y_test)
    print(f'trees={n_trees}, depth={max_tree_depth}, precision ={precision*100}% accuracy={accuracy*100}%')

    
test_forest(100, 5)
test_forest(100, 7)
test_forest(100, 9)
test_forest(300, 5)

trees=100, depth=5, precision =72.72727272727273% accuracy=84.0%
trees=100, depth=7, precision =57.14285714285714% accuracy=76.0%
trees=100, depth=9, precision =57.14285714285714% accuracy=76.0%
trees=300, depth=5, precision =72.72727272727273% accuracy=84.0%


In [113]:
accuracy = best_random_forest.score(X_test, Y_test)
precision = best_random_forest.precision(X_test, Y_test)

print(f'precision ={precision*100}% accuracy={accuracy*100}%')


precision =60.0% accuracy=76.0%


### Sklearn

#### Logistic regression

In [136]:
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import classification_report
lr1 = LogisticRegression(max_iter=1000).fit(X_train, Y_train)
print(f'accuracy = {lr1.score(X_test, Y_test)*100}%')

accuracy = 92.0%


#### Decision Tree

In [133]:
from sklearn.tree import DecisionTreeClassifier

def test_tree(depth):
    tree1 = DecisionTreeClassifier(max_depth=depth).fit(X_train, Y_train)
    print(f'depth={depth}, accuracy = {tree1.score(X_test, Y_test)*100}%')
test_tree(1)
test_tree(2)
test_tree(3)
test_tree(4)
test_tree(5)
test_tree(6)
test_tree(7)

depth=1, accuracy = 72.0%
depth=2, accuracy = 72.0%
depth=3, accuracy = 86.0%
depth=4, accuracy = 86.0%
depth=5, accuracy = 88.0%
depth=6, accuracy = 74.0%
depth=7, accuracy = 86.0%


#### Random Forest

In [134]:
from sklearn.ensemble import RandomForestClassifier

def test_forest(trees, depth):
    print(f'trees={trees}, depth={depth}, accuracy = {RandomForestClassifier(max_depth=depth, n_estimators=trees).fit(X_train, Y_train).score(X_test, Y_test)*100}%')


test_forest(100, 5)
test_forest(100, 7)
test_forest(100, 9)
test_forest(300, 5)

trees=100, depth=5, accuracy = 86.0%
trees=100, depth=7, accuracy = 88.0%
trees=100, depth=9, accuracy = 86.0%
trees=300, depth=5, accuracy = 86.0%
