In [2]:
import urllib.request
import json
import numpy as np
import collections
import datetime
import os
import pprint
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras.layers import Input, Dense, Dropout, Concatenate, Average, BatchNormalization
from tensorflow.keras.models import Model, Sequential
import tensorflow.keras.layers as layers
from tensorflow.keras import regularizers
from sklearn.model_selection import train_test_split
import random
import re
import datetime, os
import game_constants
import trial_counter
import importlib
import itertools
import copy
import math
from attr import attrs, attrib
import pickle
import pandas as pd
from IPython.core.display import HTML


import matplotlib.pyplot as plt

%load_ext tensorboard

The tensorboard extension is already loaded. To reload it, use:
  %reload_ext tensorboard


In [3]:
CARD_INFO = json.load(open('cards_2020_11_14.json', 'r'))
RELIC_INFO = json.load(open('relics_2020_11_14.json', 'r'))

# pprint.pprint(CARD_INFO)

def fix_cost(card_name):
    CARD_INFO[card_name + "+1"] = copy.deepcopy(CARD_INFO[card_name])
    CARD_INFO[card_name]['cost'] += 1

for card in ['Dualcast', 'Zap', 'Redo', 
             'Double Energy', 'Fusion', 'Recycle', 'White Noise', 'Creative AI']:
    fix_cost(card)


In [24]:
cached_data = False

if cached_data:
    loaded_data = np.load('cached_comp_data.npz')
    X = loaded_data['X']
    Y = loaded_data['Y']

else:
    json_game_data = list()
    for root, dirs, files in os.walk('processed_logs/a20_act1_defect'):
        for fname in files:
            path = os.path.join(root, fname)
            json_game_data.extend(json.load(open(path)))
    #pprint.pprint(json_game_data[0])
    print(f'Data has {len(json_game_data)} samples.')

{'fights': [{'act_boss': 'Slime Boss',
             'ascension': 20,
             'cards': ['Strike_B',
                       'Strike_B',
                       'Strike_B',
                       'Strike_B',
                       'Defend_B',
                       'Defend_B',
                       'Defend_B',
                       'Defend_B',
                       'Zap',
                       'Dualcast',
                       'AscendersBane',
                       'Undo',
                       'Gash'],
             'character': 'DEFECT',
             'damage_taken': 0,
             'enemies': 'Jaw Worm',
             'entering_hp': 62,
             'floor': 2,
             'last_elite': None,
             'max_hp': 71,
             'not_picked': ['Conserve Battery', 'Beam Cell'],
             'picked': 'Glacier',
             'potion_used': False,
             'relics': ['Cracked Core']},
            {'act_boss': 'Slime Boss',
             'ascension': 20,
             'cards'

In [10]:
def upgrade_card(c):
    return c + "+1"

def dedup(l):
    return sorted(set(l))

# Categories for one hot encoder. Categories are in alphabetical order and is the order used by OneHotEncoder
DEFECT_CARD_CHOICES = [c for c in CARD_INFO if CARD_INFO[c]['color'] == 'BLUE'] 
DEFECT_CARD_CHOICES = dedup(DEFECT_CARD_CHOICES + 
                               [upgrade_card(c) for c in DEFECT_CARD_CHOICES if not c.endswith('+1')])

ALL_CARDS = [c for c in CARD_INFO if CARD_INFO[c]['color'] in ['BLUE', 'COLORLESS', 'CURSE']] + ['Strike', 'Defend']
ALL_CARDS = dedup(ALL_CARDS + [upgrade_card(c) for c in ALL_CARDS if not c.endswith('+1')])
ALL_RELICS = dedup(RELIC_INFO.keys())
ALL_ENCOUNTERS = game_constants.BASE_GAME_ENEMIES
ALL_CHARACTERS = ['DEFECT', 'IRONCLAD', 'THE_SILENT', 'WATCHER']
ALL_CARD_CHOICES = DEFECT_CARD_CHOICES + ['Singing Bowl', 'SKIP']
ALL_ELITES = game_constants.ELITES
ALL_BOSSES = game_constants.BOSSES

def make_key(l):
    indexes = {}
    
    assert len(l) == len(set(l))
    for index, item in enumerate(l):
        assert not item.endswith('+1+1'), item
        indexes[item] = index
    return indexes
    
ALL_CARDS_KEY = make_key(ALL_CARDS)
ALL_RELICS_KEY = make_key(ALL_RELICS)
ALL_CHARACTERS_KEY = make_key(ALL_CHARACTERS)
ALL_ENCOUNTERS_KEY = make_key(ALL_ENCOUNTERS)
ALL_CARD_CHOICES_KEY = make_key(ALL_CARD_CHOICES)
ALL_ELITES_KEY = make_key(ALL_ELITES)
ALL_BOSSES_KEY = make_key(ALL_BOSSES)
CARD_TYPES_KEY = {'SKILL': 0, 'POWER': 1, 'ATTACK': 2, 'CURSE': 3}

In [6]:
def generalize_strikes_and_defends(cards):
  """
  Modifies any character specific Strikes and Defends (eg. Strike_R) into
  general Strikes and Defends(Strike)
  """
  for i, s in enumerate(cards):
    if s.startswith('Strike_') or s.startswith('Defend_'):
      cards[i] = re.sub('_.', '', s)
  return cards

def encode_list2(list_to_encode, encoder_key):
    ret = np.zeros(len(encoder_key))
    for item in list_to_encode:
        ret[encoder_key[item]] += 1
    return ret

def encode_single2(value, encoder_key):
    return encode_list2([value], encoder_key)

def encode_single_or_none(value, encoder_key):
    l = []
    if value:
        l.append(value)
    return encode_list2(l, encoder_key)

class CardEncoder:
    _NUM_DISTINCT_COSTS = 6
    _UPGRADED_OR_NOT_COUNT = 2
    ENCODER_SIZE = 2 * (len(ALL_CARDS_KEY) + len(CARD_TYPES_KEY) + _NUM_DISTINCT_COSTS + _UPGRADED_OR_NOT_COUNT)
      
    def encode(self, cards):
        encoded_cards = encode_list2(generalize_strikes_and_defends(cards), ALL_CARDS_KEY)
        card_counts_by_type = np.zeros(len(CARD_TYPES_KEY))
        card_counts_by_cost = np.zeros(self._NUM_DISTINCT_COSTS)
        card_counts_by_upgrade = np.zeros(self._UPGRADED_OR_NOT_COUNT)
        # TODO: energy proportion by type feature?
        for card in cards:
            remap_hack = {'Strike': 'Strike_B',
                         'Defend': 'Defend_B',
                         'Strike+1': 'Strike_B+1',
                         'Defend+1': 'Defend_B+1'}
            card = remap_hack.get(card, card)
                
            if card in CARD_INFO:
                card_info = CARD_INFO[card]
            elif card[:-2] in CARD_INFO:
                card_info = CARD_INFO[card[:-2]]
            else:
                assert False, 'could not find info for ' + card
            card_counts_by_type[CARD_TYPES_KEY[card_info['card_type']]] += 1
            card_counts_by_cost[card_info['cost']] += 1
            card_counts_by_upgrade[card.endswith('+1')] += 1
        count_features = np.concatenate((
                                      encoded_cards, 
                                      card_counts_by_type, 
                                      card_counts_by_cost, 
                                      card_counts_by_upgrade))                                      
        # return count_features
        proportion_features = count_features / len(cards)
        return np.concatenate((count_features, proportion_features))

#pprint.pprint(json_game_data[0]['fights'][3]['cards'])
#print(CardEncoder().encode(json_game_data[0]['fights'][3]['cards']))
        
def encode_relics(relics):
    return encode_list2(relics, ALL_RELICS_KEY)

def encode_encounter(encounter):
    return encode_single2(encounter, ALL_ENCOUNTERS_KEY)

def encode_character(character):
    return encode_single2(character, ALL_CHARACTERS_KEY)

def picked_and_not_picked(sample):
    not_picked = list(sample['not_picked'])
    if sample['picked'] != 'SKIP':
        not_picked.append('SKIP')
    return sample['picked'], not_picked

def encode_card_choice(sample):
    picked, not_picked = picked_and_not_picked(sample)
    #return (encode_single2(picked, ALL_CARD_CHOICES_KEY) 
    #        - encode_list2(not_picked, ALL_CARD_CHOICES_KEY))
    return encode_single2(picked, ALL_CARD_CHOICES_KEY)

def encode_last_elite(sample):
    return encode_single_or_none(sample['last_elite'], ALL_ELITES_KEY)

def encode_boss(sample):
    # note, because of the log structure, this is none if the player did
    # not reach the act boss, which leaks info from the future.
    return encode_single_or_none(sample['act_boss'], ALL_BOSSES_KEY)

def encode_available(sample):
    picked, not_picked = picked_and_not_picked(sample)
    return encode_list2([picked] + not_picked, ALL_CARD_CHOICES_KEY)

#encode_sample_with_loop(json_data[0])

In [7]:
def encode_fight(fight):
    cards = CardEncoder().encode(fight['cards'])
    relics = encode_relics(fight['relics'])
    available = encode_available(fight)

    encounter = encode_encounter(fight['enemies'])
    last_elite = encode_last_elite(fight)
    boss = encode_boss(fight)
    #enc_floor = np.zeros(6)
    #enc_floor[int(fight['floor']) / 10] = 1
    num_and_bool_data = np.array([fight['max_hp'] / 100.0, fight['entering_hp'] / 100.0, fight['ascension'], int(fight['potion_used'] == 'true')])  
    return np.concatenate((cards, relics, available, encounter, last_elite, boss, num_and_bool_data))


In [8]:
def decode_encoded_fight(row):
    cards = []
    relics = []
    choices = []
    len_all_cards = len(ALL_CARDS)
    encoded_card_features = CardEncoder.ENCODER_SIZE
    len_all_relics = len(ALL_RELICS)
    len_all_choices = len(ALL_CARD_CHOICES)
    
    for index, quantity in enumerate(row):
        if quantity == 0:
            continue
        if index < len_all_cards:
            cards.extend([ALL_CARDS[index]] * int(quantity))
        elif index < encoded_card_features + len_all_relics:
            relics.append(ALL_RELICS[index - encoded_card_features])
        elif index <  encoded_card_features + len_all_relics + len_all_choices:
            choices.append(ALL_CARD_CHOICES[index - encoded_card_features - len_all_relics])
    return cards, relics, choices

#pprint.pprint(json_data[0])
#pprint.pprint(decode_encoded_fight(encode_fight(json_data[0])))

In [9]:
def fight_is_valid_for_learning(fight):
    if 'not_picked' not in fight or len(fight['not_picked']) == 0:
        return False        
    if fight['picked'] == 'Singing Bowl':
        return False        
    if 'PrismaticShard' in fight['relics']:
        return False        
    if fight['max_hp'] > 200:
        return False
    return True


def fight_enumerator(runs_data, num_runs=None, log_freq=None):
    skipped = 0
    fight_number = 0
    for run_number, run_data in enumerate(runs_data):
        for fight in run_data['fights']:
            fight_number += 1
            if not fight_is_valid_for_learning(fight):
                skipped += 1
                continue
            yield fight

        if log_freq and run_number % log_freq == 0:
            print(f"on run {run_number} of {num_runs} {run_number / num_runs:.2%} "
                  f"skipped {skipped} of {fight_number} {skipped / fight_number:.2%} fights")

    
def featureize_fights(runs_data, num_runs=None, log_freq=None):
    encoded_fights = []
    y = []
    for fight in fight_enumerator(runs_data, num_runs, log_freq):
        encoded_fights.append(encode_fight(fight))
        y.append(encode_card_choice(fight))
    X = np.vstack(encoded_fights)
    Y = np.array(y, dtype='float32')
    return X, Y

In [10]:
def get_train_test_split():
    random.shuffle(json_game_data)
    split_point = int(len(json_game_data) * .8)
    train_slice = itertools.islice(json_game_data, split_point)
    test_slice =  itertools.islice(json_game_data, split_point, len(json_game_data))
    X_train, Y_train = featureize_fights(train_slice, split_point, log_freq=2000)
    X_test, Y_test = featuerize_fights(test_slice, len(json_game_data) - split_point, log_freq=2000)
    print('train shape is', X_train.shape, Y_train.shape)
    print('test shape is', X_test.shape, Y_test.shape)
    return X_train, Y_train, X_test, Y_test
# X_train, Y_train, X_test, Y_test = get_train_test_split()

In [11]:
def train_model(x, y, validation_split=.05):
    tf.keras.backend.clear_session()

    input_layer = Input(shape=(x.shape[1]),)

    last_layer = input_layer

    l1_reg = 5e-6
    l2_reg = 5e-5

    for i in range(1):
        l = Dense(200, activation='relu',
                 kernel_regularizer=regularizers.l1_l2(l1=l1_reg, l2=l2_reg))(last_layer)
        ld = Dropout(.15)(l)
    #     l2 = Dense(200,  activation='relu')(ld)
    #     l2d = Dropout(.2)(l2)
    #     l3 = Dense(200,  activation='relu')(l2d)
    #     l3d = Dropout(.3)(l3)
        next_l = Concatenate()([ld, input_layer])
        last_layer = next_l

    output = tf.keras.layers.Dense(len(ALL_CARD_CHOICES), activation='softmax',
                                  kernel_regularizer=regularizers.l1_l2(l1=l1_reg, l2=l2_reg))(last_layer)
    model = keras.Model(input_layer, output)
    model.summary()

    cce = tf.keras.losses.CategoricalCrossentropy()

    # model.summary()

    model.compile(
        loss=cce,
        optimizer='adam',
        metrics=['accuracy']
       )

    # Tensorboard
    logdir = os.path.join("logs", datetime.datetime.now().strftime("%Y%m%d-%H%M%S"))
    tensorboard_callback = tf.keras.callbacks.TensorBoard(logdir, histogram_freq=1)

    history = model.fit(x, y, 
                        batch_size=1024, epochs=20, validation_split=validation_split, 
                        callbacks=[tensorboard_callback])
    return model

In [12]:
def eval_model(model, x_test, y_test, name):
    test_scores = model.evaluate(x_test, y_test, verbose=2)
    out = model.predict(x_test)
    print(name, "test loss:", test_scores[0])
    print(name, "test accuracy:", test_scores[1])

In [13]:
# def k_fold_splits_featurized(k):
#     #sample_game_data = random.sample(json_game_data, 2000)
#     sample_game_data = json_game_data
#     holdout_size = int(len(sample_game_data) / k) # this is gonna lose a couple things bc truncation, oh well
#     holdout_chunks = []
#     for i in range(k):
#         start_holdout_ind = holdout_size * i
#         end_holdout_ind = holdout_size * (i + 1)
#         holdout_slice = itertools.islice(sample_game_data, start_holdout_ind, end_holdout_ind)
#         holdout_chunk = featureize_fights(holdout_slice, end_holdout_ind - start_holdout_ind, log_freq=2000)
#         holdout_chunks.append(holdout_chunk)
#     print(holdout_chunks[0][0].shape)
#     for i in range(k):
#         train_x_chunks = [holdout_chunks[j][0] for j in range(k) if i != j]
#         train_y_chunks = [holdout_chunks[j][1] for j in range(k) if i != j]
#         combined_train_x = np.concatenate(train_x_chunks, axis=0)
#         combined_train_y = np.concatenate(train_y_chunks, axis=0)
#         yield combined_train_x, combined_train_y, holdout_chunks[i][0], holdout_chunks[i][1]

# for fold_no, (x_train, y_train, x_test, y_test) in enumerate(k_fold_splits_featurized(2)):
#     print(x_train.shape, y_train.shape, x_test.shape, y_test.shape)
#     model = train_model(x_train, y_train)
#     eval_model(model, x_test, y_test, f"fold_no {fold_no}")


In [14]:
def k_fold_splits_models_with_holdout(k, json_game_data, sample_size = None):
    sample_game_data = json_game_data
    if sample_size:
        sample_game_data = random.sample(json_game_data, sample_size)
                
    holdout_size = int(len(sample_game_data) / k) # this is gonna lose a couple things bc truncation, oh well
    for i in range(k):
        start_holdout_ind = holdout_size * i
        end_holdout_ind = holdout_size * (i + 1)
        holdout_chunk = sample_game_data[start_holdout_ind: end_holdout_ind]
        training_chunk = sample_game_data[:start_holdout_ind] + sample_game_data[end_holdout_ind:]
        
        train_x, train_y = featureize_fights(training_chunk, len(training_chunk))
        model = train_model(train_x, train_y)
        # m.save(f'model_split_{i}')
        yield model, holdout_chunk
        #test_x, test_y = featureize_fights(holdout_chunk, len(holdout_chunk))
        #eval_model(m, test_x, test_y, f"split {i}")


In [4]:
def leave_one_card_out(fight):
    handled_cards = set(['AscendersBane'])
    for card in fight['cards']:
        if card not in handled_cards:
            new_output = copy.deepcopy(fight)
            new_output['cards'].remove(card)
            yield card, new_output
            handled_cards.add(card)

@attrs
class LeaveOneOutStat:
    times_observed = attrib(default = 0)
    became_top_pick = attrib(default = 0)
    lost_top_pick = attrib(default = 0)
    score_delta = attrib(default = 0.0)
    score_delta_sq = attrib(default = 0.0)
        
    def aggregate(self, other):
        self.times_observed += other.times_observed
        self.became_top_pick += other.became_top_pick
        self.lost_top_pick += other.lost_top_pick
        self.score_delta += other.score_delta
        self.score_delta_sq += other.score_delta_sq
        
    def score_delta_std_error(self):
        n = self.times_observed
        var = (self.score_delta_sq - self.score_delta * self.score_delta / n) / (n - 1)
        std_dev = math.sqrt(var) 
        return std_dev / math.sqrt(n)
        
    def average_score_delta(self):
        return self.score_delta / self.times_observed
    
    def average_pick_change_delta(self):
        return (self.became_top_pick - self.lost_top_pick) / self.times_observed
        
        
class CardPredictionResult:
    def __init__(self, fight, prediction):
        self.probs_by_card = {}
        picked, not_picked = picked_and_not_picked(fight)
        
        for available_card in not_picked + [picked]:
            ind = ALL_CARD_CHOICES_KEY[available_card]
            self.probs_by_card[ind] = prediction[ind]
        
        # model assigns non-zero prob to unavailable cards, normalize it out
        total_prob = sum(self.probs_by_card.values()) 
        max_prob = -1
        self.best_choice = None
        for card_ind in self.probs_by_card:
            self.probs_by_card[card_ind] /= total_prob
            if self.probs_by_card[card_ind] > max_prob:
                max_prob = self.probs_by_card[card_ind] 
                self.best_choice = card_ind
        
def compute_leave_one_out_stat(loo_pred, original_pred, card_ind):
    ret = LeaveOneOutStat()
    ret.times_observed = 1
    ret.became_top_pick = (original_pred.best_choice != card_ind and
                           loo_pred.best_choice == card_ind)
    ret.lost_top_pick = (original_pred.best_choice == card_ind and
                           loo_pred.best_choice != card_ind)
    score_delta = (original_pred.probs_by_card[card_ind] - 
         loo_pred.probs_by_card[card_ind])
    ret.score_delta = score_delta
    ret.score_delta_sq = score_delta * score_delta
    return ret

In [28]:
def update_leave_one_out_matrix_for_fight(model, fight, leave_one_out_matrix):
    encoded_fights_list = [encode_fight(fight)]
    removed_cards = [None]
    for removed_card, mod_fight in leave_one_card_out(fight):
        encoded_fights_list.append(encode_fight(mod_fight))
        removed_cards.append(removed_card)
    leave_one_out_encoded = np.vstack(encoded_fights_list)
    preds = model.predict(leave_one_out_encoded)

    original_pred = CardPredictionResult(fight, preds[0])

    for removed_card, pred in zip(removed_cards[1:], preds[1:]):  
        removed_card_ind = ALL_CARDS_KEY[removed_card]
        loo_pred = CardPredictionResult(fight, pred)    

        for card_ind in loo_pred.probs_by_card:
            if card_ind not in leave_one_out_matrix:
                leave_one_out_matrix[card_ind] = {}
            loo_row = leave_one_out_matrix[card_ind]
            
            if not removed_card_ind in loo_row:
                loo_row[removed_card_ind] = LeaveOneOutStat()
            loo_item = loo_row[removed_card_ind]

            loo_item.aggregate(compute_leave_one_out_stat(loo_pred, original_pred, card_ind))
                       

def update_leave_one_out_matrix_for_games(model, games, leave_one_out_matrix):
    for ind, game in enumerate(games):
        if ind % 1000 == 0:
            print(f'at {ind} of {len(games)}')
        for fight in game['fights']:
            if not fight_is_valid_for_learning(fight):
                continue
            update_leave_one_out_matrix_for_fight(model, fight, leave_one_out_matrix)

leave_one_out_matrix = {}
for model, holdout_chunk in k_fold_splits_models_with_holdout(5, json_game_data, None):
    update_leave_one_out_matrix_for_games(model, holdout_chunk, leave_one_out_matrix)

Model: "model"
__________________________________________________________________________________________________
Layer (type)                    Output Shape         Param #     Connected to                     
input_1 (InputLayer)            [(None, 1038)]       0                                            
__________________________________________________________________________________________________
dense (Dense)                   (None, 200)          207800      input_1[0][0]                    
__________________________________________________________________________________________________
dropout (Dropout)               (None, 200)          0           dense[0][0]                      
__________________________________________________________________________________________________
concatenate (Concatenate)       (None, 1238)         0           dropout[0][0]                    
                                                                 input_1[0][0]                

Epoch 14/20
Epoch 15/20
Epoch 16/20
Epoch 17/20
Epoch 18/20
Epoch 19/20
Epoch 20/20
at 0 of 2500
at 1000 of 2500
at 2000 of 2500


In [16]:
pickle.dump(leave_one_out_matrix, open('leave_one_out_matrix.pickle', 'wb'))

In [5]:
loo2 = pickle.load(open('leave_one_out_matrix.pickle', 'rb'))

In [33]:
#plt.hist(all_preds[(ALL_CARD_CHOICES_KEY['Biased Cognition'], ALL_CARDS_KEY['Glacier'])])
#plt.hist(all_preds[(ALL_CARD_CHOICES_KEY['Turbo'], ALL_CARDS_KEY['Compile Driver'])])
# plt.hist(all_preds[(ALL_CARD_CHOICES_KEY['Hologram'], ALL_CARDS_KEY['All For One'])])
#pickle.dump(all_preds, open('all_loo_preds_by_key_on_5k_sample.pickle', 'wb'))

In [7]:
localization_dir = '/home/ubuntu/.local/share/Steam/steamapps/common/SlayTheSpire/desktop-1.0/localization'
localizers = {}

class Localizer:
    def __init__(self, localization_dict):
        self.localization_dict = localization_dict
        
    def localize_name(self, key):
        fix_plus_one = ''
        if key.endswith('+1'):
            key = key[:-2]
            fix_plus_one = '+1'
        return self.localization_dict[key]['NAME'] + fix_plus_one

for lang_code in [d for d in os.listdir(localization_dir) if len(d) == 3 and d != 'www']:
    directory = localization_dir + '/' + lang_code
    cards_dict = json.load(open(directory + '/cards.json'))
    relics_dict = json.load(open(directory + '/relics.json'))
    cards_dict.update(relics_dict)
    cards_dict['Strike'] = cards_dict['Strike_G']
    cards_dict['Defend'] = cards_dict['Defend_G']
    localizers[lang_code] = Localizer(cards_dict)
    #print(localization_dicts['dut']['Astrolabe'])



In [36]:
def render_card_page(card_name, loo_stats_matrix, localizer=localizers['eng']):
    dict_of_loo_stats = loo_stats_matrix[ALL_CARD_CHOICES_KEY[card_name]]
    
    sorted_stats = sorted(dict_of_loo_stats.items(), 
                          key = lambda card_with_stat: -card_with_stat[1].average_score_delta())
    title_card = localizer.localize_name(card_name)
    output = (f'<html><head><title>Analysis of {title_card}</title>'
              '<link rel="stylesheet" href="../style.css"'
               '</head>\n<body>')
    output += (f'<table><tr><th>listed card</th><th>number of times {title_card} offered in card ' 
               'rewards while listed card in deck</th>'
        '<th>mean model score change when listed card is removed</td>'
             f'<th># times removing listed card causes model to skip {title_card}</th>'
             f'<th># times removing listed card causes model to take {title_card}</th></tr>\n')
    for card_ind, loo_stat in sorted_stats:
        if loo_stat.times_observed < 50:
            continue
        output += (f'<tr>'
                  f'<td>{localizer.localize_name(ALL_CARDS[card_ind])}</td> '
                  f'<td>{loo_stat.times_observed}</td> '
                  f'<td>{loo_stat.average_score_delta()*100:.1f} ± {loo_stat.score_delta_std_error() * 200:.1f}</td>'
                  f'<td>{loo_stat.lost_top_pick}</td>'
                  f'<td>{loo_stat.became_top_pick}<td>'
                  f'</tr>\n')
    output += '</table>'
    
    output += '</body></html>'
    return output

display(HTML(render_card_page('Ball Lightning', loo2)))


listed card,number of times Ball Lightning offered in card rewards while listed card in deck,mean model score change when listed card is removed,# times removing listed card causes model to skip Ball Lightning,# times removing listed card causes model to take Ball Lightning,Unnamed: 5
Thunder Strike,534,10.7 ± 0.5,95,0,
Bullseye+1,340,6.8 ± 0.4,34,0,
Bullseye,1133,5.4 ± 0.2,68,1,
Barrage,1619,5.0 ± 0.2,94,4,
Rip and Tear+1,284,3.7 ± 0.4,14,0,
Melter,1871,3.6 ± 0.1,89,8,
Dualcast,27713,3.3 ± 0.0,903,169,
Charge Battery,5023,3.3 ± 0.1,185,4,
Strike,31129,3.1 ± 0.0,1115,23,
Melter+1,220,3.0 ± 0.4,7,0,


In [37]:
def render_site():
    for lang_code, localizer in localizers.items():
        for card_index in loo2:
            logged_card_name = ALL_CARD_CHOICES[card_index]
            if logged_card_name == 'SKIP':
                continue
            localized_name = localizer.localize_name(logged_card_name)
            card_page_contents = render_card_page(logged_card_name, loo2, localizer=localizer)
            open(f'site_contents/{lang_code}/{localized_name}.html', 'w').write(card_page_contents)
render_site()

In [20]:
print(len(loo2))
af1_synergy_dict = loo2[ALL_CARD_CHOICES_KEY['All For One']]
print(len())
row = loo2[ALL_CARD_CHOICES_KEY['All For One']][ALL_CARDS_KEY['Gash']]
x = pd.Series()

143


TypeError: len() takes exactly one argument (0 given)

In [None]:
def cn(index):
    return ALL_CARD_CHOICES[index]

ct = 0

confusion_matrix = collections.defaultdict(collections.Counter)
success_rate_by_card = collections.defaultdict(trial_counter.TrialCounter)

for example, predictions, actuals in zip(X_test, out, Y_test):
    #model_pick = np.argmax(np.multiply(predictions, np.absolute(actuals)))
    model_pick = np.argmax(predictions)
    real_label = np.argmax(actuals)
#     if model_pick == real_label: 
#         print('correct')
#     else: 
#         print('wrong')
        
    confusion_matrix[cn(real_label)][cn(model_pick)] += 1
    success_rate_by_card[cn(real_label)].record_outcome(real_label == model_pick)
    
#     if (cn(real_label) != 'Steam Power'):
#         continue
#     print(f'model: {cn(model_pick)} actual: {cn(real_label)}')
#     print(decode_encoded_sample(example))
#     ct += 1
#     print()
#     if ct > 10:
#         break


In [None]:
cards_by_success_rate = sorted(success_rate_by_card.items(), key=lambda x: x[1].success - x[1].total)

for card, rate in cards_by_success_rate:
    print(card, rate, confusion_matrix[card].most_common(5))

In [None]:
def train_xgb_model():
  # I got this working with the sklearn wrapper, but didn't figure out the DMatrix format
  xgb.set_config(verbosity=1)
  X_train, X_test, Y_train, Y_test = train_test_split(
    X, Y, test_size=0.33, shuffle=False)
  dmatrix = xgb.DMatrix(data=X_train, label=Y_train)
  params = {'verbose_eval': True, 'learning_rate':.3, 'n_estimators':10, 'subsample':.2, 'silent':False,
         'objective':'multi:softmax', 'num_class': len(ALL_CARD_CHOICES)}
  bst = xgb.train(params, dmatrix, 2)

  Y_pred = model.predict(xgb.DMatrix(X_test))
  print(Y_pred)
  print(np.sum(Y_pred == Y_test) / len(Y_test))

# train_xgb_model()

In [None]:
def score_fight(fight, fight_predictions, real_label):
    #print(fight)
    ranked_choices = list(reversed(np.argsort(fight_predictions)))
#             formatted_scores = ""
#             for choice_index in ranked_choices[:4]:
#                 choice = cn(choice_index)
#                 score = fight_predictions[choice_index]
#                 formatted_scores += f"{choice} {score:.2f} "
    #print(formatted_scores)
    return fight_predictions[ranked_choices[0]] - fight_predictions[real_label] 
    #print('num_correct', correct, 'margin loss', margin_loss)
         
def score_run(run, num_fights=5):
    encoded_run, encoded_labels = preprocess_with_loop([run], 1, log_freq=0)
        
    predicted_labels = model.predict(encoded_run)
    
    margin_loss = 0
    num_correct = 0
    fight_index = 0
    for fight in test_run['fights']:
        if fight_is_valid_for_learning(fight):
            fight_predictions = predicted_labels[fight_index]
            real_label = np.argmax(encoded_labels[fight_index])
            this_margin = score_fight(fight, fight_predictions, real_label) 
            margin_loss += this_margin
            if this_margin <= .02:
                num_correct += 1
            fight_index += 1
        if fight_index == num_fights:
            break
    return margin_loss, num_correct


scored_play_and_floor_reached = []
for test_run in itertools.islice(json_game_data, split_point, len(json_game_data)):
    num_valid_fights = sum(fight_is_valid_for_learning(fight) for fight in test_run['fights'])
    
    NUM_FIGHTS_TO_SCORE = 5
    if num_valid_fights >= NUM_FIGHTS_TO_SCORE:
        margin, num_correct = score_run(test_run, NUM_FIGHTS_TO_SCORE)
        scored_play_and_floor_reached.append((margin, num_correct, test_run['floor_reached']))
    
        
    

In [None]:
floor_reached_sum = np.zeros(6)
floor_reached_count = np.zeros(6)
for margin, num_correct, floor_reached in scored_play_and_floor_reached:
    floor_reached_sum[num_correct] += floor_reached
    floor_reached_count[num_correct] += 1
    
plt.ylabel('average floor reached')
plt.xlabel('number of model agreements with player in first 5 fights')
plt.plot(range(6), floor_reached_sum / floor_reached_count)

plt.savefig('model_agreement_vs_outcome.png')

In [None]:
def plot_floor_reached_cumulative_graph():
    floors_reached = [d['floor_reached'] for d in json_game_data]
    n, bins, patches = plt.hist(floors_reached, 100, density=True, histtype='step',
                          cumulative=-1)

### Tensorboard

Inspect training and validation results with tenorboard and upload logs to tensorboard.dev

In [None]:
%tensorboard --logdir logs

In [None]:
# !tensorboard dev upload --logdir ./logs \
#   --name "Slay the Spire fight predictions" \
#   --description "Training results predicting health loss in a Slay the Spire fight"

### Manual Inspection

## Tune the Model

Let's try to find the best hyper parameters

In [None]:
## This is old code from the STS predictor.

def build_model(hp):
  model = tf.keras.models.Sequential()
  for i in range(hp.Int('num_layers_big', 1, 2)):
    model.add(tf.keras.layers.Dense(
        units=hp.Int('units_' + str(i), min_value=100, max_value=500, step=50, default=400), activation='relu'))
    tf.keras.layers.Dropout(
      hp.Float('dropout', 0.1, 0.4, step=0.1, default=0.2))
  for i in range(hp.Int('num_layers_small', 1, 2)):
    model.add(tf.keras.layers.Dense(
        units=hp.Int('units_' + str(i), min_value=16, max_value=128, step=16, default=32), activation='relu'))
    tf.keras.layers.Dropout(
      hp.Float('dropout', 0, 0.3, step=0.1, default=0.2))
  model.add(tf.keras.layers.Dense(1))
  model.compile(
      optimizer=keras.optimizers.RMSprop(hp.Choice('learning_rate', [1e-1, 1e-2, 1e-3, 1e-4])),
      loss='mean_absolute_error',
      metrics=['mean_absolute_error', 'mean_squared_error'])
  return model

def tune_model():
  tuner = kt.Hyperband(
      build_model,
      objective='val_loss',
      max_epochs=20,
      hyperband_iterations=4)
  tuner.search(X_train, Y_train,
              epochs=5,
              validation_data=(X_test, Y_test))
  return tuner

# tuner = tune_model()
# tuner.results_summary()

## Save the Model

Save the model to be loaded into the game to be used in a mod!

Cache the data to help speed up development

In [None]:
model.save("STSFightPredictor") # Saved Model Format
# model.save("STSFP.h5")

np.savez_compressed('cached_comp_data.npz', X=X, Y=Y)

In [None]:
# For testing in the mod that the model in the mod is predicting the save values
for i in range(3):
  case = X_test[i]
  sb = ''
  sb = 'float[] testCase = {'
  for num in case:
    sb += str(num)
    sb += '0f, '
  sb += '};'
  print(sb)

## Embedding Experiments

The current model has a good training loss curve but the validation curve looks more like a straight line than a curve. This problem is often related to overfit. Rather than one hot encoding cards, relics, and encounter, an experiment with Embedding layers was run to try to reduce overfit.

An Embedding layer can be used to learn relations between cards. The general idea is to encode cards, relics, and enemies as vectors instead of a single numbers.

Because there are a variable number of cards and relics a player can have, the average of card vectors and the average of relic vectors was taken. These averages can be passed into a Dense layer.

In [None]:
# Custom average function to ignore masked layers
def avg_labmda_fun(x, mask):
  mask_cast = keras.backend.cast(mask, 'float32')
  expanded = keras.backend.expand_dims(mask_cast)
  count = tf.keras.backend.sum(mask_cast)
  sum = keras.backend.sum(expanded * x, axis=1)
  return sum / count

In [None]:
def train_embedding_model():
  # Embed cards and average output vectors
  card_input = Input(shape=(NUM_CARDS_FOR_EMB, ), name='cards_input')
  card_embedding = Embedding(len(ALL_CARDS) + 1, 26, mask_zero=True)(card_input)
  card_average = Lambda(avg_labmda_fun, output_shape=(26, ), mask=None)(card_embedding)

  # Embed relics and average output vectors
  relic_input = Input(shape=(NUM_RELICS_FOR_EMB, ), name='relics_input')
  relic_embedding = Embedding(len(ALL_RELICS) + 1, 13, mask_zero=True)(relic_input)
  relic_average = Lambda(avg_labmda_fun, output_shape=(13, ), mask=None)(relic_embedding)

  # Embed encounter. There is only a single encounter but the lambda is used to reshape the vector
  encounter_input = Input(shape=(1, ), name='encounter_input')
  encounter_embedding = Embedding(len(ALL_ENCOUNTERS), 8)(encounter_input)
  encounter_layer_reshape = Lambda(lambda x: keras.backend.mean(x, axis=1), output_shape=(8, ))(encounter_embedding)

  numbers_input = Input(shape=(4, ), name='num_and_bool_input')

  # Concatenate before sending to Dense layers
  merged = concatenate([card_average, relic_average, encounter_layer_reshape, numbers_input])

  dense_1 = Dense(40, activation='relu')(merged)
  drop_out_1 = Dropout(.1)(dense_1)
  dense_out = Dense(1)(drop_out_1)


  emb_model = Model(inputs=[card_input, relic_input, encounter_input, numbers_input], output=dense_out)
  emb_model.summary()

  emb_model.compile(
      optimizer=keras.optimizers.RMSprop(learning_rate=.0001),
      loss='mse',
      metrics=['mae'])

  logdir = os.path.join("logs", datetime.datetime.now().strftime("%Y%m%d-%H%M%S"))
  tensorboard_callback = tf.keras.callbacks.TensorBoard(logdir, histogram_freq=1)

  Y_scaled = scale_Y(Y)
  X_train, X_test, Y_train, Y_test = train_test_split(X, Y_scaled, test_size=0.33, shuffle=False)

  cards_col = X_train[:, 0:NUM_CARDS_FOR_EMB]
  relic_index = NUM_CARDS_FOR_EMB + NUM_RELICS_FOR_EMB
  relics_col = X_train[:, NUM_CARDS_FOR_EMB:relic_index]
  encounter_index = relic_index + 1
  encounter_col = X_train[:, relic_index:encounter_index]
  num_and_bool_col = X_train[:, encounter_index:]
  max_abs_scaler = MaxAbsScaler()
  num_and_bool_col = max_abs_scaler.fit_transform(num_and_bool_col)

  history = emb_model.fit(x={'cards_input': cards_col, 'relics_input': relics_col, 'encounter_input': encounter_col, 'num_and_bool_input': num_and_bool_col}, y=Y_train, batch_size=32, epochs=20, validation_split=0.2)
  return emb_model

if USE_EMBEDDING:
  emb_model = train_embedding_model()

In [None]:
def inspect_embeddings():
  embeddings = emb_model.layers[3].get_weights()[0]
  embeddings.shape
  weights = dict()
  for i, name in enumerate(ALL_CARDS):
    weights[name] = embeddings[i]
    # print(f'{name}:\t{embeddings[i]}')

  print(weights['Pommel Strike'])
  print(weights['Sucker Punch'])

if USE_EMBEDDING:
  inspect_embeddings()