In [6]:
import torch
import torchvision
from torch.utils.data import DataLoader

import os
import cv2
from tqdm.notebook import trange

import gym
from gym import spaces

from stable_baselines3 import DQN
from stable_baselines3.common.vec_env import DummyVecEnv
from stable_baselines3.dqn.policies import CnnPolicy

import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import csv
import sklearn as sk
from sklearn import datasets
# from stable_baselines3.common.policies import register_policy
# register_policy("myPolicy", CnnPolicy)

In [7]:
# Define the gym environment.
class USPSGym(gym.Env):
    def __init__(self, dataset,width=128, height=128, channels=1):
        # Training dataset (Handwritten digits on a 16x16px canvas).
        self.X, self.y = dataset
        
        # Reset the state index, used to step through dataset.
        self.idx = 0
        
        # Digits 0-9 are valid actions.
        self.action_space = spaces.Discrete(10)
        
        # A 1-channel canvas is used for observations.
        self.observation_space = spaces.Box(low=0, high=255, shape=(width, height, channels), dtype=np.uint8)
    def _obs(self):
        # Return a frame at the target dimensions from self.X at the current state index for the CnnPolicy.
        width, height, channels = (self.observation_space.shape[0],
                                   self.observation_space.shape[1],
                                   self.observation_space.shape[2])
        obs = self.X[self.idx]
        
        # Enlarge the observation if the dataset is smaller than the target canvas.
        if obs.shape[0] < width or obs.shape[1] < height:
            obs = cv2.resize(np.array(obs).astype(np.float32), (width, height), interpolation = cv2.INTER_CUBIC)
            obs = obs.reshape(width, height, channels)
        return obs
    def step(self, action):
        # The agent earns 1 point for a correct label.
        reward = 1 if action == self.y[self.idx] else 0
        
        # The state index increments at each step then wraps around at the end of the training dataset.
        self.idx = self.idx + 1 if self.idx < len(self.X) - 1 else 0
        
        # Return the observation, earned reward, terminal state, and info dict.
        return self._obs(), reward, self.idx == 0, {}
    def reset(self):
        # Reset the index to the beginning of the training dataset and return the initial observation.
        self.idx = 0
        return self._obs()
    def render(self, action='', mode='human', close=False):
        # Display the labeled observation.
        width, height = self.observation_space.shape[0], self.observation_space.shape[1]
        fig, ax = plt.subplots(1)
        ax.imshow(self._obs().reshape(width, height), cmap='Greys')
        
        # Label with the correct value and action if supplied. 
        title = '{}-{}'.format(action, self.y[self.idx]) if action != '' else self.y[self.idx]
        ax.set_title(title)
        plt.show()

In [8]:
from sklearn.datasets import load_svmlight_file
from dqn import *
import numpy as np
from stable_baselines3 import DQN


train = load_svmlight_file('data/mnist_data')
x, y = train
x = x.todense()

sortind = np.argsort(y)
x = x[sortind, :]
y = y[sortind]

test = load_svmlight_file('data/mnist_data.t')
testx, testy = test
testx = testx.todense()

In [9]:
#@title Complete data 7291
# Load the custom gym into a vectorized environment.
env = DummyVecEnv([lambda: USPSGym(width=64, height=64, channels=1, dataset=(x, y))])

# Grab the observation shape for generating evaluation frames.
width, height = env.observation_space.shape[0], env.observation_space.shape[1]

In [10]:
#@title complete 60000 - create model
def create_model(pretrained=False, save_model=True, epochs=2):
    model_name = "dqn_cnn_mnist_norm_{}_tarun.zip".format(epochs)
    
    # Return a pretrained model if the flag is set. Otherwise, train a new model.
    if pretrained:
        return DQN.load(model_name)

    # Create a model from a DQN agent with a CnnPolicy attached to a tensorboard logger.
    
    # Train the model on several epochs through the full training dataset.
    model = DQN(CnnPolicy, env, verbose=1)

    model.learn(total_timesteps=len(x) * epochs)
#     model.learn(total_timesteps=300000, log_interval=1)
    
    # Save the new model if the flag is set.
    if save_model:
        model.save(model_name)

    return model

In [None]:
# #@title eval
# # Evaluate the model by counting the total rewards attained on the test dataset.
# total_rewards = 0
# pred_y_test = []
# count = 0

# for idx in trange(len(testx)):
#     # Generate an evaluation observation frame.
#     obs = cv2.resize(np.array(testx[idx]).astype(np.float32), (width, height), interpolation = cv2.INTER_CUBIC)
#     obs = obs.reshape(width, height, 1)
    
#     # Predict an action based on the observation.
#     action, _states = inter_model.predict(obs)
#     pred_y_test.append(action)


#     # Score the prediction.
#     if (action.size > 1):
#         # print(action)
#         count += 1
#         pass
#     else:
#         reward = 1 if action == testy[idx] else 0
#         total_rewards += reward

# print('Accuracy: {:.2f}%'.format(total_rewards / (len(testy) - count) * 100.0))

In [None]:
epochs = [1, 5, 10, 25, 50, 75, 100, 667]
for epoch in epochs:
    print(epoch, '\n')
    inter_model = create_model(pretrained=False, save_model=True, epochs=epoch)
    
    #@title eval
    # Evaluate the model by counting the total rewards attained on the test dataset.
    total_rewards = 0
    pred_y_test = []
    count = 0

    for idx in trange(len(testx)):
        # Generate an evaluation observation frame.
        obs = cv2.resize(np.array(testx[idx]).astype(np.float32), (width, height), interpolation = cv2.INTER_CUBIC)
        obs = obs.reshape(width, height, 1)

        # Predict an action based on the observation.
        action, _states = inter_model.predict(obs)
        pred_y_test.append(action)


        # Score the prediction.
        if (action.size > 1):
            # print(action)
            count += 1
            pass
        else:
            reward = 1 if action == testy[idx] else 0
            total_rewards += reward

    print('Accuracy: {:.2f}%'.format(total_rewards / (len(testy) - count) * 100.0))
#     break

1 

Using cpu device
Wrapping the env in a VecTransposeImage.


  0%|          | 0/10000 [00:00<?, ?it/s]

Accuracy: 11.30%
5 

Using cpu device
Wrapping the env in a VecTransposeImage.
