## Note:
#### Make sure to meet all requirements of requirements.txt in advance.
#### To train the Agent, you may skip sections 0,1,2.
Sections 0, 1 and 2 only demonstrate how the virtual test environment was implemented in the package 'gymnasium-custom'.\
In section 2, the import and functionality of the package can be tested by means of a simulation.

Section 3 can be run independently of sections 1 and 2. It includes the reinforcement learning model and the training process of the agent.

# 0. Import Dependencies

In [None]:
import gymnasium as gym # 0.27.1
from gymnasium import logger, spaces
import pygame # 2.3.0
import numpy as np # 1.22.4

# 1. Setup Custom Environment

In [None]:
import math
from gymnasium.error import DependencyNotInstalled
import pandas as pd
import pyreadr as pr
from dotenv import dotenv_values
from os import path

In [None]:
import gymnasium as gym # 0.27.1
from gymnasium import logger, spaces
import pygame # 2.3.0
import numpy as np # 1.22.4
import math
from gymnasium.error import DependencyNotInstalled
import pandas as pd
import pyreadr as pr
from dotenv import dotenv_values
from os import path


class IcTestEnvironment(gym.Env):
    """
    Description ...
    """
    metadata = {"render_modes": ["human", "rgb_array"], "render_fps": 4}
    
    def __init__(self, render_mode=None, data=False):
        
        if data:
            self.data_path = data
        else:
            env_variables = dotenv_values('.env')
            file_name = env_variables['TRAINING_FILE']
            file_path = env_variables['DATA_PATH']
            self.data_path = path.join(file_path, file_name)
            
        self.data = pd.read_csv(self.data_path, delimiter = ";").to_numpy() # read .csv file, convert pd.df to np.array
        self.test_data = self.data[0:, 8:] # slice array to relevant test data
        self.cond_label = self.data[0:,0] # slice array to IC-condition labels (1: good device, other: bad device)
        
        
        self.no_of_tests = np.shape(self.test_data)[1] # ammount of tests in data
        self.no_of_duts = len(self.cond_label) # ammount of DUTs in data
        self.test_no = 0 # initial Test No.
        self.dut_id = -1 # initial DUT ID
        
        self.dut_cond = None # Agent based IC Condition, True: good device, False: bad device
        self.true_dut_cond = None # True IC Condition, True: good device, False: bad device
        
        # State space: [Test No., Test Result]
        # low: -Inf
        states_low = [-np.finfo(np.float32).max] * self.no_of_tests
        low = np.array(states_low, dtype=np.float32)
        # high: Inf
        states_high = [np.finfo(np.float32).max] * self.no_of_tests
        high = np.array(states_high, dtype=np.float32)
        self.observation_space = spaces.Box(low=low, high=high, dtype=np.float32)
        
        self.state = None
        
        # 3 actions corresponding to 0: "abort good", 1: "abort bad", 2: "continue"
        self.action_space = spaces.Discrete(3)
        
        # define instant reward (to be used in reward functions)
        self.inst_reward = 0
        
        # In case of process visualization:
        self.render_mode = render_mode
        self.screen_width = 600
        self.screen_height = 400
        self.screen = None
        self.clock = None
        self.isopen = True
               
        # Ammount of steps after a terminating state
        self.steps_beyond_terminated = None
        
        
    def step(self, action):
        """
        This method defines both reward and next state in dependence of an action taken by the agent.
        Returns: the next state, the reward, whether the episode is terminated or not and optionally additional info
        """
        err_msg = f"{action!r} ({type(action)}) invalid"
        assert self.action_space.contains(action), err_msg
        assert self.state is not None, "Call reset before using step method." 
        
        self.test_no +=1
        
        if action == 2:
            test_no = float(self.test_no)
            test_result = self.test_data[self.dut_id, self.test_no]
            
        elif action == 1:
            test_no = float(self.test_no)
            test_result = 0 
            self.dut_cond = False # bad device
            
        elif action == 0:
            test_no = float(self.test_no)
            test_result = 0 
            self.dut_cond = True # good device
        
        # overwrite state
        self.state[self.test_no] = test_result
        
        overdue = (self.test_no == (self.no_of_tests-1))

        terminated = bool(action == 1 or action == 0 or overdue)
        
        # Reward Functions:
        #tbr = np.tanh(2e-3*self.test_no) - 8e-4 * self.test_no # tbr V1
        tbr = -1e-1 # tbr V2

        if not terminated:
            reward = tbr
        
        elif self.steps_beyond_terminated is None:
            
            self.steps_beyond_terminated = 0

            if self.cond_label[self.dut_id] == 1:
                self.true_dut_cond = True

            elif self.cond_label[self.dut_id] != 1:
                self.true_dut_cond = False

            print("DUT No.: {}".format(self.dut_id), end = '\r', flush = True)

            if self.dut_cond == None:
                reward = -800
                
            elif self.dut_cond == False and not self.true_dut_cond:
                reward = 250*np.tanh(2e-3*self.test_no - 1.5) + 250
                
            elif self.dut_cond == True and self.true_dut_cond:
                reward = 250*np.tanh(2e-3*self.test_no - 1.5) + 250
                
            else:
                reward = -300 * np.tanh(2e-3*self.test_no - 1.5) - 400
        
        else:
            if self.steps_beyond_terminated == 0:
                logger.warn(
                    "You are calling 'step()' even though this "
                    "environment has already returned terminated = True. You "
                    "should always call 'reset()' once you receive 'terminated = "
                    "True' -- any further steps are undefined behavior."
                )
                
            self.steps_beyond_terminated += 1
            reward = 0.0
            
        return np.array(self.state, dtype=np.float32), reward, terminated, False, {'PC' : self.dut_cond, 'TC' : self.true_dut_cond}
    
    
    def reset(self, seed=None, options=None):
        """
        Resets to the initial state (Test 0, Test Result 0, pending test results defined as infinite).
        Calling the method indicates, that the testing process proceeds by testing the next DUT.
        Returns the initial state.
        """

        # call reset of parent class Env in core.py
        super().reset(seed=seed) 
        
        # reset attributes 
        self.dut_cond = None # set condition to unknown
        self.true_dut_cond = None # set true condition to unknown
        
        if self.dut_id == (self.no_of_duts-1): # Out of data (end of epoch), reset to DUT 0
            self.dut_id = 0
        else:
            self.dut_id += 1 # next DUT
            
        self.test_no = 0 # reset to test 0
        
        # set intial state: test result 0 + rest of list filled with value 1 * (no_of_tests - 1) 
        self.state = [self.test_data[self.dut_id, 0]] + [1] * (self.no_of_tests-1)
        self.steps_beyond_terminated = None
        
        return np.array(self.state, dtype=np.float32), {}
    
    def render(self):
        """
        Mandatory, yet unused in the case of IC testing.
        """
        pass

## 1.1 Test Functionality of Environment

Make sure to add the DATA_PATH and a TRAINING_FILE name to your .env file. The .env file has to be located in the same folder as this Notebook.\
If the environment variables are not to be used, a data path to the training data set can be specified directly via the argument "data".

In [None]:
ICenv = IcTestEnvironment() # or data = 'training_data_path' 

In [None]:
ICenv.reset()

In [None]:
for i in range(2352):
    ICenv.step(2)

In [None]:
ICenv.step(0)

# 2. Import Custom Environment and Simulation

#### Requirement:
Package 'gymnasium-custom' is installed locally and path/file informations are added to .env file, located in the same folder as this Notebook.\
Alternatively, a path can be specified with the argument 'data' in gym.make()

In [None]:
import gymnasium_custom
import random
import time

In [None]:
env = gym.make("ICTesting-v0") # if environment variables are not to be used: add argument data = 'training_data_path'
states = env.observation_space.shape[0]
actions = env.action_space.n

In [None]:
episodes = 5
for episode in range(1, episodes+1):
    state = env.reset()
    done = False
    score = 0
    count = 0
    time.sleep(1)
    
    while not done:
        env.render() 
        
        action = np.random.choice(np.arange(0, 3), p=[0.05, 0.05, 0.9])

        
        # env.step to process one step -> return: next state, reward, done: T or F, info
        n_state, reward, done, info, info2  = env.step(action) 
        
        # Reward +1
        score += reward
        count += 1
        if done:
            if action == 0:
                decision = 'good device'
            elif action == 1:
                decision = 'bad device'
            elif action == 2:
                decision = 'no decision'
    
    print('Episode: {} # of Tests: {} Decision: {} Score: {}'.format(episode, count, decision, score))
    

# 3. Reinforcement Learning Model

#### Requirement:
requirements.txt are installed.\
Package 'gymnasium-custom' is installed locally and path/file informations are added to your .env file. The .env file has to be located in the same folder as this Notebook.\
Alternatively, a path to your training data can be specified with the argument 'data' in gym.make()

## 3.1 Import Dependencies

In [None]:
import gymnasium as gym
import gymnasium_custom
import math
import random
import matplotlib
import matplotlib.pyplot as plt
import time
import numpy as np
from collections import namedtuple, deque
from itertools import count


import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F

In [None]:
# load custom environment
env = gym.make("ICTesting-v0") # if environment variables are not to be used: add argument data = 'training_data_path'

# set up matplotlib
is_ipython = 'inline' in matplotlib.get_backend()
if is_ipython:
    from IPython import display

plt.ion()

# check if GPU supports cuda. Else cpu is used.
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

## 3.2 Replay Memory

In [None]:
Transition = namedtuple('Transition',
                        ('state', 'action', 'next_state', 'reward'))


class ReplayMemory(object):
    """
    Stores the transitions that the agent observes. The data is to be reused for training purposes.
    By sampling from it randomly, the transitions that build up a batch are decorrelated.
    """

    def __init__(self, capacity):
        self.memory = deque([], maxlen=capacity)

    def push(self, *args):
        """Save a transition"""
        self.memory.append(Transition(*args))

    def sample(self, batch_size):
        return random.sample(self.memory, batch_size)

    def __len__(self):
        return len(self.memory)

## 3.3 DQN Agent - Neural Network

In [None]:
class DQN(nn.Module):

    def __init__(self, n_observations, n_actions):
        super(DQN, self).__init__()
        self.layer1 = nn.Linear(n_observations, 2356)
        self.layer2 = nn.Linear(2356, 1180)
        self.layer3 = nn.Linear(1180, n_actions)

    # Called with either one element to determine next action, or a batch
    # during optimization. Returns tensor([[abortgood0exp, abortbad0exp, continue0exp]...]).
    def forward(self, x):
        x = F.relu(self.layer1(x))
        x = F.relu(self.layer2(x))
        return self.layer3(x)

# to understand Input/Output of a DQN
net = DQN(10,3)
input = torch.randn(1, 10)
print(input)
out = net(input)
print(out)

## 3.4 Training

In [None]:
BATCH_SIZE = 128 # batch of states
GAMMA = 0.99 # discount factor to compute the discounted total reward

# €-greedy algorithm params:
EPS_START = 0.05 # starting epsilon (decreasing exponentially)
EPS_END = 0.0 # ending epsilon 
EPS_DECAY = 6e05 #controls the rate of exponential decay of epsilon, higher means a slower decay

TAU = 0.005 # update rate of target network
LR = 1e-04 # Learning rate for Adam Optimizer
lmbd = 0.9 # regularization rate

n_actions = env.action_space.n # number of actions: 3
state, info = env.reset() # initial state
n_observations = len(state) # number of state observations: depends on test data
min_test_ammount = 10 # minimum ammount of tests per DUT

policy_net = DQN(n_observations, n_actions).to(device)
target_net = DQN(n_observations, n_actions).to(device)
target_net.load_state_dict(policy_net.state_dict())

optimizer = optim.AdamW(policy_net.parameters(), lr=LR, amsgrad=True) # momentum-based optimization (based on gradient descent)
memory = ReplayMemory(30000)


steps_done = 0

# €-greedy Algorithm:

def select_action(state, t):
    '''
    returns action (as tensor) given the current state
    e.g. a = [[1]]
    '''
    global steps_done
    global min_test_ammount
    steps_done += 1

    if t <= min_test_ammount:
        return None, torch.tensor([[2]], device=device, dtype=torch.long)
    else:
        sample = random.random() # Unif([0,1])
        eps_threshold = EPS_END + (EPS_START - EPS_END) * \
        math.exp(-1. * steps_done / EPS_DECAY)
        if eps_threshold < 1.5e-03:
            eps_threshold = 0
            
        if sample > eps_threshold: # pick best action under current model
            with torch.no_grad():
                # t.max(1) will return the largest column value of each row.
                # second column on max result is index of where max element was
                # found, so we pick action with the larger expected reward. (see 3.3 class DQN)
                return eps_threshold, policy_net(state).max(1)[1].view(1, 1)
        #elif sample > .3*eps_threshold:
        #    return eps_threshold, torch.tensor([[2]], device=device, dtype=torch.long) # continue testing action 2
        else: # pick action uniformly at random among all actions
            return eps_threshold, torch.tensor([[env.action_space.sample()]], device=device, dtype=torch.long)



episode_durations = []


def plot_durations(show_result=False):
    plt.figure(1)
    durations_t = torch.tensor(episode_durations, dtype=torch.float)
    if show_result:
        plt.title('Result')
    else:
        plt.clf()
        plt.title('Training...')
    plt.xlabel('Episode')
    plt.ylabel('Duration (# of tests)')
    plt.ylim(min(episode_durations),max(episode_durations))
    plt.plot(durations_t.numpy())
    # Take 100 episode averages and plot them too
    if len(durations_t) >= 100:
        means = durations_t.unfold(0, 100, 1).mean(1).view(-1)
        means = torch.cat((torch.zeros(99), means))
        plt.plot(means.numpy(), 'tab:orange')

    plt.pause(0.001)  # pause to update plots
    if is_ipython:
        if not show_result:
            display.display(plt.gcf())
            display.clear_output(wait=True)
        else:
            display.display(plt.gcf())


episode_reward = []
episode_epsilon = []


def plot_reward(show_result=False):
    plt.figure(1)
    reward_t = torch.tensor(episode_reward, dtype=torch.float)
    if show_result:
        plt.title('Result')
    else:
        plt.clf()
        plt.title('Training...')
    plt.xlabel('Episode')
    plt.ylabel('Reward')
    if torch.cuda.is_available():
        ylim_low = min(episode_reward).cpu().numpy()[0]
        ylim_high = max(episode_reward).cpu().numpy()[0]+100
    else:
        ylim_low = min(episode_reward)
        ylim_high = max(episode_reward)
    
    plt.ylim(ylim_low,ylim_high)
    plt.plot(reward_t.numpy(), 'g')
    # Take 100 episode averages and plot them too
    if len(reward_t) >= 100:
        means = reward_t.unfold(0, 100, 1).mean(1).view(-1)
        means = torch.cat((torch.zeros(99), means))
        plt.plot(means.numpy(), 'tab:orange')
        
    # Plot Epsilon:
    eps_list = [i * ylim_high for i in episode_epsilon]
    plt.plot(eps_list, 'red')
    

    plt.pause(0.001)  # pause to update plots
    if is_ipython:
        if not show_result:
            display.display(plt.gcf())
            display.clear_output(wait=True)
        else:
            display.display(plt.gcf())

episode_loss = []

def plot_loss(show_result=False):
    loss_t = torch.tensor(episode_loss, dtype=torch.float)
    if len(loss_t) >= 100:
        plt.figure(1)
        if show_result:
            plt.title('Result')
        else:
            plt.clf()
            plt.title('Training...')
        plt.xlabel('Episode')
        plt.ylabel('Mean Loss')
        plt.ylim(min(episode_loss),max(episode_loss[30:]))
        plt.plot(loss_t.numpy(), 'm')
        # Take 100 episode averages and plot them too
        means = loss_t.unfold(0, 100, 1).mean(1).view(-1)
        means = torch.cat((torch.zeros(99), means))
        plt.plot(means.numpy(), 'tab:orange')

    plt.pause(0.001)  # pause to update plots
    if is_ipython:
        if not show_result:
            display.display(plt.gcf())
            display.clear_output(wait=True)
        else:
            display.display(plt.gcf())



def plot_results(path='add_default_path', window=100):

    d = episode_durations
    average_d = []
    for ind in range(len(d) - window + 1):
        average_d.append(np.mean(d[ind:ind+window]))
    for ind in range(window - 1):
        average_d.insert(0, np.nan)
    plt.figure(figsize=(10, 5))
    plt.xlabel('Episode')
    plt.ylabel('Duration (# of Tests)')
    plt.ylim(min(episode_durations),max(episode_durations))
    plt.plot(d, 'b')
    plt.plot(average_d, 'tab:orange', label='Moving average')
    plt.grid(linestyle=':')
    plt.legend()
    fig_l = plt.gcf()
    plt.show()
    if path:
        p = path + '/Duration_' + time.strftime("%Y%m%d-%H%M%S") + '.pdf'
        fig_l.savefig(p, dpi=100)
    

    r = torch.tensor(episode_reward, dtype=torch.float).numpy()
    average_r = []
    for ind in range(len(r) - window + 1):
        average_r.append(np.mean(r[ind:ind+window]))
    for ind in range(window - 1):
        average_r.insert(0, np.nan)

    fig, ax1 = plt.subplots()
    
    color = 'g'
    ax1.set_xlabel('Episode')
    ax1.set_ylabel('Reward', color=color)
    
    if torch.cuda.is_available():
        ylim_low = min(episode_reward).cpu().numpy()[0]
        ylim_high = max(episode_reward).cpu().numpy()[0]+100
    else:
        ylim_low = min(episode_reward)
        ylim_high = max(episode_reward)
        
    ax1.set_ylim(ylim_low, ylim_high)    
    ax1.plot(r, color=color)
    ax1.plot(average_r, color='tab:orange', label='Moving Average')
    ax1.grid(linestyle=':')
    ax1.tick_params(axis='y', labelcolor=color)
    
    ax2 = ax1.twinx()  # instantiate a second axes that shares the same x-axis
    
    color = 'tab:red'
    ax2.set_ylabel('Epsilon', color=color)
    ax2.set_ylim(EPS_END, EPS_START + 0.05)
    ax2.plot(episode_epsilon, color=color)
    ax2.tick_params(axis='y', labelcolor=color)
    
    fig.tight_layout()
    fig_r = plt.gcf()
    plt.show()
    if path:
        p = path + '/Reward_' + time.strftime("%Y%m%d-%H%M%S") + '.pdf'
        fig_r.savefig(p, dpi=100)
    

    l = episode_loss
    average_l = []
    for ind in range(len(l) - window + 1):
        average_l.append(np.mean(l[ind:ind+window]))
    for ind in range(window - 1):
        average_l.insert(0, np.nan)
    plt.figure(figsize=(10, 5))
    plt.xlabel('Episode')
    plt.ylabel('Loss')
    plt.ylim(min(episode_loss),max(episode_loss[50:]))
    plt.plot(l, 'm')
    plt.plot(average_l, 'tab:orange', label='Moving average')
    plt.grid(linestyle=':')
    plt.legend()
    fig_r = plt.gcf()
    plt.show()
    if path:
        p = path + '/Loss_' + time.strftime("%Y%m%d-%H%M%S") + '.pdf'
        fig_r.savefig(p, dpi=100)

# Confusion Matrix Count:
ND = 0 # No Decision
TP = 0 # True Positive
FP = 0 # False Positive
FN = 0 # False Negative
TN = 0 # True Negative

def performance():
    global ND, TP, FP, FN, TN
    cm = np.matrix([[TP, FP],[FN, TN]])
    try:
        acc = (TP+TN) / (TP+TN+FP+FN)
        print('Confusion Matrix: \n')
        print(cm, '\n')
        print('Accuracy: ', acc)
    except:
        print("An error occured: No DUT has been classified")

def save_agent(path):
    """
    Saves weights of current policy_net
    """
    p = path + '/Agent_' + time.strftime("%Y%m%d-%H%M%S") + '.pth'
    torch.save(policy_net.state_dict(), p)

### 3.4.1 Optimization Model

In [None]:
def optimize_model(reg=False):
    if len(memory) < BATCH_SIZE:
        return # Memory not yet large enough
    
    transitions = memory.sample(BATCH_SIZE)
    # Transpose the batch. This converts batch-array of Transitions to Transition of batch-arrays.
    batch = Transition(*zip(*transitions))

    # Compute a mask of non-final states and concatenate the batch elements
    # (a final state would've been the one after which simulation ended)
    non_final_mask = torch.tensor(tuple(map(lambda s: s is not None,
                                          batch.next_state)), device=device, dtype=torch.bool)
    non_final_next_states = torch.cat([s for s in batch.next_state
                                       if s is not None])
    
    state_batch = torch.cat(batch.state)
    action_batch = torch.cat(batch.action)
    reward_batch = torch.cat(batch.reward)

    # Compute Q(s_t, a) - the model computes Q(s_t), then we select the
    # columns of actions taken. These are the actions which would've been taken
    # for each batch state according to policy_net
    state_action_values = policy_net(state_batch).gather(1, action_batch)


    # Compute V(s_{t+1}) for all next states.
    # Expected values of actions for non_final_next_states are computed based
    # on the "older" target_net; selecting their best reward with max(1)[0].
    # This is merged based on the mask, such that we'll have either the expected
    # state value or 0 in case the state was final.
    next_state_values = torch.zeros(BATCH_SIZE, device=device)
    with torch.no_grad():
        next_state_values[non_final_mask] = target_net(non_final_next_states).max(1)[0]
        
    # Compute the expected Q values
    expected_state_action_values = (next_state_values * GAMMA) + reward_batch

    # Compute Huber loss
    criterion = nn.SmoothL1Loss()
    loss = criterion(state_action_values, expected_state_action_values.unsqueeze(1)) 

    # Optimize the model
    optimizer.zero_grad()
    
    ## L2 Regularization:
    if reg:
        for param in policy_net.parameters():
            reg_loss = 0.5 * torch.sum(param**2) # euclidian norm of weights
        
        loss += lmbd * reg_loss
        
    loss_val = loss.item()
    
    ## Compute gradient
    loss.backward()
    
    ## In-place gradient clipping
    torch.nn.utils.clip_grad_value_(policy_net.parameters(), 100)
    optimizer.step()

    return loss_val

### 3.4.2 Training Loop

In [None]:
def training(episodes=50, epochs=1):

    num_epochs = epochs
    num_episodes = episodes

    for i_epoch in range(num_epochs):
        epoch_loss = 0

        for i_episode in range(num_episodes):
            # Initialize the environment and get it's state
            state, info = env.reset()
            state = torch.tensor(state, dtype=torch.float32, device=device).unsqueeze(0)
            total_reward = 0
            total_loss = 0

            for t in count():
                E_t, action = select_action(state, t)
                observation, reward, terminated, truncated, condition = env.step(action.item())
                reward = torch.tensor([reward], device=device)
                done = terminated or truncated

                total_reward += reward.float()

                if terminated:
                    next_state = None
                else:
                    next_state = torch.tensor(observation, dtype=torch.float32, device=device).unsqueeze(0)

                # Store the transition in memory
                memory.push(state, action, next_state, reward)

                # Move to the next state
                state = next_state

                # Perform one step of the optimization (on the policy network) (Adam Optimizer) and return step loss
                step_loss = optimize_model()
                if step_loss:
                    total_loss += step_loss
                    
                # Soft update of the target network's weights
                # θ′ ← τ θ + (1 −τ )θ′
                target_net_state_dict = target_net.state_dict()
                policy_net_state_dict = policy_net.state_dict()
                for key in policy_net_state_dict:
                    target_net_state_dict[key] = policy_net_state_dict[key]*TAU + target_net_state_dict[key]*(1-TAU)
                target_net.load_state_dict(target_net_state_dict)

                if done:
                    # Add predicted [0] and true condition [1] of DUT to global count
                    global ND, TP, FP, FN, TN
                    if condition['PC'] == None:
                        ND += 1
                    elif condition['PC'] and condition['TC']:
                        TP +=1
                    elif condition['PC'] and not condition['TC']:
                        FP +=1
                    elif not condition['PC'] and condition['TC']:
                        FN += 1
                    elif not condition['PC'] and not condition['TC']:
                        TN += 1
                    episode_durations.append(t+1) # append number of Tests of episode
                    episode_reward.append(total_reward) # append total reward of episode
                    episode_loss.append(total_loss/(t+1)) # append mean loss of episode
                    episode_epsilon.append(E_t)
                    break

        print('Epoch ', i_epoch+1, '/', num_epochs, 'Complete')
        if i_epoch == (num_epochs-1):
            print('Training complete')
            plot_durations(show_result=True)
            plot_reward(show_result=True)
            plot_loss(show_result=True)
            plt.ioff()
            plt.show()

Attention: A high number of episodes and epochs can lead to high computing time! Especially if GPU is not in use.

In [None]:
training(episodes=90, epochs=1) # Define episodes per epoch and amount of training epochs. 

In [None]:
performance()

Note: plot_results() works only after an ammount of > 100 episodes.

In [None]:
plot_results(path='C:/Users/username/Results/', window=200) # add path or define default path in function 

### 3.4.3 Save Agent

In [None]:
save_agent(path='C:/Users/username/Agents/') # add path or define default path in function 

### 3.4.4 Save Data

In [None]:
def save_data(path):
    """
    Saves durations, rewards, epsilons and loss as .txt files
    Can be used for further data analysis
    """
    global episode_durations, episode_reward, episode_epsilon, episode_loss
    
    pathd = path + '/durations_' + time.strftime("%Y%m%d-%H%M%S") + '.txt'
    pathr = path + '/rewards_' + time.strftime("%Y%m%d-%H%M%S") + '.txt'
    pathe = path + '/epsilons_' + time.strftime("%Y%m%d-%H%M%S") + '.txt'
    pathl = path + '/loss_' + time.strftime("%Y%m%d-%H%M%S") + '.txt'
    
    with open(r'%s' % pathd, 'w') as f:
        for d in episode_durations:
            f.write("%s\n" % d)
        f.close()
        
    ep_rew = [r.item() for r in episode_reward]
    with open(r'%s' % pathr, 'w') as f:
        for r in ep_rew:
            f.write("%s\n" % r)
        f.close()

    with open(r'%s' % pathe, 'w') as f:
        for e in episode_epsilon:
            f.write("%s\n" % e)
        f.close()

    with open(r'%s' % pathl, 'w') as f:
        for l in episode_loss:
            f.write("%s\n" % l)
        f.close()

In [None]:
save_data(path='C:/Users/username/training_results/') # add path 

## 3.5 Testing

#### Requirement:
Agent was trained and saved as .pth file.

### 3.5.1 Load Agent

Note: Insert path to agent and path to training data 

In [None]:
policy = DQN(n_observations, n_actions).to(device)
policy.load_state_dict(torch.load("C:/add_path/Agent_example.pth", map_location=torch.device('cpu')))
policy.eval()

env_test = gym.make("ICTesting-v0", data = 'C:/Users/username/Data/test.csv')


def testing():

    lot_size = np.shape(env_test.data)[0]

    for dut in range(lot_size):
        # Initialize the environment and get it's state
        state, info = env_test.reset()
        state = torch.tensor(state, dtype=torch.float32, device=device).unsqueeze(0)

        for t in count():
            with torch.no_grad():
                action = policy(state).max(1)[1].view(1, 1)
            observation, reward, terminated, truncated, condition = env_test.step(action.item())
            reward = torch.tensor([reward], device=device)
            done = terminated or truncated

            if terminated:
                next_state = None
            else:
                next_state = torch.tensor(observation, dtype=torch.float32, device=device).unsqueeze(0)

            # Move to the next state
            state = next_state

            if done:
                     
                # Add predicted [0] and true condition [1] of DUT to global count
                global T_ND, T_TP, T_FP, T_FN, T_TN
                if condition['PC'] == None:
                    T_ND += 1
                elif condition['PC'] and condition['TC']:
                    T_TP +=1
                elif condition['PC'] and not condition['TC']:
                    T_FP +=1
                elif not condition['PC'] and condition['TC']:
                    T_FN += 1
                elif not condition['PC'] and not condition['TC']:
                    T_TN += 1
                break

    print('Testing Complete:\n')
    test_performance()

# Confusion Matrix Count:
T_ND = 0 # No Decision
T_TP = 0 # True Positive
T_FP = 0 # False Positive
T_FN = 0 # False Negative
T_TN = 0 # True Negative

def test_performance():
    global T_ND, T_TP, T_FP, T_FN, T_TN
    cm = np.matrix([[T_TP, T_FP],[T_FN, T_TN]])
    try:
        acc = (T_TP+T_TN) / (T_TP+T_TN+T_FP+T_FN)
        print('Confusion Matrix: \n')
        print(cm, '\n')
        print('Accuracy: ', acc)
    except:
        print("Lazy Agent Error: No DUT has been classified")
    

In [None]:
testing()