In [1]:
import numpy as np
import tensorflow as tf
from tensorflow import keras
from keras.models import Sequential 
from keras.layers import Dense, Activation
from keras.optimizers import Adam
from keras.metrics import mean_squared_error
from matplotlib import pyplot as plt


In [2]:
RACK_MAX = 7
FV_WEIGHT_NUM = 234

import random
import os
import numpy as np

import scrabbler as sc
from scrabbler.dictionary import Dictionary
import utilities.logger as logger

RACK_MAX = 7

LETTER_VALUE = {}
with open("resources/scrabble/tile_list.txt") as f:
    for line in f:
        (key, val) = line.split()
        LETTER_VALUE[key] = int(val)

script_dir = os.path.dirname("scrabble_dqn.ipynb")
resource_dir = os.path.join(script_dir, "resources")
resource_directory = os.path.join(resource_dir, "scrabble")
saved_dictionary_path = os.path.join(resource_directory, "dictionary.p")

logger.info("loading saved dictionary file...")
global_dictionary = Dictionary.load_from_pickle(saved_dictionary_path)
bag_o = ["A", "A", "A", "A", "A", "A", "A", "A", "A",
         "B", "B",
         "C", "C",
         "D", "D", "D", "D",
         "E", "E", "E", "E", "E", "E", "E", "E", "E", "E", "E", "E",
         "F", "F",
         "G", "G", "G",
         "H", "H",
         "I", "I", "I", "I", "I", "I", "I", "I", "I",
         "J",
         "K",
         "L", "L", "L", "L",
         "M", "M",
         "N", "N", "N", "N", "N", "N",
         "O", "O", "O", "O", "O", "O", "O", "O",
         "P", "P",
         "Q",
         "R", "R", "R", "R", "R", "R",
         "S", "S", "S", "S",
         "T", "T", "T", "T", "T", "T",
         "U", "U", "U", "U",
         "V", "V",
         "W", "W",
         "X",
         "Y", "Y",
         "Z"]


print("Scrabble game initialized")


2023-05-02 14:35:08 INFO: loading saved dictionary file...
Scrabble game initialized


In [4]:
# This class contains information and control flow for an agent to make decisions based off experiences using DQN

class DQNAgent:
    def __init__(self, state_size, action_size):
        self.n_actions = action_size 
        # we define some parameters and hyperparameters:
        # "lr" : learning rate
        # "gamma": discounted factor
        # "exploration_proba_decay": decay of the exploration probability
        # "batch_size": size of experiences we sample to train the DNN
        self.lr = 0.001
        self.gamma = 0.99
        self.exploration_proba = 1.0
        self.exploration_proba_decay = 0.005 
        self.batch_size = 32 

        # We define our memory buffer where we will store our experiences
        # We stores only the 2000 last time steps
        self.memory_buffer= list()
        self.max_memory_buffer = 30000
        
        # We creaate our model having to hidden layers of 24 units (neurones)
        # The first layer has the same size as a state size
        # The last layer has the size of actions space
        self.model = Sequential([
            Dense(units=24,input_dim=state_size, activation = 'relu'),
            Dense(units=24,activation = 'relu'),
            Dense(units=action_size, activation = 'linear')
        ])
        self.model.compile(loss="mse",
                      optimizer = Adam(learning_rate=self.lr))
        
    # The agent computes the action to perform given a state 
    def compute_action(self, current_state):
        # epsilon greedy policy:
        if np.random.uniform(0,1) < self.exploration_proba:
            return np.random.choice(range(self.n_actions))
        q_values = self.model.predict(current_state)[0]
        return np.argmax(q_values)
    
    # when an episode is finished, we update the exploration probability using 
    # espilon greedy algorithm
    def update_exploration_probability(self):
        self.exploration_proba = self.exploration_proba * np.exp(-self.exploration_proba_decay)
        print(self.exploration_proba)

    # At each time step, we store the corresponding experience
    def store_episode(self,current_state, action, reward, next_state, done):
        #We use a dictionnary to store them
        self.memory_buffer.append({
            "current_state":current_state,
            "action":action,
            "reward":reward,
            "next_state":next_state,
            "done" :done
        })
        # If the size of memory buffer exceeds its maximum, we remove the oldest experience
        if len(self.memory_buffer) > self.max_memory_buffer:
            self.memory_buffer.pop(0)

    # At the end of each episode, we train our model
    def train(self):
        # We shuffle the memory buffer and select a batch size of experiences
        np.random.shuffle(self.memory_buffer)
        batch_sample = self.memory_buffer[0:self.batch_size]
        
        # We iterate over the selected experiences
        for experience in batch_sample:
            # We compute the Q-values of S_t
            q_current_state = self.model.predict(experience["current_state"])
            # We compute the Q-target using Bellman optimality equation
            q_target = experience["reward"]
            if not experience["done"]:
                q_target = q_target + self.gamma*np.max(self.model.predict(experience["next_state"])[0])
            q_current_state[0][experience["action"]] = q_target
            # train the model
            self.model.fit(experience["current_state"], q_current_state, verbose=0)

In [10]:
def choose_move(actions, state, agent):
    # start by running an episode for the first move:
    eps = 0.01
    r = random.uniform(0, 1)
    if r <= eps: # epsilon greedy algorithm
        move = random.choice(actions)
    else:
        # the parameterized policy will select the move, given the state (board, bag, played, etc.)
        moveagent.compute_action(state)
        
    return move

############################################################################################
# Function: state_vectorize
# inputs: board
# reasoning: to take the board which is in proprietery state, and change it to a matrix of 
# size (columns, rows, # of letters)
# This is to be then fed into the DQN neural network evaluation
# output: state, size of (15, 15, 26)
############################################################################################

def state_vectorize(board):
    rows, cols, n = (15, 15, 26) # declare size of the matrix: (#rows, #cols, #letters)
    vec = [[[0 for k in range(n)] for j in range(rows)] for i in range(cols)]
    for i in range(rows):
        for j in range(cols):
            if board.square(i, j)._tile: # If this tile has a letter...
                vec[i][j][ord(board.square(i, j)._tile) - 65] = 1 # One hot encode to the appropriate location
                # e.g.: if 'A' is at top left corner:
                # i = 0, j = 0
                # ord(board.square(0, 0)._tile) - 65 = 0.
                # So we set the position (0, 0, 0) to be 1.



In [5]:
state_size = 234 # Our feature vector is 234 elements
action_size = 20 # We are choosing from 20 actions
n_episodes = 400 
max_iteration_ep = 500 
agent = DQNAgent(state_size, action_size)
total_steps = 0
batch_size = 20

In [23]:
for e in range(n_episodes):
    bag = bag_o.copy()
    random.shuffle(bag)
    score1 = 0  # resetting the scores and bag:
    score2 = 0
    game = sc.Game(filename="/Users/sbrosh1/Documents/GitHub/scrabbler/games/start_state.p",
                   global_dictionary=global_dictionary, enable_logger=False)

    rack1 = ""
    rack2 = ""
    for _ in range(RACK_MAX):
        rack1 = rack1 + bag.pop()
        rack2 = rack2 + bag.pop()

    while len(bag) > 0:
        moves = game.find_best_moves(rack1, num=20)
        if moves:
            move = choose_move(moves)
            game.play(move.start_square, move.word, move.direction)
            score1 = score1 + move.score
            if move.score > 100:
                print("SCORE: ", move.score)
                print("MOVE: ", move.word)
            for i in range(len(move.word)):
                if len(bag) > 0:
                    rack1 = rack1.replace(move.word[i], bag.pop(), 1)
                else:
                    rack1 = rack1.replace(move.word[i], '', 1)

        else:
            for l in range(len(rack1)):
                if LETTER_VALUE[rack1[l]] > 4:
                    bag.append(rack1[l])
                    random.shuffle(bag)
                    rack1 = rack1.replace(rack1[l], bag.pop(), 1)

        # Opponents turn:
        moves = game.find_best_moves(rack2, num=20)
        if moves:
            game.play(moves[0].start_square,moves[0].word, moves[0].direction)
            score2 = score2 + moves[0].score

            for i in range(len(moves[0].word)):
                if len(bag) > 0:
                    rack2 = rack2.replace(moves[0].word[i], bag.pop(), 1)
                else:
                    rack2 = rack2.replace(moves[0].word[i], '', 1)

        else:
            for l in range(len(rack2)):
                if LETTER_VALUE[rack2[l]] > 4:
                    bag.append(rack2[l])
                    random.shuffle(bag)
                    rack2 = rack2.replace(rack2[l], bag.pop(), 1)

    print(score1 - score2)


36
-102
-141
7
76
-113
132
-52
-131
-19
17
-58
-29
-86
-46
-15
24
-47
5
-57
218
16
-107
-82
-156
-57
47
-23
4
172
53
28
-4
-16


KeyboardInterrupt: 