# Pokémon competition

In this notebook you have to provide the best pipeline that you have found to predict Pokémon battles.

At the end you will have to generate a set of predictions over the unlabeled data `data.hidden` and `data_inverse.hidden`. In these unlabeled dataset you will find all the Pokémon battles that we will be performing in some *fictional* Pokémon competition, so we do not know the outcome of these battles right now!

Remember to use all the tools that we have seen in class to evaluate and fine-tune your pipeline.

*Gotta Predict 'Em All!*

Paste here your pipeline:

In [7]:
# global imports
from pathlib import Path
import pandas
import numpy as np
import json

In [8]:
"""Custom functions"""
# Get the list of opposite lables with the current ones
def add_opposite_labels(labels):
    rt = list(labels)
    for l in labels:
        rt.append(opposite_label(l))
    return rt

# Get the list of opposite lables
def get_opposite_labels(labels):
    return [
        opposite_label(l)
        for l in labels
    ]

# Get current labels and opposite ones, to iterate them separetly
def double_opposite_labels(labels):
    return labels, get_opposite_labels(labels)

# Get Opposite of label
def opposite_label(label):
    other = '__other'
    if label.endswith(other):
        return label[:-len(other)]
    return f'{label}{other}'

# Remove prefix from string
def remove_prefix(text, prefix):
    if text.startswith(prefix):
        return text[len(prefix):]
    return text

SEED = 42

In [9]:
"""Custom classes"""
from sklearn.impute import KNNImputer, SimpleImputer
from sklearn.base import BaseEstimator, TransformerMixin

predict_columns = [
    'HP',
    'Attack',
    'Defense',
    'Sp. Atk',
    'Sp. Def',
    'Speed',
    'Generation',
    'Legendary'
]
name_predict_columns = ['Name'] + predict_columns

columns_other = {
    opposite_label(col): col for col in name_predict_columns
}
name_columns_other = get_opposite_labels(name_predict_columns)

class DeduceFeaturesFromNameTransformer:
    # We are going to the deduce
    # information about these attributes:
    deduce_from_name = ['Type 1', 'Legendary', 'Generation']
    def __init__(self):
        self.fitted_data = {}
    
    def fit(self, X, y=None):
        for col in self.deduce_from_name:
            self.fit_column(X, col)
        return self
    
    def fit_column(self, X, column):
        name_type = X[add_opposite_labels(['Name', column])].dropna()
        column_data = {}
        for n, t in double_opposite_labels(['Name', column]):
            column_data.update(dict(zip(name_type[n],name_type[t])))

        to_add = {}
        for k, v in column_data.items():
            k = remove_prefix(k, 'Mega ')
            if k not in column_data:
                to_add[k] = v
        column_data.update(to_add)
        self.fitted_data[column] = column_data

    def transform_column(self, X, column):
        for n, t in double_opposite_labels(['Name', column]):
            filler = X.apply(lambda row: self.fitted_data[column].get(row[n], None), axis=1)
            X[t] = X[t].fillna(filler)

    def transform(self, X, y=None):
        for col in self.deduce_from_name:
            self.transform_column(X, col)
        return X


class FillEmptyNonPredictableTransformer:
    def fit(self, X, y=None):
        return self
    
    def transform(self, X, y=None):
        for col in add_opposite_labels(['Name', 'Type 2', 'Type 1']):
            X[col] = X[col].fillna(value='')
        return X


class BasePokemonDataCorrectorTransformer:

    def __init__(self, attr):
        self.trained_attrs = {}
        self.attr_to_process = attr #['Name'].extend(attr)

    '''
    This function should generate and return all values needed to store to
    properly proccess a correction in the transform step.
    Recieves the selected attr column data
    '''
    def generate_correction_data(self, data):
        return NotImplementedError

    '''
    This function should determine and if needed correct a value from the
    current attr being looked using the correction data generated before.
    Recieves the value being processed and the correction data needed
    '''
    def process_correction(self, value, data):
        return NotImplementedError

    '''
    Joins both combating pokemons into a unique set in order to proccess all 
    attributes of a known pokemon at the same time
    '''
    def combine_pokemons(self, X, y=None):
        X = X[add_opposite_labels(self.attr_to_process)]
        first_half = X[self.attr_to_process]
        second_half = X[get_opposite_labels(self.attr_to_process)]
        second_half = second_half.rename(columns=columns_other)
        return pandas.concat([first_half, second_half])

    def fit(self, X, y=None):
        unified_pokemons = self.combine_pokemons(X, y)
        for name in unified_pokemons['Name'].unique():
            selected_pokemon = unified_pokemons[unified_pokemons['Name'] == name]
            self.trained_attrs[name] = {}
            for attr in self.attr_to_process[1:]:
                selected_attr = selected_pokemon[selected_pokemon[attr].notna()][attr]
                if len(selected_attr.index) == 0:
                    continue
                self.trained_attrs[name][attr] = self.generate_correction_data(selected_attr)
        return self

    def transform(self, X, y=None):
        def correct_attr(row, attr):
            name = row['Name']
            attr_value = row[attr]
            if name not in self.trained_attrs or attr not in self.trained_attrs[name]:
                return attr_value
            return self.process_correction(attr_value, self.trained_attrs[name][attr])

        for attr in self.attr_to_process[1:]:
            X[attr] = X.apply(lambda row: correct_attr(row, attr), axis=1)
        return X


class CategoricalConsensusTransformer(BasePokemonDataCorrectorTransformer):
    # Overrides not implemented function
    def generate_correction_data(self, data):
        return data.mode()[0]

    # Overrides not implemented function
    def process_correction(self, value, data):
        def is_wrong(value, mode):
            return value == None or value != mode

        mode = data
        if not is_wrong(value, mode):
            return value
        return mode


class NumericConsensusTransformer(BasePokemonDataCorrectorTransformer):
    # Overrides not implemented function
    def generate_correction_data(self, data):
        q1 = data.quantile(0.25)
        mean = data.mean()
        q3 = data.quantile(0.75)
        iqr = q3 - q1
        max = 1.5 * iqr + q3
        min = q1 - 1.5 * iqr
        return (min, mean, max)

    # Overrides not implemented function
    def process_correction(self, value, data):
        def is_wrong(value, min, max):
            return np.isnan(value) or value > max or value < min

        min, mean, max = data
        if not is_wrong(value, min, max):
            return value
        return mean


class BothSidesTransformer:
    def __init__(self, imputer):
        self.imputer = imputer
    
    def get_halfs(self, X):
        first_half = X[predict_columns]
        second_half = X[get_opposite_labels(predict_columns)]
        second_half = second_half.rename(columns=columns_other)
        return first_half, second_half

    def fit(self, X, y=None):
        first_half, second_half = self.get_halfs(X)
        self.imputer.fit(pandas.concat([first_half, second_half]))
        return self
    
    def transform(self, X, y=None):
        first_half, second_half = self.get_halfs(X)
        first_half_predict = self.imputer.transform(first_half)
        second_half_predict = self.imputer.transform(second_half)
        X[predict_columns] = first_half_predict
        X[get_opposite_labels(predict_columns)] = second_half_predict
        return X


class KNNImputerBothSidesTransformer(BothSidesTransformer):
    def __init__(self):
        super().__init__(KNNImputer(n_neighbors=2, weights='distance'))

class SimpleImputterBothSidesTransformer(BothSidesTransformer):
    def __init__(self):
        super().__init__(SimpleImputer(strategy='mean'))


class AddPokemonRatiosTransformer:
    def __init__(self):
        with open('type-chart.json') as f:
            self.type_matrix = json.load(f)

    def map_row_attack_multipler(self, row, opposite):
        all_type_row = ['Type 1', 'Type 2']
        if opposite:
            type_row = get_opposite_labels(all_type_row)
        else:
            type_row = all_type_row
        best_attack_multiplier = 0.25
        at_least_one_type = False
        for attack_type in type_row:
            attack_type = row[attack_type].lower()
            if not attack_type: continue
            multiplier = 1
            for defense_type in all_type_row:
                enemy_defense = defense_type if opposite else opposite_label(defense_type)
                defense_type = row[enemy_defense].lower()
                if not defense_type: continue
                multiplier *= self.type_matrix[attack_type][defense_type]
                at_least_one_type = True
            best_attack_multiplier = max(best_attack_multiplier, multiplier)
        if not at_least_one_type:
            return 1
        return best_attack_multiplier

    def fit(self, X, y=None):
        return self
    
    def transform(self, X, y=None):
        clean_ratios = X[add_opposite_labels(['Attack', 'Defense', 'Sp. Atk', 'Sp. Def'])]
        clean_ratios = X.replace(0, 1)
        tmp_rows = {}
        for side in [True, False]:
            ot_label = '', '__other'
            if not side:
                ot_label = ot_label[::-1]
            first_label, second_label = ot_label
            at_def_ratio = X[f'Attack{first_label}'] / clean_ratios[f'Defense{second_label}']
            sp_at_def_ratio = X[f'Sp. Atk{first_label}'] / clean_ratios[f'Sp. Def{second_label}']
            best_ratio = pandas.concat([at_def_ratio, sp_at_def_ratio], axis=1).max(axis=1)
            X[f'best_attack_ratio{first_label}'] = best_ratio

            # Is the pokemon a mega evolution?
            tmp_rows[f'is_mega{first_label}'] = X[f'Name{first_label}'].str.contains('Mega ', regex=False).astype(int)
            stat_sum = X[[f'HP{first_label}', f'Attack{first_label}' , f'Defense{first_label}' , f'Sp. Atk{first_label}' ,f'Sp. Def{first_label}' ,f'Speed{first_label}']].sum(axis=1)
            # The sum of stats is a VERY good idicator of how strong a pokemon is
            tmp_rows[f'sum_stats{first_label}'] = stat_sum

        # Get the best attack multiplier based on types
        X['attack_multiplier'] = X.apply(lambda row: self.map_row_attack_multipler(row, opposite=False),axis=1)
        # Repeat for the other side
        X[opposite_label('attack_multiplier')] = X.apply(lambda row: self.map_row_attack_multipler(row, opposite=True),axis=1)
        # This indicator should be correlated with the number of attacks required to kill a pokemon
        X['HP_attack'] = X['HP'] / X[opposite_label('attack_multiplier')]
        X[opposite_label('HP_attack')] = X[opposite_label('HP')] / X['attack_multiplier']
        # Computing the difference of a few metrics is positive for the classification
        # HP diff
        X['HP_diff'] = X['HP'] - X[opposite_label('HP')]
        # Difference attack multiplier
        X['attack_multipler_diff'] = X['attack_multiplier'] - X[opposite_label('attack_multiplier')]
        # HP attack mult difference
        X['HP_attack_multiplier_diff'] = X['attack_multipler_diff'] * X['HP_diff']
        # Mega difference
        X['mega_diff'] = tmp_rows['is_mega'] - tmp_rows[opposite_label('is_mega')]
        # Mega & Lgenendary difference
        X['strong_pokemon'] = X['mega_diff'] + X['Legendary'] - X[opposite_label('Legendary')]
        # Difference sum stats
        X['sum_stats_diff'] = tmp_rows['sum_stats'] - tmp_rows[opposite_label('sum_stats')]
        # HP attack difference
        X['HP_attack_diff'] = X['HP_attack'] - X[opposite_label('HP_attack')]
        # Velocity difference (VERY IMPORTANT)
        X['velocity_diff'] =  (X['Speed'] - X['Speed__other'])
        # Velocity diff as binary attribute
        X['velocity_diff_binary'] =  (X['Speed'] - X['Speed__other']) < 0
        return X


In [10]:
# Models
from sklearn.tree import DecisionTreeClassifier
from sklearn.ensemble import RandomForestClassifier
from sklearn.neural_network import MLPClassifier


dtc_model = DecisionTreeClassifier(criterion="entropy", max_depth=6, min_samples_leaf=5, random_state=SEED)

rfc_model = RandomForestClassifier(criterion="entropy", max_depth=9, min_samples_leaf=5, random_state=SEED)

mlp_model = MLPClassifier(hidden_layer_sizes=(6, 4, 2), max_iter=1000, random_state=SEED)

In [11]:
# Ensamble
from sklearn.ensemble import VotingClassifier

eclf = VotingClassifier(estimators=[('dtc', dtc_model), ('rfc', rfc_model), ('mlp', mlp_model)], voting='soft', weights=[1, 2, 2])

In [12]:
# Fine tunning
from sklearn.model_selection import RandomizedSearchCV

parameters = {
    'rfc__n_estimators': list(range(1, 200)),
    'rfc__max_depth': list(range(5, 25)),
    'dtc__max_depth': list(range(5, 25)),
    'mlp__max_iter': list(range(500, 1200)),
}

clf = RandomizedSearchCV(eclf, parameters, cv=5, n_jobs=-1, n_iter=5, random_state=SEED)

In [13]:
# Pre Proccessing Pipeline
from sklearn.preprocessing import StandardScaler, KBinsDiscretizer
from sklearn.compose import ColumnTransformer
from sklearn.pipeline import Pipeline, make_pipeline
from sklearn.preprocessing import OneHotEncoder

CATEGORICAL = add_opposite_labels(['Type 1', 'Type 2', 'Legendary'])
NUMERIC = add_opposite_labels(['HP', 'Attack', 'Defense', 'Sp. Atk',
                               'Sp. Def', 'Speed', 'Generation', 'best_attack_ratio',
                               'attack_multiplier', 'HP_attack']) + \
    ['attack_multipler_diff', 'sum_stats_diff', 'HP_diff', 'mega_diff',
     'HP_attack_diff', 'HP_attack_multiplier_diff',
     'strong_pokemon', 'velocity_diff', 'strong_pokemon']
BINARY = ['mega_diff', 'velocity_diff_binary'] + \
    add_opposite_labels(['Legendary'])

pre_pipeline = Pipeline(
    [
        ("data-correction", make_pipeline(
            DeduceFeaturesFromNameTransformer(),
            FillEmptyNonPredictableTransformer(),
            # adjust known values as there may be outfliers that add noise to the general expected behaviour
            NumericConsensusTransformer(
                ['Name', 'HP', 'Attack', 'Defense', 'Sp. Atk', 'Sp. Def', 'Speed']),
            CategoricalConsensusTransformer(
                ['Name', 'Legendary']),
            # KNN Imputer is better than the Simple Imputter, but way slower
            # deduce missing data from the adjusted values
            KNNImputerBothSidesTransformer(),
            # ('simple', SimpleImputterBothSidesTransformer()),
        )),
        ("data-enchancement", Pipeline(
            [
                # create retio values that take into account in-game mechanics
                # from the raw data
                ("generate_ratios", AddPokemonRatiosTransformer())
            ]
        )),
        ("column-transform", ColumnTransformer(
            [
                (
                    "cat",
                    OneHotEncoder(sparse_output=True,
                                  handle_unknown='ignore'),
                    CATEGORICAL
                ),
                ("scaler", StandardScaler(), NUMERIC),
                ("binary", KBinsDiscretizer(n_bins=2,
                                            encode='onehot-dense', strategy='kmeans'), BINARY)
            ],
            remainder="drop",
        ))
    ]
)

In [15]:
from pathlib import Path

__wd__ = Path("__file__").resolve().parent
datasets_path = __wd__ / "datasets"

data = pandas.read_csv(datasets_path / "data.train", index_col=0)
inverse_data = pandas.read_csv(datasets_path / "data_inverse.train", index_col=0)

def get_Xy(dataset):
    return dataset.drop("Wins", axis=1), dataset["Wins"]

X, y = get_Xy(data)

pipeline = Pipeline([
    ("preprocessing", pre_pipeline),
    ("ensamble", clf)
])

# Train the pipeline
pipeline.fit(X, y)

In [None]:
#NOTE: WE HAD TO MODIFY THE DO NOT CHANGE THIS CODE PART BECAUSE
# IT WAS NOT IDENTICAL TO THE BASIC NOTEBOOK
# WE HAD TO ADD index_col=0 TO THE READ CSV METHOD

In [None]:
# !!!!!!!!!!!!!!!!!!!!!!!!
# Do not change this code
# !!!!!!!!!!!!!!!!!!!!!!!!
from pathlib import Path

__wd__ = Path("__file__").resolve().parent
datasets_path = __wd__ / "datasets"

tournament = pandas.read_csv(datasets_path / "data.hidden", index_col=0)
tournament_inverse = pandas.read_csv(datasets_path / "data_inverse.hidden", index_col=0)

y_predicted = pipeline.predict(tournament)
y_inverse_predicted = pipeline.predict(tournament_inverse)

y_predicted.tofile("predicted.csv", sep=",")
y_inverse_predicted.tofile("predicted_inverse.csv", sep=",")
# !!!!!!!!!!!!!!!!!!!!!!!!
# Do not change this code
# !!!!!!!!!!!!!!!!!!!!!!!!