In [None]:
# This Python 3 environment comes with many helpful analytics libraries installed
# It is defined by the kaggle/python Docker image: https://github.com/kaggle/docker-python
# For example, here's several helpful packages to load

import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)

# Input data files are available in the read-only "../input/" directory
# For example, running this (by clicking run or pressing Shift+Enter) will list all files under the input directory

import os
for dirname, _, filenames in os.walk('/kaggle/input'):
    for filename in filenames:
        print(os.path.join(dirname, filename))

# You can write up to 20GB to the current directory (/kaggle/working/) that gets preserved as output when you create a version using "Save & Run All" 
# You can also write temporary files to /kaggle/temp/, but they won't be saved outside of the current session

In [None]:
# testing_new_features.py
######################### TO-DO #########################
# 1. User-ID based sessions?
# 2. Redis database when in prod
# 3. Add dark/light themes
# 4. Add all models status page (JSON to store all model metrics on GitLab)

#########################################################

from dash import Dash, dcc, html, Input, Output, State
from flask_caching import Cache
import dash_bootstrap_components as dbc
# import dash_core_components as dcc
# import dash_html_components as html
# from dash.dependencies import Input, Output, State
from dash import Dash, dcc, html, Input, Output, ctx
import dash_bootstrap_components as dbc
import plotly.express as px

from plotting_functions import plot_hist_dist, plot_feature_importances, create_card, psi_plot_ly, plot_auc_roc, \
    plot_ks, psi_variable_plot, new_create_card
from helper_functions import col_dropper, fix_dtypes, DataFrameImputer, AutoPrepareScoreCard
# import plotly.graph_objects as go
import pandas as pd
# from datetime import date
import sqlite3
from sklearn.metrics import roc_curve, roc_auc_score, log_loss
from sklearn.model_selection import train_test_split
import json
import helper_functions
import joblib

from optbinning import BinningProcess
from optbinning.scorecard import ScorecardMonitoring
import logging
import sys
import io
from contextlib import redirect_stdout
import time
from docxtpl import DocxTemplate, InlineImage

import credit_py_validation as cpv

from plotly.tools import mpl_to_plotly

from page_content import generate_search_bar, generate_navbar, generate_sidebar, generate_footer
from styles import get_content_style, get_footer_style, get_sidebar_style, get_sidebar_hidden_style, get_modal_style
import pages.accuracy
import pages.stability

import logging

logger = logging.getLogger(__name__)

class DashLogger(logging.StreamHandler):
    def __init__(self, stream=None):
        super().__init__(stream=stream)
        self.logs = list()

    def emit(self, record):
        try:
            msg = self.format(record)
            self.logs.append(msg)
            self.logs = self.logs[-1000:]
            self.flush()
        except Exception:
            self.handleError(record)

dash_logger = DashLogger(stream=sys.stdout)
logger.addHandler(dash_logger)

# Global variables
HALYK_LOGO = 'assets/halyk_logo.png'
# PRIVATE_TOKEN = ''
model_list = helper_functions.get_all_models()

app = Dash(external_stylesheets=[dbc.themes.CYBORG])

navbar = generate_navbar(model_list, HALYK_LOGO)

# the style arguments for the sidebar. We use position:fixed and a fixed width
CONTENT_STYLE, CONTENT_STYLE1 = get_content_style()
FOOTER_STYLE = get_footer_style()
SIDEBAR_STYLE = get_sidebar_style()
SIDEBAR_HIDDEN = get_sidebar_hidden_style()
MODAL_STYLE = get_modal_style()

sidebar = generate_sidebar(SIDEBAR_STYLE)

content = html.Div(
    id="page-content",
    style=CONTENT_STYLE)

footer = generate_footer(FOOTER_STYLE)

generate_button = html.Div(
    html.Button('Generate report',
                id='generate-report-button',
                n_clicks=0))

MODAL_CONTENT = {
    "margin": "90px",
    "padding": "30px",
    "background-color": "white",
    'textAlign': 'center'
}

modal = html.Div([
    html.Div([
        html.Div([
            'Please, choose a model from the dropdown on top, retard',
        ]),

        html.Hr(),
        html.Button('Close', id='modal-close-button')
    ],
        style=MODAL_CONTENT,
        className='modal-content',
    ),
],
    id='modal',
    className='modal',
    style=MODAL_STYLE
)

app.layout = html.Div(
    [
        dcc.Store(id='intermediate-value'),
        dcc.Store(id='side_click'),
        dcc.Location(id="url"),
        navbar,
        sidebar,
        modal,

        dcc.Loading(
            id='loading-1',
            type='default',
            # fullscreen=True,
            # children=html.Div(id='loading-output-1'),
            # children=['intermediate-value', content]
            children=content
        ),
        # content,

        dcc.Interval(
            id='log-update2',
            interval=1 * 1000  # in milliseconds
        ),
        html.Div(id='log2'),

        generate_button,
        footer
    ]
)

@app.callback(
    Output('log2', 'children'),
    [Input('log-update2', 'n_intervals')])
def update_logs(interval):
    return [html.Div(log) for log in dash_logger.logs]


# @app.callback(Output('my-div', 'children'), [Input('button', 'n_clicks')])
# def add_log(click):
#     logger.warning("Important Message")





@app.callback(Output('modal', 'style'),
              [Input('modal-close-button', 'n_clicks')])
def close_modal(n):
    if n is None:
        return get_modal_style()
    elif (n is not None) and (n > 0):
        return {"display": "none"}


@app.callback(
    [
        Output("sidebar", "style"),
        Output("page-content", "style"),
        Output("side_click", "data"),
    ],

    [Input("btn_sidebar", "n_clicks")],
    [
        State("side_click", "data"),
    ]
)
def toggle_sidebar(n, nclick):
    if n:
        if nclick == "SHOW":
            sidebar_style = SIDEBAR_HIDDEN
            content_style = CONTENT_STYLE1
            cur_nclick = "HIDDEN"
        else:
            sidebar_style = SIDEBAR_STYLE
            content_style = CONTENT_STYLE
            cur_nclick = "SHOW"
    else:
        sidebar_style = SIDEBAR_STYLE
        content_style = CONTENT_STYLE
        cur_nclick = 'SHOW'

    return sidebar_style, content_style, cur_nclick


# this callback uses the current pathname to set the active state of the
# corresponding nav link to true, allowing users to tell see page they are on
@app.callback(
    [Output(f"page-{i}-link", "active") for i in range(1, 4)],
    [Input("url", "pathname")]
)
def toggle_active_links(pathname):
    if pathname == "/":
        # Treat page 1 as the homepage / index
        return True, False, False
    return [pathname == f"/page-{i}" for i in range(1, 4)]


@app.callback(Output('intermediate-value', 'data'), Input('dropdown-models', 'value'))
def make_calculations(value):
    if value is not None:
        incoming_batch_results, test_results, psi_table, psi_var_table_sum, psi_var_table_det, monitoring, scorecard, y_test, pd_X_test, stat_tests_report = helper_functions.calculate_data()
    else:
        # incoming_batch_results, test_results, psi_table, psi_var_table_sum, psi_var_table_det, y_test, pd_X_test = helper_functions.initialize_dash_vars()
        incoming_batch_results = {'binomial': 'N/A',
                                  'brier': 'N/A',
                                  'herfindahl': 'N/A',
                                  'hosmer': 'N/A',
                                  'spiegelhalter': 'N/A',
                                  'jeffreys': 'N/A',
                                  'roc_auc': 'N/A',
                                  'ber': 'N/A',
                                  'log_loss': 'N/A',
                                  'ks': 'N/A'}
        test_results = {'binomial': 'N/A',
                        'brier': 'N/A',
                        'herfindahl': 'N/A',
                        'hosmer': 'N/A',
                        'spiegelhalter': 'N/A',
                        'jeffreys': 'N/A',
                        'roc_auc': 'N/A',
                        'ber': 'N/A',
                        'log_loss': 'N/A',
                        'ks': 'N/A'}
        psi_table = None
        psi_var_table_sum = None
        psi_var_table_det = None
        y_test = None
        pd_X_test = None
        stat_tests_report = None

    incoming_batch_results = json.dumps(incoming_batch_results)
    test_results = json.dumps(test_results)

    if isinstance(psi_table, pd.DataFrame):
        psi_table = psi_table.to_json(date_format='iso', orient='split')
    else:
        psi_table = json.dumps(psi_table)

    if isinstance(psi_var_table_sum, pd.DataFrame):
        psi_var_table_sum = psi_var_table_sum.to_json(date_format='iso', orient='split')
    else:
        psi_var_table_sum = json.dumps(psi_var_table_sum)

    if isinstance(psi_var_table_det, pd.DataFrame):
        psi_var_table_det = psi_var_table_det.to_json(date_format='iso', orient='split')
    else:
        psi_var_table_det = json.dumps(psi_var_table_det)

    if y_test is None:
        y_test = json.dumps(y_test)
    else:
        y_test = json.dumps(y_test.tolist())

    if pd_X_test is None:
        pd_X_test = json.dumps(pd_X_test)
    else:
        pd_X_test = json.dumps(pd_X_test.tolist())

    stat_tests_report = json.dumps(stat_tests_report)

    datasets = {'incoming_batch_results': incoming_batch_results,
                'test_results': test_results,
                'psi_table': psi_table,
                'psi_var_table_sum': psi_var_table_sum,
                'psi_var_table_det': psi_var_table_det,
                'y_test': y_test,
                'pd_X_test': pd_X_test,
                'stat_tests_report': stat_tests_report}
    return json.dumps(datasets)


@app.callback(
    Output("page-content", "children"),
    Input("url", "pathname"),
    Input("intermediate-value", 'data')
)
def render_page_content(pathname, data):
    datasets = json.loads(data)

    incoming_batch_results = json.loads(datasets['incoming_batch_results'])
    test_results = json.loads(datasets['test_results'])

    if datasets['psi_table'] == 'null':
        psi_table = json.loads(datasets['psi_table'])
    else:
        psi_table = pd.read_json(datasets['psi_table'], orient='split')

    if datasets['psi_var_table_sum'] == 'null':
        psi_var_table_sum = json.loads(datasets['psi_var_table_sum'])
    else:
        psi_var_table_sum = pd.read_json(datasets['psi_var_table_sum'], orient='split')

    if datasets['psi_var_table_det'] == 'null':
        psi_var_table_det = json.loads(datasets['psi_var_table_det'])
    else:
        psi_var_table_det = pd.read_json(datasets['psi_var_table_det'], orient='split')

    y_test = json.loads(datasets['y_test'])
    pd_X_test = json.loads(datasets['pd_X_test'])
    stat_tests_report = json.loads(datasets['stat_tests_report'])

    if pathname in ["/", "/page-1"]:
        page_content = pages.accuracy.generate_accuracy_page(incoming_batch_results, test_results, y_test, pd_X_test)
        return page_content

    elif pathname == "/page-2":
        page_content = pages.stability.generate_stability_page(incoming_batch_results, test_results, psi_table,
                                                               psi_var_table_sum)

        return page_content
    elif pathname == "/page-3":
        page_content = html.H4(stat_tests_report)
        return page_content

    elif pathname == "/summary":
        page_content = html.Div([
            html.H2('Overall quality score: GOOD'),
            html.H3('AUC')
        ])
        return page_content

    elif pathname == '/report-generator':
        page_content = html.Div([

            html.H2('Generate report here')

        ])
        return page_content

    return dbc.Container(
        [
            html.H1("404: Not found", className="text-danger"),
            html.Hr(),
            html.P(f"The pathname {pathname} was not recognised..."),
        ]
    )


if __name__ == "__main__":
    app.run_server(debug=True, port=8086)


In [None]:
# styles.py
def get_sidebar_style():
    SIDEBAR_STYLE = {
        "position": "fixed",
        "top": 75.0,
        "left": 0,
        "bottom": 0,
        "width": "16rem",
        "height": "100%",
        "z-index": 1,
        "overflow-x": "hidden",
        "transition": "all 0.5s",
        "padding": "0.5rem 1rem",
        "background-color": "#111111",
    }
    return SIDEBAR_STYLE

def get_sidebar_hidden_style():
    SIDEBAR_HIDDEN = {
        "position": "fixed",
        "top": 62.5,
        "left": "-16rem",
        "bottom": 0,
        "width": "16rem",
        "height": "100%",
        "z-index": 1,
        "overflow-x": "hidden",
        "transition": "all 0.5s",
        "padding": "0rem 0rem",
        "background-color": "#111111",
    }
    return SIDEBAR_HIDDEN

def get_content_style():
    # the styles for the main content position it to the right of the sidebar and
    # add some padding.
    CONTENT_STYLE = {
        "transition": "margin-left .5s",
        "margin-left": "18rem",
        "margin-right": "2rem",
        "padding": "2rem 1rem",
        "background-color": "#111111",
    }

    CONTENT_STYLE1 = {
        "transition": "margin-left .5s",
        "margin-left": "2rem",
        "margin-right": "2rem",
        "padding": "2rem 1rem",
        "background-color": "#111111",
    }
    return CONTENT_STYLE, CONTENT_STYLE1

def get_footer_style():
    FOOTER_STYLE = {
        "position": "absolute",
        "bottom": "0",
        "width": "100%",
        "height": "60px",  # /* Set the fixed height of the footer here */
        "line-height": "60px",  # /* Vertically center the text there */
        "background-color": "#111111"
    }
    return FOOTER_STYLE

def get_modal_style():
    MODAL = {
        "position": "fixed",
        "z-index": "1002",  # /* Sit on top, including modebar which has z=1001 */
        "left": "0",
        "top": "0",
        "width": "100%",  # /* Full width */
        "height": "100%",  # /* Full height */
        "background-color": "rgba(0, 0, 0, 0.6)",
        "display": "block"  # /* Black w/ opacity */
    }
    return MODAL

In [None]:
# accuracy.py
from dash import dcc, html
import dash_bootstrap_components as dbc
from plotting_functions import new_create_card, plot_auc_roc, plot_ks, no_figure_plot


def generate_accuracy_page(incoming_batch_results, test_results, y_test, pd_X_test):
    page_content = html.Div([
        dbc.Row(
            [
                dbc.Col(
                    new_create_card('Log Loss', '12412', incoming_batch_results['log_loss'],
                                    test_results['log_loss'])
                ),

                dbc.Col(
                    new_create_card('Brier', '12413', incoming_batch_results['brier'],
                                    test_results['brier'])
                ),

                dbc.Col(
                    new_create_card('K-S', '12414', incoming_batch_results['ks'],
                                    test_results['ks'])
                ),

                dbc.Col(
                    new_create_card('AUROC', '12415', incoming_batch_results['roc_auc'],
                                    test_results['roc_auc'])
                ),

                dbc.Col(
                    new_create_card('BER', '12416', incoming_batch_results['ber'],
                                    test_results['ber'])
                )
            ]),
        # dbc.Row([html.Div(dcc.Dropdown(X_test.columns, 'age', id='dropdown'),
        #                   style={'width': '50%'}
        #                   ),
        #          html.Div(dcc.Dropdown(X_test.columns, 'age', id='dropdown_2'),
        #                   style={'width': '50%'}
        #                   )
        #          ]
        #         ),
        dbc.Row([

            dbc.Col(
                dcc.Graph(id='logloss-plot',
                          figure=no_figure_plot()
                          ),
                width=5
            ),
            dbc.Col(
                dcc.Graph(id='graph-with-slider',
                          figure=plot_auc_roc(y_test, pd_X_test)
                          ),
                width=3
            ),
            dbc.Col(
                dcc.Graph(id='graph-distributions',
                          figure=plot_ks(y_test, pd_X_test)
                          ),
                width=3
            )
        ])
    ])
    return page_content


In [None]:
# plotting_functions.py
import plotly.graph_objects as go
import plotly.express as px
import dash_bootstrap_components as dbc
from dash import Dash, dcc, html
import numpy as np
import pandas as pd
import plotly.io as pio

from sklearn.metrics import roc_curve, roc_auc_score
pio.templates.default = "plotly_dark"


def no_figure_plot():
    return go.Figure().add_annotation(x=2, y=2, text="No Data",
                                      font=dict(family="sans serif", size=25, color="crimson"),
                                      showarrow=False, yshift=10)


def psi_single_variable_plot(psi_var_table):
    # psi_table = monitoring.psi_variable_table(style=style)
    fig = go.Figure()
    if style == 'detailed':
        var_table = psi_var_table[psi_var_table['Variable'] == var]
        fig.add_trace(go.Bar(x=var_table.index, y=var_table['Count A (%)']))
        fig.add_trace(go.Bar(x=var_table.index, y=var_table['Count E (%)']))
    return fig


def psi_variable_plot(psi_variable_table):
    if psi_variable_table is None:
        return no_figure_plot()
    # CHECK STYLE ONLY SUMMARY OR DETAILED
    fig = go.Figure()
    # if style == 'summary':
    fig.add_trace(go.Bar(x=psi_variable_table['Variable'], y=psi_variable_table['PSI']))
    # elif style == 'detailed':
    #     fig = go.Figure()
    #     fig
    return fig


### TAKE DATA FROM monitoring.psi_table(). same data as here
def psi_plot_ly(psi_table):
    if psi_table is None:
        return no_figure_plot()
    fig = go.Figure()
    fig.add_trace(go.Bar(x=psi_table.index, y=psi_table['Count A (%)']))
    fig.add_trace(go.Bar(x=psi_table.index, y=psi_table['Count E (%)']))
    return fig


# def psi_plot_ly(psi_table):
#     fig = go.Figure()
#     fig.add_trace(go.Bar(x=psi_table.index, y=psi_table['Count A (%)']))
#     fig.add_trace(go.Bar(x=psi_table.index, y=psi_table['Count E (%)']))
#     return fig


def plot_ks(y, y_pred, title=None, xlabel=None, ylabel=None):
    if y is None or y_pred is None:
        return go.Figure().add_annotation(x=2, y=2, text="No Data to Display",
                                          font=dict(family="sans serif", size=25, color="crimson"),
                                          showarrow=False, yshift=10)


    import plotly.express as px

    # pio.templates.default = "plotly_dark"

    y = pd.Series(y)
    y_pred = pd.Series(y_pred)
    n_samples = y.shape[0]
    # n_samples = len(y)
    n_event = np.sum(y)
    n_nonevent = n_samples - n_event

    idx = y_pred.argsort()
    yy = y[idx]
    pp = y_pred[idx]

    cum_event = np.cumsum(yy)
    cum_population = np.arange(0, n_samples)
    cum_nonevent = cum_population - cum_event

    p_event = cum_event / n_event
    p_nonevent = cum_nonevent / n_nonevent

    p_diff = p_nonevent - p_event

    ks_score = np.max(p_diff)
    ks_max_idx = np.argmax(p_diff)
    # Define the plot settings
    print('plot')
    print(p_diff)
    if title is None:
        title = "Kolmogorov-Smirnov"
    if xlabel is None:
        xlabel = "Threshold"
    if ylabel is None:
        ylabel = "Cumulative probability"

    # plt.title(title, fontdict={'fontsize': 14})
    # plt.xlabel(xlabel, fontdict={'fontsize': 12})
    # plt.ylabel(ylabel, fontdict={'fontsize': 12})

    fig = go.Figure()
    fig.add_trace(go.Scatter(x=pp, y=p_event))
    fig.add_trace(go.Scatter(x=pp, y=p_nonevent))
    return fig


def plot_auc_roc(y, y_pred, title=None, xlabel=None, ylabel=None):
    # savefig = False, fname = None, ** kwargs)
    if y is None or y_pred is None:
        return no_figure_plot()




    print('THIS IS Y: ', y[0:10])
    print('THIS IS Y_PRED: ', y_pred[0:10])
    fpr, tpr, _ = roc_curve(y, y_pred)
    auc_roc = roc_auc_score(y, y_pred)

    # # Define the plot settings
    # if title is None:
    #     title = "ROC curve"
    # if xlabel is None:
    #     xlabel = "False Positive Rate"
    # if ylabel is None:
    #     ylabel = "True Positive Rate"

    layout = dict(
        title='ROC-AUC',
        xaxis_title='False Positive Rate',
        yaxis_title='True Positive Rate',
        legend_title='Legend Title',
        font=dict(
            family='Courier New, monospace',
            size=18,
            color='red'
        )
    )

    fig = go.Figure(layout=layout)
    fig.add_trace(go.Scatter(x=fpr, y=fpr, name='Stupid model'))
    fig.add_trace(go.Scatter(x=fpr, y=tpr, name='Your model'))


    return fig


def plot_hist_dist(factor1, factor2):
    fig = go.Figure()
    fig.add_trace(go.Histogram(x=factor1, histnorm='probability density'))
    fig.add_trace(go.Histogram(x=factor2, histnorm='probability density', opacity=0.5))

    fig.update_layout(barmode='overlay')
    # Reduce opacity to see both histograms
    # fig.update_traces(opacity=0.3)
    return fig


# Plots feature importances
def plot_feature_importances(feat_imp_df):
    fig = px.bar(feat_imp_df.sort_values(['Importance']), x="Importance", y="Feature", orientation='h')
    fig.update_layout(xaxis=dict(rangeslider=dict(visible=True),
                                 type="linear"))
    return fig


# Creates card with information on the dashboard
def create_card(value, card_id, title, description):
    return dbc.Card(
        dbc.CardBody(
            [
                html.H4(title, id=f"{card_id}-title"),
                html.H2(f'{value:.3f}', id=f"{card_id}-value"),
                html.P(description, id=f"{card_id}-description")
            ]
        ),
        body=True,
        color='dark',
        outline=True
    )


def new_create_card(title, card_id, new_value, prev_value, description=None):
    def f(v):
        if v is None:
            return v
        elif v == 'N/A':
            return v
        return f"{v:.3f}"

    card = dbc.Card(
        dbc.CardBody(
            [
                html.H4(title, id=f"{card_id}-title"),
                html.H2(f(new_value), id=f"{card_id}-value"),
                html.H6(f'Initial: {f(prev_value)}', id=f"{card_id}-prev-value"),
                html.P(description, id=f"{card_id}-description")
            ]
        ),
        body=True,
        color='dark',
        outline=True
    )
    return card

In [None]:
# mpp.py
# Model Performance Predictor
# predicts the model's metrics

class ModelPerformancePredictor:
    def __init__(self, x_test=None, y_test=None):
        self.x_test = x_test
        self.y_test = y_test


    def fit(self):

    def _fit(self):

    def predict(self):

    def _predict(self):



In [None]:
# helper_functions.py
from optbinning.binning import binning_process
from optbinning.scorecard.scorecard import _check_parameters
from optbinning.scorecard.scorecard import _compute_scorecard_points
from sklearn.base import TransformerMixin
import joblib
import pandas as pd
import numpy as np
import credit_py_validation as cpv

from sklearn.preprocessing import MinMaxScaler
from sklearn.metrics import log_loss
from sklearn.model_selection import train_test_split
from optbinning.scorecard import Scorecard
from optbinning.logging import Logger

import time
from sklearn.base import clone
from sklearn.utils.multiclass import type_of_target
import gitlab
import sqlite3
import json
import io
from contextlib import redirect_stdout

from optbinning import BinningProcess
from optbinning.scorecard import ScorecardMonitoring

logger = Logger(__name__).logger


def capture_output(output_stream):
    f = io.StringIO()
    with redirect_stdout(f):
        output_stream
        # output_stream.system_stability_report()
    output = f.getvalue()
    return output


def get_all_models():
    gl = gitlab.Gitlab('http://gitlab.cloud.halykbank.nb', private_token='yBKJTSeuciukPxTfWJ-Z',
                       ssl_verify=False, api_version=4)
    project = gl.projects.get(2370)
    repo = project.repository_tree(ref='master')
    directories = [item["path"] for item in repo if item["type"] == "tree"]
    return directories


def conduct_tests(table_for_tests):  # __REWORK - add options to choose model and different tests
    # if self.table_for_tests is None:
    #     raise Exception('Dataset for tests is not ready')
    binomial_result = cpv.binomial_test(table_for_tests, 'Credit Rating', 'Default Flag',
                                        'Default Probability').to_dict()
    brier_score_result = cpv.brier_score(table_for_tests, 'Credit Rating', 'Default Flag',
                                         'Default Probability')
    herfindahl_result = cpv.herfindahl_test(table_for_tests, 'Credit Rating')
    hosmer_result = cpv.hosmer_test(table_for_tests, 'Credit Rating', 'Default Flag', 'Default Probability')
    spiegelhalter_result = cpv.spiegelhalter_test(table_for_tests, 'Credit Rating', 'Default Flag',
                                                  'Default Probability')
    jeffreys_result = cpv.jeffreys_test(table_for_tests, 'Credit Rating', 'Default Flag',
                                        'Default Probability').to_dict()
    roc_auc_result = cpv.roc_auc(table_for_tests, 'Default Flag', 'Default Probability')  # FIX - outputs 1.0 AUC
    ber_result = cpv.bayesian_error_rate(table_for_tests['Default Flag'],
                                         table_for_tests['Default Probability'])
    log_loss_result = log_loss(table_for_tests['Default Flag'], table_for_tests['Default Probability'])

    ks_result = cpv.kolmogorov_smirnov(table_for_tests['Default Flag'].reset_index(drop=True),
                                       table_for_tests['Default Probability'].reset_index(drop=True))
    # cpv.psi(loans_test, 'Credit Rating', 'PD')

    test_results = {'binomial': binomial_result,
                    'brier': brier_score_result,
                    'herfindahl': herfindahl_result,
                    'hosmer': hosmer_result,
                    'spiegelhalter': spiegelhalter_result,
                    'jeffreys': jeffreys_result,
                    'roc_auc': roc_auc_result,
                    'ber': ber_result,
                    'log_loss': log_loss_result,
                    'ks': ks_result}
    return test_results


def initialize_dash_vars():
    init_cols = ['Bin', 'Count A', 'Count E', 'Count A (%)', 'Count E (%)', 'PSI']
    stat_test_results = {'binomial': 'N/A',
                         'brier': 'N/A',
                         'herfindahl': 'N/A',
                         'hosmer': 'N/A',
                         'spiegelhalter': 'N/A',
                         'jeffreys': 'N/A',
                         'roc_auc': 'N/A',
                         'ber': 'N/A',
                         'log_loss': 'N/A'}
    psi_table = pd.DataFrame(columns=init_cols)
    psi_var_table_sum = pd.DataFrame(columns=init_cols)
    psi_var_table_det = pd.DataFrame(columns=init_cols)
    y_test = pd.Series()
    pd_X_test = pd.Series()
    return stat_test_results, stat_test_results, psi_table, psi_var_table_sum, psi_var_table_det, y_test, pd_X_test


def load_full_simulation_df():  # TEMPORARY FUNCTION FOR SIMULATION
    con = sqlite3.connect('new_credit_data.db')
    query_X = """SELECT *
    FROM X_train_log
    """
    query_y = """SELECT *
    FROM y_train_log
    """
    X = pd.read_sql(query_X, con, index_col=None)
    y = pd.read_sql(query_y, con, index_col=None)
    y = y.values.ravel()
    X_train, X_unseen, y_train, y_unseen = train_test_split(X, y, test_size=0.2, random_state=42)
    return X_train, X_unseen, y_train, y_unseen


def load_model():
    with open('binning_fit_params.json') as json_file:
        binning_fit_params = json.load(json_file)
    # Loading the model
    lr_model = joblib.load('lr_model.pkl')
    return lr_model, binning_fit_params


def calculate_data():  # RE -FUCKING- DO IT
    X_train, X_unseen, y_train, y_unseen = load_full_simulation_df()
    lr_model, binning_fit_params = load_model()

    variable_names = list(X_train.columns)
    binning_process = BinningProcess(variable_names,  # selection_criteria=selection_criteria,
                                     binning_fit_params=binning_fit_params)
    scorecard = AutoPrepareScoreCard(binning_process=binning_process,
                                     estimator=lr_model, scaling_method="min_max",
                                     scaling_method_params={"min": 300, "max": 850}, verbose=True)

    scorecard.fit(X_train, y_train)
    monitoring = ScorecardMonitoring(scorecard=scorecard, psi_method="cart",
                                     psi_n_bins=10, verbose=True)

    ff = io.StringIO()
    with redirect_stdout(ff):
        monitoring.fit(X_unseen, y_unseen, X_train, y_train)
    asd2 = ff.getvalue()

    stat_tests_report = capture_output(monitoring.system_stability_report())

    # Calculate train
    pd_X_train = scorecard.predict_proba(X_train)[:, 1]
    default_flag_X_train = scorecard.predict(X_train)
    score_X_train = scorecard.score(X_train)
    rating_X_train = scorecard.get_credit_ratings(score_X_train)

    X_train['Default Probability'] = pd_X_train
    X_train['Credit Rating'] = rating_X_train
    X_train['Default Flag'] = default_flag_X_train

    # Calculate test
    pd_X_test = scorecard.predict_proba(X_unseen)[:, 1]
    default_flag_X_test = scorecard.predict(X_unseen)
    score_X_test = scorecard.score(X_unseen)
    rating_X_test = scorecard.get_credit_ratings(score_X_test)

    print('pd_X_Test: ', pd_X_test)
    print('def_X_test: ', default_flag_X_test)
    unique, counts = np.unique(default_flag_X_test, return_counts=True)
    print(np.asarray((unique, counts)).T)

    X_unseen['Default Probability'] = pd_X_test
    X_unseen['Credit Rating'] = rating_X_test
    X_unseen['Default Flag'] = default_flag_X_test

    incoming_batch_results = conduct_tests(X_unseen)
    test_results = conduct_tests(X_train)
    psi_var_table_det = monitoring.psi_variable_table(style='detailed')
    psi_var_table_sum = monitoring.psi_variable_table(style='summary')
    psi_table = monitoring.psi_table()

    return incoming_batch_results, test_results, psi_table, psi_var_table_sum, psi_var_table_det, monitoring, scorecard, y_unseen, pd_X_test, stat_tests_report


# CHANGE ALL
class AutoPrepareScoreCard(Scorecard):
    def __init__(self, binning_process, estimator, scaling_method=None, scaling_method_params=None,
                 intercept_based=False, reverse_scorecard=False, rounding=False, verbose=False, db_connection=None,
                 target_name='target'):  # ADD PARAMETERS
        super().__init__(binning_process, estimator, scaling_method=None, scaling_method_params=None,
                         intercept_based=False, reverse_scorecard=False, rounding=False, verbose=False)

        self.binning_process = binning_process
        self.estimator = estimator
        self.scaling_method = scaling_method
        self.scaling_method_params = scaling_method_params
        self.intercept_based = intercept_based
        self.reverse_scorecard = reverse_scorecard
        self.rounding = rounding
        self.verbose = verbose

        self.incoming_labels = None
        self.test_data_tests = None
        self.incoming_data_tests = None
        self.incoming_data = None
        self.db_connection = db_connection
        self.target_name = target_name

        self.train_set = None
        self.test_set = None
        self.pipeline = None

        self.test_predictions = None

        self.choices = list(reversed(range(1, 11)))
        self.loan_type = 'УЗП'

        self.credit_scores = None
        self.credit_ratings = None

        # attributes
        self.binning_process_ = None
        self.estimator_ = None
        self.intercept_ = 0

        self._metric_special = None
        self._metric_missing = None

        # auxiliary
        self._target_dtype = None

        # timing
        self._time_total = None
        self._time_binning_process = None
        self._time_estimator = None
        self._time_build_scorecard = None
        self._time_rounding = None

        self._is_fitted = False

    def fit(self, X, y, sample_weight=None, metric_special=0, metric_missing=0,
            show_digits=2, check_input=False, fitted=True):
        """Fit scorecard.
        Parameters
        ----------
        X : pandas.DataFrame (n_samples, n_features)
            Training vector, where n_samples is the number of samples.
        y : array-like of shape (n_samples,)
            Target vector relative to x.
        sample_weight : array-like of shape (n_samples,) (default=None)
            Array of weights that are assigned to individual samples.
            If not provided, then each sample is given unit weight.
            This option is only available for a binary target.
        metric_special : float or str (default=0)
            The metric value to transform special codes in the input vector.
            Supported metrics are "empirical" to use the empirical WoE or
            event rate, and any numerical value.
        metric_missing : float or str (default=0)
            The metric value to transform missing values in the input vector.
            Supported metrics are "empirical" to use the empirical WoE or
            event rate and any numerical value.
        check_input : bool (default=False)
            Whether to check input arrays.
        show_digits : int, optional (default=2)
            The number of significant digits of the bin column.
        fitted : boolean, optional (default=True)
            If the estimator is fitted or not
        Returns
        -------
        self : Scorecard
            Fitted scorecard.
        """
        return self._fit(X, y, sample_weight, metric_special, metric_missing,
                         show_digits, check_input, fitted)

    def _fit(self, X, y, sample_weight, metric_special, metric_missing,
             show_digits, check_input, fitted=True):

        # Store the metrics for missing and special bins for predictions
        self._metric_special = metric_special
        self._metric_missing = metric_missing

        time_init = time.perf_counter()

        # if self.verbose:
        #     logger.info("Scorecard building process started.")
        #     logger.info("Options: check parameters.")

        # _check_parameters(**self.get_params(deep=False))

        # Check X dtype
        if not isinstance(X, pd.DataFrame):
            raise TypeError("X must be a pandas.DataFrame.")

        # Target type and metric
        self._target_dtype = type_of_target(y)

        if self._target_dtype not in ("binary", "continuous"):
            raise ValueError("Target type {} is not supported."
                             .format(self._target_dtype))

        # _check_scorecard_scaling(self.scaling_method,
        #                          self.scaling_method_params,
        #                          self.rounding,
        #                          self._target_dtype)

        # # Check sample weight
        # if sample_weight is not None and self._target_dtype != "binary":
        #     raise ValueError("Target type {} does not support sample weight."
        #                      .format(self._target_dtype))
        metric = 'woe'
        bt_metric = 'WoE'
        if self._target_dtype == "binary":
            metric = "woe"
            bt_metric = "WoE"
        elif self._target_dtype == "continuous":
            metric = "mean"
            bt_metric = "Mean"
        # if self.verbose:
        #     logger.info("Dataset: {} target.".format(self._target_dtype))

        # Fit binning process
        if self.verbose:
            logger.info("Binning process started.")

        time_binning_process = time.perf_counter()
        self.binning_process_ = clone(self.binning_process)

        # Suppress binning process verbosity
        self.binning_process_.set_params(verbose=False)

        X_t = self.binning_process_.fit_transform(
            X[self.binning_process.variable_names], y, sample_weight, metric,
            metric_special, metric_missing, show_digits, check_input)

        self._time_binning_process = time.perf_counter() - time_binning_process

        # if self.verbose:
        #     logger.info("Binning process terminated. Time: {:.4f}s"
        #                 .format(self._time_binning_process))

        if not fitted:
            # Fit estimator
            time_estimator = time.perf_counter()
            if self.verbose:
                logger.info("Fitting estimator.")

            self.estimator_ = clone(self.estimator)
            self.estimator_.fit(X_t, y, sample_weight)

            self._time_estimator = time.perf_counter() - time_estimator

            if self.verbose:
                logger.info("Fitting terminated. Time {:.4f}s"
                            .format(self._time_estimator))
        else:
            from copy import deepcopy
            time_estimator = time.perf_counter()
            self.estimator_ = deepcopy(self.estimator)
            self._time_estimator = time.perf_counter() - time_estimator

        # Get coefs
        intercept = 0
        if hasattr(self.estimator_, 'coef_'):
            coefs = self.estimator_.coef_.flatten()
            if hasattr(self.estimator_, 'intercept_'):
                intercept = self.estimator_.intercept_
        else:
            raise RuntimeError('The classifier does not expose '
                               '"coef_" attribute.')

        # Build scorecard
        time_build_scorecard = time.perf_counter()

        if self.verbose:
            logger.info("Scorecard table building started.")

        selected_variables = self.binning_process_.get_support(names=True)
        binning_tables = []
        for i, variable in enumerate(selected_variables):
            optb = self.binning_process_.get_binned_variable(variable)
            binning_table = optb.binning_table.build(
                show_digits=show_digits, add_totals=False)

            c = coefs[i]
            binning_table.loc[:, "Variable"] = variable
            binning_table.loc[:, "Coefficient"] = c
            binning_table.loc[:, "Points"] = binning_table[bt_metric] * c

            nt = len(binning_table)
            if metric_special != 'empirical':
                if isinstance(optb.special_codes, dict):
                    n_specials = len(optb.special_codes)
                else:
                    n_specials = 1

                binning_table.loc[
                nt - 1 - n_specials:nt - 2, "Points"] = metric_special * c
            elif metric_missing != 'empirical':
                binning_table.loc[nt - 1, "Points"] = metric_missing * c

            binning_table.index.names = ['Bin id']
            binning_table.reset_index(level=0, inplace=True)
            binning_tables.append(binning_table)

        df_scorecard = pd.concat(binning_tables)
        df_scorecard.reset_index()

        # Apply score points
        if self.scaling_method is not None:
            points = df_scorecard["Points"]
            scaled_points = _compute_scorecard_points(
                points, binning_tables, self.scaling_method,
                self.scaling_method_params, intercept, self.reverse_scorecard)

            df_scorecard.loc[:, "Points"] = scaled_points

        if self.intercept_based:
            scaled_points, self.intercept_ = _compute_intercept_based(
                df_scorecard)
            df_scorecard.loc[:, "Points"] = scaled_points

        time_rounding = time.perf_counter()
        if self.rounding:
            points = df_scorecard["Points"]
            if self.scaling_method in ("pdo_odds", None):
                round_points = np.rint(points)

                if self.intercept_based:
                    self.intercept_ = np.rint(self.intercept_)
            elif self.scaling_method == "min_max":
                round_mip = RoundingMIP()
                round_mip.build_model(df_scorecard)
                status, round_points = round_mip.solve()

                if status not in ("OPTIMAL", "FEASIBLE"):
                    if self.verbose:
                        logger.warning("MIP rounding failed, method nearest "
                                       "integer used instead.")
                    # Back-up method
                    round_points = np.rint(points)

                if self.intercept_based:
                    self.intercept_ = np.rint(self.intercept_)

            df_scorecard.loc[:, "Points"] = round_points
        self._time_rounding = time.perf_counter() - time_rounding

        self._df_scorecard = df_scorecard

        self._time_build_scorecard = time.perf_counter() - time_build_scorecard
        self._time_total = time.perf_counter() - time_init

        if self.verbose:
            logger.info("Scorecard table terminated. Time: {:.4f}s"
                        .format(self._time_build_scorecard))
            logger.info("Scorecard building process terminated. Time: {:.4f}s"
                        .format(self._time_total))

        # Completed successfully
        self._is_fitted = True

        return self

    def read_pipeline(self, path, from_file=True, from_db=False):  # __REDO - for different model formats
        if from_file:
            self.pipeline = joblib.load(path)
            return self.pipeline
        return None

    def _read_data_db(self, query):
        return pd.read_sql(query, self.db_connection, index_col=None)

    def read_data(self, train_query, test_query):
        self.train_set = self._read_data_db(train_query)
        self.test_set = self._read_data_db(test_query)
        return self.train_set, self.test_set

    def read_incoming_data(self, incoming_query, incoming_labels_query=None):
        self.incoming_data = self._read_data_db(incoming_query)
        if incoming_labels_query is not None:
            self.incoming_labels = self._read_data_db(incoming_labels_query)
            self.incoming_data[self.target_name] = self._read_data_db(incoming_labels_query)[self.target_name].copy()
        return self.incoming_data

    def make_predictions(self, df_set, proba=True):
        if self.pipeline is None:
            raise Exception('No pipeline is set to predict')
        if proba:
            return self.pipeline.predict_proba(df_set.drop(self.target_name, axis=1, errors='ignore'))[:, 1]
        return self.pipeline.predict(df_set.drop(self.target_name, axis=1, errors='ignore'))

    def set_score_card(self, choices, loan_type):  # __REWORK - add actual scorecard
        self.choices = choices
        self.loan_type = loan_type

    def get_credit_scores(self, df):  # __REWORK - add scoring card construction options (can be non-linear)
        if df is None:
            raise Exception('Data set is not specified')
        default_probs = self.make_predictions(df, proba=True)
        minmax_sc = MinMaxScaler(feature_range=(300, 850))
        minmax_sc.fit(default_probs.reshape(-1, 1))
        credit_scores = minmax_sc.transform(default_probs.reshape(-1, 1)).ravel().round()
        self.credit_scores = credit_scores
        return credit_scores

    def get_credit_ratings(self, credit_scores):
        if self.choices is None:
            raise Exception('Need to set score card first')
        if self.loan_type is None:
            raise Exception('Please specify loan type first')
        choices = self.choices.copy()
        if not isinstance(credit_scores, pd.Series):
            credit_scores = pd.Series(credit_scores)
        if self.loan_type == 'УЗП' or self.loan_type == 'НЗП':
            cond_list = [
                credit_scores.lt(576),
                credit_scores.between(576, 585),
                credit_scores.between(586, 594),
                credit_scores.between(595, 603),
                credit_scores.between(604, 610),
                credit_scores.between(611, 617),
                credit_scores.between(618, 625),
                credit_scores.between(626, 633),
                credit_scores.between(634, 642),
                credit_scores.gt(642)]
        #     else if loan_type == 'Пенсионная':
        #         cond_list = [
        #             df_series.lt(1),
        #             df_series.between(1, 30),
        #             df_series.between(31, 60),
        #             df_series.between(61, 90),
        #             df_series.between(91, 180),
        #             df_series.gt(180)]
        credit_ratings = np.select(cond_list, choices)

        self.credit_ratings = credit_ratings
        return credit_ratings

    def prepare_test_tests(self):
        if self.test_set is None:
            raise Exception('Test set is missing')
        else:
            test_data_tests = self.test_set.copy()
        test_data_tests['Credit Rating'] = self.credit_ratings.copy()
        test_data_tests.rename(columns={self.target_name: 'Default Flag'}, inplace=True)
        test_data_tests['Default Probability'] = self.make_predictions(self.test_set, proba=True)
        self.test_data_tests = test_data_tests.copy()
        return test_data_tests

    def prepare_incoming_tests(self):
        if self.incoming_data is None:
            raise Exception('Incoming data is missing')
        else:
            incoming_data_tests = self.incoming_data.copy()
        incoming_data_tests['Credit Rating'] = self.credit_ratings.copy()
        incoming_data_tests.rename(columns={'target': 'Default Flag'}, inplace=True)
        incoming_data_tests['Default Probability'] = self.make_predictions(self.incoming_data, proba=True)
        self.incoming_data_tests = incoming_data_tests.copy()
        return incoming_data_tests


# --------------------------------------------------------------------------CUSTOM TRANSFORMERS FOR PICKLE FILE

def col_dropper(df):
    df = df.drop(['INCOME_CALC_METHOD', 'SUM_PROCENTY_TODAY_Sum'], axis=1)
    return df


def fix_dtypes(df):
    df['USTUPKA_DEISTV'] = df['USTUPKA_DEISTV'].astype(int)
    df['USTUPKA_ZAKRYTYE'] = df['USTUPKA_ZAKRYTYE'].astype(int)
    df['GENDER'] = df['GENDER'].astype(int)
    df['CREDIT_HISTORY'] = df['CREDIT_HISTORY'].astype(int)
    df['SUBJ_STATUS'] = df['SUBJ_STATUS'].astype(int)
    df['age'] = df['age'].astype(int)
    return df


class DataFrameImputer(TransformerMixin):

    def __init__(self):
        """Impute missing values.

        Columns of dtype object are imputed with the most frequent value
        in column.

        Columns of other types are imputed with mean of column.

        """

    def fit(self, X, y=None):
        self.fill = pd.Series([X[c].value_counts().index[0]
                               if X[c].dtype == np.dtype('O') else X[c].mean() for c in X],
                              index=X.columns)

        return self

    def transform(self, X, y=None):
        return X.fillna(self.fill)
# --------------------------------------------------------------------------CUSTOM TRANSFORMERS FOR PICKLE FILE


In [None]:
# stat_tests_report.py

In [None]:
# stability.py
from dash import dcc, html
import dash_bootstrap_components as dbc
from plotting_functions import new_create_card, psi_plot_ly, psi_variable_plot


def generate_stability_page(incoming_batch_results, test_results, psi_table, psi_var_table_sum):
    page_content = html.Div([
        dbc.Row(
            [
                dbc.Col(
                    new_create_card('Log Loss', '12412', incoming_batch_results['log_loss'],
                                    test_results['log_loss'])
                ),

                dbc.Col(
                    new_create_card('Brier', '12413', incoming_batch_results['brier'],
                                    test_results['brier'])
                ),

                dbc.Col(
                    new_create_card('Log Loss', '12414', incoming_batch_results['log_loss'],
                                    test_results['log_loss'])
                ),

                dbc.Col(
                    new_create_card('AUROC', '12415', incoming_batch_results['roc_auc'],
                                    test_results['roc_auc'])
                ),

                dbc.Col(
                    new_create_card('BER', '12416', incoming_batch_results['ber'],
                                    test_results['ber'])
                )
            ]),

        # dbc.Row([html.Div(dcc.Dropdown(X_test.columns, 'age', id='dropdown'),
        #                   style={'width': '50%'}
        #                   ),
        #          html.Div(dcc.Dropdown(X_test.columns, 'age', id='dropdown_2'),
        #                   style={'width': '50%'}
        #                   )
        #          ]
        #         ),

        dbc.Row([
            dbc.Col(
                dcc.Graph(id='graph-with-slider', figure=psi_variable_plot(psi_var_table_sum))
            ),
            dbc.Col(
                dcc.Graph(id='graph-distributions',
                          figure=psi_plot_ly(psi_table)
                          )
            )
        ])
    ])
    return page_content


In [None]:
# page_content.py

from dash import Dash, dcc, html, Input, Output
import dash_bootstrap_components as dbc


def generate_search_bar():
    search_bar = dbc.Row(
        [
            dbc.Col(dbc.Input(type="search", placeholder="Search")),
            dbc.Col(
                dbc.Button(
                    "Search", color="primary", className="ms-2", n_clicks=0
                ),
                width="auto",
            ),
        ],
        className="g-0 ms-auto flex-nowrap mt-3 mt-md-0",
        align="center",
    )
    return search_bar


def generate_navbar(model_list, logo):
    navbar = dbc.Navbar(
        dbc.Container(
            [
                html.A(
                    # Use row and col to control vertical alignment of logo / brand
                    dbc.Row(
                        [
                            dbc.Col(html.Img(src=logo, height="30px")),
                            dbc.Col(dbc.NavbarBrand("Credit Risk Model Monitoring System", className="ms-2")),
                        ],
                        align="left",
                        className="g-0",
                    ),
                    href="https://halykbank.kz/",
                    style={"textDecoration": "none"},
                ),
                dbc.NavbarToggler(id="navbar-toggler", n_clicks=0),
                html.Div(dcc.Dropdown(model_list, id='dropdown-models', placeholder='Select a model'),
                         style={'width': '20%'}),
                dbc.Button("Sidebar", outline=True, color="secondary", className="mr-1", id="btn_sidebar"),
                dbc.Collapse(
                    generate_search_bar(),
                    id="navbar-collapse",
                    is_open=False,
                    navbar=True,
                ),
            ]
        ),
        color="dark",
        dark=True,
    )
    return navbar


def generate_sidebar(SIDEBAR_STYLE):
    sidebar = html.Div(
        [
            html.H2("CREMOSYS", className="display-4"),
            html.Hr(),
            html.P(
                "CREdit Risk Model MOnitoring SYStem is for a live observation of model's quality", className="lead"
            ),
            dbc.Nav(
                [
                    dbc.NavLink("Summary", href="/summary", id="summary-link"),
                    dbc.NavLink("Assess accuracy", href="/page-1", id="page-1-link"),
                    dbc.NavLink("Assess stability", href="/page-2", id="page-2-link"),
                    dbc.NavLink("Statistical tests report", href="/page-3", id="page-3-link"),
                    dbc.NavLink("Generate report", href="/report-generator", id="report-generator-link"),
                ],
                vertical=True,
                pills=True,
            ),
        ],
        id="sidebar",
        style=SIDEBAR_STYLE,
    )
    return sidebar
def generate_footer(FOOTER_STYLE):
    footer = html.Footer(
        id='footer',
        children=[
            html.H6(
                "Copyright " + u"\u00A9" + " 2023 Halyk Bank. All rights reserved. Developed by Janysbek Kusmangaliyev")
        ],
        style=FOOTER_STYLE
    )
    return footer


In [None]:
# credit_py_validation.py
from scipy.stats import binom
from scipy.stats import norm
import pandas as pd
import numpy as np
from scipy.stats import chi2
from scipy.stats import beta
from sklearn.metrics import roc_auc_score
from sklearn.metrics import auc
from sklearn import metrics
from scipy.stats import t
from scipy import stats


# def plot_ks(y, y_pred, title=None, xlabel=None, ylabel=None):
#     if y is None or y_pred is None:
#         return go.Figure().add_annotation(x=2, y=2, text="No Data to Display",
#                                           font=dict(family="sans serif", size=25, color="crimson"),
#                                           showarrow=False, yshift=10)
#     y = pd.Series(y)
#     y_pred = pd.Series(y_pred)
#     n_samples = y.shape[0]
#     # n_samples = len(y)
#     n_event = np.sum(y)
#     n_nonevent = n_samples - n_event
#
#     idx = y_pred.argsort()
#     yy = y[idx]
#     pp = y_pred[idx]
#
#     cum_event = np.cumsum(yy)
#     cum_population = np.arange(0, n_samples)
#     cum_nonevent = cum_population - cum_event
#
#     p_event = cum_event / n_event
#     p_nonevent = cum_nonevent / n_nonevent
#
#     p_diff = p_nonevent - p_event
#     print(np.max(abs(p_diff)))
#     ks_score = np.max(p_diff)
#     ks_max_idx = np.argmax(p_diff)
#     # Define the plot settings
#     print('plot ks: ', ks_score)
#     if title is None:
#         title = "Kolmogorov-Smirnov"
#     if xlabel is None:
#         xlabel = "Threshold"
#     if ylabel is None:
#         ylabel = "Cumulative probability"
#
#     # plt.title(title, fontdict={'fontsize': 14})
#     # plt.xlabel(xlabel, fontdict={'fontsize': 12})
#     # plt.ylabel(ylabel, fontdict={'fontsize': 12})
#
#     fig = go.Figure()
#     fig.add_trace(go.Scatter(x=pp, y=p_event))
#     fig.add_trace(go.Scatter(x=pp, y=p_nonevent))
#     return fig


def kolmogorov_smirnov(default_actual, default_predicted):
    """

     Parameters
     ----------
     p : estimated default probability
     d : number of defaults
     n : number of obligors

     Returns
     -------
     p_value : Binomial test p-value

     Notes
     -----
     If the defaults are modeled as iid Bernoulli trials with success probability p,
     then the number of defaults d is a draw from Binomial(n, p).
     The one-sided p-value is the probability that such a draw
     would be at least as large as d.
     """


    n_samples = default_actual.shape[0]
    n_event = np.sum(default_actual)
    n_nonevent = n_samples - n_event

    idx = default_predicted.argsort()
    yy = default_actual[idx]
    pp = default_predicted[idx]

    cum_event = np.cumsum(yy)
    cum_population = np.arange(0, n_samples)
    cum_nonevent = cum_population - cum_event

    p_event = cum_event / n_event
    p_nonevent = cum_nonevent / n_nonevent

    p_diff = p_nonevent - p_event

    ks_score = np.max(p_diff)
    print('cpv')
    print(p_diff)
    ks_max_idx = np.argmax(p_diff)
    return ks_score






def _binomial(p, d, n):
    """

    Parameters
    ----------
    p : estimated default probability
    d : number of defaults
    n : number of obligors

    Returns
    -------
    p_value : Binomial test p-value

    Notes
    -----
    If the defaults are modeled as iid Bernoulli trials with success probability p,
    then the number of defaults d is a draw from Binomial(n, p).
    The one-sided p-value is the probability that such a draw
    would be at least as large as d.
    """

    p_value = 1 - binom.cdf(d - 1, n, p)

    return p_value


def binomial_test(data, ratings, default_flag, predicted_pd, alpha_level=0.05):
    """Calculate the Binomial test for a given probability of defaults buckets

    Parameters
    ----------
    data : Pandas DataFrame with at least three columns
            ratings : PD rating class of obligor
            default_flag : 1 (or True) for defaulted and 0 (or False) for good obligors
            probs_default : predicted probability of default of an obligor

    ratings : column label for ratings
    default_flag : column label for default_flag
    probs_default : column label for probs_default
    alpha_level : false positive rate of hypothesis test (default .05)

    Returns
    -------
    Pandas DataFrame with the following columns :
        Rating (Index) : Contains the ratings of each class/group
        PD : predicted default rate in each group
        N : number of obligors in each group
        D : number of defaults in each group
        Default Rate : realised default rate per each group
        p_value : Binomial test p-value
        reject : whether to reject the null hypothesis at alpha_level


    Notes
    -----
    The Binomial test compares forecasted defaults with observed defaults in a binomial
    model with independent observations under the null hypothesis that the PD applied
    in the portfolio/rating grade at the beginning of the relevant observation period is
    greater than the true one (one-sided hypothesis test). The test statistic is the
    observed number of defaults.

    .. [1] "Studies on the Validation of Internal Rating Systems,"
            Basel Committee on Banking Supervision,
            p. 47, revised May 2005.


    Examples
    --------

    >>> import random, numpy as np
    >>> buckets = ['A', 'B', 'C']
    >>> ratings = random.choices(buckets,  [0.4, 0.5, 0.1], k=1000)
    >>> bucket_pds = {'A': .1, 'B': .15, 'C': .2}
    >>> probs_default = [bucket_pds[r] for r in ratings]
    >>> default_flag = [random.uniform(0, 1) < bucket_pds[r] for r in ratings]
    >>> test_data = pd.DataFrame({'ratings': ratings,
                                  'default_flag': default_flag,
                                  'predicted_pd' : probs_default})
    >>> from meliora import binomial_test
    >>> binomial_test(test_data, 'ratings', 'default_flag', 'probs_default')

               PD    N   D  Default Rate   p_value  reject
    ratings
    A        0.10  401  36      0.089776  0.775347   False
    B        0.15  489  73      0.149284  0.537039   False
    C        0.20  110  23      0.209091  0.443273   False

    """

    # Perform plausibility checks
    assert all(x in data.columns for x in [ratings, default_flag, predicted_pd]), "Missing columns"
    assert all(x in [0, False, 1, True] for x in data[default_flag]), "Default flag can have only value 0 and 1"
    assert len(data[ratings].unique()) < 40, "Number of PD ratings is excessive"
    assert all(0 <= x <= 1 for x in data[predicted_pd]), "Predicted PDs must be between 0% and 100%"

    # Transform input data into the required format
    df = data.groupby(ratings).agg({predicted_pd: "mean", default_flag: ["count", "sum", "mean"]}).reset_index()
    df.columns = ["Rating class", "Predicted PD", "Total count", "Defaults", "Actual Default Rate"]

    # Calculate Binomial test outcome for each rating
    df["p_value"] = _binomial(df["Predicted PD"], df["Defaults"], df["Total count"])
    df["Reject H0"] = df["p_value"] < alpha_level

    return df


def _brier(predicted_values, realised_values):
    """

    Parameters
    ----------
    predicted_values : Pandas Series of predicted PD outcomes
    realised_values : Pandas Series of realised PD outcomes

    Returns
    -------
    mse : Brier score for the dataset

    Notes
    -----
    Calculates the mean squared error (MSE) between the outcomes
    and their hypothesized PDs. In this context, the MSE is
    also called the "Brier score" of the dataset.
    """

    # Calculate mean squared error
    errors = realised_values - predicted_values
    mse = (errors**2).sum()

    return mse


def brier_score(data, ratings, default_flag, predicted_pd):
    """Calculate the Brier score for a given probability of defaults buckets

    Parameters
    ----------
    data : Pandas DataFrame with at least three columns
            ratings : PD rating class of obligor
            default_flag : 1 (or True) for defaulted and 0 (or False) for good obligors
            probs_default : predicted probability of default of an obligor

    ratings : column label for ratings
    default_flag : column label for default_flag
    probs_default : column label for probs_default

    Returns
    -------
    Pandas DataFrame with the following columns :
        Rating (Index) : Contains the ratings of each class/group
        PD : predicted default rate in each group and total
        N : number of obligors in each group and total
        D : number of defaults in each group and total
        Default Rate : realised default rate per each group and total
        brier_score : overall Brier score


    Notes
    -----
    The Brier score is the mean squared error when each default outcome
    is predicted by its PD rating. Larger values of the Brier score
    indicate a poorer performance of the rating system.

    .. [1] "Studies on the Validation of Internal Rating Systems,"
            Basel Committee on Banking Supervision,
            pp. 46-47, revised May 2005.


    Examples
    --------

    >>> import random, numpy as np
    >>> buckets = ['A', 'B', 'C']
    >>> ratings = random.choices(buckets,  [0.4, 0.5, 0.1], k=1000)
    >>> bucket_pds = {'A': .1, 'B': .15, 'C': .2}
    >>> probs_default = [bucket_pds[r] for r in ratings]
    >>> default_flag = [random.uniform(0, 1) < bucket_pds[r] for r in ratings]
    >>> test_data = pd.DataFrame({'ratings': ratings,
                                  'default_flag': default_flag,
                                  'predicted_pd' : probs_default})
    >>> from meliora import brier_score
    >>> brier_score(test_data, 'ratings', 'default_flag', 'probs_default')

                  PD       N      D  Default Rate brier_score
    ratings
    A        0.10000   401.0   36.0      0.089776        None
    B        0.15000   489.0   73.0      0.149284        None
    C        0.20000   110.0   23.0      0.209091        None
    total    0.13545  1000.0  132.0      0.132000    0.113112

    """

    # Perform plausibility checks
    assert all(x in data.columns for x in [ratings, default_flag, predicted_pd]), "Not all columns are present"
    assert all(x in [0, False, 1, True] for x in data[default_flag]), "Default flag can have only value 0 and 1"
    assert len(data[ratings].unique()) < 40, "Number of PD ratings is excessive"
    assert all(x >= 0 and x <= 1 for x in data[predicted_pd]), "Predicted PDs must be between 0% and 100%"

    # Transform input data into the required format
    df = data.groupby(ratings).agg({predicted_pd: "mean", default_flag: ["count", "sum", "mean"]})
    df.columns = ["PD", "N", "D", "Default Rate"]

    # Calculate Brier score for the dataset
    b_score = _brier(df["PD"], df["Default Rate"])

    return b_score


def _herfindahl(df):
    """

    Parameters
    ----------
    df : Pandas DataFrame with first column providing the number
         of obligors and row index corresponding to rating labels

    Returns
    -------
    cv : coefficient of variation
    h : Herfindahl index

    Notes
    -----
    Calculates the coefficient of variation and the Herfindahl index,
    as defined in the paper [1] referenced in herfindahl_test's docstring.
    These quantities measure the dispersion of rating grades in the data.
    """

    k = df.shape[0]
    counts = df.iloc[:, 0]
    n_tot = counts.sum()
    terms = (counts / n_tot - 1 / k) ** 2
    cv = (k * terms.sum()) ** 0.5
    h = (counts**2).sum() / n_tot**2

    return cv, h


def herfindahl_multiple_period_test(data1, data2, ratings, alpha_level=0.05):
    """Calculate the Herfindahl test for a given probability of defaults buckets

    Parameters
    ----------
    data1 : Pandas DataFrame with at least one column
            ratings : PD rating class of obligor
    data2 : Pandas DataFrame with at least one column
            ratings : PD rating class of obligor

    ratings : column label for ratings
    alpha_level : false positive rate of hypothesis test (default .05)

    Returns
    -------
    Pandas DataFrame with the following columns :
        Rating (Index) : Contains the ratings of each class/group
        N_initial : number of obligors in each group and total
        h_initial : Herfindahl index for initial dataset
        N_current : number of obligors in each group and total
        h_current : Herfindahl index for current dataset
        p_value : overall Herfindahl test p-value
        reject : whether to reject the null hypothesis at alpha_level


    Notes
    -----
    The Herfindahl test looks for an increase in the
    dispersion of the rating grades over time.
    The (one-sided) null hypothesis is that the current Herfindahl
    index is no greater than the initial Herfindahl index.
    The test statistic is a suitably standardized difference
    in the coefficient of variation, which is monotonically
    related to the Herfindahl index.
    If the Herfindahl index has not changed, then the
    test statistic has the standard Normal distribution.
    Large values of this test statistic
    provide evidence against the null hypothesis.
    (Note that the reference [1] has an uncommon defintion
    of Herfindahl index, whereas we return the common definition)

    .. [1] "Instructions for reporting the validation results
            of internal models - IRB Pillar I models for credit risks," ECB,
            pp. 26-27, 2019.


    Examples
    --------

    >>> import random, numpy as np
    >>> buckets = ['A', 'B', 'C']
    >>> ratings1 = random.choices(buckets,  [0.4, 0.5, 0.1], k=1000)
    >>> test_data1 = pd.DataFrame({'ratings': ratings1})
    >>> ratings2 = random.choices(buckets,  [0.4, 0.5, 0.1], k=1000)
    >>> test_data2 = pd.DataFrame({'ratings': ratings2})
    >>> from meliora import herfindahl_test
    >>> herfindahl_test(test_data1, test_data2, "ratings")

           N_initial h_initial  N_current h_current   p_value reject
    B            489      None        487      None      None   None
    A            401      None        414      None      None   None
    C            110      None         99      None      None   None
    total       1000   0.19291       1000  0.206819  0.475327  False

    """

    # Perform plausibility checks
    assert ratings in data1.columns and ratings in data2.columns, f"Ratings column {ratings} not found"
    assert max(len(data1[ratings].unique()), len(data2[ratings].unique())) < 40, "Number of PD ratings is excessive"

    # Transform input data into the required format
    df1 = pd.DataFrame({"N_initial": data1[ratings].value_counts()})
    df2 = pd.DataFrame({"N_current": data2[ratings].value_counts()})

    # Calculate the Herfindahl index for each dataset
    c1, h1 = _herfindahl(df1)
    c2, h2 = _herfindahl(df2)

    # Add a row of totals along with Herfindahl indices
    df1.loc["total"] = [df1["N_initial"].sum()]
    df1["h_initial"] = None
    df1.loc["total", "h_initial"] = h1
    df2.loc["total"] = [df2["N_current"].sum()]
    df2["h_current"] = None
    df2.loc["total", "h_current"] = h2

    # Put the results together into a single dataframe
    df = df1.join(df2)

    # Calculate Herfindahl test's p-value for the dataset
    k = df.shape[0] - 1
    z_stat = (k - 1) ** 0.5 * (c2 - c1) / (c2**2 * (0.5 + c2**2)) ** 0.5
    p_value = 1 - norm.cdf(z_stat)

    # Put the p-value and test result into the output
    df["p_value"] = None
    df.loc["total", "p_value"] = p_value
    if alpha_level:
        df["reject"] = None
        df.loc["total", "reject"] = p_value < alpha_level

    return df


def herfindahl_test(data1, ratings, alpha_level=0.05):
    """Calculate the Herfindahl test for a given probability of defaults buckets

    Parameters
    ----------
    data1 : Pandas DataFrame with at least one column
            ratings : PD rating class of obligor
    data2 : Pandas DataFrame with at least one column
            ratings : PD rating class of obligor

    ratings : column label for ratings
    alpha_level : false positive rate of hypothesis test (default .05)

    Returns
    -------
    Pandas DataFrame with the following columns :
        Rating (Index) : Contains the ratings of each class/group
        N_initial : number of obligors in each group and total
        h_initial : Herfindahl index for initial dataset
        N_current : number of obligors in each group and total
        h_current : Herfindahl index for current dataset
        p_value : overall Herfindahl test p-value
        reject : whether to reject the null hypothesis at alpha_level


    Notes
    -----
    The Herfindahl test looks for an increase in the
    dispersion of the rating grades over time.
    The (one-sided) null hypothesis is that the current Herfindahl
    index is no greater than the initial Herfindahl index.
    The test statistic is a suitably standardized difference
    in the coefficient of variation, which is monotonically
    related to the Herfindahl index.
    If the Herfindahl index has not changed, then the
    test statistic has the standard Normal distribution.
    Large values of this test statistic
    provide evidence against the null hypothesis.
    (Note that the reference [1] has an uncommon defintion
    of Herfindahl index, whereas we return the common definition)

    .. [1] "Instructions for reporting the validation results
            of internal models - IRB Pillar I models for credit risks," ECB,
            pp. 26-27, 2019.


    Examples
    --------

    >>> import random, numpy as np
    >>> buckets = ['A', 'B', 'C']
    >>> ratings1 = random.choices(buckets,  [0.4, 0.5, 0.1], k=1000)
    >>> test_data1 = pd.DataFrame({'ratings': ratings1})
    >>> ratings2 = random.choices(buckets,  [0.4, 0.5, 0.1], k=1000)
    >>> test_data2 = pd.DataFrame({'ratings': ratings2})
    >>> from meliora import herfindahl_test
    >>> herfindahl_test(test_data1, test_data2, "ratings")

           N_initial h_initial  N_current h_current   p_value reject
    B            489      None        487      None      None   None
    A            401      None        414      None      None   None
    C            110      None         99      None      None   None
    total       1000   0.19291       1000  0.206819  0.475327  False

    """

    # Perform plausibility checks
    assert ratings in data1.columns, f"Ratings column {ratings} not found"
    assert len(data1[ratings].unique()) < 40, "Number of PD ratings is excessive"

    # Transform input data into the required format
    df1 = pd.DataFrame({"N_initial": data1[ratings].value_counts()})

    # Calculate the Herfindahl index for each dataset
    c1, h1 = _herfindahl(df1)

    return {'c1': c1, 'h1':h1}


def _hosmer(p, d, n):
    """

    Parameters
    ----------
    p : Pandas Series of estimated default probabilities
    d : Pandas Series of number of defaults
    n : Pandas Series of number of obligors

    Returns
    -------
    p_value : Hosmer-Lemeshow Chi-squared test p-value

    Notes
    -----
    Calculates the Hosmer-Lemeshow test statistic, as defined in
    the paper [1] referenced in hosmer_test's docstring.
    If the hypothesized PDs are accurate and defaults are independent,
    this test statisitc is approximately Chi-squared distributed
    with degrees of freedom equal to the number of rating groups minus two.
    The p-value is the probability of such a draw being
    at least as large as the observed value of the statistic.
    """

    assert len(p) > 2, "Hosmer-Lemeshow test requires at least three groups"

    # expected_def = n * p
    # expected_nodef = n * (1 - p)
    # if any(expected_def < 10) or any(expected_nodef < 10):
    #     print("Warning: a group has fewer than 10 expected defaults or non-defaults.")
    #     print("--> Chi-squared approximation is questionable.")

    # terms = (expected_def - d) ** 2 / (p * expected_nodef)
    # chisq_stat = terms.sum()
    # p_value = 1 - chi2.cdf(chisq_stat, len(p) - 2)

    kr = sum((d - p * n) ** 2 / (n * p * (1 - p)))  # todo: treatment of missing values
    p_value = 1 - chi2.cdf(kr, len(p))  # todo: p.val <- pchisq(q = hl, df = k, lower.tail = FALSE)

    return p_value


def hosmer_test(data, ratings, default_flag, predicted_pd, alpha_level=0.05):
    """Calculate the Hosmer-Lemeshow Chi-squared test for a given probability of defaults buckets

    Parameters
    ----------
    data : Pandas DataFrame with at least three columns
            ratings : PD rating class of obligor
            default_flag : 1 (or True) for defaulted and 0 (or False) for good obligors
            probs_default : predicted probability of default of an obligor

    ratings : column label for ratings
    default_flag : column label for default_flag
    probs_default : column label for probs_default
    alpha_level : false positive rate of hypothesis test (default .05)

    Returns
    -------
    Pandas DataFrame with the following columns :
        Rating (Index) : Contains the ratings of each class/group
        PD : predicted default rate in each group and total
        N : number of obligors in each group and total
        D : number of defaults in each group and total
        Default Rate : realised default rate per each group and total
        p_value : overall Hosmer-Lemeshow test p-value
        reject : whether to reject the null hypothesis at alpha_level


    Notes
    -----
    The Hosmer-Lemeshow Chi-squared test calculates a standardized sum
    of squared differences between the number of defaults and
    the expected number of defaults within each rating group.
    Under the null hypothesis that the PDs applied
    in the portfolio/rating grade at the beginning of the relevant observation period are
    equal to the true ones, the test statistic has an approximate Chi-squared distribution.
    Large values of this test statistic
    provide evidence against the null hypothesis.

    .. [1] "Backtesting Framework for PD, EAD and LGD - Public Version,"
            Bauke Maarse, Rabobank International,
            p. 43, 2012.


    Examples
    --------

    >>> import random, numpy as np
    >>> buckets = ['A', 'B', 'C']
    >>> ratings = random.choices(buckets,  [0.4, 0.5, 0.1], k=1000)
    >>> bucket_pds = {'A': .1, 'B': .15, 'C': .2}
    >>> probs_default = [bucket_pds[r] for r in ratings]
    >>> default_flag = [random.uniform(0, 1) < bucket_pds[r] for r in ratings]
    >>> test_data = pd.DataFrame({'ratings': ratings,
                                  'default_flag': default_flag,
                                  'predicted_pd' : probs_default})
    >>> from meliora import hosmer_test
    >>> hosmer_test(test_data, 'ratings', 'default_flag', 'probs_default')

                  PD       N      D  Default Rate   p_value reject
    ratings
    A        0.10000   401.0   36.0      0.089776      None   None
    B        0.15000   489.0   73.0      0.149284      None   None
    C        0.20000   110.0   23.0      0.209091      None   None
    total    0.13545  1000.0  132.0      0.132000  0.468902  False

    """

    # Perform plausibility checks
    assert all(x in data.columns for x in [ratings, default_flag, predicted_pd]), "Not all columns are present"
    assert all(x in [0, False, 1, True] for x in data[default_flag]), "Default flag can have only value 0 and 1"
    assert len(data[ratings].unique()) < 40, "Number of PD ratings is excessive"
    assert all(x >= 0 and x <= 1 for x in data[predicted_pd]), "Predicted PDs must be between 0% and 100%"

    # Transform input data into the required format
    df = data.groupby(ratings).agg({predicted_pd: "mean", default_flag: ["count", "sum", "mean"]})
    df.columns = ["PD", "N", "D", "Default Rate"]

    # Calculate Hosmer-Lemeshow test's p-value for the dataset
    p_value = _hosmer(df["PD"], df["D"], df["N"])

    return {'p_value': p_value, 'p_val < a': bool(p_value < alpha_level)}


def _spiegelhalter(realised_values, predicted_values, alpha_level=0.05):
    """
    todo: https://github.com/andrija-djurovic/PDtoolkit/blob/main/R/12_PREDICTIVE_POWER.R

    Parameters
    ----------
    ratings : Pandas Series of rating categories
    default_flag : Pandas Series of default outcomes (0/1 or False/True)
    df : Pandas DataFrame with ratings as rownames and a column of hypothesized 'PD' values

    Returns
    -------
    p_value : Spiegelhalter test p-value

    Notes
    -----
    Calculates the mean squared error (MSE) between the outcomes
    and their hypothesized PDs, which is approximately Normal.
    If the hypothesized PDs equal the true PDs, then the mean
    and standard deviation of that statistic are provided in
    the paper [1] referenced in spiegelhalter_test's docstring.
    The standardized statistic is approximately standard Normal.
    and leads to a "one-sided" p-value via the Normal cdf.
    """

    # Calculate mean squared error
    errors = realised_values - predicted_values
    mse = (errors**2).sum() / len(errors)

    # # Calculate null expectation and variance of MSE
    expectations = sum(predicted_values * (1 - predicted_values)) / len(realised_values)
    variances = (
        sum(predicted_values * (1 - 2 * predicted_values) ** 2 * (1 - predicted_values)) / len(realised_values) ** 2
    )

    # Calculate standardized statistic
    z_score = (mse - expectations) / np.sqrt(variances)  # todo: check formula

    # Calculate standardized MSE as test statistic, then its p-value
    outcome = z_score > norm.ppf(1 - alpha_level / 2)

    return {'z_score': z_score, 'outcome':bool(outcome)}


def spiegelhalter_test(data, ratings, default_flag, predicted_pd, alpha_level=0.05):
    """Calculate the Spiegelhalter test for a given probability of defaults buckets

    Parameters
    ----------
    data : Pandas DataFrame with at least three columns
            ratings : PD rating class of obligor
            default_flag : 1 (or True) for defaulted and 0 (or False) for good obligors
            probs_default : predicted probability of default of an obligor

    ratings : column label for ratings
    default_flag : column label for default_flag
    probs_default : column label for probs_default
    alpha_level : false positive rate of hypothesis test (default .05)

    Returns
    -------
    Pandas DataFrame with the following columns :
        Rating (Index) : Contains the ratings of each class/group
        PD : predicted default rate in each group and total
        N : number of obligors in each group and total
        D : number of defaults in each group and total
        Default Rate : realised default rate per each group and total
        p_value : overall Spiegelhalter test p-value
        reject : whether to reject the null hypothesis at alpha_level


    Notes
    -----
    The Spiegelhalter test compares forecasted defaults with observed defaults by analyzing
    the prediction errors. Under the null hypothesis that the PDs applied
    in the portfolio/rating grade at the beginning of the relevant observation period are
    equal to the true ones, the mean squared error can be standardized into
    an approximately standard Normal test statistic. Large values of this test statistic
    provide evidence against the null hypothesis.

    .. [1] "Backtesting Framework for PD, EAD and LGD - Public Version,"
            Bauke Maarse, Rabobank International,
            pp. 43-44, 2012.


    Examples
    --------

    >>> import random, numpy as np
    >>> buckets = ['A', 'B', 'C']
    >>> ratings = random.choices(buckets,  [0.4, 0.5, 0.1], k=1000)
    >>> bucket_pds = {'A': .1, 'B': .15, 'C': .2}
    >>> probs_default = [bucket_pds[r] for r in ratings]
    >>> default_flag = [random.uniform(0, 1) < bucket_pds[r] for r in ratings]
    >>> test_data = pd.DataFrame({'ratings': ratings,
                                  'default_flag': default_flag,
                                  'predicted_pd' : probs_default})
    >>> from meliora import spiegelhalter_test
    >>> spiegelhalter_test(test_data, 'ratings', 'default_flag', 'probs_default')

                  PD       N      D  Default Rate   p_value reject
    ratings
    A        0.10000   401.0   36.0      0.089776      None   None
    B        0.15000   489.0   73.0      0.149284      None   None
    C        0.20000   110.0   23.0      0.209091      None   None
    total    0.13545  1000.0  132.0      0.132000  0.647161  False

    """

    # Perform plausibility checks
    assert all(x in data.columns for x in [ratings, default_flag, predicted_pd]), "Not all columns are present"
    assert all(x in [0, False, 1, True] for x in data[default_flag]), "Default flag can have only value 0 and 1"
    assert len(data[ratings].unique()) < 40, "Number of PD ratings is excessive"
    assert all(x >= 0 and x <= 1 for x in data[predicted_pd]), "Predicted PDs must be between 0% and 100%"

    # Transform input data into the required format
    df = data.groupby(ratings).agg({predicted_pd: "mean", default_flag: ["count", "sum", "mean"]})
    df.columns = ["PD", "N", "D", "Default Rate"]

    # Calculate Spiegelhalter test's p-value for the dataset
    result = _spiegelhalter(df["PD"], df["Default Rate"])

    return result


def _jeffreys(p, d, n):
    """

    Parameters
    ----------
    p : estimated default probability
    d : number of defaults
    n : number of obligors

    Returns
    -------
    p_value : Jeffrey's "p-value" (The posterior probability of the null hypothesis)

    Notes
    -----
    Given the Jeffreys prior for the binomial proportion, the
    posterior distribution is a beta distribution with shape parameters a = D + 1/2 and
    b = N − D + 1/2. Here, N is the number of customers in the portfolio/rating grade and
    D is the number of those customers that have defaulted within that observation
    period. The p-value (i.e. the cumulative distribution function of the aforementioned
    beta distribution evaluated at the PD of the portfolio/rating grade) serves as a
    measure of the adequacy of estimated PD.
    """

    a = d + 0.5
    b = n - d + 0.5
    p_value = beta.cdf(p, a, b)

    return p_value


def jeffreys_test(data, ratings, default_flag, predicted_pd, alpha_level=0.05):
    """Calculate the Jeffrey's test for a given probability of defaults buckets

    Parameters
    ----------
    data : Pandas DataFrame with at least three columns
            ratings : PD rating class of obligor
            default_flag : 1 (or True) for defaulted and 0 (or False) for good obligors
            probs_default : predicted probability of default of an obligor

    ratings : column label for ratings
    default_flag : column label for default_flag
    probs_default : column label for probs_default
    alpha_level : false positive rate of hypothesis test (default .05)

    Returns
    -------
    Pandas DataFrame with the following columns :
        Rating (Index) : Contains the ratings of each class/group
        PD : predicted default rate in each group
        N : number of obligors in each group
        D : number of defaults in each group
        Default Rate : realised default rate per each group
        p_value : Jeffreys p-value
        reject : whether to reject the null hypothesis at alpha_level


    Notes
    -----
    The Jeffreys test compares forecasted defaults with observed defaults in a binomial
    model with independent observations under the null hypothesis that the PD applied
    in the portfolio/rating grade at the beginning of the relevant observation period is
    greater than the true one (one-sided hypothesis test). The test updates a Beta distribution
    (with Jeffrey's prior) in light of the number of defaults and non-defaults,
    then reports the posterior probability of the null hypothesis.

    .. [1] "Instructions for reporting the validation results
            of internal models - IRB Pillar I models for credit risks," ECB,
            pp. 20-21, 2019.


    Examples
    --------

    >>> import random, numpy as np
    >>> buckets = ['A', 'B', 'C']
    >>> ratings = random.choices(buckets,  [0.4, 0.5, 0.1], k=1000)
    >>> bucket_pds = {'A': .1, 'B': .15, 'C': .2}
    >>> probs_default = [bucket_pds[r] for r in ratings]
    >>> default_flag = [random.uniform(0, 1) < bucket_pds[r] for r in ratings]
    >>> test_data = pd.DataFrame({'ratings': ratings,
                                  'default_flag': default_flag,
                                  'predicted_pd' : probs_default})
    >>> from meliora import jeffreys_test
    >>> jeffreys_test(test_data, 'ratings', 'default_flag', 'probs_default')

               PD    N   D  Default Rate   p_value  reject
    ratings
    A        0.10  401  36      0.089776  0.748739   False
    B        0.15  489  73      0.149284  0.511781   False
    C        0.20  110  23      0.209091  0.397158   False

    """

    # Perform plausibility checks
    assert all(x in data.columns for x in [ratings, default_flag, predicted_pd]), "Not all columns are present"
    assert all(x in [0, False, 1, True] for x in data[default_flag]), "Default flag can have only value 0 and 1"
    assert len(data[ratings].unique()) < 40, "Number of PD ratings is excessive"
    assert all(x >= 0 and x <= 1 for x in data[predicted_pd]), "Predicted PDs must be between 0% and 100%"

    # Transform input data into the required format
    df = data.groupby(ratings).agg({predicted_pd: "mean", default_flag: ["count", "sum", "mean"]}).reset_index()
    df.columns = ["Rating class", "Predicted PD", "Total count", "Defaults", "Actual Default Rate"]

    # Calculate Binomial test outcome for each rating
    df["p_value"] = _jeffreys(df["Predicted PD"], df["Defaults"], df["Total count"])
    df["Reject H0"] = df["p_value"] < alpha_level

    return df


def roc_auc(df, target, prediction):
    """Compute Area ROC AUC from prediction scores.

    Note: this implementation can be used with binary, multiclass and
    multilabel classification, but some restrictions apply (see Parameters).
    Read more in the :ref:`User Guide <roc_metrics>`.
    Parameters
    ----------
    y_true : array-like of shape (n_samples,) or (n_samples, n_classes)
        True labels or binary label indicators. The binary and multiclass cases
        expect labels with shape (n_samples,) while the multilabel case expects
        binary label indicators with shape (n_samples, n_classes).
    y_score : array-like of shape (n_samples,) or (n_samples, n_classes)
        Target scores.

    Returns
    -------
    auc : float
        Area Under the Curve score.

    See Also
    --------
    https://github.com/scikit-learn/scikit-learn/blob/36958fb24/sklearn/metrics/_ranking.py#L47
    """

    # Perform plausibility checks
    assert all(x >= 0 and x <= 1 for x in df[target]), "Predicted PDs must be between 0% and 100%"
    assert all(x >= 0 and x <= 1 for x in df[prediction]), "Predicted PDs must be between 0% and 100%"

    return roc_auc_score(df[target], df[prediction])


def clar(df, predicted_ratings, realised_outcomes):
    """
    CLAR serves as a measure of ranking ability against LGD risk
    The cumulative LGD accuracy ratio (CLAR) curve can be treated as
    the equivalent of the Cumulative Accuracy Profile (CAP) curve. This
    test compares the cumulative percentage of correctly assigned realized
    LGD and the cumulative rate of observations in the predicted LGD bands.
    Parameters
    ----------
    predicted_ratings: pandas Series
        predicted LGD, can be ordinal or continuous
    realised_outcomes: pandas Series
        realised LGD, can be ordinal or continuous
    Returns
    -------
    clar: scalar
        Cumulative LGD Accuracy Ratio
    References
    --------------
    [1] Ozdemir, B., Miu, P., 2009. Basel II Implementation.
    A Guide to Developing and Validating a Compliant Internal Risk Rating
    System. McGraw-Hill, USA.
    [2] See also: https://rdrr.io/cran/VUROCS/man/clar.html
    Examples
    --------
        >>> res = clar(predicted_ratings, realised_outcomes)
        >>> print(res)
    """

    # Calculate CLAR
    x_s = [0]
    x_values = [0]
    y_values = [0]

    for i, j in enumerate(list(set(df[predicted_ratings]))[::-1]):
        x = (df[predicted_ratings] == j).sum()
        x_bucket = df.sort_values(by=realised_outcomes, ascending=False)[x_s[i]: x_s[i] + x]
        x_value = x / len(df)
        y_value = (x_bucket[realised_outcomes] == j).sum() / len((x_bucket[realised_outcomes] == j))
        x_values.append(x_value)
        y_values.append(y_value)
        x_s.append(x + 1)

    new_xvalues = list(np.cumsum(x_values))
    new_yvalues = list(np.cumsum(y_values))

    model_auc = auc(new_xvalues, new_yvalues)
    clar = 2 * model_auc

    return clar


def loss_capture_ratio(ead, predicted_ratings, realised_outcomes):
    """
    The loss_capture_ratio measures how well a model is able to
    rank LGDs when compared to the observed losses.
    For this approach three plots are relevant: the model loss
    capture curve, ideal loss capture curve and the random loss
    capture curve. These curves are constructed in the same way
    as the curves for the CAP. The main difference is the data,
    which is for LGDs and the LR a (continuous) percentage of the EAD,
    while for the CAP it is binary.
    The LC can be percentage weighted, which simply uses the LGD and
    LR percentages as input, while it can also be EAD weighted, which
    uses the LGD and LR multiplied with the respective EAD as input.
    The results between the two approaches can differ  if the portfolio
    is not-well balanced.
    Parameters
    ----------
    ead: pandas Series
        Exposure at Default
    predicted_ratings: pandas Series
        predicted LGD, can be ordinal or continuous
    realised_outcomes: pandas Series
        realised LGD, can be ordinal or continuous
    Returns
    -------
    LCR: scalar
        Loss Capture Ratio
    References
    ----------------
    Li, D., Bhariok, R., Keenan, S., & Santilli, S. (2009). Validation techniques
    and performance metrics for loss given default models.
    The Journal of Risk Model Validation, 33, 3-26.
    Examples
    --------
        >>> res = loss_capture_ratio(ead, predicted_ratings, realised_outcomes)
        >>> print(res)
    """

    # Create a dataframe
    frame = {"ead": ead, "predicted_ratings": predicted_ratings, "realised_outcomes": realised_outcomes}
    df = pd.DataFrame(frame)

    # Prepare data
    df["loss"] = df["ead"] * df["realised_outcomes"]

    # Model loss capture curve
    df2 = df.sort_values(by="predicted_ratings", ascending=False)
    df2["cumulative_loss"] = df2.cumsum()["loss"]
    df2["cumulative_loss_capture_percentage"] = df2.cumsum()["loss"] / df2.loss.sum()
    auc_curve1 = auc([i for i in range(len(df2))], df2.cumulative_loss_capture_percentage)
    random_auc1 = 0.5 * len(df2) * 1

    # Ideal loss capture curve
    df3 = df.sort_values(by="realised_outcomes", ascending=False)
    df3["cumulative_loss"] = df3.cumsum()["loss"]
    df3["cumulative_loss_capture_percentage"] = df3.cumsum()["loss"] / df3.loss.sum()
    auc_curve2 = auc([i for i in range(len(df3))], df3.cumulative_loss_capture_percentage)
    random_auc2 = 0.5 * len(df3) * 1

    loss_capture_ratio = (auc_curve1 - random_auc1) / (auc_curve2 - random_auc2)

    return loss_capture_ratio


def bayesian_error_rate(default_flag, prob_default):
    """
    BER is the proportion of the whole sample that is misclassified
    when the rating system is in optimal use. For a perfect rating model,
    the BER has a value of zero. A model's BER depends on the probability
    of default. The lower the BER, and the lower the classification error,
    the better the model.
    The Bayesian error rate specifies the minimum probability of error if
    the rating system or score function under consideration is used for a
    yes/no decision whether a borrower will default or not. The error can
    be estimated parametrically, e.g. assuming normal score distributions,
    or non-parametrically, for instance with kernel density estimation methods.
    If parametric estimation is applied, the distributional assumptions have
    to be carefully checked. Non-parametric estimation will be critical if
    sample sizes are small. In its general form, the error rate depends on
    the total portfolio probability of default. As a consequence, in many
    cases its magnitude is influenced much more by the probability of
    erroneously identifying a non-defaulter as a defaulter than by the
    probability of not detecting a defaulter.
    In practice, therefore, the error rate is often applied
    with a fictitious 50% probability of default. In this case, the error
    rate is equivalent to the Kolmogorov-Smirnov statistic and to the Pietra index.
    Parameters
    ----------
    default_flag : pandas series
        Boolean flag indicating whether the borrower has actually defaulted
    prob_default : pandas series
        Predicted default probability, as returned by a classifier.
    Returns
    ---------
    score : float
        Bayesian Error Rate.

    Examples
    --------
    >>> from scipy import stats
    >>> default_flag = [1, 0, 0, 1, 1]
    >>> prob_default = [0.01, 0.04, 0.07, 0.11, 0]
    >>> bayesian_error_rate(default_flag, prob_default)
    -0.47140452079103173
    """

    frame = {"default_flag": default_flag, "prob_default": prob_default}

    df = pd.DataFrame(frame)

    fpr, tpr, thresholds = metrics.roc_curve(df["default_flag"], df["prob_default"])
    roc_curve_df = pd.DataFrame({"c": thresholds, "hit_rate": tpr, "false_alarm_rate": fpr})

    p_d = df.default_flag.sum() / len(df)

    roc_curve_df["ber"] = p_d * (1 - roc_curve_df.hit_rate) + (1 - p_d) * roc_curve_df.false_alarm_rate

    return round(min(roc_curve_df["ber"]), 3)


def calc_iv(df, feature, target, pr=0):
    """
    A numerical value that quantifies the predictive power of an independent
    variable in capturing the binary dependent variable.
    Weight of evidence (WOE) is a measure of how much the evidence supports or
    undermines a hypothesis. WOE measures the relative risk of an attribute of
    binning level. The value depends on whether the value of the target variable
    is a nonevent or an event.
    The information value (IV) is a weighted sum of the WOE of the
    characteristic's attributes. The weight is the difference between the
    conditional probability of an attribute for an event and the conditional
    probability of that attribute for a nonevent.
    An information value can be any real number. Generally speaking, the higher
    the information value, the more predictive an attribute is likely to be.
    Parameters
    ----------
    df : Pandas dataframe
        Contains information on the the feature and target variable
    feature : string
        independent variable
    feature : string
        dependent variable
    Returns
    -------
    iv : float
       Information Value.
    References
    --------------
    -  https://www.lexjansen.com/mwsug/2013/AA/MWSUG-2013-AA14.pdf.
    -  https://documentation.sas.com/doc/en/vdmmlcdc/8.1/casstat/viyastat_binning_details02.htm.
    Examples
    --------
    >>> iv = calc_iv(df, feature, target, pr=0)
    >>> iv
    -0.47140452079103173

    """

    lst = []

    for i in range(df[feature].nunique()):
        val = list(df[feature].unique())[i]
        lst.append(
            [
                feature,
                val,
                df[df[feature] == val].count()[feature],
                df[(df[feature] == val) & (df[target] == 1)].count()[feature],
            ]
        )

    data = pd.DataFrame(lst, columns=["Variable", "Value", "All", "Bad"])
    data = data[data["Bad"] > 0]

    data["Share"] = data["All"] / data["All"].sum()
    data["Bad Rate"] = data["Bad"] / data["All"]
    data["Distribution Good"] = (data["All"] - data["Bad"]) / (data["All"].sum() - data["Bad"].sum())
    data["Distribution Bad"] = data["Bad"] / data["Bad"].sum()
    data["WoE"] = np.log(data["Distribution Good"] / data["Distribution Bad"])
    data["IV"] = data["WoE"] * (data["Distribution Good"] - data["Distribution Bad"])

    data = data.sort_values(by=["Variable", "Value"], ascending=True)
    iv = data["IV"].sum()

    return data, iv


def lgd_t_test(df, observed_LGD_col, expected_LGD_col, verbose=False):
    """t-test for the Null hypothesis that estimated LGD is greater than true LGD
    Parameters
    ----------
    df: array-like, at least 2D
        data
    observed_LGD_col: string
        name of column with observed LGD values
    expected_LGD_col: string
        name of column with expected LGD values
    verbose: boolean
        if true, results and interpretation are printed
    Returns
    -------
    N: integer
        Number of customers
    LGD.mean: float
        Mean value of observed LGD values
    pred_LGD.mean: float
        Mean value of predicted LGD values
    t_stat: float
        test statistic
    lgd_s2: float
        denominator of test statistic
    p_value: float
        p-value of the test
    Notes
    -----------
    Observations are assumed to be independent.
    This fundtion can be used for both performing and non-performing LGDs.
    Examples
    --------
    .. code-block:: python
        >>> res = lgd_t_test(df=df, observed_LGD_col='LGD', expected_LGD_col='PRED_LGD', verbose=True)
        >>> print(res)
    """
    # Checking for any missing data
    if df.empty:
        raise TypeError("No data provided!")
    if observed_LGD_col is None:
        raise TypeError("No column name for observed LGDs provided")
    if expected_LGD_col is None:
        raise TypeError("No column name for expected LGDs provided")

    # Checking that the correct datatype
    if not isinstance(observed_LGD_col, str):
        raise TypeError("observed_LGD_col not of type string")
    if not isinstance(expected_LGD_col, str):
        raise TypeError("expected_LGD_col not of type string")

    # Check if the correct column names have been provided
    if observed_LGD_col not in df.columns:
        raise ValueError("{} not a column in the df".format(observed_LGD_col))
    if expected_LGD_col not in df.columns:
        raise ValueError("{} not a column in the df".format(expected_LGD_col))

    # Check the data for missing values
    if df[observed_LGD_col].hasnans:
        raise ValueError("Missing values in {}".format(observed_LGD_col))
    if df[expected_LGD_col].hasnans:
        raise ValueError("Missing values in {}".format(expected_LGD_col))

    N = len(df)
    LGD = df[observed_LGD_col]
    pred_LGD = df[expected_LGD_col]
    error = LGD - pred_LGD
    mean_error = error.mean()
    num = np.sqrt(N) * mean_error
    lgd_s2 = ((error - mean_error) ** 2).sum() / (N - 1)
    t_stat = num / np.sqrt(lgd_s2)
    p_value = 1 - t.cdf(t_stat, df=N - 1)

    if verbose is True:
        # print the results
        print(
            "t_stat=%.3f, LGD.mean=%.3f,pred_LGD.mean=%.3f,N=%d, s2=%.3f, p=%.3f"
            % (t_stat, pred_LGD.mean(), LGD.mean(), N, lgd_s2, p_value)
        )
        if p_value <= 0.05:
            print("P-value <= 5%, therefore, H0 is rejected.")
        elif p_value > 0.05:
            print("P-value > 5%, therefore, H0 fails to be rejected.")

    return N, LGD.mean(), pred_LGD.mean(), t_stat, lgd_s2, p_value


def migration_matrix_stability(df, initial_ratings_col, final_ratings_col):
    """z-tests to verify stability of transition matrices
    Parameters
    ----------
    df: array-like, at least 2D
        data
    initial_ratings_col: string
        name of column with initial ratings values
    final_ratings_col: string
        name of column with final ratings values
    Returns
    -------
    z_df: array-like
        z statistic for each ratings pair
    phi_df: array-like
        p-values for each ratings pair
    Notes
    -----------
    The Null hypothesis is that p_ij >= p_ij-1 or p_ij-1 >= p_ij
    depending on whether the (ij) entry is below or above main diagonal
    Examples
    --------
    .. code-block:: python
        >>> res = migration_matrix_stability(df=df, initial_ratings_col='ratings', final_ratings_col='ratings2')
        >>> print(res)
    """
    a = df[initial_ratings_col]
    b = df[final_ratings_col]
    N_ij = pd.crosstab(a, b)
    p_ij = pd.crosstab(a, b, normalize="index")
    K = len(set(a))
    z_df = p_ij.copy()
    for i in range(1, K + 1):
        for j in range(1, K + 1):
            if i == j:

                z_ij = np.nan

            if i > j:
                Ni = N_ij.sum(axis=1).values[i - 1]

                num = p_ij.iloc[i - 1, j - 1 + 1] - p_ij.iloc[i - 1, j - 1]
                den_a = p_ij.iloc[i - 1, j - 1] * (1 - p_ij.iloc[i - 1, j - 1]) / Ni
                den_b = p_ij.iloc[i - 1, j - 1 + 1] * (1 - p_ij.iloc[i - 1, j - 1 + 1]) / Ni
                den_c = 2 * p_ij.iloc[i - 1, j - 1] * p_ij.iloc[i - 1, j - 1 + 1] / Ni

                z_ij = num / np.sqrt(den_a + den_b + den_c)

            elif i < j:
                Ni = N_ij.sum(axis=1).values[i - 1]

                num = p_ij.iloc[i - 1, j - 1 - 1] - p_ij.iloc[i - 1, j - 1]
                den_a = p_ij.iloc[i - 1, j - 1] * (1 - p_ij.iloc[i - 1, j - 1]) / Ni
                den_b = p_ij.iloc[i - 1, j - 1 - 1] * (1 - p_ij.iloc[i - 1, j - 1 - 1]) / Ni
                den_c = 2 * p_ij.iloc[i - 1, j - 1] * p_ij.iloc[i - 1, j - 1 - 1] / Ni

                z_ij = num / np.sqrt(den_a + den_b + den_c)

            else:

                z_ij = np.nan

            z_df.iloc[i - 1, j - 1] = z_ij
    phi_df = z_df.apply(lambda x: x.apply(lambda y: norm.cdf(y)))
    return z_df, phi_df


def psi(actual, actual_bins, expected, expected_bins, crosstab=False):  # todo: expected vs actual
    """Calculate the PSI for a single variable
    Args:
        expected_array: numpy array of original values
        actual_array: numpy array of new values, same size as expected
        buckets: number of percentile ranges to bucket the values into
    Returns:
        psi_value: calculated PSI value

    """

    if crosstab:
        actual = pd.crosstab(actual, actual_bins).sum()
        expected = pd.crosstab(expected, expected_bins).sum()
    actual = actual/actual.sum()
    expected = expected/expected.sum()
    psi = (actual - expected) * np.log(actual/expected)
    psi = np.sum(psi)
    # df.columns = ["actual", "expected"]

    # df["expected"] = np.where(df["expected"] == 0, 0.0001, df["expected"])

    # # Calculating PSI
    # df["PSI"] = (df["actual"] - df["expected"]) * np.log(df["actual"] / df["expected"])

    # psi = np.sum(df["PSI"])

    return psi


def kendall_tau(x, y, variant="b"):
    """
    Calculate Kendall's tau, a correlation measure for ordinal data.
    This is a wrapper around SciPy kendalltau function.
    Kendall's tau is a measure of the correspondence between two rankings.
    Values close to 1 indicate strong agreement, and values close to -1
    indicate strong disagreement. This implements two variants of Kendall's
    tau: tau-b (the default) and tau-c (also known as Stuart's tau-c). These
    differ only in how they are normalized to lie within the range -1 to 1;
    the hypothesis tests (their p-values) are identical. Kendall's original
    tau-a is not implemented separately because both tau-b and tau-c reduce
    to tau-a in the absence of ties.
    Parameters
    ----------
    x, y : array_like
        Arrays of rankings, of the same shape. If arrays are not 1-D, they
        will be flattened to 1-D.
    variant: {'b', 'c'}, optional
        Defines which variant of Kendall's tau is returned. Default is 'b'.
    Returns
    -------
    correlation : float
       The tau statistic.
    pvalue : float
       The p-value for a hypothesis test whose null hypothesis is
       an absence of association, tau = 0.
    References
    --------------
    [1] Maurice G. Kendall, "A New Measure of Rank Correlation", Biometrika
           Vol. 30, No. 1/2, pp. 81-93, 1938.
    [2] Maurice G. Kendall, "The treatment of ties in ranking problems",
           Biometrika Vol. 33, No. 3, pp. 239-251. 1945.
    [3] Gottfried E. Noether, "Elements of Nonparametric Statistics",
        John Wiley & Sons, 1967.
    [4] Peter M. Fenwick, "A new data structure for cumulative frequency tables",
        Software: Practice and Experience, Vol. 24, No. 3, pp. 327-336, 1994.
    [5] Maurice G. Kendall, "Rank Correlation Methods" (4th Edition),
           Charles Griffin & Co., 1970.
    Scipy: https://github.com/scipy/scipy/blob/v1.8.1/scipy/stats/_stats_py.py#L4666-L4875
    Examples
    --------
    >>> from scipy import stats
    >>> x1 = [12, 2, 1, 12, 2]
    >>> x2 = [1, 4, 7, 1, 0]
    >>> tau, p_value = kendall_tau(x1, x2)
    >>> tau
    -0.47140452079103173
    >>> p_value
    0.2827454599327748
    """

    tau, pvalue = stats.kendalltau(x, y, initial_lexsort=None, variant="b")

    return tau, pvalue


def somersd(array_1, array_2, alternative="two-sided"):
    """
    Calculates Somers' D, an asymmetric measure of ordinal association.
    This is a wrapper around scipy.stats.somersd function.
    Somers' :math:`D` is a measure of the correspondence between two rankings.
    It considers the difference between the number of concordant
    and discordant pairs in two rankings and is  normalized such that values
    close  to 1 indicate strong agreement and values close to -1 indicate
    strong disagreement.
    Parameters
    ----------
    x: array_like
        1D array of rankings, treated as the (row) independent variable.
        Alternatively, a 2D contingency table.
    y: array_like, optional
        If `x` is a 1D array of rankings, `y` is a 1D array of rankings of the
        same length, treated as the (column) dependent variable.
        If `x` is 2D, `y` is ignored.
    alternative : {'two-sided', 'less', 'greater'}, optional
        Defines the alternative hypothesis. Default is 'two-sided'.
        The following options are available:
        * 'two-sided': the rank correlation is nonzero
        * 'less': the rank correlation is negative (less than zero)
        * 'greater':  the rank correlation is positive (greater than zero)
    Returns
    -------
    res : SomersDResult
        A `SomersDResult` object with the following fields:
            correlation : float
               The Somers' :math:`D` statistic.
            pvalue : float
               The p-value for a hypothesis test whose null
               hypothesis is an absence of association, :math:`D=0`.
               See notes for more information.
            table : 2D array
               The contingency table formed from rankings `x` and `y` (or the
               provided contingency table, if `x` is a 2D array)
    References
    ----------
    [1] Robert H. Somers, "A New Asymmetric Measure of Association for
           Ordinal Variables", *American Sociological Review*, Vol. 27, No. 6,
           pp. 799--811, 1962.
    [2] Morton B. Brown and Jacqueline K. Benedetti, "Sampling Behavior of
           Tests for Correlation in Two-Way Contingency Tables", *Journal of
           the American Statistical Association* Vol. 72, No. 358, pp.
           309--315, 1977.
    [3] SAS Institute, Inc., "The FREQ Procedure (Book Excerpt)",
           *SAS/STAT 9.2 User's Guide, Second Edition*, SAS Publishing, 2009.
    [4] Laerd Statistics, "Somers' d using SPSS Statistics", *SPSS
           Statistics Tutorials and Statistical Guides*,
           https://statistics.laerd.com/spss-tutorials/somers-d-using-spss-statistics.php,
           Accessed July 31, 2020.
    Examples
    --------
    >>> table = [[27, 25, 14, 7, 0], [7, 14, 18, 35, 12], [1, 3, 2, 7, 17]]
    >>> res = somersd(table)
    >>> res.statistic
    0.6032766111513396
    >>> res.pvalue
    1.0007091191074533e-27

    """

    return stats.somersd(array_1, array_2, alternative="two-sided")


def spearman_corr(array_1, array_2, alternative="two-sided"):
    """
    Calculate a Spearman correlation coefficient with associated p-value.
    This is a wrapper around scipy.stats.spearmanr function.
    The Spearman rank-order correlation coefficient is a nonparametric
    measure of the monotonicity of the relationship between two datasets.
    Unlike the Pearson correlation, the Spearman correlation does not
    assume that both datasets are normally distributed. Like other
    correlation coefficients, this one varies between -1 and +1 with 0
    implying no correlation. Correlations of -1 or +1 imply an exact
    monotonic relationship. Positive correlations imply that as x
    increases, so does y. Negative correlations imply that as x increases,
    y decreases.
    The p-value roughly indicates the probability of an uncorrelated
    system producing datasets that have a Spearman correlation at least
    as extreme as the one computed from these datasets. The p-values
    are not entirely reliable but are probably reasonable for datasets
    larger than 500 or so.

    Parameters
    ----------
    array_1 : pandas series
        Series containing multiple observations
    array_2 : pandas series
        Series containing multiple observations
    alternative : {'two-sided', 'less', 'greater'}, optional
        Defines the alternative hypothesis. Default is 'two-sided'.
        The following options are available:
        * 'two-sided': the correlation is nonzero
        * 'less': the correlation is negative (less than zero)
        * 'greater':  the correlation is positive (greater than zero)

    Returns
    -------
    correlation : float or ndarray (2-D square)
        Spearman correlation matrix or correlation coefficient (if only 2
        variables are given as parameters. Correlation matrix is square with
        length equal to total number of variables (columns or rows) in ``a``
        and ``b`` combined.
    pvalue : float
        The p-value for a hypothesis test whose null hypotheisis
        is that two sets of data are uncorrelated. See `alternative` above
        for alternative hypotheses. `pvalue` has the same
        shape as `correlation`.

    References
    -------------
        [1] Zwillinger, D. and Kokoska, S. (2000). CRC Standard
        Probability and Statistics Tables and Formulae. Chapman & Hall: New
        York. 2000.
        Section  14.7

    Examples
    --------
    >>> spearmanr([1,2,3,4,5], [5,6,7,8,7])
    SpearmanrResult(correlation=0.82078..., pvalue=0.08858...)
    """

    return stats.spearmanr(array_1, array_2, alternative="two-sided")


In [None]:
# report_generator.py

from docxtpl import DocxTemplate, InlineImage


def get_context():
    return {
        'invoice_no': 12345,
        'date': '30 Mar',
        'due_date': '30 Apr',
        'name': 'Jane Doe',
        'address': '123 Quiet Lane',
        'subtotal': 335,
        'tax_amt': 10,
        'total': 345,
        'amt_paid': 100,
        'amt_due': 245,
        'row_contents': [
            {
                'description': 'Eggs',
                'quantity': 30,
                'rate': 5,
                'amount': 150
            }, {
                'description': 'All Purpose Flour',
                'quantity': 10,
                'rate': 15,
                'amount': 150
            }, {
                'description': 'Eggs',
                'quantity': 5,
                'rate': 7,
                'amount': 35
            }
        ]
    }


doc = DocxTemplate('monitoring_report_template.docx')
context = get_context()

doc.render(context)
doc.save("generated_doc.docx")


In [None]:
# monitoring.py

"""
Scorecard monitoring (System stability report)
"""

import numbers
import time

import matplotlib.pyplot as plt
import matplotlib.ticker as mtick
import numpy as np
import pandas as pd

from scipy import stats
from sklearn.base import BaseEstimator
from sklearn.exceptions import NotFittedError
from sklearn.utils.multiclass import type_of_target

from ..binning.binning_statistics import bin_str_format
from ..binning.metrics import jeffrey
from ..binning.prebinning import PreBinning
from ..formatting import dataframe_to_string
from ..logging import Logger
from ..metrics.classification import gini
from ..metrics.classification import imbalanced_classification_metrics
from ..metrics.regression import regression_metrics
from .monitoring_information import print_monitoring_information
from .scorecard import Scorecard


logger = Logger(__name__).logger


PSI_VERDICT_MSG = {0: "No significant change",
                   1: "Requires investigation",
                   2: "Significance change"}


def _check_parameters(scorecard, psi_method, psi_n_bins,
                      psi_min_bin_size, show_digits, verbose):

    if not isinstance(scorecard, Scorecard):
        raise TypeError("scorecard must be a Scorecard instance.")

    if psi_method not in ("uniform", "quantile", "cart"):
        raise ValueError('Invalid value for prebinning_method. Allowed '
                         'string values are "cart", "quantile" and '
                         '"uniform".')

    if psi_n_bins is not None:
        if (not isinstance(psi_n_bins, numbers.Integral) or
                psi_n_bins <= 0):
            raise ValueError("psi_n_bins must be a positive integer; got {}."
                             .format(psi_n_bins))

    if psi_min_bin_size is not None:
        if (not isinstance(psi_min_bin_size, numbers.Number) or
                not 0. < psi_min_bin_size <= 0.5):
            raise ValueError("psi_min_bin_size must be in (0, 0.5]; got {}."
                             .format(psi_min_bin_size))

    if (not isinstance(show_digits, numbers.Integral) or
            not 0 <= show_digits <= 8):
        raise ValueError("show_digits must be an integer in [0, 8]; "
                         "got {}.".format(show_digits))

    if not isinstance(verbose, bool):
        raise TypeError("verbose must be a boolean; got {}.".format(verbose))


def print_psi_report(df_psi):
    t_psi = df_psi.PSI.values[-1]
    psi = df_psi.PSI.values[:-1]

    splits = [0.1, 0.25]
    n_bins = len(splits) + 1
    indices = np.digitize(psi, splits, right=True)

    psi_bins = np.empty(n_bins).astype(np.int64)
    for i in range(n_bins):
        mask = (indices == i)
        psi_bins[i] = len(psi[mask])

    p_psi_bins = psi_bins / psi_bins.sum()
    psi_verdict = PSI_VERDICT_MSG[np.digitize(t_psi, splits)]

    df_psi_string = dataframe_to_string(pd.DataFrame({
        "PSI bin": ["[0.00, 0.10)", "[0.10, 0.25)", "[0.25, Inf+)"],
        "Count": psi_bins,
        "Count (%)": p_psi_bins
        }), tab=4)

    psi_stats = (
        "  Population Stability Index (PSI)\n\n"
        ""
        "\n"
        "    PSI total:     {:>7.4f} ({})\n\n"
        "{}\n").format(t_psi, psi_verdict, df_psi_string)

    print(psi_stats)


def print_tests_report(df_tests):
    pvalues = df_tests["p-value"].values

    splits = [0.05, 0.1, 0.5]
    n_bins = len(splits) + 1
    indices = np.digitize(pvalues, splits, right=True)

    pvalue_bins = np.empty(n_bins).astype(np.int64)
    for i in range(n_bins):
        mask = (indices == i)
        pvalue_bins[i] = len(pvalues[mask])

    p_pvalue_bins = pvalue_bins / pvalue_bins.sum()

    df_tests_string = dataframe_to_string(pd.DataFrame({
        "p-value bin":  ["[0.00, 0.05)", "[0.05, 0.10)", "[0.10, 0.50)",
                         "[0.50, 1.00)"],
        "Count": pvalue_bins,
        "Count (%)": p_pvalue_bins,
        }), tab=4)

    tests_stats = (
        "  Significance tests (H0: actual == expected)\n\n"
        "{}\n").format(df_tests_string)

    print(tests_stats)


def print_target_report(df_target):
    df_target_string = dataframe_to_string(df_target, tab=4)

    target_stats = (
        "  Target analysis\n\n"
        "{}\n").format(df_target_string)

    print(target_stats)


def print_performance_report(df_performance):
    df_performance_string = dataframe_to_string(df_performance, tab=4)

    performance_stats = (
        "  Performance metrics\n\n"
        "{}\n".format(df_performance_string)
        )

    print(performance_stats)


def print_system_report(df_psi, df_tests, df_target_analysis, df_performance):

    print("-----------------------------------\n"
          "Monitoring: System Stability Report\n"
          "-----------------------------------\n")

    print_psi_report(df_psi)
    print_tests_report(df_tests)
    print_target_report(df_target_analysis)
    print_performance_report(df_performance)


class ScorecardMonitoring(BaseEstimator):
    """Scorecard monitoring.
    Parameters
    ----------
    scorecard : object
        A ``Scorecard`` fitted instance.
    psi_method : str, optional (default="cart")
        The binning method to compute the Population Stability Index (PSI).
        Supported methods are "cart" for a CART
        decision tree, "quantile" to generate prebins with approximately same
        frequency and "uniform" to generate prebins with equal width. Method
        "cart" uses `sklearn.tree.DecistionTreeClassifier
        <https://scikit-learn.org/stable/modules/generated/sklearn.tree.
        DecisionTreeClassifier.html>`_.
    psi_n_bins : int (default=20)
        The maximum number of bins to compute PSI.
    psi_min_bin_size : float (default=0.05)
        The fraction of mininum number of records for PSI bin.
    show_digits : int, optional (default=2)
        The number of significant digits of the bin column.
    verbose : bool (default=False)
        Enable verbose output.
    """
    def __init__(self, scorecard, psi_method="cart", psi_n_bins=20,
                 psi_min_bin_size=0.05, show_digits=2, verbose=False):

        self.scorecard = scorecard

        self.psi_method = psi_method
        self.psi_n_bins = psi_n_bins
        self.psi_min_bin_size = psi_min_bin_size

        self.show_digits = show_digits
        self.verbose = verbose

        # auxiliary data
        self._splits = None
        self._df_psi = None
        self._df_tests = None
        self._target_dtype = None
        self._n_records_a = None
        self._n_records_e = None
        self._metric_a = None
        self._metric_e = None

        # time
        self._time_total = None
        self._time_system = None
        self._time_variables = None

        # flags
        self._is_fitted = False

    def fit(self, X_actual, y_actual, X_expected, y_expected):
        """Fit monitoring with actual and expected data.
        Parameters
        ----------
        X_actual : pandas.DataFrame
            New/actual/test data input samples.
        y_actual : array-like of shape (n_samples,)
            Target vector relative to X actual.
        X_expected : pandas.DataFrame
            Trainning data used for fitting the scorecard.
        y_expected : array-like of shape (n_samples,)
            Target vector relative to X expected.
        Returns
        -------
        self : ScorecardMonitoring
            Fitted monitoring.
        """
        time_init = time.perf_counter()

        if self.verbose:
            logger.info("Monitoring started.")
            logger.info("Options: check parameters.")

        # Check parameters
        _check_parameters(**self.get_params(deep=False))

        # Check if scorecard is fitted
        self.scorecard._check_is_fitted()

        target_dtype = type_of_target(y_actual)
        target_dtype_e = type_of_target(y_expected)

        if target_dtype not in ("binary", "continuous"):
            raise ValueError("Target type (actual) {} is not supported."
                             .format(target_dtype))

        if target_dtype_e not in ("binary", "continuous"):
            raise ValueError("Target type (expected) {} is not supported."
                             .format(target_dtype_e))

        if target_dtype != target_dtype_e:
            raise ValueError("Target types must coincide; {} != {}."
                             .format(target_dtype, target_dtype_e))

        self._target_dtype = target_dtype

        # Check variable names
        if list(X_actual.columns) != list(X_expected.columns):
            raise ValueError("Dataframes X_actual and X_expected must "
                             "have the same columns.")

        # Statistics at system level
        if self.verbose:
            logger.info("System stability analysis started.")

        time_system = time.perf_counter()
        self._fit_system(X_actual, y_actual, X_expected, y_expected)
        self._time_system = time.perf_counter() - time_system

        if self.verbose:
            logger.info("System stability analysis terminated. Time: {:.4f}s"
                        .format(self._time_system))

        # Statistics at variable level
        if self.verbose:
            logger.info("Variable analysis started.")

        time_variable = time.perf_counter()
        self._fit_variables(X_actual, X_expected)
        self._time_variable = time.perf_counter() - time_variable

        if self.verbose:
            logger.info("Variable analysis terminated. Time: {:.4f}s"
                        .format(self._time_variable))

        self._time_total = time.perf_counter() - time_init

        if self.verbose:
            logger.info("Monitoring terminated. Time: {:.4f}s"
                        .format(self._time_total))

        # Completed successfully
        self._is_fitted = True

        return self

    def information(self, print_level=1):
        """Print overview information about the options settings and
        statistics.
        Parameters
        ----------
        print_level : int (default=1)
            Level of details.
        """
        self._check_is_fitted()

        if not isinstance(print_level, numbers.Integral) or print_level < 0:
            raise ValueError("print_level must be an integer >= 0; got {}."
                             .format(print_level))

        n_vars = np.count_nonzero(self.scorecard.binning_process_._support)
        dict_user_options = self.get_params(deep=False)

        print_monitoring_information(print_level, self._n_records_a.sum(),
                                     self._n_records_e.sum(), n_vars,
                                     self._target_dtype, self._time_total,
                                     self._time_system,
                                     self._time_variable,
                                     dict_user_options)

    def system_stability_report(self):
        """Print overview information and statistics about system stability.
        It includes qualitative suggestions regarding the necessity of
        scorecard updates.
        """
        self._check_is_fitted()

        print_system_report(self._df_psi, self._df_tests,
                            self._df_target_analysis, self._df_performance)

    def psi_table(self):
        """System Population Stability Index (PSI) table.
        Returns
        -------
        psi_table : pandas.DataFrame
        """
        self._check_is_fitted()

        return self._df_psi

    def psi_variable_table(self, name=None, style="summary"):
        """Population Stability Index (PSI) at variable level.
        Parameters
        ----------
        name : str or None (default=None)
            The variable name. If name is None, a table with all variables
            is returned.
        style : str, optional (default="summary")
            Supported styles are "summary" and "detailed". Summary only
            includes the total PSI for each variable. Detailed includes the
            PSI for each variable at bin level.
        Returns
        -------
        psi_table : pandas.DataFrame
        """
        self._check_is_fitted()

        if style not in ("summary", "detailed"):
            raise ValueError('Invalid value for style. Allowed string '
                             'values are "summary" and "detailed".')

        if name is not None:
            variables = self.scorecard.binning_process_.get_support(names=True)

            if name not in variables:
                raise ValueError("name {} does not match a binned variable "
                                 "included in the provided scorecard."
                                 .format(name))

            dv = self._df_psi_variable[self._df_psi_variable.Variable == name]

            if style == "summary":
                return dv.groupby(["Variable"])["PSI"].sum()
            else:
                return dv

        if style == "summary":
            return pd.DataFrame(
                self._df_psi_variable.groupby(['Variable'])['PSI'].sum()
                ).reset_index()
        elif style == "detailed":
            return self._df_psi_variable

    def tests_table(self):
        """Compute statistical tests to determine if event rate (Chi-square
        test - binary target) or mean (Student's t-test - continuous target)
        are significantly different. Null hypothesis (actual == expected).
        Returns
        -------
        tests_table : pandas.DataFrame
        """
        self._check_is_fitted()

        return self._df_tests

    def psi_plot(self, savefig=None):
        """Plot Population Stability Index (PSI).
        Parameters
        ----------
        savefig : str or None (default=None)
            Path to save the plot figure.
        """
        self._check_is_fitted()

        fig, ax1 = plt.subplots()

        n_bins = len(self._n_records_a)
        indices = np.arange(n_bins)
        width = np.min(np.diff(indices))/3

        p_records_a = self._n_records_a / self._n_records_a.sum() * 100.0
        p_records_e = self._n_records_e / self._n_records_e.sum() * 100.0

        p1 = ax1.bar(indices-width, p_records_a, width, color='tab:red',
                     label="Records Actual", alpha=0.75)
        p2 = ax1.bar(indices, p_records_e, width, color='tab:blue',
                     label="Records Expected", alpha=0.75)

        handles = [p1[0], p2[0]]
        labels = ['Actual', 'Expected']

        ax1.set_xlabel("Bin ID", fontsize=12)
        ax1.set_ylabel("Population distribution", fontsize=13)
        ax1.yaxis.set_major_formatter(mtick.PercentFormatter())

        ax2 = ax1.twinx()

        if self._target_dtype == "binary":
            metric_label = "Event rate"
        elif self._target_dtype == "continuous":
            metric_label = "Mean"

        ax2.plot(indices, self._metric_a, linestyle="solid", marker="o",
                 color='tab:red')
        ax2.plot(indices, self._metric_e,  linestyle="solid", marker="o",
                 color='tab:blue')

        ax2.set_ylabel(metric_label, fontsize=13)
        ax2.xaxis.set_major_locator(mtick.MultipleLocator(1))

        ax2.set_xlim(-width * 2, n_bins - width * 2)

        plt.legend(handles, labels, loc="upper center",
                   bbox_to_anchor=(0.5, -0.2), ncol=2, fontsize=12)

        if savefig is None:
            plt.show()
        else:
            if not isinstance(savefig, str):
                raise TypeError("savefig must be a string path; got {}."
                                .format(savefig))
            plt.savefig(savefig)
            plt.close()

    def _fit_system(self, X_actual, y_actual, X_expected, y_expected):
        if self._target_dtype == "binary":
            problem_type = "classification"
        else:
            problem_type = "regression"

        score_actual = self.scorecard.score(X_actual)
        score_expected = self.scorecard.score(X_expected)

        prebinning = PreBinning(problem_type=problem_type,
                                method=self.psi_method,
                                n_bins=self.psi_n_bins,
                                min_bin_size=self.psi_min_bin_size
                                ).fit(score_expected, y_expected)

        splits = prebinning.splits
        n_bins = len(splits) + 1

        # Compute basic metrics
        indices_a = np.digitize(score_actual, splits, right=True)
        indices_e = np.digitize(score_expected, splits, right=True)

        if self._target_dtype == "binary":
            n_nonevent_a = np.empty(n_bins).astype(np.int64)
            n_event_a = np.empty(n_bins).astype(np.int64)
            n_nonevent_e = np.empty(n_bins).astype(np.int64)
            n_event_e = np.empty(n_bins).astype(np.int64)

            y0_a = (y_actual == 0)
            y1_a = ~ y0_a

            y0_e = (y_expected == 0)
            y1_e = ~ y0_e

            for i in range(n_bins):
                mask_a = (indices_a == i)
                n_nonevent_a[i] = np.count_nonzero(y0_a & mask_a)
                n_event_a[i] = np.count_nonzero(y1_a & mask_a)

                mask_e = (indices_e == i)
                n_nonevent_e[i] = np.count_nonzero(y0_e & mask_e)
                n_event_e[i] = np.count_nonzero(y1_e & mask_e)

            n_records_a = n_nonevent_a + n_event_a
            n_records_e = n_nonevent_e + n_event_e
        else:
            n_records_a = np.empty(n_bins).astype(np.int64)
            n_records_e = np.empty(n_bins).astype(np.int64)
            mean_a = np.empty(n_bins)
            mean_e = np.empty(n_bins)
            std_a = np.empty(n_bins)
            std_e = np.empty(n_bins)

            for i in range(n_bins):
                mask_a = (indices_a == i)
                n_records_a[i] = np.count_nonzero(mask_a)
                mean_a[i] = y_actual[mask_a].mean()
                std_a[i] = y_actual[mask_a].std()

                mask_e = (indices_e == i)
                n_records_e[i] = np.count_nonzero(mask_e)
                mean_e[i] = y_expected[mask_e].mean()
                std_e[i] = y_expected[mask_e].std()

        bins = np.concatenate([[-np.inf], splits, [np.inf]])
        bin_str = bin_str_format(bins, self.show_digits)

        # Target analysis
        if self._target_dtype == "binary":
            self._system_target_binary(n_records_a, n_event_a, n_nonevent_a,
                                       n_records_e, n_event_e, n_nonevent_e)
        else:
            self._system_target_continuous(y_actual, y_expected)

        # Population Stability Information (PSI)
        self._system_psi(bin_str, n_records_a, n_records_e)

        # Significance tests
        if self._target_dtype == "binary":
            self._system_tests_binary(
                bin_str, n_records_a, n_event_a, n_nonevent_a,
                n_records_e, n_event_e, n_nonevent_e)
        else:
            self._system_tests_continuous(
                bin_str, n_records_a, mean_a, std_a,
                n_records_e, mean_e, std_e)

        # Performance analysis
        if self._target_dtype == "binary":
            self._system_performance_binary(
                X_actual, y_actual, X_expected, y_expected)
        else:
            self._system_performance_continuous(
                X_actual, y_actual, X_expected, y_expected)

        self._splits = splits
        self._n_records_a = n_records_a
        self._n_records_e = n_records_e

    def _system_psi(self, bin_str, n_records_a, n_records_e):
        t_n_records_a = n_records_a.sum()
        t_n_records_e = n_records_e.sum()
        p_records_a = n_records_a / t_n_records_a
        p_records_e = n_records_e / t_n_records_e

        psi = jeffrey(p_records_a, p_records_e, return_sum=False)

        df_psi = pd.DataFrame({
            "Bin": bin_str,
            "Count A": n_records_a,
            "Count E": n_records_e,
            "Count A (%)": p_records_a,
            "Count E (%)": p_records_e,
            "PSI": psi
            })

        totals = ["", t_n_records_a, t_n_records_e, 1, 1, psi.sum()]
        df_psi.loc["Totals"] = totals

        self._df_psi = df_psi

    def _system_tests_binary(self, bin_str, n_records_a, n_event_a,
                             n_nonevent_a, n_records_e, n_event_e,
                             n_nonevent_e):
        t_statistics = []
        p_values = []

        n_bins = len(bin_str)
        event_rate_a = n_event_a / n_records_a
        event_rate_e = n_event_e / n_records_e

        self._metric_a = event_rate_a
        self._metric_e = event_rate_e

        for i in range(n_bins):
            obs = np.array([
                [n_nonevent_a[i], n_nonevent_e[i]],
                [n_event_a[i], n_event_e[i]]])

            t, p, _, _ = stats.chi2_contingency(obs, correction=False)

            t_statistics.append(t)
            p_values.append(p)

        df_tests = pd.DataFrame({
            "Bin": bin_str,
            "Count A": n_records_a,
            "Count E": n_records_e,
            "Event rate A": event_rate_a,
            "Event rate E": event_rate_e,
            "statistic": t_statistics,
            "p-value": p_values
            })

        self._df_tests = df_tests

    def _system_tests_continuous(self, bin_str, n_records_a, mean_a, std_a,
                                 n_records_e, mean_e, std_e):

        self._metric_a = mean_a
        self._metric_e = mean_e

        t_statistics = []
        p_values = []

        n_bins = len(bin_str)
        for i in range(n_bins):
            t, p = stats.ttest_ind_from_stats(
                mean_a[i], std_a[i], n_records_a[i],
                mean_e[i], std_e[i], n_records_e[i], False)

            t_statistics.append(t)
            p_values.append(p)

        df_tests = pd.DataFrame({
            "Bin": bin_str,
            "Count A": n_records_a,
            "Count E": n_records_e,
            "Mean A": mean_a,
            "Mean E": mean_e,
            "Std A": std_a,
            "Std E": std_e,
            "statistic": t_statistics,
            "p-value": p_values
            })

        self._df_tests = df_tests

    def _system_target_binary(self, n_records_a, n_event_a, n_nonevent_a,
                              n_records_e, n_event_e, n_nonevent_e):

        t_n_records_a = n_records_a.sum()
        t_n_event_a = n_event_a.sum()
        t_n_nonevent_a = n_nonevent_a.sum()

        t_n_records_e = n_records_e.sum()
        t_n_event_e = n_event_e.sum()
        t_n_nonevent_e = n_nonevent_e.sum()

        event_rate_a = t_n_event_a / t_n_records_a
        event_rate_e = t_n_event_e / t_n_records_e

        df_target = pd.DataFrame({
            "Metric": ["Number of records", "Event records",
                       "Non-event records"],
            "Actual": [t_n_records_a, t_n_event_a, t_n_nonevent_a],
            "Actual (%)": ["-", event_rate_a, 1 - event_rate_a],
            "Expected": [t_n_records_e, t_n_event_e, t_n_nonevent_e],
            "Expected (%)": ["-", event_rate_e, 1 - event_rate_e]
            })

        self._df_target_analysis = df_target

    def _system_target_continuous(self, y_actual, y_expected):

        mean_a = y_actual.mean()
        mean_e = y_expected.mean()
        std_a = y_actual.std()
        std_e = y_expected.std()

        p25_a, median_a, p75_a = np.percentile(y_actual, [25, 50, 75])
        p25_e, median_e, p75_e = np.percentile(y_expected, [25, 50, 75])

        df_target = pd.DataFrame({
            "Metric": ["Mean", "Std", "p25", "Median", "p75"],
            "Actual": [mean_a, std_a, p25_a, median_a, p75_a],
            "Expected": [mean_e, std_e, p25_e, median_e, p75_e]
            })

        self._df_target_analysis = df_target

    def _system_performance_binary(self, X_actual, y_actual, X_expected,
                                   y_expected):
        # Metrics derived from confusion matrix
        y_true_a = y_actual
        y_pred_a = self.scorecard.predict(X_actual)
        d_metrics_a = imbalanced_classification_metrics(y_true_a, y_pred_a)

        y_true_e = y_expected
        y_pred_e = self.scorecard.predict(X_expected)
        d_metrics_e = imbalanced_classification_metrics(y_true_e, y_pred_e)

        metric_names = list(d_metrics_a.keys())
        metrics_a = list(d_metrics_a.values())
        metrics_e = list(d_metrics_e.values())

        # Gini
        y_pred_proba_a = self.scorecard.predict_proba(X_actual)[:, 1]
        gini_a = gini(y_true_a, y_pred_proba_a)

        y_pred_proba_e = self.scorecard.predict_proba(X_expected)[:, 1]
        gini_e = gini(y_true_e, y_pred_proba_e)

        metric_names.append("Gini")
        metrics_a.append(gini_a)
        metrics_e.append(gini_e)

        diff = np.array(metrics_a) - np.array(metrics_e)

        df_performance = pd.DataFrame({
            "Metric": metric_names,
            "Actual": metrics_a,
            "Expected": metrics_e,
            "Diff A - E": diff,
            })

        self._df_performance = df_performance

    def _system_performance_continuous(self, X_actual, y_actual, X_expected,
                                       y_expected):
        y_true_a = y_actual
        y_pred_a = self.scorecard.predict(X_actual)
        d_metrics_a = regression_metrics(y_true_a, y_pred_a)

        y_true_e = y_expected
        y_pred_e = self.scorecard.predict(X_expected)
        d_metrics_e = regression_metrics(y_true_e, y_pred_e)

        metric_names = list(d_metrics_a.keys())
        metrics_a = list(d_metrics_a.values())
        metrics_e = list(d_metrics_e.values())

        diff = np.array(metrics_a) - np.array(metrics_e)

        df_performance = pd.DataFrame({
            "Metric": metric_names,
            "Actual": metrics_a,
            "Expected": metrics_e,
            "Diff A - E": diff,
            })

        self._df_performance = df_performance

    def _fit_variables(self, X_actual, X_expected):
        variables = self.scorecard.binning_process_.get_support(names=True)
        sc_table = self.scorecard.table()

        l_df_psi = []

        for name in variables:
            optb = self.scorecard.binning_process_.get_binned_variable(name)
            ta = optb.transform(X_actual[name], metric="indices")
            te = optb.transform(X_expected[name], metric="indices")

            n_bins = te.max() + 1

            n_records_a = np.empty(n_bins).astype(np.int64)
            n_records_e = np.empty(n_bins).astype(np.int64)

            for i in range(n_bins):
                n_records_a[i] = np.count_nonzero(ta == i)
                n_records_e[i] = np.count_nonzero(te == i)

            t_n_records_a = n_records_a.sum()
            t_n_records_e = n_records_e.sum()
            p_records_a = n_records_a / t_n_records_a
            p_records_e = n_records_e / t_n_records_e

            psi = jeffrey(p_records_a, p_records_e, return_sum=False)
            bins = sc_table[sc_table.Variable == name]["Bin"].values[:n_bins]

            df_psi = pd.DataFrame({
                "Variable": [name] * n_bins,
                "Bin": bins,
                "Count A": n_records_a,
                "Count E": n_records_e,
                "Count A (%)": p_records_a,
                "Count E (%)": p_records_e,
                "PSI": psi
                })

            l_df_psi.append(df_psi)

        self._df_psi_variable = pd.concat(l_df_psi)
        self._df_psi_variable.reset_index()

    def _check_is_fitted(self):
        if not self._is_fitted:
            raise NotFittedError("This {} instance is not fitted yet. Call "
                                 "'fit' with appropriate arguments."
                                 .format(self.__class__.__name__))

    @property
    def psi_splits(self):
        """List of splits points used to compute system PSI.
        Returns
        -------
        splits : numpy.ndarray
        """
        self._check_is_fitted()

        return self._splits

In [None]:
# main.py -- old version

# # # Database
# # import oracledb
# # import sqlite3
# # import sqlalchemy
# # # Data manipulation
# # import pandas as pd
# # from helper_functions import col_dropper, fix_dtypes, DataFrameImputer, AutoPrepareScoreCard
# # import numpy as np
# # from optbinning import BinningProcess
# # from optbinning import Scorecard
# # from optbinning.scorecard import ScorecardMonitoring
# #
# # import joblib
# #
# # con = sqlite3.connect('credit_data.db')
# # cur = con.cursor()
# #
# #
# # def read_pipeline(path):  # __REDO - for different model formats
# #     pipeline = joblib.load(path)
# #     return pipeline
# #
# #
# # estimator = read_pipeline('clf_pipeline.pkl')
# #
# #
# #
# # def list_all_table():
# #     query = """SELECT name FROM sqlite_master WHERE type='table';"""
# #     print(pd.read_sql(query, con, index_col=None))
# #     return None
# #
# #
# # query_inc = """SELECT *
# # FROM test_set
# # """
# # test_set = pd.read_sql(query_inc, con, index_col=None)
# #
# # scorecard = Scorecard()
#
#
# import datetime
# import dash_bootstrap_components as dbc
# from dash import html, Dash
#
# app = Dash(__name__, external_stylesheets=[dbc.themes.CYBORG])
#
# # app.layout = html.H1('The time is: ' + str(datetime.datetime.now()))
# #
# # if __name__ == '__main__':
# #     app.run_server(debug=True)
#
# def serve_layout():
#     return html.H1('The time is: ' + str(datetime.datetime.now()))
#
# app.layout = serve_layout
#
# if __name__ == '__main__':
#     app.run_server(debug=True)

In [None]:
# dashboard.py --old version

from dash import Dash, dcc, html, Input, Output
import dash_bootstrap_components as dbc
import plotly.express as px

from plotting_functions import plot_hist_dist, plot_feature_importances, create_card, psi_plot_ly, plot_auc_roc, plot_ks, psi_variable_plot
from helper_functions import col_dropper, fix_dtypes, DataFrameImputer, AutoPrepareScoreCard
# import plotly.graph_objects as go
import pandas as pd
# from datetime import date
import sqlite3
from sklearn.metrics import roc_curve, roc_auc_score, log_loss
from sklearn.model_selection import train_test_split
import json
import helper_functions
import joblib

from optbinning import BinningProcess
from optbinning.scorecard import ScorecardMonitoring

import credit_py_validation as cpv

from plotly.tools import mpl_to_plotly

# Global variables
LOGO_PATH = 'assets/halyk_logo.png'

# Database connection
con = sqlite3.connect('new_credit_data.db')
cur = con.cursor()
query_X = """SELECT *
FROM X_train_log
"""
X_train_log = pd.read_sql(query_X, con, index_col=None)

query_y = """SELECT *
FROM y_train_log
"""
y_train_log = pd.read_sql(query_y, con, index_col=None)
X_train, X_test, y_train, y_test = train_test_split(X_train_log, y_train_log, test_size=0.2, random_state=42)
def do_calculations():
    # Read binning fit params
    with open('binning_fit_params.json') as json_file:
        binning_fit_params = json.load(json_file)

    # Loading the model
    lr_model = joblib.load('lr_model.pkl')

    # selection_criteria = {
    #     "iv": {"min": 0.02, "max": 1},
    #     "quality_score": {"min": 0.01}
    # }
    variable_names = list(X_train.columns)

    binning_process = BinningProcess(variable_names,  # selection_criteria=selection_criteria,
                                     binning_fit_params=binning_fit_params)

    scorecard = helper_functions.AutoPrepareScoreCard(binning_process=binning_process,
                                                      estimator=lr_model, scaling_method="min_max",
                                                      scaling_method_params={"min": 300, "max": 850}, verbose=True)

    scorecard.fit(X_train, y_train.values.ravel())

    monitoring = ScorecardMonitoring(scorecard=scorecard, psi_method="cart",
                                     psi_n_bins=10, verbose=True)

    monitoring.fit(X_test, y_test.values.ravel(), X_train, y_train.values.ravel())

    monitoring.psi_table()

    # Calculate train
    pd_X_train = scorecard.predict_proba(X_train)[:, 1]
    default_flag_X_train = scorecard.predict(X_train)
    score_X_train = scorecard.score(X_train)
    rating_X_train = scorecard.get_credit_ratings(score_X_train)

    X_train['Default Probability'] = pd_X_train
    X_train['Credit Rating'] = rating_X_train
    X_train['Default Flag'] = default_flag_X_train

    # Calculate test
    pd_X_test = scorecard.predict_proba(X_test)[:, 1]
    default_flag_X_test = scorecard.predict(X_test)
    score_X_test = scorecard.score(X_test)
    rating_X_test = scorecard.get_credit_ratings(score_X_test)

    X_test['Default Probability'] = pd_X_test
    X_test['Credit Rating'] = rating_X_test
    X_test['Default Flag'] = default_flag_X_test

    incoming_batch_results = helper_functions.conduct_tests(X_test)
    test_results = helper_functions.conduct_tests(X_train)
    # card_tests = ['log_loss', 'brier', 'roc_auc', 'ber']
    return incoming_batch_results, test_results, scorecard, monitoring

# # plot_auc_roc(y_train, scorecard.predict_proba(X_train)[:, 0])))
# def get_change(current, previous):
#     if current == previous:
#         return 0
#     try:
#         return (abs(current - previous) / previous) * 100.0
#     except ZeroDivisionError:
#         return float('inf')
# tests_percent_change = {k + '_change': get_change(test_results[k], incoming_batch_results[k]) for k in card_tests}



app = Dash(__name__, external_stylesheets=[dbc.themes.CYBORG])

# app.layout = html.Div([
#     html.H2('Welcome to HB Credit Risk Model Monitoring System!'),
#     html.H3('Please, choose a model from the dropdown menu below: '),
#     # html.Div(dcc.dropdown)
#     html.Button(id='calculate-button-state', n_clicks=0, children='Submit')
# ])

###### RE-WORK - initilize arrays and values for empty DASHBOARD!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!

incoming_batch_results, test_results, scorecard, monitoring = do_calculations() # CHANGE TO PRODUCE PLACEHOLDERS

# @app.callback(
#     Output('graph-distributions', 'figure'),
#     Input('dropdown', 'value')
# )
# def update_layout(value):
#     fig = plot_hist_dist(train[value], test[value])
#     return fig


app.layout = html.Div([
    # date_range(),
    html.Br(),
    html.Div(id='output-container-date-picker-range'),
    dbc.Row([html.Img(src=LOGO_PATH, style={'height': '10%', 'width': '10%'})]),
    html.H2('Welcome to HB Credit Risk Model Monitoring System!'),
    html.Br(),

    dcc.Tabs([
        dcc.Tab(label='Assess accuracy', children=[
            dbc.Row(
                [dbc.Col(create_card(incoming_batch_results['log_loss'], '1', 'Log Loss',
                                     f'Initial: {test_results["log_loss"]:.3f}')),
                 dbc.Col(
                     create_card(incoming_batch_results['brier'], '2', 'Brier',
                                 f'Initial: {test_results["brier"]:.3f}')),
                 dbc.Col(
                     create_card(incoming_batch_results['roc_auc'], '3', 'AUROC',
                                 f'Initial: {test_results["roc_auc"]:.3f}')),
                 dbc.Col(create_card(incoming_batch_results['ber'], '4', 'BER', f'Initial: {test_results["ber"]:.3f}')),
                 dbc.Col(create_card(11, '5', 'K-S', 12))
                 ]),

            dbc.Row([html.Div(dcc.Dropdown(X_test.columns, 'age', id='dropdown'),
                              style={'width': '50%'}
                              ),
                     html.Div(dcc.Dropdown(X_test.columns, 'age', id='dropdown_2'),
                              style={'width': '50%'}
                              )
                     ]
                    ),
            dbc.Row([
                dbc.Col(
                    dcc.Graph(id='graph-with-slider', figure=psi_variable_plot(monitoring)
                              )
                ),
                dbc.Col(
                    dcc.Graph(id='graph-distributions',
                              figure=plot_ks(y_train.values.ravel(), scorecard.predict_proba(X_train)[:, 0])
                              )
                )
            ])
        ]),

        dcc.Tab(label='Assess stability', children=[
            dbc.Row(html.Br())
        ])
    ]),


    html.Br(),
    html.Br(),
    html.Div(html.Button('Generate report', id='generate-report-button', n_clicks=0))
])

if __name__ == '__main__':
    app.run_server(debug=True)


# @app.callback(
#     Output('graph-distributions', 'figure'),
#     Input('dropdown', 'value')
# )
# def update_hist_dist(value):
#     fig = plot_hist_dist(train[value], test[value])
#     return fig


# fig_roc_auc = plot_auc_roc(y_train, scorecard.predict_proba(X_train)[:, 0])




In [None]:
# summary.py