In [6]:
# region ----------Imports ----------
from SnakeBoard import SnakeBoard
from SnakeGame import SnakeGame
from NeuralNetwork import NeuralNetwork
import numpy as np
import time
import matplotlib.pyplot as plt
import pickle
from concurrent.futures import ThreadPoolExecutor
#endregion

# Run a game instance following the ANN inputs until
# it's "Game Over"
def play_game_instance(game, ann, idx):
    while 1:
        # Get current game state 
        state = game.get_game_state()
        
        # Feed ANN with game state and calc. next move
        next_move = ann.calculate(state) 

        # Step game based on ANN input
        [game_over, w_score, score] = game.step_game(next_move)
        
        if game_over:
            break # End of the game!
    
    return [game_over, w_score, score, idx]

# Run all game instances in parallel
def run_games_parallel(s_games, s_ann):
    game_status = list() 
    # Go into each game to step individually (parallel)
    with ThreadPoolExecutor() as executor:
        # Play game instance until finishing / game-over
        futures = [executor.submit(play_game_instance,
                                   s_games[idx_game],
                                   s_ann[idx_game], 
                                   idx_game) for idx_game in range(len(s_games))]

        # Collect the results for each played game instance
        [game_status.append({
            "game_over":    future.result()[0], 
            "w_score":      future.result()[1], 
            "score":        future.result()[2],
            "idx_game":     future.result()[3]}) for future in futures]  
    
    return game_status

# Run all game instances concomitantly in series (as a 'RTOS scheduler')
def run_games_series(s_games, s_ann, s_board, show_visuals, manual_play):
    while True:     
        game_status = list() 
        # Go into each game to step individually (series)
        for idx_game in range(len(s_games)):
            # Get current game state 
            state = s_games[idx_game].get_game_state()
           
            # If showing visuals, get key
            # (even in auto-play mode it's important to don't freeze - pygame needs to be queried to events every ~5s)
            if show_visuals:
                next_move = s_games[idx_game].get_key()            
            
            # Detect pause behavior
            if manual_play == 1 and next_move == "PAUSE":
                while 1:
                    next_move = s_games[idx_game].get_key()
                    if next_move != "IDLE":
                        break                    
            elif manual_play == 0:    
                # Feed ANN with game state and calc. next move
                next_move = s_ann[idx_game].calculate(state) 

            # Step game based on ANN input 
            [game_over, w_score, score] = s_games[idx_game].step_game(next_move)

            # Collect the results for each played game instance
            game_status.append({"game_over":    game_over, 
                                "score":        score, 
                                "w_score":      w_score, 
                                "idx_game":     idx_game})       

        # Update graphics of all games (visual feedback)
        s_board.clear_board()
        s_board.update_board_elements()

        # If all game instances are over, finish current generation
        if np.min([g["game_over"] for g in game_status])==True:
            break
    
    return game_status
    
    

In [None]:
# region ---------- User defined parameters ----------
# Miscellaneous parameters
restore_weights_prev_training = 0
manual_play = 0 # Get user input (keyboard) instead of neural network auto-play
show_visuals = 0 # Show the games (1) or just play and calculated in the back-end (0)
t_between_gen = 0 # Time (secs) between generations
n_gens_2_save_weights = 10 # Num of generations elapsed to save weights in a file

# Training parameters
n_of_gens = 10 # Number of training generations
n_games_per_gen = 1000 # Number of parallel games per generation
selected_games_per_gen = 20 # Selected baselines per generation to be used as references for mutations

# Mutation parameters
mrate_bias, mrate_weights = 0.05, 0.05
msize_bias, msize_weights = 0.2, 0.2
#endregion

# ---------- Machine Learning main logic ---------- 
# Restore weights from previous training if required
if restore_weights_prev_training == 1:
    with open('./trainingHistory.bin', 'rb') as file:
        fileDataLoaded = pickle.load(file)
        file.close()

# Create 'N' games + their ANN instances
record_score, record_w_score = 0, 0
s_games, s_ann = [] , []
for idx in range(n_games_per_gen):
    s_games.append(SnakeGame())
    s_ann.append(NeuralNetwork())
    if restore_weights_prev_training == 1:
        s_ann[idx].set_weights_biases(fileDataLoaded["ann_weights_history"][-1][idx].weights,
                                      fileDataLoaded["ann_weights_history"][-1][idx].biases)
s_board = SnakeBoard(s_games)

# Create history of the best scores and ANN weights
game_status_history, ann_weights_history = list(), list()
if restore_weights_prev_training == 1:
    game_status_history = fileDataLoaded["game_status_history"]
    ann_weights_history = fileDataLoaded["ann_weights_history"]

s_board.init_board()

# Run number of generations
for idx_gen in range(n_of_gens):
    # 1 Play all games in current generation (until all games are over)
    #game_status = run_games_parallel(s_games, s_ann)
    game_status = run_games_series(s_games, s_ann, s_board, show_visuals, manual_play)
    
    # 2 Get best scores and ANN weights in prev. generation + save history for the last generation
    game_status.sort(key=lambda x:x["score"],reverse =True) # Sort from best to worst game
    game_status_history.append(list(o.copy() for o in game_status)) # Game status history copy
    ann_weights_history.append(list(o.copy() for o in s_ann)) # ANN weights history copy

    # 3 If best score in curr. gen is an all-time record, save it
    #if game_status[0]["w_score"] > record_w_score: # Based on weighed score
    if game_status[0]["score"] > record_score: # Based on normal score
        record_w_score = game_status[0]["w_score"]
        record_score = game_status[0]["score"]
    
    # 4 Get the best "selected_games_per_gen" games in the current generation
    # and place them in the first positions
    for i in range(selected_games_per_gen):
        s_ann[i] = s_ann[game_status[i]["idx_game"]].copy()
    
    # 5 Mutate the best ones in the subsequent positions
    for i in range(selected_games_per_gen, n_games_per_gen):
        s_ann[i] = s_ann[np.mod(i,selected_games_per_gen)].copy()
        s_ann[i].mutate(mrate_weights, msize_weights, mrate_bias, msize_bias) # random mutations

    # 6 Reset all games
    for idx_game, game in enumerate(s_games):
        game.reset_game()

    # 7 Save weights in an external file when applicable
    if np.mod(idx_gen,n_gens_2_save_weights)==0:
        with open('./trainingHistory.bin', 'wb') as file:
            fileData = {"game_status_history": game_status_history, 
                        "ann_weights_history": ann_weights_history}
            pickle.dump(fileData, file)
            file.close()
    
    print("GEN ", idx_gen, " --- BEST SCORE: ", game_status[0]["score"] , " / ", 
            game_status[0]["w_score"] , " --- RECORD: ", record_score, " / ", record_w_score)
    time.sleep(t_between_gen)

s_board.quit_board()

SnakeBoard instance created.
GEN  0  --- BEST SCORE:  4  /  111  --- RECORD:  4  /  111
GEN  1  --- BEST SCORE:  8  /  231  --- RECORD:  8  /  231
GEN  2  --- BEST SCORE:  11  /  268  --- RECORD:  11  /  268
GEN  3  --- BEST SCORE:  7  /  208  --- RECORD:  11  /  268
GEN  4  --- BEST SCORE:  10  /  311  --- RECORD:  11  /  268
GEN  5  --- BEST SCORE:  15  /  483  --- RECORD:  15  /  483
GEN  6  --- BEST SCORE:  15  /  488  --- RECORD:  15  /  483
