In [None]:
import sys
import os
import matplotlib.pyplot as plt
import tensorflow as tf
import math
from typing import Optional, Union
import numpy as np
import gym
from gym import logger, spaces
from gym.envs.classic_control import utils
from gym.error import DependencyNotInstalled
import csv
from datetime import datetime

# Cartpole environment and physical model
class CartPoleEnv(gym.Env[np.ndarray, Union[int, np.ndarray]]):
    metadata = {
        "render_modes": ["human", "rgb_array"],
        "render_fps": 50,
    }

    def __init__(self, render_mode: Optional[str] = None):
        
        # Physical Variables
        self.gravity = 9.81    
        self.masscart = 1.0   
        self.masspole = 0.5   
        self.length = 0.5      # actually half the pole's length
        self.force_mag = 20.0  # step force, +1 or -1
        self.tau = 0.02        # timeframe(dt), seconds between state updates
        
        self.polemass_length = self.masspole * self.length
        self.total_mass = self.masspole + self.masscart
        self.kinematics_integrator = "rk4"

        self.x_threshold = 10

        high = np.array(
            [
                self.x_threshold * 2,
                np.finfo(np.float32).max,
                np.finfo(np.float32).max,
                np.finfo(np.float32).max,
            ],
            dtype=np.float32,
        )
        
        self.observation_space = spaces.Box(-high, high, dtype=np.float32)

        self.action_space = spaces.Discrete(2)
        self.render_mode = render_mode

        # Screen variables
        self.screen_width = 800
        self.screen_height = 600
        self.screen = None
        self.clock = None
        self.isopen = True
        self.state = None

        self.steps_beyond_terminated = None

    def step(self, action):
        err_msg = f"{action!r} ({type(action)}) invalid"
        assert self.action_space.contains(action), err_msg
        assert self.state is not None, "Call reset before using step method."
        force = self.force_mag if action == 1 else -self.force_mag
        
        # ODE, RK4, euler and semi-euler methods
        def ode(state, force):
            x, x_dot, theta, theta_dot = self.state
            
            costheta = math.cos(theta)
            sintheta = math.sin(theta)

            temp = (
                force + self.polemass_length * theta_dot**2 * sintheta
            ) / self.total_mass
            thetaacc = (self.gravity * sintheta - costheta * temp) / (
                self.length * (4.0 / 3.0 - self.masspole * costheta**2 / self.total_mass)
            )
            xacc = temp - self.polemass_length * thetaacc * costheta / self.total_mass
            
            state_dot = x_dot, xacc, theta_dot, thetaacc
            return state_dot
        
        def state_toArray(state):
            x, x_dot, theta, theta_dot = state
            state_dot = np.zeros(4)
            state_dot[0] = x
            state_dot[1] = x_dot
            state_dot[2] = theta
            state_dot[3] = theta_dot
            return state_dot
            
        def array_toState(arr):
            return arr[0], arr[1], arr[2], arr[3]
            
        def euler_state(state, force):
            x, __ , theta, __ = state
            x_dot, xacc, theta_dot, thetaacc = ode(state, force)

            x = x + self.tau * x_dot
            x_dot = x_dot + self.tau * xacc
            theta = theta + self.tau * theta_dot
            theta_dot = theta_dot + self.tau * thetaacc
            
            new_state = x, x_dot, theta, theta_dot
            return new_state
        
        def rk4_state(state, force):
            dt = self.tau
            
            k1 = dt * state_toArray(ode(state, force))
            k2 = dt * state_toArray(ode(state + k1 / 2, force))
            k3 = dt * state_toArray(ode(state + k2 / 2, force))
            k4 = dt * state_toArray(ode(state + k3, force))
            
            state_arr = state_toArray(state)
            result_arr = state_arr + ((k1 + 2 * k2 + 2 * k3 + k4) / 6)   
            
            return array_toState(result_arr)
        
        def semi_state(state,force):
            x, __ , theta, __ = state
            x_dot, xacc, theta_dot, thetaacc = ode(state, force)

            x_dot = x_dot + self.tau * xacc
            x = x + self.tau * x_dot
            theta_dot = theta_dot + self.tau * thetaacc
            theta = theta + self.tau * theta_dot
            
            new_state = x, x_dot, theta, theta_dot
            return new_state

        # Call state function from above
        if self.kinematics_integrator == "euler":
            self.state = euler_state(self.state, force)
        elif self.kinematics_integrator == "rk4":
            self.state = rk4_state(self.state, force)
        else:
            self.state = semi_state(self.state, force)

        x, x_dot, theta, theta_dot = self.state
        
        ## Terminated Condition
        # Case 1. Out of bound = Terminated
        terminated = bool(
            x < -self.x_threshold
            or x > self.x_threshold
        )
        
        # Case 2. Reach the goal = Finished
        finished = goal_check(self.state)
        goal_reward = 15000
        
        if not terminated:
            
            if finished:      # Finished, Not Terminated
                reward = goal_reward
                terminated = True
                self.steps_beyond_terminated = 0
                
            else:             # Not Finished, Not Terminate, Normal case
                reward = reward_function(self.state, self.masspole, self.gravity,
                                         self.length, self.x_threshold)
                
        else:
            if self.steps_beyond_terminated is None:
                self.steps_beyond_terminated = 0
                
                if finished:   # Finished And Terminated
                    reward = goal_reward
                    
                else:          # Not Finished, but Terminated = Out of boundary
                    reward = -10000
                    
            else:
                if self.steps_beyond_terminated == 0:
                    logger.warn(
                        "You are calling 'step()' even though this "
                        "environment has already returned terminated = True. You "
                        "should always call 'reset()' once you receive 'terminated = "
                        "True' -- any further steps are undefined behavior."
                    )
                self.steps_beyond_terminated += 1
                reward = 0.0
                  
            
        if self.render_mode == "human":
            self.render()
        return np.array(self.state, dtype=np.float32), reward, terminated, False, {}

    def reset(
        self,
        *,
        seed: Optional[int] = None,
        options: Optional[dict] = None,
    ):
        super().reset(seed=seed)
        
        # Initial State
        self.state = (0, 0, math.pi, 0)
        self.steps_beyond_terminated = None

        if self.render_mode == "human":
            self.render()
        return np.array(self.state, dtype=np.float32), {}

    def render(self):
        if self.render_mode is None:
            gym.logger.warn(
                "You are calling render method without specifying any render mode. "
                "You can specify the render_mode at initialization, "
                f'e.g. gym("{self.spec.id}", render_mode="rgb_array")'
            )
            return

        try:
            import pygame
            from pygame import gfxdraw
        except ImportError:
            raise DependencyNotInstalled(
                "pygame is not installed, run `pip install gym[classic_control]`"
            )

        if self.screen is None:
            pygame.init()
            if self.render_mode == "human":
                pygame.display.init()
                self.screen = pygame.display.set_mode(
                    (self.screen_width, self.screen_height)
                )
            else:  # mode == "rgb_array"
                self.screen = pygame.Surface((self.screen_width, self.screen_height))
        if self.clock is None:
            self.clock = pygame.time.Clock()

        world_width = self.x_threshold * 2
        scale = self.screen_width / world_width
        polewidth = 5.0
        polelen = scale * (2 * self.length)
        cartwidth = 40.0
        cartheight = 20.0

        if self.state is None:
            return None

        x = self.state

        self.surf = pygame.Surface((self.screen_width, self.screen_height))
        self.surf.fill((255, 255, 255))

        l, r, t, b = -cartwidth / 2, cartwidth / 2, cartheight / 2, -cartheight / 2
        axleoffset = cartheight / 4.0
        cartx = x[0] * scale + self.screen_width / 2.0  # MIDDLE OF CART
        carty = 250                                     # TOP OF CART
        cart_coords = [(l, b), (l, t), (r, t), (r, b)]
        cart_coords = [(c[0] + cartx, c[1] + carty) for c in cart_coords]
        gfxdraw.aapolygon(self.surf, cart_coords, (0, 0, 0))
        gfxdraw.filled_polygon(self.surf, cart_coords, (0, 0, 0))

        l, r, t, b = (
            -polewidth / 2,
            polewidth / 2,
            polelen - polewidth / 2,
            -polewidth / 2,
        )

        pole_coords = []
        for coord in [(l, b), (l, t), (r, t), (r, b)]:
            coord = pygame.math.Vector2(coord).rotate_rad(-x[2])
            coord = (coord[0] + cartx, coord[1] + carty + axleoffset)
            pole_coords.append(coord)
        gfxdraw.aapolygon(self.surf, pole_coords, (202, 152, 101))
        gfxdraw.filled_polygon(self.surf, pole_coords, (202, 152, 101))

        gfxdraw.aacircle(
            self.surf,
            int(cartx),
            int(carty + axleoffset),
            int(polewidth / 2),
            (129, 132, 203),
        )
        gfxdraw.filled_circle(
            self.surf,
            int(cartx),
            int(carty + axleoffset),
            int(polewidth / 2),
            (129, 132, 203),
        )

        gfxdraw.hline(self.surf, 0, self.screen_width, carty, (0, 0, 0))

        self.surf = pygame.transform.flip(self.surf, False, True)
        self.screen.blit(self.surf, (0, 0))
        if self.render_mode == "human":
            pygame.event.pump()
            self.clock.tick(self.metadata["render_fps"])
            pygame.display.flip()

        elif self.render_mode == "rgb_array":
            return np.transpose(
                np.array(pygame.surfarray.pixels3d(self.screen)), axes=(1, 0, 2)
            )

    def close(self):
        if self.screen is not None:
            import pygame

            pygame.display.quit()
            pygame.quit()
            self.isopen = False

In [None]:
# Goal check function
def goal_check(state):
    x, x_dot, theta, theta_dot = state
    
    theta_threshold = 5.0      # degree 
    theta_dot_threshold = 1.0  # omega
    
    theta_deg = math.degrees(theta)

    # Return true if fulfill all requirements
    if (abs(theta_deg) < theta_threshold
       and abs(theta_dot) < theta_dot_threshold):
        return True
    else:
        return False

In [None]:
# Reward function
def reward_function(state, m, g, h, x_max):
    
    x, x_dot, theta, theta_dot = state
    theta = theta % (2.0*math.pi)
    
    # Energy Reward 0 to 10000
    target_energy = 2.0 * m * g * h 
    rate_100 = 100 / target_energy
    
    inertia = (1.0 / 3.0) * m * (2.0 * h)**2
    kinetic_energy = 0.5 * inertia * theta_dot**2
    potential_energy = m * g * h * (1.0 + math.cos(theta))
    current_energy = kinetic_energy + potential_energy
    
    energy_difference_100 = 100.0 - abs(100.0 - current_energy * rate_100   )
    energy_reward = energy_difference_100 ** 2

    
    # Position Panalty 0 to 10000
    warning_border = x_max - 3.0
    position_panalty = 0.0
    if (abs(x) > warning_border):
        out_of_bound = (abs(x) - warning_border) / 3.0
        position_panalty = (100.0 * out_of_bound) ** 2
        
    
    reward = energy_reward - position_panalty
    return reward

In [None]:
# Export functions for csv and diagrams

def ang2abs(theta):
    theta = theta % (2 * math.pi)
    return abs(theta - math.pi)

# save the datapoints and plot in the end
class DataPlotter:
    def __init__(self, run):
        self.min_reward = []
        self.max_reward = []
        self.explore_data = []
        self.max_angle = []
        self.scores = []
        self.cur_reward = []
        self.cur_angle = []
        self.final_reward = []
        self.runs = [i for i in range(run)]
        self.f_name = "a"
        
    def add_reward_angle(self, reward, angle):
        self.cur_reward.append(reward)
        self.cur_angle.append(ang2abs(angle))
        
    def append(self, exp, score, rf):
        
        # append
        self.min_reward.append(min(self.cur_reward))
        self.max_reward.append(max(self.cur_reward))
        self.max_angle.append(max(self.cur_angle))
        self.explore_data.append(exp)
        self.scores.append(score)
        self.final_reward.append(rf)
        
        # clear
        self.cur_reward.clear()
        self.cur_angle.clear()
        
    def plot(self):

        fig, ax = plt.subplots(2, 2, figsize=(10, 8))
            
        # graph 1
        ax[0, 0].plot(self.runs, self.explore_data, color = 'blue')
        ax[0, 0].set_ylabel('Percentage')
        ax[0, 0].set_xlabel('Run')
        ax[0, 0].set_title('Exploration rate')
        
        # graph 2
        ax[0, 1].plot(self.runs, self.max_reward,  color = 'green')
        ax[0, 1].set_ylabel('Reward')
        ax[0, 1].set_xlabel('Run')
        ax[0, 1].set_title('Max Reward')
        
        # graph 3
        ax[1, 0].plot(self.runs, self.max_angle, color = 'red')
        ax[1, 0].set_ylabel('Degree')
        ax[1, 0].set_xlabel('Run')
        ax[1, 0].set_title('Max Angle')
        
        # graph 4
        ax[1, 1].plot(self.runs, self.scores, color = 'purple')
        ax[1, 1].set_ylabel('Score')
        ax[1, 1].set_xlabel('Run')
        ax[1, 1].set_title('Score')
        
        # Adjust the spacing between subplots
        plt.subplots_adjust(hspace=0.5)
        figPath = os.path.join(self.f_name, "result")
        plt.savefig(figPath)
        
    # create the folder
    def folder(self):
        self.f_name = create_folder_and_csv()
        
    # create the CSV file in the folder    
    def toCSV(self):
        filename = self.f_name
        filename = os.path.join(filename, "data.csv")
        with open(filename, 'w', newline='') as csvfile:
            writer = csv.writer(csvfile)
            writer.writerow(['Reward', 'Exploration rate', 'Max Angle',
                             'Scores', 'Final reward'])  # Write header row
            for row in zip(self.max_reward, self.explore_data, self.max_angle,
                           self.scores, self.final_reward):
                writer.writerow(row)
                
def create_folder_and_csv():
    # Get current time
    current_time = datetime.now().strftime("%Y-%m-%d_%H-%M-%S")

    # Create a new folder
    folder_name = f"result_{current_time}"
    os.makedirs(folder_name)

    # Create a CSV file inside the folder
    return folder_name

In [None]:
# DQN Training Model

import random
from collections import deque
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense
from tensorflow.keras.optimizers import Adam

ENV_NAME = "CartPole"

MEMORY_SIZE = 1000000

EXPLORATION_MAX = 1.0
EXPLORATION_MIN = 0.01

class DQNSolver():

    def __init__(self, observation_space, action_space, exploration_decay,
                 batch_size, learing_rate, gamma):
        self.exploration_rate = EXPLORATION_MAX
        self.exploration_decay = exploration_decay
        self.batch_size = batch_size
        self.gamma = gamma

        self.action_space = action_space
        self.memory = deque(maxlen=MEMORY_SIZE)

        self.model = Sequential()
        self.model.add(Dense(24, input_shape=(observation_space,), activation="relu"))
        self.model.add(Dense(24, activation="relu"))
        self.model.add(Dense(self.action_space, activation="linear"))
        self.model.compile(loss="mse", optimizer=Adam(lr=learing_rate))

    def remember(self, state, action, reward, next_state, done):
        self.memory.append((state, action, reward, next_state, done))

    def act(self, state):
        if np.random.rand() < self.exploration_rate:
            return random.randrange(self.action_space)
        q_values = self.model.predict(state)
        return np.argmax(q_values[0])

    def experience_replay(self):
        if len(self.memory) < self.batch_size:
            return
        batch = random.sample(self.memory, self.batch_size)
        for state, action, reward, state_next, terminal in batch:
            q_update = reward
            if not terminal:
                q_update = (reward + self.gamma * np.amax(self.model.predict(state_next)[0]))
            q_values = self.model.predict(state)
            q_values[0][action] = q_update
            self.model.fit(state, q_values, verbose=0)
        self.exploration_rate *= self.exploration_decay
        self.exploration_rate = max(EXPLORATION_MIN, self.exploration_rate)

        
class CartPole():
    def __init__ (self, run):
        self.dp = DataPlotter(run)
        
    def train(self, exploration_decay, batch_size, learing_rate, gamma, iterr):
        print("Training Started")
        env = CartPoleEnv()
        observation_space = env.observation_space.shape[0]
        action_space = env.action_space.n
        dqn_solver = DQNSolver(observation_space, action_space, exploration_decay,
                               batch_size, learing_rate, gamma)

        for run in self.dp.runs:
            state = env.reset()
            state = np.reshape(state[0], [1, observation_space])
            step = 0
            for i in range(iterr):
                step += 1
                action = dqn_solver.act(state)
                state_next, reward, terminal, info, _ = env.step(action)
                if step % 30 == 0:
                    print("Reward: {}".format(reward))
                state_next = np.reshape(state_next, [1, observation_space])
                dqn_solver.remember(state, action, reward, state_next, terminal)
                state = state_next
                self.dp.add_reward_angle(reward, state[0][2])
                if terminal:
                    print("Run: {}, exploration: {}, score: {}"
                          .format(str(run),str(dqn_solver.exploration_rate), str(step)))
                    print("Final Reward: {}".format(reward))
                    self.dp.append(dqn_solver.exploration_rate, step, reward)
                    break
                elif i == (iterr - 1):
                    print("Iteration limit reached...")
                    print("Run: {}, exploration: {}, score: {}"
                          .format(str(run), str(dqn_solver.exploration_rate), str(step)))
                    print("Final Reward: {}".format(reward))
                    self.dp.append(dqn_solver.exploration_rate, step, reward)
                dqn_solver.experience_replay()

In [None]:
# Training parameters
batch_size = 20
exploration_decay = 0.999
learing_rate = 0.001
gamma = 0.95
episodes = 100
iterr = 200

# Initialize the Cartpole system
cp = CartPole(episodes)

# Start training
cp.train(exploration_decay, batch_size, learing_rate, gamma, iterr)

In [None]:
# Export csv file
cp.dp.folder()
cp.dp.toCSV()

# Plotting
cp.dp.plot()