In [None]:
# System
import os
import datetime
from itertools import product

# Data Analysis
import re
import numpy as np
import pandas as pd

# Machine Learning
from sklearn.pipeline import make_pipeline
from sklearn.feature_selection import RFE
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import GridSearchCV,train_test_split
from sklearn.ensemble import GradientBoostingClassifier
from sklearn.neighbors import KNeighborsClassifier
from sklearn.decomposition import PCA
from sklearn.metrics import mean_squared_error
from sklearn.tree import DecisionTreeClassifier

In [None]:
class Preprocessor(object):
    """
    """
    
    def __init__(self,target,stage):
        try:
            assert stage in ['dev','test','prod']
        except:
            raise Exception('Unknown stage')
        self.stage = stage
        
        cwd = os.getcwd()
        
        self.target = target
        self.rename = {target: 'Target','key':'Key'}
        self.fields = [
            'ABV','Ave Rating','Min IBU',
            'Astringency','Body','Alcohol','Bitter',
            'Sweet','Sour','Salty','Fruits','Hoppy',
            'Spices','Malty'
        ]
        self.input_df = pd.read_csv(os.path.join(cwd,'beer_data_set.csv'))
        
        self.y = None
        self.X = None
        self.features = None
        
        self.scaler = None
        self.X_standardized = None
        
        self.selector = None
        self.estimator= None
        self.selected_features = None
        self.X_selected = None
        
        self.X_fit = None
        self.y_fit = None
        self.X_score = None
        self.y_score = None
        
        self._clean()
        self._standardize()
    
    def _clean(self):
        df = self.input_df
        df = df.rename(self.rename,axis=1)
        df = df.set_index('Key')
        df = df.loc[:,['Target']+self.fields]
        self.y = df.Target
        X = df.drop('Target',axis=1)
        
        combinations = []
        for c1 in self.fields:
            for c2 in self.fields:
                comb = [c1,c2]
                comb.sort()
                comb = (comb[0],comb[1])
                combinations.append(comb)
        combinations = set(combinations)
                
        extra = []
        for c in combinations:
            extra.append(pd.Series((X[c[0]]*X[c[1]]),name=(c[0]+' * '+c[1]),index=X.index))
            extra.append(pd.Series((X[c[0]]+X[c[1]]),name=(c[0]+' + '+c[1]),index=X.index))
                
        X = pd.concat([X]+extra,axis=1)
        self.X = X
        self.features = X.columns
    
    def _standardize(self):
        self.scaler = StandardScaler()
        self.scaler.fit(self.X)
        
        self.X_standardized = pd.DataFrame(
            self.scaler.transform(self.X)
            ,columns=self.features,index=self.X.index
        )
    
    def select(self,num_columns=50):
        self.estimator = DecisionTreeClassifier()
        self.selector = RFE(self.estimator, n_features_to_select=num_columns, step=1)
        self.selector.fit(self.X_standardized,self.y)
        self.selected_features = self.selector.get_feature_names_out(self.selector.feature_names_in_)
        self.X_selected = self.X_standardized[self.selected_features]
        
    def split(self):
        (self.X_fit,self.X_score,
        self.y_fit,self.y_score) = train_test_split(
            self.X_selected,self.y,
            test_size=0.1,random_state=42
        )
    
    def audit(self):
        None

In [None]:
class Model(object):
    """
    """
    def __init__(self,model_type,stage,X_fit,y_fit,X_score,y_score):
        try:
            assert model_type in ['gbc','knn']
        except:
            raise Exception('Unknown model type')
        self.model_type = model_type
        
        try:
            assert stage in ['dev','test','prod']
        except:
            raise Exception('Unknown stage')
        self.stage = stage
        
        
        if self.model_type == 'gbc':
            self.model = GradientBoostingClassifier()
            if self.stage == 'dev':
                self.parameters = {
                    'n_estimators': 1000,
                    'max_depth': 5,
                    'min_samples_split': 5,
                    'learning_rate': 0.3,
                    # 'loss': ['huber'],#'squared_error','absolute_error','quantile'],
                    'criterion': 'friedman_mse',#'squared_error','mae'],
                }
            elif self.stage in ['test','prod']:
                self.parameters = {
                    'n_estimators': [500*i for i in range(1,4)],
                    'max_depth': [2+i for i in range(5)],
                    'min_samples_split': [3+i for i in range(5)],
                    'learning_rate': [0.2+(0.1*i) for i in range(1,10)],
                    # 'loss': ['huber'],#'squared_error','absolute_error','quantile'],
                    'criterion': ['friedman_mse'],#'squared_error','mae'],
                }
        elif self.model_type == 'knn':
            self.model = KNeighborsClassifier()
            if self.stage == 'dev':
                self.parameters = {
                    'n_neighbors': 15,
                    'weights': 'uniform',
                    'algorithm': 'auto',
                    'leaf_size': 10,
                    'p': 1,
                    'metric': 'minkowski',
                }
            elif self.stage in ['test','prod']:
                self.parameters = {
                    'n_neighbors': [1+i for i in range(10)],
                    'weights': ['uniform'],#'distance'],
                    'algorithm': ['auto'],#'ball_tree','kd_tree'],
                    'leaf_size': [5+i for i in range(10)],
                    'p': [1],#2],
                    'metric': ['minkowski'],
                }
        
        self.X_fit = X_fit
        self.y_fit = y_fit
        self.X_score = X_score
        self.y_score = y_score
        
        self.grid = None
        self.corr = None
        self.mse = None
        
        self.final = None
        self.prediction = None
    
    def tune(self):
        if self.stage == 'dev':
            self.model.set_params(**self.parameters)
            self.model.fit(X=self.X_fit,y=self.y_fit)
        elif self.stage in ['test','prod']:
            self.grid = GridSearchCV(self.model,self.parameters)
            self.grid.fit(X=self.X_fit,y=self.y_fit)
            self.model = self.grid.best_estimator_
        
        self.corr = round(self.model.score(X=self.X_score,y=self.y_score),4)

    def refresh(self,X_fit,y_fit,X_score,y_score):
        self.X_fit = X_fit
        self.y_fit = y_fit
        self.X_score = X_score
        self.y_score = y_score
    
    def audit(self):
        None

In [None]:
stage = 'prod' # dev, test, prod

In [None]:
preprocessor = Preprocessor(target='Style',stage=stage)
preprocessor.select(num_columns=100)
preprocessor.split()

In [None]:
model_data = {
    'X_fit': preprocessor.X_fit,
    'y_fit': preprocessor.y_fit,
    'X_score': preprocessor.X_score,
    'y_score': preprocessor.y_score,
}

# gradient boosted regression
# start_gbc = datetime.datetime.now()
# gbc = Model('gbc',stage=stage,**model_data)
# gbc.tune()
# print('GBDT classifier correlation:',gbc.corr)
# print('Runtime:',round((datetime.datetime.now() - start_gbc).total_seconds(), 2))

# k nearest neighbors regression
start_knn = datetime.datetime.now()
knn = Model('knn',stage=stage,**model_data)
knn.tune()
print('KNN classifier correlation:',knn.corr)
print('Runtime:',round((datetime.datetime.now() - start_knn).total_seconds(), 2))

In [None]:
class Analysis(object):
    """
    """
    import matplotlib.pyplot as plt
    import seaborn as sns
    pd.set_option('display.max_columns', None)

    def __init__(self,train_df,test_df,features):
        self.train_df = train_df
        self.test_df = test_df
        self.features = features
        
    
    def eda(self):
        nulls = self.train_df.isna().sum()
        
        display(self.train_df[self.features['target']].describe())
        display(self.train_df[self.features['int']].describe())
        for column in self.features['cat']:
            values = self.train_df[column].unique()
            missing = nulls.loc[nulls.index==column].values[0]
            print(column)
            if missing>0:
                print('Missing values:',missing)
            print(len(values))
            print(values[:5])
            print('\n')
        display(self.train_df.groupby('MSZoning').mean().SalePrice)
        print('\n')
    
    def plot(self,version):
        if version == 'target':
            sns.histplot(self.train_df.Target)
        elif version == 'quality':
            data = self.train_df[['OverallQual','SalePrice']]
            plt.figure(figsize=(8,6))
            sns.boxplot(x='OverallQual',y='SalePrice',data=data)
        else:
            raise Exception('Unknown plot version')

In [None]:
analysis = Analysis(preprocessor.train_df,preprocessor.test_df,preprocessor.features)

In [None]:
analysis.plot('target')

In [None]:
analysis.eda()