In [5]:
from nes_py.wrappers import JoypadSpace
import gym_tetris
from gym_tetris.actions import MOVEMENT

from collections import namedtuple
import numpy as np
from tensorboardX import SummaryWriter

import torch
import torch.nn as nn
import torch.optim as optim

In [6]:
# The nn

class Net(nn.Module):
    def __init__(self,obs_size,hidden_size,n_actions):
        super(Net,self).__init__()
        self.net = nn.Sequential(
        nn.Linear(obs_size,hidden_size),
        nn.ReLU(),
        nn.Linear(hidden_size,n_actions)
        )
        
        
    def forward(self,x):
        return self.net(x)

In [7]:
HIDDEN_SIZE = 128
BATCH_SIZE = 16
PERCENTILE = 70
Episode =namedtuple('Episode',field_names=['reward','steps'])# like a struct
EpisodeStep=namedtuple('EpisodeStep',field_names=['observation','action'])


In [8]:
def iterate_batches(env,net,batch_size):
    episode_reward=0
    batch=[]
    episode_steps=[]
    obs = env.reset()
    sm=nn.Softmax(dim=1)
    while True:
        obs_v=torch.FloatTensor([obs])
        actions_probs=sm(net(obs_v)).data.numpy()[0]
        action_chosen=np.random.choice(len(actions_probs),p=actions_probs)
        next_obs, reward, is_done, _ = env.step(action_chosen)
        ep_step=EpisodeStep(observation=obs,action=action_chosen)
        episode_reward+=reward
        episode_steps.append(ep_step)
        
        if is_done:
            batch.append(Episode(reward=episode_reward,steps=episode_steps))
            next_obs=env.reset()
            episode_steps=[]
            episode_reward=0
            
        if len(batch)==batch_size:
            yield batch
            batch=[]
        obs = next_obs

In [9]:
def filter_batch(batch,percentile):
    rewards = list(map(lambda s :s.reward,batch))
    reward_boundary= np.percentile(rewards,percentile)
    reward_mean= np.mean(rewards)
    train_acts=[]
    train_obs=[]
    for episode in batch:
        if episode.reward <reward_boundary:
            continue
        train_act.extend(map(lambda es:es.action,episode.steps))
        train_obs.extend(map(lambda es:es.observation,episode.steps))
    train_obs_v = torch.FloatTensor(train_obs)
    train_act_v = torch.LongTensor(train_act)
    return train_obs_v, train_act_v, reward_boundary, reward_mean

In [None]:
env = gym_tetris.make('TetrisA-v0')
env = JoypadSpace(env, MOVEMENT)

done = True
for step in range(5000):
    if done:
        state = env.reset()
    state, reward, done, info = env.step(env.action_space.sample())
    env.render()

env.close()