In [21]:
import pandas as pd
import numpy as np

In [184]:
SEASON = 9

In [124]:
# to be done: make this scalable to defenses too (no easy way to verify data)
all = pd.read_csv(f'../s{SEASON}/s{SEASON}.csv')
atk_df = all[all['type'] == 'atk']

# also
striker_df = pd.read_csv('apply_figs/f_striker_attributes.csv')
special_df = pd.read_csv('apply_figs/f_special_attributes.csv')

# and
affinity_map = pd.read_csv('save_figs/terrain.csv')
color_map = pd.read_csv('save_figs/color_map.csv')

# using these dfs
atk_df
striker_df
special_df
affinity_map
color_map

# properties
list_str = ['d1','d2','d3','d4','a1','a2','a3','a4']; list_sp = ['ds1','ds2','as1','as2']

def_slots = ['d1', 'd2', 'd3', 'd4', 'ds1', 'ds2']
atk_slots = ['a1', 'a2', 'a3', 'a4', 'as1', 'as2']

def_slots_strikers = ['d1', 'd2', 'd3', 'd4']
atk_slots_strikers = ['a1', 'a2', 'a3', 'a4']

# partial info
def_slots_partial = ['d1','ds1','ds2']


In [164]:
pd.set_option('display.max_columns', None)
pd.set_option('display.max_rows', 50)

In [100]:
# expands original dataset to include char-specific features, defined in respective dfs
def expand_features_wide(df: pd.DataFrame, list_ftrs_to_expand: list[str], ftr_to_merge: pd.DataFrame) -> pd.DataFrame:
    out = df.copy()

    for slot in list_ftrs_to_expand:
        tmp = ftr_to_merge.copy()
        tmp.columns = [f'{slot}_{c}' for c in tmp.columns]
        out = out.merge(tmp, how='left', left_on=slot, right_on=f'{slot}_char', validate='m:1')
        out = out.drop(columns=[f'{slot}_char'])
    
    return out

In [101]:
terrain_drops = {
    "urban": ("_outdoor", "_indoor"),
    "outdoor": ("_urban", "_indoor"),
    "indoor": ("_urban", "_outdoor")}

# removes extraneous terrain attributes on map affinity

def remove_terrains(df: pd.DataFrame) -> pd.DataFrame:
    out = df.copy()
    terrain = df['affinity'].iloc[0]
    
    to_drop = terrain_drops.get(terrain)
    cols_dropped = [c for c in out.columns if c.endswith(to_drop)]
    out = out.drop(columns=cols_dropped)

    return out

In [148]:
# convert terrain multipliers into features

def expand_terrain_cols(df: pd.DataFrame, affinity_map: pd.DataFrame) -> pd.DataFrame:

    out = df.copy()
    suffixes = ('dmg_mult', 'block_r')
    terrain = atk_df["affinity"].iloc[0]

    aff_cols = [c for c in out.columns if c.endswith(f'_affinity_{terrain}')]

    maps = {s: dict(zip(affinity_map['affinity'], affinity_map[s])) for s in suffixes}

    for col in aff_cols:
        for s in suffixes:
            out[f'{col}_{s}'] = out[col].map(maps[s])

    out = out.drop(columns=aff_cols)
    
    return out

In [150]:
# add some team classifiers

# add up d1-d4 roles, make a new column, same for a1-a4 roles # make one for each unique role, so two extra sets of columns with role number of columns there
# do the same for position
# aggregate sum hp, mean hp, min hp, max hp, same for atk, same for def, then also do the subtraction

# adds row-wise aggregates for a given stat
def add_team_aggregates_cont(
    df: pd.DataFrame, stat: str, def_slots_f: list[str], def_slots_p: list[str], atk_slots: list[str]) -> pd.DataFrame:
    
    out = df.copy()

    def_cols_f = [f"{s}_{stat}" for s in def_slots_f]
    def_cols_p = [f"{s}_{stat}" for s in def_slots_p]
    atk_cols = [f"{s}_{stat}" for s in atk_slots]

    # aggregate row-wise
    # all units (post)
    out[f"def_{stat}_sum_FULL"]  = out[def_cols_f].sum(axis=1)
    out[f"def_{stat}_mean_FULL"] = out[def_cols_f].mean(axis=1)
    out[f"def_{stat}_std_FULL"] = out[def_cols_f].std(axis=1)
    # out[f"def_{stat}_min_FULL"]  = out[def_cols_f].min(axis=1)
    # out[f"def_{stat}_max_FULL"]  = out[def_cols_f].max(axis=1)

    # seen units (pre)
    out[f"def_{stat}_sum_PARTIAL"]  = out[def_cols_p].sum(axis=1)
    out[f"def_{stat}_mean_PARTIAL"] = out[def_cols_p].mean(axis=1)
    out[f"def_{stat}_std_PARTIAL"] = out[def_cols_p].std(axis=1)
    # out[f"def_{stat}_min_PARTIAL"]  = out[def_cols_p].min(axis=1)
    # out[f"def_{stat}_max_PARTIAL"]  = out[def_cols_p].max(axis=1)

    # always have own units
    out[f"atk_{stat}_sum"]  = out[atk_cols].sum(axis=1)
    out[f"atk_{stat}_mean"] = out[atk_cols].mean(axis=1)
    out[f"atk_{stat}_std"] = out[atk_cols].std(axis=1)
    # out[f"atk_{stat}_min"]  = out[atk_cols].min(axis=1)
    # out[f"atk_{stat}_max"]  = out[atk_cols].max(axis=1)

    # diffs between atk and def teams
    out[f"diff_{stat}_sum_PARTIAL"] = out[f"atk_{stat}_sum"] - out[f"def_{stat}_sum_PARTIAL"]
    out[f"diff_{stat}_mean_PARTIAL"] = out[f"atk_{stat}_mean"] - out[f"def_{stat}_mean_PARTIAL"]
    out[f"diff_{stat}_std_PARTIAL"] = out[f"atk_{stat}_std"] - out[f"def_{stat}_std_PARTIAL"]

    out[f"diff_{stat}_sum_FULL"] = out[f"atk_{stat}_sum"] - out[f"def_{stat}_sum_FULL"]
    out[f"diff_{stat}_mean_FULL"] = out[f"atk_{stat}_mean"] - out[f"def_{stat}_mean_FULL"]
    out[f"diff_{stat}_std_FULL"] = out[f"atk_{stat}_std"] - out[f"def_{stat}_std_FULL"]

    return out

In [151]:
# adds row-wise aggregates for a given categorical stat
def add_team_aggregates_cat(
    df: pd.DataFrame, stat: str, def_slots_f: list[str], atk_slots: list[str]) -> pd.DataFrame:

    out = df.copy()

    def_cols_f = [f"{s}_{stat}" for s in def_slots_f]
    atk_cols = [f"{s}_{stat}" for s in atk_slots]

    def_stacked = out[def_cols_f].stack()
    atk_stacked = out[atk_cols].stack()

    def_counts = pd.get_dummies(def_stacked).groupby(level=0).sum().reindex(out.index)
    atk_counts = pd.get_dummies(atk_stacked).groupby(level=0).sum().reindex(out.index)

    def_props = def_counts/len(def_cols_f)
    atk_props = atk_counts/len(atk_cols)

    def_props.columns = [f"def_{stat}_{c}_prop_FULL" for c in def_props.columns]
    atk_props.columns = [f"atk_{stat}_{c}_prop_FULL" for c in atk_props.columns]

    out = out.join(def_props).join(atk_props)

    return out

In [179]:
# could be extended with detailed ex logs, first deaths, time to win, damage dealt

# compute the expected multiplier of units against defs

def compute_exp_mult(df: pd.DataFrame, type: str, color_map: pd.DataFrame) -> pd.DataFrame:
    out = df.copy()

    atk_slots = ["a1", "a2", "a3", "a4"]
    def_slots = ["d1", "d2", "d3", "d4"]

    M = color_map.set_index("attack_color").copy()
    M_vals = M.values
    atk_idx = {k: i for i, k in enumerate(M.index.tolist())}
    def_idx = {k: j for j, k in enumerate(M.columns.tolist())}

    def _codes(arr2d: np.ndarray, mapping: dict) -> np.ndarray:
        v = np.vectorize(lambda x: mapping.get(x, -1), otypes=[int])
        return v(arr2d)

    def _mult(atk_colors: np.ndarray, def_colors: np.ndarray) -> np.ndarray:
        a = _codes(atk_colors, atk_idx)
        d = _codes(def_colors, def_idx)
        mult = M_vals[a[:, :, None], d[:, None, :]]

        return mult

    def _get(cols: list[str]) -> np.ndarray:
        return out[cols].to_numpy()

    if type == "striker":
        A_atk   = _get([f"{s}_atk" for s in atk_slots])
        A_hp    = _get([f"{s}_max_hp" for s in atk_slots])
        A_atkc  = _get([f"{s}_atk_type" for s in atk_slots])
        A_defc  = _get([f"{s}_def_type" for s in atk_slots])

        D_atk   = _get([f"{s}_atk" for s in def_slots])
        D_hp    = _get([f"{s}_max_hp" for s in def_slots])
        D_atkc  = _get([f"{s}_atk_type" for s in def_slots])
        D_defc  = _get([f"{s}_def_type" for s in def_slots])

        mult_A_to_D = _mult(A_atkc, D_defc)
        mult_D_to_A = _mult(D_atkc, A_defc)

        denom_A_to_D = A_atk[:, :, None] * mult_A_to_D
        denom_D_to_A = D_atk[:, :, None] * mult_D_to_A

        ttk_A_to_D = D_hp[:, None, :] / denom_A_to_D
        ttk_D_to_A = A_hp[:, None, :] / denom_D_to_A

        # store each unit ttk
        ttk_a_to_def = np.nanmean(ttk_A_to_D, axis=2)
        for i, s in enumerate(atk_slots):
            out[f"ttk_{s}_to_def"] = ttk_a_to_def[:, i]

        ttk_d_to_atk = np.nanmean(ttk_D_to_A, axis=2)
        for i, s in enumerate(def_slots):
            out[f"ttk_{s}_to_atk"] = ttk_d_to_atk[:, i]

        # store avg atk
        ttk_avg_A_to_D = np.nanmean(ttk_A_to_D, axis=(1, 2))
        ttk_avg_D_to_A = np.nanmean(ttk_D_to_A, axis=(1, 2))

        out["ttk_avg_atk_to_def"] = ttk_avg_A_to_D
        out["ttk_avg_def_to_atk"] = ttk_avg_D_to_A
        out["ttk_delta_strikers"] = out["ttk_avg_def_to_atk"] - out["ttk_avg_atk_to_def"]

    elif type == "special":
        atk_sp_slots = ["as1", "as2"]
        def_sp_slots = ["ds1", "ds2"]

        # atk specials -> def strikers
        AS_atk = _get([f"{s}_atk" for s in atk_sp_slots])
        AS_atkc = _get([f"{s}_atk_type" for s in atk_sp_slots])
        D_hp = _get([f"{s}_max_hp" for s in def_slots])
        D_defc = _get([f"{s}_def_type" for s in def_slots])

        mult_AS_to_D = _mult(AS_atkc, D_defc)
        denom_AS_to_D = AS_atk[:, :, None] * mult_AS_to_D
        denom_AS_to_D[denom_AS_to_D <= 0] = np.nan

        ttk_AS_to_D = D_hp[:, None, :] / denom_AS_to_D

        AS_deals = _get([f"{s}_deals_dmg" for s in atk_sp_slots])
        AS_can_dmg = np.isin(AS_deals, ["yes"])
        mask = np.broadcast_to((~AS_can_dmg)[:, :, None], ttk_AS_to_D.shape)
        ttk_AS_to_D[mask] = np.nan

        ttk_as_to_def_str = np.nanmean(ttk_AS_to_D, axis=2)
        for i, s in enumerate(atk_sp_slots):
            out[f"ttk_{s}_to_def_str"] = ttk_as_to_def_str[:, i]
        out["ttk_avg_atkS_to_def_str"] = np.nanmean(ttk_AS_to_D, axis=(1, 2))

        # def specials -> atk strikers
        DS_atk  = _get([f"{s}_atk" for s in def_sp_slots])
        DS_atkc = _get([f"{s}_atk_type" for s in def_sp_slots])
        A_hp    = _get([f"{s}_max_hp" for s in atk_slots])
        A_defc  = _get([f"{s}_def_type" for s in atk_slots])

        mult_DS_to_A = _mult(DS_atkc, A_defc)
        denom_DS_to_A = DS_atk[:, :, None] * mult_DS_to_A
        denom_DS_to_A[denom_DS_to_A <= 0] = np.nan

        ttk_DS_to_A = A_hp[:, None, :] / denom_DS_to_A

        DS_deals = _get([f"{s}_deals_dmg" for s in def_sp_slots])
        DS_can_dmg = np.isin(DS_deals, ["yes"])
        mask = np.broadcast_to((~DS_can_dmg)[:, :, None], ttk_DS_to_A.shape)
        ttk_DS_to_A[mask] = np.nan

        ttk_ds_to_atk_str = np.nanmean(ttk_DS_to_A, axis=2)
        for i, s in enumerate(def_sp_slots):
            out[f"ttk_{s}_to_atk_str"] = ttk_ds_to_atk_str[:, i]
        out["ttk_avg_defS_to_atk_str"] = np.nanmean(ttk_DS_to_A, axis=(1, 2))

        out["ttk_delta_specials"] = out["ttk_avg_defS_to_atk_str"] - out["ttk_avg_atkS_to_def_str"]

    else:
        raise ValueError('try type="striker" or type="special"')

    return out

In [186]:
# pipeline

copy = atk_df.copy()

c = expand_features_wide(copy, list_str, striker_df)

c = expand_features_wide(c, list_sp, special_df)

c = remove_terrains(c)

c = expand_terrain_cols(c, affinity_map)

# these can change, speculating that these are most imp
for stat_cont in ['ex_cost', 'ex_length', 'max_hp', 'atk', 'def', 'healing', 'accuracy', 'evasion', 'crit']:
    c = add_team_aggregates_cont(c, stat_cont, def_slots, def_slots_partial, atk_slots)

# total categorical vars, also can add more, in terms of strikers-only, encode roles and positioning info (heuristic)
for stat_cat in ['role', 'position']:
    c = add_team_aggregates_cat(c, stat_cat, def_slots_strikers, atk_slots_strikers)

c = compute_exp_mult(c, 'striker', color_map)
c = compute_exp_mult(c, 'special', color_map)

c.to_csv('../s9/s9_working_dataset.csv', index=False)

  ttk_ds_to_atk_str = np.nanmean(ttk_DS_to_A, axis=2)
