In [None]:
%load_ext autoreload
%autoreload 2

In [None]:
%load_ext dotenv
%dotenv

In [None]:
import os
import math
from typing import List, Tuple
from logging import getLogger, StreamHandler, INFO

import pandas as pd
from sklearn.tree import DecisionTreeRegressor
import seaborn as sns
import pygwalker as pyg

In [None]:
logger = getLogger(__name__)
logger.setLevel(INFO)
logger.addHandler(StreamHandler())
logger.propagate = False

## environment variables

In [None]:
DEFAULT_INPUT_FILE = 'example_data.csv'
MIN_COMP_RATIO = 0.05

In [None]:
INPUT_DATA_DIR = os.environ.get('INPUT_DATA_DIR')
OUTPUT_DATA_DIR = os.environ.get('OUTPUT_DATA_DIR')
assert INPUT_DATA_DIR is not None
assert OUTPUT_DATA_DIR is not None

## functions

In [None]:
def load_data(file_name: str=DEFAULT_INPUT_FILE):
    f = os.path.join(INPUT_DATA_DIR, file_name)
    return pd.read_csv(f)

In [None]:
def round_nice(numbers: List[float], data_min: float, data_max: float, n_digits: int=1) -> List[float]:
    rounded_numbers = list()
    for number in numbers:
        exponent = math.floor(math.log10(number))
        base = 10 ** exponent
        factor = number / base

        # Determine if it is close to 1, 2, or 5
        if factor < 1.5:
            rounded_numbers.append(1 * base)
        elif factor < 3.5:
            rounded_numbers.append(2 * base)
        else:
            rounded_numbers.append(5 * base)

    rounded_numbers = sorted(list(set(rounded_numbers)))

    # if min value in original numbers is less than min of rounded numbers, then add a smaller number
    if data_min < min(rounded_numbers):
        rounded_numbers.insert(0, data_min)
    # if max value in original numbers is greater than max of rounded numbers, then add a larger number
    if data_max > max(rounded_numbers):
        rounded_numbers.append(data_max)

    return rounded_numbers

In [None]:
def calculate_thresholds(df: pd.DataFrame, col: str, nice_round: bool) -> List[float]:
    n = df[col].count()
    k = int(1 + math.log2(n))  # Sturges' formula

    unique_values = df[col].unique().tolist()
    if len(unique_values) <= 1:
        return list()

    if len(unique_values) <= k:
        cut_points = sorted(unique_values)
    else:
        bins = pd.qcut(df[col], q=k, duplicates='drop')
        cut_points = [df[col].min()] + [bins.cat.categories[i].right for i in range(len(bins.cat.categories))]

    if nice_round:
        # Round to the nearest 1, 2, or 5 multiples of one significant digit
        cut_points = round_nice(cut_points, data_min=min(df[col]), data_max=max(df[col]))

    return cut_points

In [None]:
def bin_records(df: pd.DataFrame, col: str, nice_round: bool) -> pd.DataFrame:
    """
    Merge adjacent records with adjacent values in col and the same predicted result
    """
    cut_points = calculate_thresholds(df, col, nice_round=nice_round)

    # Create a new column with the bin number
    df[f'bin_{col}'] = pd.cut(df[col], bins=cut_points, labels=None, include_lowest=True)
    df[f'bin_{col}_str'] = df[f'bin_{col}'].apply(lambda x: f'{x.left} < {col} <= {x.right}')

    return df

In [None]:
def rename_columns(df: pd.DataFrame) -> pd.DataFrame:
    df.columns = ['feature 1 name', 'feature 2 name', 'target', 'target pred', 'bin 1', 'feature 1 range', 'feature 1', 'bin 2', 'feature 2 range', 'feature 2']
    df = df[['feature 1', 'feature 2', 'feature 1 range', 'feature 2 range', 'target', 'target pred']]
    return df

## main

In [None]:
# main
df = load_data()

In [None]:
# sns.pairplot(df)

In [None]:
# put index to Details field
# pyg.walk(df.reset_index())

In [None]:
target_col = df.columns[0]
feature_cols = df.drop(target_col, axis=1).columns.tolist()
min_samples = math.ceil(len(df) * MIN_COMP_RATIO)

# featureから2つ選んでリストにする
feature_col_pairs = [[feature_cols[i], feature_cols[j]] for i in range(len(feature_cols)) for j in range(i+1, len(feature_cols))]

In [None]:
df_master = pd.DataFrame()
for feature_col_pair in feature_col_pairs:
    X = df[feature_col_pair]
    y = df[target_col]

    model = DecisionTreeRegressor(min_samples_leaf=min_samples, min_impurity_decrease=0)
    model.fit(X, y)
    y_pred = model.predict(X)

    df_pred = df[feature_col_pair].copy()
    df_pred[target_col] = y
    df_pred[f'{target_col}_pred'] = y_pred

    # assessment
    accuracy = model.score(X, y)
    logger.info(f'{feature_col_pair}, {accuracy}')
    df_pred.plot.scatter(x=feature_col_pair[0], y=feature_col_pair[1], c=f'{target_col}_pred', colormap='viridis')

    for i, feature_col in enumerate(feature_col_pair, 1):
        df_pred = bin_records(df_pred, feature_col, nice_round=False)
        df_pred[f'feature {i}'] = feature_col

    df_pred = rename_columns(df_pred)

    # display(df_pred)
    df_master = pd.concat([df_master, df_pred], axis=0).reset_index(drop=True)

df_master

In [None]:
# aggregate
df_agg = df_master.groupby(['feature 1 range', 'feature 2 range']).agg({'target': ['count', 'mean'], 'target pred': 'mean'}).reset_index()
df_agg.columns = ['feature 1 range', 'feature 2 range', 'target count', 'target mean', 'target pred mean']
df_agg = df_agg[df_agg['target count'] >= min_samples]
df_agg.sort_values(by=['target pred mean'], ascending=False, inplace=True)
df_agg