# Ensemble Soft Voting

In [None]:
import os
os.chdir('D:\\Code\\maa\\')

In [None]:
from typing import List, Dict
from numpy import load, ndarray
from sqlmodel import SQLModel, create_engine, Session, select

from models.config import Config
from models.models import Sample, EnsembleSample


__author__ = "Marius Benthin"


# load secrets from environment
config = Config()

# create database connection and tables
sql_engine = create_engine(config.database_url)
SQLModel.metadata.create_all(sql_engine)

# load numpy feature vector
"""
with load(file='model_A_fnn_probabilities.npz', allow_pickle=True) as model_A:
    p_model_A: ndarray = model_A['P']
    x_model_A: list[int] = [sample_id for sample_id, _ in model_A['sample_ids']]

with load(file='model_B_rfc_probabilities.npz', allow_pickle=True) as model_B:
    p_model_B: ndarray = model_B['P']
    x_model_B: list[int] = [sample_id for sample_id, _ in model_B['sample_ids']]

with load(file='model_C_rfc_probabilities.npz', allow_pickle=True) as model_C:
    p_model_C: ndarray = model_C['P']
    x_model_C: list[int] = [sample_id for sample_id, _ in model_C['sample_ids']]
"""
with load(file='model_A_fnn_probabilities.npz', allow_pickle=True) as model_A:
    p_model_A: ndarray = model_A['X']
    x_model_A: list[int] = [sample_id for sample_id in model_A['sample_ids']]

with load(file='model_B_rfc_probabilities.npz', allow_pickle=True) as model_B:
    p_model_B: ndarray = model_B['X']
    x_model_B: list[int] = [sample_id for sample_id in model_B['sample_ids']]

with load(file='model_C_rfc_probabilities.npz', allow_pickle=True) as model_C:
    p_model_C: ndarray = model_C['X']
    x_model_C: list[int] = [sample_id for sample_id in model_C['sample_ids']]

with Session(sql_engine) as session:
    labels: List[str] = []
    samples: Sample = session.exec(select(Sample).where(Sample.fold_id != None)).all()
    parents: Dict[str, EnsembleSample] = {}
    children: Dict[str, EnsembleSample] = {}
    parent_model_A: bool = False
    parent_model_B: bool = False
    parent_model_C: bool = False
    for sample in samples:
        label: str = sample.group.name
        fold_id: int = sample.fold_id
        if len(sample.children) == 0:
            if sample.id in x_model_A:
                parent_model_A = True
            if sample.id in x_model_B:
                parent_model_B = True
            if sample.id in x_model_C:
                parent_model_C = True
        else:
            for child in sample.children:
                if child.id in x_model_A:
                    child_model_A = True
                if child.id in x_model_B:
                    child_model_B = True
                if child.id in x_model_C:
                    child_model_C = True
                children[child.id] = EnsembleSample(
                    fold_id=fold_id,
                    label=label,
                    model_A=child_model_A,
                    model_B=child_model_B,
                    model_C=child_model_C
                )
        parents[sample.id] = EnsembleSample(
            fold_id=fold_id,
            label=label,
            model_A=parent_model_A,
            model_B=parent_model_B,
            model_C=parent_model_C
        )

        labels.append(label)

In [None]:
from numpy import sum, full, divide, argmax
from sklearn.metrics import classification_report

y = []
y_cm = []
y_pred = []
y_pred_cm = []

for parent_id, sample in parents.items():

    predictions_model_1 = []
    predictions_model_2 = []
    predictions_model_3 = []

    if sample.model_A and int(parent_id) in x_model_A:
        predictions_model_1.append(p_model_A[x_model_A.index(int(parent_id))])
    if sample.model_B and int(parent_id) in x_model_B:
        predictions_model_2.append(p_model_B[x_model_B.index(int(parent_id))])
    if sample.model_C and int(parent_id) in x_model_C:
        predictions_model_3.append(p_model_C[x_model_C.index(int(parent_id))])

    for child_id in sample.children:
        if children[child_id].model_A and int(child_id) in x_model_A:
            predictions_model_1.append(p_model_A[x_model_A.index(int(child_id))])
        if children[child_id].model_B and int(child_id) in x_model_B:
            predictions_model_2.append(p_model_B[x_model_B.index(int(child_id))])

    predictions = []

    if len(predictions_model_1) > 0:
        predictions.append(divide(sum(predictions_model_1, axis=0), full((len(labels), 1), len(predictions_model_1))))

    if len(predictions_model_2) > 0:
        predictions.append(divide(sum(predictions_model_2, axis=0), full((len(labels), 1), len(predictions_model_2))))

    if len(predictions_model_3) > 0:
        predictions.append(divide(sum(predictions_model_3, axis=0), full((len(labels), 1), len(predictions_model_3))))

    prediction = argmax(divide(sum(predictions, axis=0), full((len(labels), 1), len(predictions))))

    y.append(sample.label)
    if prediction is not None and prediction != -1:
        y_cm.append(sample.label)
        y_pred.append(labels[prediction])
        y_pred_cm.append(labels[prediction])
    else:
        y_pred.append('unknown')

print(classification_report(y_true=y, y_pred=y_pred))

In [None]:
from matplotlib import pyplot as plt
from sklearn.metrics import ConfusionMatrixDisplay

fig, ax = plt.subplots(figsize=(config.cm_size, config.cm_size))
ConfusionMatrixDisplay.from_predictions(y_cm, y_pred_cm, xticks_rotation='vertical', ax=ax, normalize='true', values_format = '.2f')