In [2]:
import json
import os
import warnings

import numpy as np
import pandas as pd
import scipy.stats as sps
from tqdm.notebook import tqdm

%matplotlib notebook

In [3]:
def flatten_dict(d, parent_key='', sep=':'):
    items = {}
    for k, v in d.items():
        new_key = f"{parent_key}{sep}{k}" if parent_key else k
        if isinstance(v, dict):
            items.update(flatten_dict(v, new_key, sep=sep))
        else:
            items[new_key] = v
    return items

In [4]:
def get_dataframes(folder, inclusion_rules=None):
    if inclusion_rules is None:
        inclusion_rules = []
    filenames = [
        filename
        for filename in os.listdir(f"features/{folder}")
        if all(inclusion_rule in filename for inclusion_rule in inclusion_rules)
    ]

    dataframes = []
    for filename in filenames:
        # print(filename)
        with open(f"features/{folder}/{filename}", mode="r", encoding="utf-8") as file:
            features = json.loads(file.read())
            features = flatten_dict(features)
            for feature in features:
                # if feature == "plv:C3_F4_delta":
                #     pass
                if "annotations" not in feature:
                    if max(features[feature]) != min(features[feature]):
                        # features[feature] = sps.zscore(features[feature])
                        with warnings.catch_warnings():
                            warnings.simplefilter("ignore", category=RuntimeWarning)
                            features[feature] = np.nan_to_num(sps.zscore(features[feature]), nan=0)
                    else:
                        features[feature] = np.zeros_like(features[feature])
            dataframes.append(pd.DataFrame(features))

    df = pd.concat(dataframes, ignore_index=True).dropna()
    for key in df.keys():
        if "annotations" in key:
            df[key] = df[key].astype(int)

    return df, dict(zip(filenames, dataframes))

In [5]:
def get_iv(df, annotation_type="main"):
    df.dropna(inplace=True)
    annotations = sorted(list(set(df["annotations:main"])))

    results = {}
    for column in df.columns:
        if "annotations" in column:
            continue

        column_binned, bins = pd.qcut(df[column], q=10, labels=False, retbins=True, duplicates="drop")

        woe_iv_info = []
        for bin in range(len(bins) - 1):
            bin_info = {
                "bin": pd.Interval(bins[bin], bins[bin + 1])
            }
            bin_items = df[column_binned == bin]

            for annotation in annotations:
                good = (bin_items[f"annotations:{annotation_type}"] == annotation).sum()
                bad = len(bin_items) - good
                bin_info[f"good_{annotation}"] = good
                bin_info[f"bad_{annotation}"] = bad
                if good == 0:
                    good = 0.5
                if bad == 0:
                    bad = 0.5

                total_good = (df[f"annotations:{annotation_type}"] == annotation).sum()
                total_bad = df.shape[0] - total_good

                per_good = good / total_good
                per_bad = bad / total_bad
                woe = np.log(per_good / per_bad)
                iv = (per_good - per_bad) * woe

                bin_info[f"WoE_{annotation}"] = woe
                bin_info[f"IV_{annotation}"] = iv

            woe_iv_info.append(bin_info)

        woe_iv_info = pd.DataFrame(woe_iv_info)

        # display(woe_iv_info)
        # break

        results[column] = {}
        if len(bins) > 2:
            for annotation in annotations:
                iv = woe_iv_info[f'IV_{annotation}'].sum()
                results[column][annotation] = iv
        else:
            for annotation in annotations:
                iv = 0
                results[column][annotation] = iv

    return pd.DataFrame(results).T

In [6]:
def translate_indexes(df):
    TRANSLATION_RULES = [
        ("delta", "дельта"),
        ("theta", "тета"),
        ("alpha", "альфа"),
        ("beta", "бета"),
        ("gamma", "гамма"),
        ("psd:", "[PSD] "),
        ("-", " "),
        ("psd_ratio:", "[отн-е PSD] "),
        ("coh:", "[когер.] "),
        ("plv:", "[PLV] "),
        ("lzc", "[LZC] "),
        ("S:", "[энтроп. К.-С.] "),
        ("Lambda:", "[мера Л.] "),
        (":median", "(бин. по медиане)"),
        (":3:", "(3 инт-ла) "),
        (":4:", "(4 инт-ла) "),
        (":5:", "(5 инт-лов) "),
        (":6:", "(6 инт-лов) "),
        ("spatial4", "(4 инт-ла)"),
        ("spatial6", "(6 инт-лов)"),
        ("spatial10", "(10 инт-лов)"),
        ("spatial14", "(14 инт-лов)"),
        ("spatial20", "(20 инт-лов)"),
        ("spatial24", "(24 инт-ла)"),
        ("temporal3", "(врем., окно 3)"),
        ("temporal4", "(врем., окно 4)"),
        (":", " "),
        ("_", " "),
    ]

    indexes = np.array(df.index, dtype="str")
    for rule_from, rule_to in TRANSLATION_RULES:
        indexes = np.char.replace(indexes, rule_from, rule_to)
    df.index = indexes

    return df

In [7]:
def get_iv_table(folder, inclusion_rules=None, ignore_separate_files=False):
    if inclusion_rules is None:
        inclusion_rules = []

    df, dfs = get_dataframes(
        folder,
        inclusion_rules
    )
    iv = translate_indexes(get_iv(df))
    iv = iv.rename(
        columns={stage: f"{stage}_ALL" for stage in iv.columns}
    )

    ivs = [iv]

    if not ignore_separate_files:
        for filename, df in tqdm(dfs.items()):
            iv = translate_indexes(get_iv(df))
            iv = iv.rename(
                columns={stage: f"{stage}_{filename[:-5]}" for stage in iv.columns}
            )
            ivs.append(iv)

    combined_iv = pd.concat(ivs, axis=1)
    combined_iv = combined_iv[sorted(combined_iv.columns)]
    return combined_iv

In [168]:
iv_isruc_s3 = get_iv_table(
    "isruc-sleep",
    ["part2", "s3"]
)
iv_isruc_s3.to_excel(f"iv/isruc-sleep/results_s3.xlsx")
iv_isruc_s3.to_excel(f"iv/isruc-sleep/results_s3_bu.xlsx")

iv_isruc_all = get_iv_table(
    "isruc-sleep",
    ["part2"]
)
iv_isruc_all.to_excel(f"iv/isruc-sleep/results_all.xlsx")
iv_isruc_all.to_excel(f"iv/isruc-sleep/results_all_bu.xlsx")

  0%|          | 0/10 [00:00<?, ?it/s]

  0%|          | 0/20 [00:00<?, ?it/s]

In [8]:
iv_sleep_edf = get_iv_table(
    "sleep_edf_database_expanded",
    ["part2"]
)
iv_sleep_edf.to_excel(f"iv/sleep_edf_database_expanded/results.xlsx")
iv_sleep_edf.to_excel(f"iv/sleep_edf_database_expanded/results_bu.xlsx")

  0%|          | 0/16 [00:00<?, ?it/s]

In [9]:
iv_eegmat = get_iv_table(
    "eegmat",
    ["part2"]
)
iv_eegmat.to_excel(f"iv/eegmat/results.xlsx")
iv_eegmat.to_excel(f"iv/eegmat/results_bu.xlsx")

  0%|          | 0/36 [00:00<?, ?it/s]

In [10]:
iv_spis = get_iv_table(
    "spis",
    ["part2"]
)
iv_spis.to_excel(f"iv/spis/results.xlsx")
iv_spis.to_excel(f"iv/spis/results_bu.xlsx")

  0%|          | 0/10 [00:00<?, ?it/s]

In [11]:
iv_mnist = get_iv_table(
    "mnist",
    ["part2"],
    ignore_separate_files=True
)
iv_mnist.to_excel(f"iv/mnist/results.xlsx")
iv_mnist.to_excel(f"iv/mnist/results_bu.xlsx")