In [2]:
# imports

#neural net imports
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim

#math imports
import numpy as np
import random
import cv2
import matplotlib.pyplot as plt
from collections import deque
import sklearn as skl


#import elpv-dataset-1.0
from elpvdataset.utils.elpv_reader import load_dataset

#quality of life imports
import keyboard
import pickle

In [3]:
# create train and test sets

# the dataset is split into 3:
# test: 25% of the dataset used for final evaluation of the mode.

# train: 75% of the dataset used for training the model.
# this is further split into 2:
# agent train: 75% of the train set used for training the agent.
# agent test: 25% of the train set used for evaluating the agent during training and generating a reward.
import sklearn.model_selection

images, probs, types = load_dataset()
dataset = np.array(list(zip(images, types)))

# create a define for style when accessing image or type
IMAGE_IDX = 0
TYPE_IDX = 1

PROBS_IDX = 0      # this line really doesn't have to exist but it can stay.

# randomly split into train and test sets.
x_train, x_test, y_train, y_test = skl.model_selection.train_test_split(dataset, probs, random_state = 42, test_size = (25/100), train_size = 75/100, shuffle = True)
print("test set size: ", y_test.shape)
# furthur split train set.
x_agent_train, x_agent_test, y_agent_train, y_agent_test = skl.model_selection.train_test_split(x_train, y_train, random_state = 42, test_size = (25/100), train_size = 75/100, shuffle = False)
print("agent train set size: ", y_agent_train.shape)
print("agent test set size: ", y_agent_test.shape)

# creating a few more defines to suit neural net requirements.

NUM_TRAIN_OUTPUTS = x_agent_train.shape[0]

test set size:  (656,)
agent train set size:  (1476,)
agent test set size:  (492,)


  del sys.path[0]


The agent is a deep Q learning agent that utilises a neural net to estimate the value of feeding the CNN an image given a certain state.

It will "play" a game of sequentially feeding the CNN images selectively with the goal of finding the optimal performance of the CNN while preventing overfitting.

The Q learning agent will also utilise and LSTM to remember its previous actions and make decisions based on that.

In [40]:
#initialize agent neural net

import torch
import torch.nn as nn
import torch.optim as optim

torch.manual_seed(1)

class Network(nn.Module):
    def __init__(self, lr):
        super(Network, self).__init__()


        self.lstm_input = 3 # 1 for image, 1 for type, 1 for reward ().
        self.lstm_output = 64
        self.inputs = self.lstm_output  
        self.hid_1 = 500
        self.hid_2 = 500
        self.n_actions = NUM_TRAIN_OUTPUTS

        #create LSTM module
        self.lstm = nn.LSTM(self.lstm_input, self.lstm_output, 1, batch_first=True)
        
        # create neural net
        self.model = nn.Sequential(
            nn.Linear(self.inputs, self.hid_1),
            nn.ReLU(),
            nn.Linear(self.hid_1, self.hid_2),
            nn.ReLU(),
            nn.Linear(self.hid_2, self.n_actions),
        )

        self.optimizer = optim.Adam(self.parameters(), lr=lr)
        self.loss = nn.SmoothL1Loss()
        self.device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')
        self.to(self.device)

        self.h = torch.zeros(1, 1, self.lstm_output).to(self.device)
        self.c = torch.zeros(1, 1, self.lstm_output).to(self.device)

    def forward(self, x, h_0=42, c_0=42,  zero=False):
        if h_0 != 42:
            self.h = h_0
        if c_0 != 42:
            self.c = c_0
        if zero:
            self.h = torch.zeros(1, x.size(0), self.lstm_output).to(self.device)
            self.c = torch.zeros(1, x.size(0), self.lstm_output).to(self.device)

        x = x.type(torch.FloatTensor).to(self.device)
        print("input: ", x.view(len(x), 1, -1))
        lstm_output, (self.h, self.c) = self.lstm(x.view(len(x), 1, 3), (self.h, self.c))  #define strict lstm input of [[image, type, reward]]
        output = self.model(lstm_output.view(len(x), -1))
        return output, (self.h, self.c)
    
    def zero(self, x):
        self.h = torch.zeros(1, x.size(0), self.lstm_output).to(self.device)
        self.c = torch.zeros(1, x.size(0), self.lstm_output).to(self.device)


In [57]:
# test neural net functionality
lstm = Network(0.001)
with torch.no_grad():
    input = [[1, 2, 3]]
    inputs = torch.tensor(input)
    # expected_outputs = torch.tensor(y_agent_train)
    output, (_, _) = lstm(inputs, zero=True)
    # print(output)
    print(torch.max(output, dim = 1))

input:  tensor([[[1., 2., 3.]]])
torch.return_types.max(
values=tensor([0.0986]),
indices=tensor([984]))


In [49]:
#initialize the agent

class Agent(object):
    def __init__(self):
        """
        Porperties:
            gamma (float): Future reward discount rate.
            epsilon (float): Probability for choosing random policy.
            epsilon_decay (float): Rate at which epsilon decays toward zero.
            learning_rate (float): Learning rate for Adam optimizer.

        Returns:
            Agent
        """
        # constant parameters
        self.gamma = 0.95
        self.epsilon_min = 0.01
        self.epsilon_decay = 0.996
        self.lr = 0.00005
        self.batch_size = 1000
        self.max_mem_size = 50000

        #variable parameters
        self.epsilon = 0.01
        self.mem_cntr = 0
        self.mem_cntr_successful = 0

        # initializing memory
        self.memory = deque(maxlen=self.max_mem_size)
        self.memory_successful = deque(maxlen=1000)
        self.episodic_memory = []

        #initialize networks
        self.network = Network(self.lr)

    def getMemory(self):
        return self.memory

    def nextEpisode(self):
        self.epsilon = max(self.epsilon_min, self.epsilon * self.epsilon_decay)

    def getepsilon(self):
        return self.epsilon

    def remember(self, state, h_0, c_0, action, reward, next_state, terminal):
        if (self.mem_cntr >= self.max_mem_size - 2):
            for i in range(self.max_mem_size - 3000):
                self.memory.popleft()
            self.mem_cntr = len(self.memory)

        memory = [state, h_0, c_0, action, reward, next_state, terminal]
        self.memory.append(memory)

        self.mem_cntr += 1

    #state consists of [previous_image, previous_type, previous_reward]
    def select_action(self, state, zero=False):
        state = torch.tensor([state])
        #select epsilon-greedy action
        output, (h_0, c_0) = self.network.forward(state, zero=zero)
        action = torch.argmax(output).item()
        if np.random.rand() <= self.epsilon:
            # exploration
            action = np.random.randint(0, NUM_TRAIN_OUTPUTS - 1);         
        return action, h_0, c_0
    
    def updateEpsilon(self):
        self.epsilon = max(self.epsilon_min, self.epsilon * self.epsilon_decay)
        
    def updateEpsilonScore(self, score):
        modifier = -0.01*score + 1.08
        episolon_new = min(self.epsilon * modifier, 0.7 )
        self.epsilon = max(self.epsilon_min, episolon_new)

    def learn(self):
        if self.mem_cntr < self.batch_size:
            return

        max_mem = min(self.mem_cntr, self.max_mem_size)

        #select a random set of states
        batch = np.random.choice(max_mem, self.batch_size, replace=False)

        # memory = [state, h_0, c_0, action, reward, next_state, terminal]
        #state consists of [previous_image, previous_type, previous_reward]

        for i in batch:
            state = torch.tensor([self.memory[i][0]]).to(self.network.device, dtype=torch.float32)
            h_0 = torch.tensor([self.memory[i][1]]).to(self.network.device, dtype=torch.float32)
            c_0 = torch.tensor([self.memory[i][2]]).to(self.network.device, dtype=torch.float32)
            action = torch.tensor([self.memory[i][3]])
            reward = torch.tensor([self.memory[i][4]]).to(self.network.device, dtype=torch.float32)
            next_state = torch.tensor([self.memory[i][5]]).to(self.network.device, dtype=torch.float32)
            terminal = torch.tensor([self.memory[i][8]]).to(self.network.device, dtype=torch.bool)

            q_current = self.network.forward(state, h_0, c_0)[action]
            q_next = self.network.forward(next_state)
            if terminal:
                q_next = torch.tensor([0]).to(self.network.device, dtype=torch.float32)
            q_target = reward + self.gamma * torch.max(q_next)

            self.network.optimizer.zero_grad()
            loss = self.network.loss(q_target, q_current).to(self.network.device)
            loss.backward()
            torch.nn.utils.clip_grad_value_(self.network.parameters(), 100)
            self.network.optimizer.step()

    def update_episodic_memory(self, state, action, reward, next_state, done, current_step):
        self.episodic_memory.append([state, action, reward, next_state, done, 0])

In [50]:
#main training loop:
def train(n_games):

    agent = Agent()
    scores, avg_scores, eps_history = [], [], []

    for game in range(n_games):
        # initialize new CNN
        image_classifier = ImageClassifier()
        # score is portion of things the image classifier got right on the train set.
        score = image_classifier.run(x_train, y_train)
        game_over = False
        #state consists of [previous_image, previous_type, previous_reward]
        state = [0, 0, 0]
        # zero hidden network state for consistent results.
        agent.network.zero(torch.tensor([state]))
        h_0, c_0 = agent.network.h, agent.network.c
        terminal = False
        # memory = [state, h_0, c_0, action, reward, next_state, h_1, c_1, terminal]
        score, reward = [], []
        for step in range(NUM_TRAIN_OUTPUTS * 10):
            if step == NUM_TRAIN_OUTPUTS * 10 - 1:
                terminal = True
            #during training, the agent is only able to select a random action from the agent_train set. zero the LSTM for first run.
            action, h_1, c_1 = agent.select_action(state)
            zero = False
            # train the image classifier on the action
            image_classifier.train(x_agent_train[action], y_agent_train[action])
            # get the reward
            new_score = image_classifier.run(x_train, y_train)       #x_train and y_train include agent_train set as well as an unseen agent_test set. This is separate from x_test and y_test which is not used in agent learning at all.
            reward = new_score - score
            score = new_score
            # record the "next state"
            next_state = [x_agent_train[action][0], x_agent_train[action][1], reward]
            # add to memory
            agent.remember(state, h_0, c_0, action, reward, next_state, terminal)
            # update states
            state = next_state
            h_0 = h_1
            c_0 = c_1
            # add stats to score and reward array for plotting
            score.append(score)
            reward.append(reward)
        #after each "game", lean from 1000 random experiences.
        agent.learn()
        #reduce change of random action.
        agent.updateEpsilon()
        avg_scores.append(score[-1])
        avg_score = np.mean(avg_scores[-10:])

        print('episode: ', game,'score: %.2f' % score[-1],
                ' avg score %.2f' % avg_score, 'epsilon %.2f' % agent.epsilon)
        # if (score > 10):
        #     print("successful")
        if (keyboard.is_pressed("`")):
            break

    plt.plot(avg_scores)
    plt.show()