In [1]:
import numpy as np
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras.layers import Input, Dense, BatchNormalization, ReLU, Dropout
from tensorflow.keras.models import Model, clone_model
from itertools import product

def make_states(n):
    states = np.zeros((n,8))
    for i in np.arange(n):
        idx = np.random.choice(8, size=3, replace=False)
        states[i, idx] = np.random.uniform(0.0, 1.0, size=3)
    return states
    
def run(state, action):
    if state[action] != 0:
        return state[action]
    else:
        before, after = state[(action - 1) % 8], state[(action + 1) % 8]
        if before != 0 and after != 0:
            return before + after
        else:
            return 0

def get_optimal_action_and_value(state):
    values = np.array([run(state, action) for action in np.arange(8)])
    optimal_action = values.argmax()
    return optimal_action, values[optimal_action]

def get_optimal_actions_and_values(state_list):
    res = [get_optimal_action_and_value(state) for state in state_list]
    actions = np.array([pairs[0] for pairs in res])
    values = np.array([pairs[1] for pairs in res])
    return actions, values
    
def get_optimal_statics(n_rounds):
    state_list = make_states(n_rounds)
    _, values = get_optimal_actions_and_values(state_list)
    return np.mean(values), np.std(values)

def get_random_policy_statics(n_rounds):
    values = [run(state, np.random.choice(8)) for state in make_states(n_rounds)]
    return np.mean(values), np.std(values)

def get_model_actions(model, state_list):
    return model(np.array(state_list)).numpy().argmax(axis = 1)

def test_model(model, n_test_rounds):
    state_list = make_states(n_test_rounds)
    actions = get_model_actions(model, state_list)
    values = [run(state, action) for state, action in zip(state_list, actions)]
    optimal_actions, _ = get_optimal_actions_and_values(state_list)
    accuracy = np.mean(actions == optimal_actions)
    return np.mean(values), np.std(values), accuracy

def one_batch(model, optimizer, batch_size=128, n_test_rounds=10000):
    states = make_states(batch_size)
    _, optimal_values = get_optimal_actions_and_values(states)
    with tf.GradientTape() as tape:
        probs = model(states)

        actions = np.array([np.random.choice(8, p=prob) for prob in probs.numpy()])
        rewards = np.array([run(state, action) for state, action in zip(states, actions)])
        rewards = rewards - optimal_values
        rewards = (rewards - np.mean(rewards)) / (np.std(rewards) + 1e-8)

        action_probs = tf.gather_nd(probs, [[i, a] for i, a in enumerate(actions)])
        log_probs = tf.math.log(action_probs)
        loss = -tf.reduce_mean(log_probs * rewards)
    
    gradients = tape.gradient(loss, model.trainable_variables)
    optimizer.apply_gradients(zip(gradients, model.trainable_variables))

def train(model, optimizer, one_batch, max_batch=200, batch_size=32, test_lapse = 10, n_test_rounds=10000, verbose = 0):
    best_weights = []
    best_idx = 0
    best_score = 0
    for i in np.arange(max_batch):
        one_batch(model, optimizer, batch_size)
        if (i+1) % test_lapse == 0 or i + 1 == max_batch:
            score, _, accuracy = test_model(model, n_test_rounds)
            if best_score < score:
                best_score = score
                best_idx = i
                best_weights = model.get_weights()
            if verbose == 1:
                print(i+1, score, accuracy)
    return best_idx, best_score, best_weights

def create_model(n_hidden_layers, n_dense_units, ratio_dropout):
    input_shape = (8,) 
    inputs = Input(shape=input_shape)

    x = inputs
    for i in np.arange(n_hidden_layers):
        x = Dense(n_dense_units)(x) 
        x = BatchNormalization()(x)
        x = ReLU()(x)
        x = Dropout(ratio_dropout)(x)
    
    outputs = Dense(8, activation='softmax')(x)
    model = Model(inputs, outputs)
    return model




In [2]:
model = create_model(n_hidden_layers=2, n_dense_units=256, ratio_dropout=0.2)
optimizer = tf.keras.optimizers.Adam(learning_rate=0.001)

In [3]:
best_idx, best_score, best_weights = train(model, optimizer, one_batch, max_batch=1000, batch_size=128, test_lapse = 20, n_test_rounds=10000, verbose = 1)

20 0.7339654840427151 0.4197
40 0.7318226986132617 0.4122
60 0.7334543882296721 0.4015
80 0.7397083137278935 0.4244
100 0.7631010915801264 0.4549
120 0.7760599521337408 0.4729
140 0.7788991718142902 0.4997
160 0.7851579011504027 0.5178
180 0.7943545117174379 0.5216
200 0.7979974383209517 0.5188
220 0.8090956493070987 0.5481
240 0.8157016234142183 0.5532
260 0.8245638989394385 0.5661
280 0.8305260406706828 0.5805
300 0.8328790437151999 0.5956
320 0.8318754286374253 0.5906
340 0.8353415596053414 0.5839
360 0.835951840604878 0.6094
380 0.8483075019872139 0.6142
400 0.8500449676547116 0.6338
420 0.8361121828732999 0.6034
440 0.8477111224868346 0.6217
460 0.8568629147846292 0.656
480 0.861327568371441 0.645
500 0.8613537610016272 0.6523
520 0.8634251765085188 0.657
540 0.8647760004490452 0.6642
560 0.872109582741807 0.6708
580 0.8664021290651801 0.672
600 0.8669640810531977 0.6798
620 0.874615604978736 0.6841
640 0.8651986231690231 0.688
660 0.8759120958599056 0.7067
680 0.8741307525899726 

In [4]:
test_model(model, 10000)

(np.float64(0.8831582976087564),
 np.float64(0.37827467223514566),
 np.float64(0.7421))

In [5]:
best_model = clone_model(model)
best_model.set_weights(best_weights)
test_model(best_model, 10000)

(np.float64(0.8885815101340466),
 np.float64(0.3783373587545388),
 np.float64(0.7435))