In [14]:
#import gym
import torch
import numpy as np
#import matplotlib.pyplot as plt
import collections
import random
import math
import time


def clamp(value, min_value, max_value):
    return max(min(value, max_value), min_value)

class MountainCar:
    def __init__(self,  maxStep=500):
        self.maxStep = maxStep
        self.curStep = 0
        self.pxbound = (-1.2, 0.5)
        self.vxbound = (-0.07, 0.07)

    def state_space(self):
        return (self.pxbound, self.vxbound)
    
    def action_shape(self):
        return 3
    
    def reset(self):
        self.curStep = 0
        self.px = random.random()*0.2 - 0.6        
        self.vx = 0
        return (self.px, self.vx), ""
    
    def step(self, action):
        #assert(0 <= action and action <=2)
        acc = (action - 1.0)*0.001
        self.vx = clamp(self.vx + acc - 0.0025*math.cos(3 * self.px), *self.vxbound)
        self.px = clamp(self.px + self.vx, *self.pxbound)
        if self.px == self.pxbound[0]:
            self.vx = 0
        terminated = self.px == self.pxbound[1]
        self.curStep += 1
        truncated = self.curStep >= self.maxStep
        return (self.px, self.vx), -1, terminated, truncated, ""


class ReplayBuffer:
    def __init__(self, capacity):
        self.buffer = collections.deque(maxlen=capacity)

    def add(self, state, action, reward, next_state, terminated):
        self.buffer.append((state, action, reward, next_state, terminated))

    def sample(self, batch_size):
        transitions = random.sample(self.buffer, batch_size)
        states, actions, rewards, next_states, terminateds = zip(*transitions)
        return np.array(states), actions, rewards, np.array(next_states), terminateds
    
    def size(self):
        return len(self.buffer)
    

class Qnet(torch.nn.Module):
    def __init__(self, state_dim, hidden_dim, action_dim):
        super().__init__()
        self.fc1 = torch.nn.Linear(state_dim, hidden_dim)
        #self.fc2 = torch.nn.Linear(hidden_dim, hidden_dim)
        #self.fc3 = torch.nn.Linear(hidden_dim, hidden_dim)
        self.fc4 = torch.nn.Linear(hidden_dim, action_dim)
    
    def forward(self, x):
        x = torch.nn.functional.relu(self.fc1(x))
        #x = torch.nn.functional.relu(self.fc2(x))
        #x = torch.nn.functional.relu(self.fc3(x))
        return self.fc4(x)
    
class DQN:
    def __init__(self, state_dim, hidden_dim, action_dim, learning_rate, gamma, epsilon, target_update, device):
        self.action_dim = action_dim
        self.q_net = Qnet(state_dim, hidden_dim, action_dim).to(device)
        self.target_net = Qnet(state_dim, hidden_dim, action_dim).to(device)
        self.optimizer = torch.optim.Adam(self.q_net.parameters(),lr = learning_rate)
        self.gamma = gamma
        self.epsilon = epsilon
        self.target_update = target_update
        self.count = 0
        self.device = device
        
    def take_action(self, state):
        if np.random.random() < self.epsilon:
            action = np.random.randint(self.action_dim)
        else:
            #print(state)
            state = torch.tensor([state], dtype=torch.float).to(self.device)
            #print("state.shape", state.shape)
            value = self.q_net(state)
            #print("value.shape",value.shape)
            argmax = value.argmax()
            #print("argmax.shape",argmax.shape)
            action = self.q_net(state).argmax().item()
        return action
    

    def update(self, states, actions, rewards, next_states, terminateds):
        states = torch.tensor(states,dtype=torch.float).to(self.device)
        actions = torch.tensor(actions,dtype=torch.int64).view(-1, 1).to(self.device)
        rewards = torch.tensor(rewards,dtype=torch.float).view(-1, 1).to(self.device)
        next_states = torch.tensor(next_states,dtype=torch.float).to(self.device)
        terminateds = torch.tensor(terminateds,dtype=torch.float).view(-1, 1).to(self.device)
        q_values = self.q_net(states).gather(1, actions)
        #with torch.no_grad():
        max_next_q_values = self.target_net(next_states).max(1)[0].view(-1, 1)
        #print(type(max_next_q_values))
        #print(rewards.shape, max_next_q_values.shape, terminateds.shape)
        q_targets = rewards + self.gamma * max_next_q_values * (1.0 - terminateds)

        dqn_loss = torch.mean(torch.nn.functional.mse_loss(q_values, q_targets))
        self.optimizer.zero_grad()
        dqn_loss.backward()
        self.optimizer.step()

        if self.count % self.target_update == 0:
            self.target_net.load_state_dict(self.q_net.state_dict())
        self.count += 1


hidden_dim = 8192

lr = 1e-3
gamma = 0.98
epsilon = 0.1
target_update = 5
buffer_size = 10000
minimal_size = 500
learn_freq = 5
batch_size = 64
device = torch.device("cpu") #torch.device("cuda") if torch.cuda.is_available () else torch.device("cpu")#

state_dim = 2
action_dim = 3

def train(agent, num_episodes):    
    replay_buffer = ReplayBuffer(buffer_size)
    reward_stat = np.zeros(num_episodes)
    total_steps = 0
    env = MountainCar()
    start_time = time.time()
    max_reward = -100000 
    for episode in range(num_episodes):
        state, info = env.reset()
        total_reward = 0
        while True:
            total_steps += 1
            action = agent.take_action(state)
            next_state, reward, terminated, truncated, info = env.step(action)
            replay_buffer.add(state, action, reward, next_state, terminated)
            state = next_state
            total_reward += reward
            if replay_buffer.size() > minimal_size:
                if total_steps % learn_freq ==0:
                    states, actions, rewards, next_states, terminateds = replay_buffer.sample(batch_size)
                    agent.update(states, actions, rewards, next_states, terminateds)
            if terminated or truncated:
                break
        new_max_reard = max_reward < total_reward
        if new_max_reard:
            max_reward = total_reward
        if new_max_reard or episode*10 % num_episodes == 0:
            duration = time.time() - start_time
            steps_per_second = total_steps/duration
            print("episode:", episode, "total_reward:",total_reward, "step/second:", steps_per_second)
        reward_stat[episode] = total_reward
    return total_steps,reward_stat



In [15]:
agent = DQN(state_dim, hidden_dim, action_dim, lr, gamma, epsilon, target_update, device)
total_steps, reward_stat = train(agent, 1000)
print(total_steps)


RuntimeError: PyTorch is not linked with support for vulkan devices