# Blackjack player based on fuzzy logic

Adapted from kaggle microchallenges, therefore their rules will be implemented.

#### Rules

"We'll use a slightly simplified version of blackjack (aka twenty-one). In this version, there is one player (who you'll control) and a dealer. Play proceeds as follows:

- The player is dealt two face-up cards. The dealer is dealt one face-up card.
- The player may ask to be dealt another card ('hit') as many times as they wish. If the sum of their cards exceeds 21, they lose the round immediately.
- The dealer then deals additional cards to himself until either:
    - The sum of the dealer's cards exceeds 21, in which case the player wins the round, or
    - The sum of the dealer's cards is greater than or equal to 17. If the player's total is greater than the dealer's, the player wins. Otherwise, the dealer wins (even in case of a tie).

When calculating the sum of cards, Jack, Queen, and King count for 10. Aces can count as 1 or 11. (When referring to a player's "total" above, we mean the largest total that can be made without exceeding 21. So A+8 = 19, A+8+8 = 17.)"

The task is to write the `should_hit` function, so that the player's winning rate is as high as possible.

In [1]:
import numpy as np
import matplotlib.pyplot as plt
import skfuzzy as fuzz
import pickle
import copy

np.random.seed(1234)

def save_data(filename, data):
    with open("saved_data/{}.pickle".format(filename), mode='wb') as file:
        pickle.dump(data, file)

def load_data(filename):
    with open("saved_data/{}.pickle".format(filename), mode='rb') as file:
        data = pickle.load(file)
    return data

Gameplay details

In [2]:
class Deck:
    def __init__(self):
        self.cards_left = None
        self.pointer = -1
        self.reset_deck()
        
    def reset_deck(self):
        # Coding (card to number)
        #  2-9 coded directly
        #  10,J,Q,K coded as 10
        #  A coded as -1 as a special card
        self.cards_left = np.random.permutation([2,3,4,5,6,7,8,9,10,10,10,10,-1]*4)
        self.pointer = -1
    
    def draw_card(self):
        self.pointer += 1
        return self.cards_left[self.pointer]

def evaluate(base_points, num_of_aces):
    if num_of_aces == 0:
        return base_points
    else:
        points = base_points + num_of_aces
        if points + 10 <= 21:
            points += 10
        return points

def play_game(deck, should_hit, verbose=False):
    deck.reset_deck()
    
    player_base = 0
    player_aces = 0

    dealer_base = 0
    dealer_aces = 0
    
    # Player get two cards
    for card in [deck.draw_card(), deck.draw_card()]:
        if card == -1: player_aces += 1
        else:          player_base += card
    
    # Dealer gets one card
    for card in [deck.draw_card()]:
        if card == -1: dealer_aces += 1
        else:          dealer_base += card
    
    player_score = evaluate(player_base, player_aces)
    dealer_score = evaluate(dealer_base, dealer_aces)
    
    while should_hit(player_base, player_aces, dealer_score):
        # Player hits next card
        for card in [deck.draw_card()]:
            if card == -1: player_aces += 1
            else:          player_base += card
        
        player_score = evaluate(player_base, player_aces)
        if player_score > 21:
            if verbose: print("You lose")
            return False
        
    while True:
        # Dealer hits next card
        for card in [deck.draw_card()]:
            if card == -1: dealer_aces += 1
            else:          dealer_base += card
        
        dealer_score = evaluate(dealer_base, dealer_aces)
        if dealer_score > 21:
            if verbose: print("You win")
            return True
        elif dealer_score >= 17:
            break
        
    if player_score > dealer_score:
        if verbose: print("You win")
        return True
    else:
        if verbose: print("You lose")
        return False

#### Fuzzy classifier

In [3]:
class FuzzyClasifier:
    def __init__(self, md_locs=[], risk_md_loc=5, rules=[]):
        # universe
        self.x_player_base = np.arange(0, 22)
        self.x_player_aces = np.arange(0, 5)
        self.x_dealer_init = np.arange(0, 12)
        self.x_risk = np.arange(0, 11)
        self.rules = rules

        # fuzzy membership functions
        player_base_md_loc, player_aces_md_loc, dealer_init_md_loc = md_locs

        self.player_base_lo = fuzz.trimf(self.x_player_base, [0,  0,  player_base_md_loc])
        self.player_base_md = fuzz.trimf(self.x_player_base, [0,  player_base_md_loc, 21])
        self.player_base_hi = fuzz.trimf(self.x_player_base, [player_base_md_loc, 21, 21])
        self.player_aces_lo = fuzz.trimf(self.x_player_aces, [0,  0,  player_aces_md_loc])
        self.player_aces_md = fuzz.trimf(self.x_player_aces, [0,  player_aces_md_loc,  4])
        self.player_aces_hi = fuzz.trimf(self.x_player_aces, [player_aces_md_loc,  4,  4])
        self.dealer_init_lo = fuzz.trimf(self.x_dealer_init, [0,  0,  dealer_init_md_loc])
        self.dealer_init_md = fuzz.trimf(self.x_dealer_init, [0,  dealer_init_md_loc, 11])
        self.dealer_init_hi = fuzz.trimf(self.x_dealer_init, [dealer_init_md_loc, 11, 11])
        
        self.risk_lo = fuzz.trimf(self.x_risk, [0,  0,  risk_md_loc])
        self.risk_md = fuzz.trimf(self.x_risk, [0,  risk_md_loc, 10])
        self.risk_hi = fuzz.trimf(self.x_risk, [risk_md_loc, 10, 10])
    
    
    def assess_risk(self, player_base, player_aces, dealer_init):
        membership_level = {
            'BL': fuzz.interp_membership(self.x_player_base, self.player_base_lo, player_base),
            'BM': fuzz.interp_membership(self.x_player_base, self.player_base_md, player_base),
            'BH': fuzz.interp_membership(self.x_player_base, self.player_base_hi, player_base),
            'AL': fuzz.interp_membership(self.x_player_aces, self.player_aces_lo, player_aces),
            'AM': fuzz.interp_membership(self.x_player_aces, self.player_aces_md, player_aces),
            'AH': fuzz.interp_membership(self.x_player_aces, self.player_aces_hi, player_aces),
            'IL': fuzz.interp_membership(self.x_dealer_init, self.dealer_init_lo, dealer_init),
            'IM': fuzz.interp_membership(self.x_dealer_init, self.dealer_init_md, dealer_init),
            'IH': fuzz.interp_membership(self.x_dealer_init, self.dealer_init_hi, dealer_init)
        }
        aggregate = np.zeros(11)
        
        for rule in self.rules:
            premise_1 = rule['prem_1']
            premise_2 = rule['prem_2']
            
            if rule['op'] == 'AND':
                activate_rule = np.fmin(membership_level[premise_1], membership_level[premise_2])
            elif rule['op'] == 'OR':
                activate_rule = np.fmax(membership_level[premise_1], membership_level[premise_2])
            
            target_risk = self.__get_target(rule['target'])
            risk_activation = np.fmin(activate_rule, target_risk)
            aggregate = np.fmax(risk_activation, aggregate)
        
        try:
            risk_score = fuzz.defuzz(self.x_risk, aggregate, 'centroid')
        except AssertionError:
            risk_score = 0.
        return risk_score / 10
    
    def __get_premise(self, rule):
        return \
            self.player_base_lo if rule == 'BL' else\
            self.player_base_md if rule == 'BM' else\
            self.player_base_hi if rule == 'BH' else\
            self.player_aces_lo if rule == 'AL' else\
            self.player_aces_md if rule == 'AM' else\
            self.player_aces_hi if rule == 'AH' else\
            self.dealer_init_lo if rule == 'IL' else\
            self.dealer_init_md if rule == 'IM' else\
            self.dealer_init_hi if rule == 'IH' else None

    def __get_target(self, rule):
        return \
            self.risk_lo if rule == 'RL' else\
            self.risk_md if rule == 'RM' else\
            self.risk_hi if rule == 'RH' else None

Basic decision functions and performance testing utility

In [4]:
def should_hit_always_False(player_base, player_aces, dealer_points):
    return False

def should_hit_random_50_50(player_base, player_aces, dealer_points):
    return np.random.random() > 0.5

def should_hit_generate_fuzzy(md_locs=[], risk_md_loc=5, rules=[]):
    f_clf = FuzzyClasifier(md_locs, risk_md_loc, rules)

    def should_hit(player_base, player_aces, dealer_init):
        return f_clf.assess_risk(player_base, player_aces, dealer_init) < 0.5
    
    return should_hit

def performance_testing(decision_function, n=10000):
    deck = Deck()
    win_count = 0
    
    for _ in range(n):
        if play_game(deck, decision_function): win_count += 1
    
    fraction = win_count / n
    score = ((fraction - 0.28) / (0.44 - 0.28))**2
    if fraction < 0.28: score = 0.
    
    return {'fraction': fraction, 'score': score}

#### Rule creation:

Feature 1: premise1 (player_base_lo (as `BL`\*), etc.)<br>
Feature 2: premise2 (as above; duplicates allowed)<br>
Feature 3: operation (OR/AND)<br>
Feature 4: target - risk level\**

\* first letter is B (standing for player's base), A (player's aces) or I (dealer's init) and and second letter is L (low), M (medium) or H (high);<br>
\** almost the same as the above, but here the first letter is R, standing for the risk.

#### Performance testing note

Random (50/50) decision function yields the chance of winning at about 28%. Best mathematically possible result is roughly below 44% (I happened to see an approximation at ~43,15%, but cannot find it now). The risk-averse approach of never hitting (and therefore never busting) yields the 38% win-rate.

Thus, the score is calculated in the following way:
* the range 28%-44% is turned into 0-1 range*,
* the result above is squared so that the bad results do not make up a singificant part of the avaiblable score range.

\* score over 1.0 is not mathematically possible assuming iteration number is large enough, but may easily occur when n is rather small.

In [5]:
rules = [
    {'prem_1': 'BL', 'prem_2': 'IH', 'op': 'AND', 'target': 'RL'},
    {'prem_1': 'BH', 'prem_2': 'IL', 'op': 'OR', 'target': 'RH'}
]
should_hit_fuzzy = should_hit_generate_fuzzy([15, 1, 6], 5, rules)

performance_testing(should_hit_fuzzy, n=5000)

{'fraction': 0.3958, 'score': 0.5238140624999998}

In [6]:
#random_choice = performance_testing(should_hit_random_50_50, n=100000)
#print(random_choice)
#always_stand = performance_testing(should_hit_always_False, n=100000)
#print(always_stand)

#### Monte Carlo parameter searching

In [7]:
def random_rule_generator():
    prem_1 = ['BL', 'BM', 'BH', 'AL', 'AM', 'AH', 'IL', 'IM', 'IH'][np.random.randint(9)]
    prem_2 = ['BL', 'BM', 'BH', 'AL', 'AM', 'AH', 'IL', 'IM', 'IH'][np.random.randint(9)]
    op     = ['AND', 'OR'][np.random.randint(2)]
    target = ['RL', 'RM', 'RH'][np.random.randint(3)]
    return {'prem_1': prem_1, 'prem_2': prem_2, 'op': op, 'target': target}

def md_locs_generator():
    return (
        [np.random.randint(22), np.random.randint(5), np.random.randint(12)],
        np.random.randint(11)
    )

def fuzzy_logic_kwargs_generator(n_rules=5):
    md_locs, risk_md_loc = md_locs_generator()
    rules = [random_rule_generator() for _ in range(n_rules)]
    
    return {
        'md_locs': md_locs,
        'risk_md_loc': risk_md_loc,
        'rules': rules
    }

In [8]:
kwargs = fuzzy_logic_kwargs_generator()
should_hit_random_fuzzy = should_hit_generate_fuzzy(**kwargs)
performance_testing(should_hit_random_fuzzy, n=5000)

{'fraction': 0.0, 'score': 0.0}

In [9]:
def monte_carlo_rule_finding(n=1):
    shortlist = []
    
    for _ in range(n):
        non_zero = []

        while len(non_zero) < 500:
            kwargs = fuzzy_logic_kwargs_generator()
            should_hit_random_fuzzy = should_hit_generate_fuzzy(**kwargs)

            if performance_testing(should_hit_random_fuzzy, n=15)['score'] > 0.:
                non_zero += [kwargs]

        funnel = []

        for kwargs in non_zero:
            should_hit_random_fuzzy = should_hit_generate_fuzzy(**kwargs)
            score = performance_testing(should_hit_random_fuzzy, n=100)['score']
            funnel += [(kwargs, score)]

        funnel.sort(key=lambda x: x[1], reverse=True)
        funnel = funnel[:50]

        for (kwargs, _) in funnel:
            should_hit_random_fuzzy = should_hit_generate_fuzzy(**kwargs)
            score = performance_testing(should_hit_random_fuzzy, n=500)['score']
            shortlist += [(kwargs, score)]

    shortlist.sort(key=lambda x: x[1], reverse=True)
    shortlist = shortlist[:10]
    
    best = []

    for (kwargs, _) in shortlist:
        should_hit_random_fuzzy = should_hit_generate_fuzzy(**kwargs)
        score = performance_testing(should_hit_random_fuzzy, n=2500)['score']
        best += [(kwargs, score)]

    best.sort(key=lambda x: x[1], reverse=True)
    return best[:10]

def model_tournament(winners=[], epochs=1, n_winners=10, winners_per_epoch=4):
    epochs = max(epochs, int(np.ceil(n_winners / winners_per_epoch)))
    
    for i in range(epochs):
        top = monte_carlo_rule_finding(1)[:winners_per_epoch]
        for (kwargs, _) in top:
            should_hit_random_fuzzy = should_hit_generate_fuzzy(**kwargs)
            score = performance_testing(should_hit_random_fuzzy, n=10000)['score']
            winners += [(kwargs, score)]
        winners.sort(key=lambda x: x[1], reverse=True)
        if len(winners) > n_winners: winners = winners[:n_winners]
        print("Epoch {}:".format(i))
        print("> Scores: ", end='')
        for _, score in winners:
            print("{:.4f}, ".format(score), end='')
        print()
    return winners

In [10]:
best_models = load_data("models")

print("> Scores: ", end='')
for _, score in best_models:
    print("{:.4f}, ".format(score), end='')
print()
display(best_models[0])

> Scores: 0.8384, 0.7733, 0.7689, 0.7645, 0.7482, 0.7482, 0.7418, 0.7418, 0.7375, 0.7130, 


({'md_locs': [4, 0, 1],
  'risk_md_loc': 1,
  'rules': [{'prem_1': 'IH', 'prem_2': 'AM', 'op': 'OR', 'target': 'RH'},
   {'prem_1': 'AL', 'prem_2': 'AM', 'op': 'AND', 'target': 'RM'},
   {'prem_1': 'AL', 'prem_2': 'IL', 'op': 'AND', 'target': 'RL'},
   {'prem_1': 'BM', 'prem_2': 'AM', 'op': 'AND', 'target': 'RL'},
   {'prem_1': 'AM', 'prem_2': 'AH', 'op': 'OR', 'target': 'RH'}]},
 0.8383691406249998)

In [11]:
#best_models = model_tournament(best_models, epochs=5)
#save_data("models", best_models)

#### Ensemble classifier

In [12]:
def ensemble_classifier(top_classifiers):
    unit_should_hits = [
        [should_hit_generate_fuzzy(**clf[0]), clf[1]]
        for clf in top_classifiers
    ]
    def should_hit_ensemble(player_base, player_aces, dealer_points):
        hit = 0.
        stand = 0.
        for should_hit, weight in unit_should_hits:
            if should_hit(player_base, player_aces, dealer_points):
                hit += weight
            else:
                stand += weight
        return hit > stand
    return should_hit_ensemble

In [13]:
# Classifiers are mostly uncorrelated, so we can create an enseble model
# by averaging their scores, which should reduce variability.

should_hit_ensemble = ensemble_classifier(best_models[:10])
performance_testing(should_hit_ensemble, n=25000)

{'fraction': 0.41696, 'score': 0.7327359999999999}

#### Evolutionary algorithm

In [14]:
def between(x, low, high):
    if x < low:  return low
    if x > high: return high
    else:        return x

def shift():
    a = np.random.normal()
    return int(np.ceil(np.abs(a)) * np.sign(a))
    
def mutate(kwargs):
    new_kwargs = copy.deepcopy(kwargs)
    r = np.random.random()
    if r < 0.75:
        new_md_locs = kwargs['md_locs']
        i = np.random.randint(0,3)
        if i == 0:   new_md_locs[0] = between(new_md_locs[0] + shift(), 0, 21)
        elif i == 1: new_md_locs[1] = between(new_md_locs[1] + shift(), 0, 4)
        elif i == 2: new_md_locs[2] = between(new_md_locs[2] + shift(), 0, 11)
    else:
        new_risk_md_loc = between(kwargs['risk_md_loc'] + shift(), 0, 10)
        new_kwargs['risk_md_loc'] = new_risk_md_loc
    return new_kwargs

def cross(kwargs_1, kwargs_2):
    new_kwargs = copy.deepcopy(kwargs_1)
    assert len(kwargs_1['rules']) == len(kwargs_2['rules'])
    assert len(kwargs_1['rules']) == 5
    new_kwargs['rules'] = np.random.permutation(np.where(
        np.random.randint(2, size=5), kwargs_1['rules'], kwargs_2['rules']
    ))
    return new_kwargs

def evolution_one_epoch(winners, epoch):
    for _ in range(15):
        new_kwargs = None
        r = np.random.random()
        if r < 0.5:
            k = np.random.randint(10)
            new_kwargs = mutate(winners[k][0])
        else:
            k_1, k_2 = np.random.choice([0,1,2,3,4,5,6,7,8,9], 2)
            new_kwargs = cross(winners[k_1][0], winners[k_2][0])
        
        should_hit_random_fuzzy = should_hit_generate_fuzzy(**new_kwargs)
        score = performance_testing(should_hit_random_fuzzy, n=40000)['score']
        winners += [(new_kwargs, score)]
    
    winners.sort(key=lambda x: x[1], reverse=True)
    if len(winners) > 10: winners = winners[:10]
    print("Epoch {}:".format(epoch))
    print("> Scores: ", end='')
    for _, score in winners:
        print("{:.4f}, ".format(score), end='')
    print()
    return winners

def model_evolution(winners, epochs=5):
    winners = copy.deepcopy(winners)
    for i in range(epochs):
        winners = evolution_one_epoch(winners, i)
    return winners

In [15]:
evolved_models = load_data("evolved")

print("> Scores: ", end='')
for _, score in evolved_models:
    print("{:.4f}, ".format(score), end='')
print()
display(evolved_models[0])

> Scores: 0.8777, 0.8684, 0.8585, 0.8571, 0.8530, 0.8513, 0.8441, 0.8435, 0.8427, 0.8424, 


({'md_locs': [11, 3, 10],
  'risk_md_loc': 6,
  'rules': array([{'prem_1': 'BL', 'prem_2': 'IM', 'op': 'AND', 'target': 'RH'},
         {'prem_1': 'AH', 'prem_2': 'BL', 'op': 'OR', 'target': 'RL'},
         {'prem_1': 'AH', 'prem_2': 'BL', 'op': 'OR', 'target': 'RL'},
         {'prem_1': 'AH', 'prem_2': 'AM', 'op': 'OR', 'target': 'RH'},
         {'prem_1': 'AM', 'prem_2': 'BH', 'op': 'OR', 'target': 'RM'}],
        dtype=object)},
 0.8777347656250001)

In [16]:
#evolved_models = model_evolution(best_models, epochs=5)
#evolved_models = model_evolution(evolved_models, epochs=10)
#save_data("evolved", evolved_models)

In [17]:
should_hit_ensemble = ensemble_classifier(evolved_models[:1])
performance_testing(should_hit_ensemble, n=50000)

{'fraction': 0.42256, 'score': 0.7938809999999998}