In [26]:
import tensorflow as tf
from tensorflow import keras
from collections import deque
import numpy as np
import random
import os

class DQNAgent:
    def __init__(self):
        self.state_size = 4
        self.memory = deque(maxlen=20000)
        self.discount = 0.95
        self.epsilon = 1
        self.replay_start_size = 2000
        self.batch_size = 512
        self.epsilon_min = 0
        self.epsilon_decay = (self.epsilon-self.epsilon_min)/1500
        self.model = self._build_model()

    def _build_model(self):
        if os.path.isfile('tetris-model.h5'):
            print("Model Imported")
            return keras.models.load_model('tetris-model.h5', compile=True)
        model = keras.Sequential()
        model.add(keras.layers.Dense(64, input_dim=4, activation='relu'))
        model.add(keras.layers.Dense(64, activation='relu'))
        model.add(keras.layers.Dense(1, 'linear'))
        model.compile(loss='mse', optimizer='adam')
        return model

    def add_to_memory(self, current_state, next_state, reward, done):
        self.memory.append((current_state, next_state, reward, done))

    def best_state(self, states):
        max_value = None
        best_state = None
        for state in states:
            value = self.model.predict(np.reshape(state, [1, self.state_size]))[0]
            if not max_value or value > max_value:
                max_value = value
                best_state = state
        return best_state

    def train(self):
        n = len(self.memory)
        if n >= self.replay_start_size and n >= self.batch_size:
            batch = random.sample(self.memory, self.batch_size)
            next_states = np.array([x[1] for x in batch])
            next_qs = [x[0] for x in self.model.predict(next_states)]
            x = []
            y = []
            for i, (state, _, reward, done) in enumerate(batch):
                if not done:
                    new_q = reward + self.discount*next_qs[i]
                else:
                    new_q = reward
                x.append(state)
                y.append(new_q)
            self.model.fit(np.array(x), np.array(y), batch_size=self.batch_size, epochs=3, verbose=0)
            if self.epsilon > self.epsilon_min:
                self.epsilon -= self.epsilon_decay

    def model_save(self):
        self.model.save('tetris-model.h5', overwrite=True)

In [27]:
class Tetris:
    MAP_EMPTY = 0
    MAP_BLOCK = 1
    MAP_PLAYER = 2
    BOARD_WIDTH = 15
    BOARD_HEIGHT = 30

    TETROMINOS = {
        0: {0: [(0,0), (1,0), (2,0), (3,0)], 90: [(1,0), (1,1), (1,2), (1,3)], 180: [(3,0), (2,0), (1,0), (0,0)], 270: [(1,3), (1,2), (1,1), (1,0)]},
        1: {0: [(1,0), (0,1), (1,1), (2,1)], 90: [(0,1), (1,2), (1,1), (1,0)], 180: [(1,2), (2,1), (1,1), (0,1)], 270: [(2,1), (1,0), (1,1), (1,2)]},
        2: {0: [(1,0), (1,1), (1,2), (2,2)], 90: [(0,1), (1,1), (2,1), (2,0)], 180: [(1,2), (1,1), (1,0), (0,0)], 270: [(2,1), (1,1), (0,1), (0,2)]},
        3: {0: [(1,0), (1,1), (1,2), (0,2)], 90: [(0,1), (1,1), (2,1), (2,2)], 180: [(1,2), (1,1), (1,0), (2,0)], 270: [(2,1), (1,1), (0,1), (0,0)]},
        4: {0: [(0,0), (1,0), (1,1), (2,1)], 90: [(0,2), (0,1), (1,1), (1,0)], 180: [(2,1), (1,1), (1,0), (0,0)], 270: [(1,0), (1,1), (0,1), (0,2)]},
        5: {0: [(2,0), (1,0), (1,1), (0,1)], 90: [(0,0), (0,1), (1,1), (1,2)], 180: [(0,1), (1,1), (1,0), (2,0)], 270: [(1,2), (1,1), (0,1), (0,0)]},
        6: {0: [(1,0), (2,0), (1,1), (2,1)], 90: [(1,0), (2,0), (1,1), (2,1)], 180: [(1,0), (2,0), (1,1), (2,1)],270: [(1,0), (2,0), (1,1), (2,1)]}
    }

    def __init__(self):
        self.reset()

    def reset(self):
        self.board = [[0] * Tetris.BOARD_WIDTH for _ in range(Tetris.BOARD_HEIGHT)]
        self.game_over = False
        self.bag = list(range(len(Tetris.TETROMINOS)))
        random.shuffle(self.bag)
        self.next_piece = self.bag.pop()
        self._new_round()
        self.score = 0
        return self._get_board_props(self.board)

    def _get_rotated_piece(self):
        return Tetris.TETROMINOS[self.current_piece][self.current_rotation]

    def _get_complete_board(self):
        piece = self._get_rotated_piece()
        piece = [np.add(x, self.current_pos) for x in piece]
        board = [x[:] for x in self.board]
        for x, y in piece:
            board[y][x] = Tetris.MAP_PLAYER
        return board

    def get_game_score(self):
        return self.score

    def _new_round(self):
        self.current_piece = self.next_piece
        self.next_piece = random.choice(self.bag)
        self.current_pos = [3, 0]
        self.current_rotation = random.choice([0, 90, 180, 270])
        if self._check_collision(self._get_rotated_piece(), self.current_pos):
            self.game_over = True

    def _check_collision(self, piece, pos):
        for x, y in piece:
            x += pos[0]
            y += pos[1]
            if x < 0 or x >= Tetris.BOARD_WIDTH \
                    or y < 0 or y >= Tetris.BOARD_HEIGHT \
                    or self.board[y][x] == Tetris.MAP_BLOCK:
                return True
        return False

    def _rotate(self, angle):
        r = self.current_rotation + angle
        if r == 360:
            r = 0
        if r < 0:
            r += 360
        elif r > 360:
            r -= 360
        self.current_rotation = r

    def _add_piece_to_board(self, piece, pos):
        board = [x[:] for x in self.board]
        for x, y in piece:
            board[y + pos[1]][x + pos[0]] = Tetris.MAP_BLOCK
        return board

    def _clear_lines(self, board):
        lines_to_clear = [index for index, row in enumerate(board) if sum(row) == Tetris.BOARD_WIDTH]
        if lines_to_clear:
            board = [row for index, row in enumerate(board) if index not in lines_to_clear]
            for _ in lines_to_clear:
                board.insert(0, [0 for _ in range(Tetris.BOARD_WIDTH)])
        return len(lines_to_clear), board

    def _number_of_holes(self, board):
        holes = 0
        for col in zip(*board):
            i = 0
            while i < Tetris.BOARD_HEIGHT and col[i] != Tetris.MAP_BLOCK:
                i += 1
            holes += len([x for x in col[i+1:] if x == Tetris.MAP_EMPTY])
        return holes

    def _bumpiness(self, board):
        total_bumpiness = 0
        max_bumpiness = 0
        min_ys = []
        for col in zip(*board):
            i = 0
            while i < Tetris.BOARD_HEIGHT and col[i] != Tetris.MAP_BLOCK:
                i += 1
            min_ys.append(i)
        for i in range(len(min_ys) - 1):
            bumpiness = abs(min_ys[i] - min_ys[i+1])
            max_bumpiness = max(bumpiness, max_bumpiness)
            total_bumpiness += abs(min_ys[i] - min_ys[i+1])
        return total_bumpiness, max_bumpiness

    def _height(self, board):
        sum_height = 0
        max_height = 0
        min_height = Tetris.BOARD_HEIGHT
        for col in zip(*board):
            i = 0
            while i < Tetris.BOARD_HEIGHT and col[i] == Tetris.MAP_EMPTY:
                i += 1
            height = Tetris.BOARD_HEIGHT - i
            sum_height += height
            if height > max_height:
                max_height = height
            elif height < min_height:
                min_height = height
        return sum_height, max_height, min_height

    def _get_board_props(self, board):
        lines, board = self._clear_lines(board)
        holes = self._number_of_holes(board)
        total_bumpiness, max_bumpiness = self._bumpiness(board)
        sum_height, max_height, min_height = self._height(board)
        return [lines, holes, total_bumpiness, sum_height]

    def get_next_states(self):
        states = {}
        piece_id = self.current_piece
        if piece_id == 6: 
            rotations = [0]
        elif piece_id == 0:
            rotations = [0, 90]
        else:
            rotations = [0, 90, 180, 270]

        for rotation in rotations:
            piece = Tetris.TETROMINOS[piece_id][rotation]
            min_x = min([p[0] for p in piece])
            max_x = max([p[0] for p in piece])
            for x in range(-min_x, Tetris.BOARD_WIDTH - max_x):
                pos = [x, 0]
                while not self._check_collision(piece, pos):
                    pos[1] += 1
                pos[1] -= 1
                if pos[1] >= 0:
                    board = self._add_piece_to_board(piece, pos)
                    states[(x, rotation)] = self._get_board_props(board)
        return states

    def play(self, x, rotation):
        self.current_pos = [x, 0]
        self.current_rotation = rotation
        while not self._check_collision(self._get_rotated_piece(), self.current_pos):
            self.current_pos[1] += 1
        self.current_pos[1] -= 1

        self.board = self._add_piece_to_board(self._get_rotated_piece(), self.current_pos)
        lines_cleared, self.board = self._clear_lines(self.board)
        score = 1 + (lines_cleared ** 2) * Tetris.BOARD_WIDTH
        self.score += score

        self._new_round()
        if self.game_over:
            score -= 2
        return score, self.game_over

In [None]:
env = Tetris()
agent = DQNAgent()

m = 0
episodes = 1
scores = []
for episode in range(episodes):
    current_state = env.reset()
    done = False
    while not done:
        next_states = env.get_next_states()
        best_state = agent.best_state(next_states.values())
        best_action = None
        for action, state in next_states.items():
            if state == best_state:
                best_action = action
                break
        reward, done = env.play(best_action[0], best_action[1])
        agent.add_to_memory(current_state, next_states[best_action], reward, done)
        current_state = next_states[best_action]
        print(env.get_game_score())
    score = env.get_game_score()
    scores.append(score)
    if(score > m):
        m = score
    print("Episode:  ", episode, "    ", score, "    ", m)
    agent.train()
    if(episode%10 == 0):
        agent.model_save()
agent.model_save()

Model Imported
1
2
3
4
5
6
