# Programming Exercise: Function Approximation

## README
We strongly recommend you install `gymnasium>=0.28.1` and `python>=3.10.*`. You can set up a python environment using e.g. [conda](https://docs.conda.io/projects/miniconda/en/latest/) in the terminal
```
conda create -n rl23 python=3.10 gymnasium=0.28.1 notebook -c conda-forge
conda activate rl23
```
The `notebook` package is required for opening and working with this jupyter notebook.
In a terminal with the Python environment active, run
```
jupyter notebook ExerciseD.ipynb
```

If you need help setting up a development environment, please visit the office hour or ask for help in the forum.

In [None]:
# Necessary Imports

from typing import Any, Dict, Callable
from dataclasses import dataclass
from functools import cached_property, partial

import numpy as np
import gymnasium as gym
from gymnasium import spaces

## Submission
**Solve the tasks D1 and D2 below and submit your final `.ipynb` file as a solution to the mCMS.**

During grading, we will clear outputs and then run all cells.
Notebooks that produce runtime errors will be graded with 0 points, unless the errors arise due to gymnasium/numpy backwards compatibility issues.

## Problem statement
The task of the programming exercise is to program an agent that learns to play Tic-Tac-Toe against different opponents. 

## Opponent Policies

We load and initialize the opponent policies of increasing strength from .json-files. They are stored in global variables and can therefore be easily changed at any point of the notebook. The opponent policies are of increasing strength.

For loading the policies, the folder `Opponent_Policies` containing the .json-files of the policies should lie in the same directory as this notebook.  

In [None]:
# Load opponent policy from .json-file. 

import json
from pathlib import Path

opponent_policy_file = Path('Opponent_Policies') # Change filename to play against different policy.

with open(opponent_policy_file / 'policy1.json') as json_file:
    opponent_policy_1 = json.load(json_file)

with open(opponent_policy_file / 'policy2.json') as json_file:
    opponent_policy_2 = json.load(json_file)

with open(opponent_policy_file / 'policy3.json') as json_file:
    opponent_policy_3 = json.load(json_file)

with open(opponent_policy_file / 'policy4.json') as json_file:
    opponent_policy_4 = json.load(json_file)

# Set opponent policy
opponent_policy_dict = opponent_policy_1 # Change to play against different opponent policy.

## Gymnasium Environment for Tic-Tac-Toe

We implement a Gymnasium environment simulating a game of Tic-Tac-Toe. We thereby use
- as possible field values  $V = \{0,1,2\}$, whereby $v = 0$ stands for a 'O'-field, $v = 1$ for an empty field, and $v = 2$ for a 'X'-field.
- as state space $S = V^{3 \times 3}$. A state `s` is stored as a `list[list[int]]`, `s[i][j]` refers then to the value in the i-th row in the j-th column.
- action space $A = V \times V = \{(0,0),(0,1),(0,2),(1,0),(1,1),(1,2), (2,0),(2,1),(2,2)\}$.

### Environment Dynamics

We implement Tic-Tac-Toe as a sequential decision problem. The agent plays against a specified opponent policy (see above). One step of the environment looks as follows: 
1. Perform the move of the agent. The agent marks fields with 'X'.
2. Check whether this has finished the game, i.e. win for the agent or draw. If the game is finished, terminate episode and compute the reward. 
3. Perform the game of the opponent. The opponent marks fields with 'O'.
4. Check whether this has finished the game, i.e. win for the opponent or draw. If the game is finished, terminate episode and compute the reward. 

#### Initial state
We randomize whether the agent or the opponent starts with the first move. Hence, the initial state of the sequential decision problem is either
- a completely empty field, for the case that the agent has the first move, or
- a field with one '0', for the case that the opponent has the first move. 


#### Rewards
Rewards are only gained when the game is finished: 
- Reward of 1, if the agent wins. 
- Reward of 0, if the game ends in a draw. 
- Reward of -1, if the opponent wins. 

#### Executable Actions 
Notice that not all actions are always executable: If a field `s[i][j]` is non-empty, then the action $(i,j)$ is not executable. If the agent tries to perform a non-executable action, the environment raises an Exception. Hence, make sure that the agent only picks executable actions (the opponent policy chooses only executable actions as well).

In [None]:
# Some preliminary and auxiliary definitions 

# Definitions of possible field values
CROSS, EMPTY, CIRCLE = 2, 1, 0  

def get_rows(state: list[list[int]]) -> [list[list[int]], list[list[int]], list[list[int]]]:
    """
    Helper function: Returns list of rows, list of columns, and list of diagonals
    
    """

    # Compute rows
    rows = state

    # Compute columns
    columns = []
    for j in range(3):
        column = []
        for i in range(3):
            column.append(state[i][j])
        columns.append(column)
    
    #Compute diagonals
    diagonal0 = []
    diagonal1 = []
    for i in range(3):
        diagonal0.append(state[i][i])
        diagonal1.append(state[2-i][i])
    
    # Return rows, columns, and diagonals. 
    return rows, columns, [diagonal0, diagonal1]

# Gymnasium environment for Tic-Tac-Toe
class SysadminEnv(gym.Env):

    def __init__(
        self,
    ) -> None:
        
        super().__init__()
        self.action_space = spaces.MultiDiscrete([3,3]) # Action space 
        self.observation_space = spaces.MultiDiscrete([[3,3,3],[3,3,3],[3,3,3]]) # State space
        self.reset_counter = 0


    @property
    def get_reset_counter(self):
        return self.reset_counter
    

    @property
    def occupied_fields(self) -> int | None:
        """
        Returns the number of occupied fields.

        """
        if not hasattr(self, "_state"):
            return None
        
        res = 0
        for l in self._state:
            for v in l:
                if v != EMPTY:
                    res = res + 1
            
        return res
    
    
    @property
    def game_finished(self) -> int | None:
        """
        Returns None if game is not finished.

        Returns 0 if circle wins.
        Returns 1 if it is a draw.
        Returns 2 if crosses wins. 

        """
        rows, columns, diagonals = get_rows(self._state)

        for l in rows + columns + diagonals:
            if all(v == CROSS for v in l):
                return 2
            if all(v == CIRCLE for v in l):
                return 0
            
        if self.occupied_fields == 9:
            return 1
        else:
            return None
        
    
    def opponent_policy (self) -> [int,int]:
        """
            Takes random action from the list of moves of the opponent policy.
        """

        if not hasattr(self, "_state"):
            raise Exception("Unable to find opponent move in uninitialized environment.")
        
        opponent_action_list = opponent_policy_dict[self._state.__str__()]
        return opponent_action_list[np.random.choice(len(opponent_action_list))]

    
    def perform_move(self, move: [int, int], cross: bool):
        """
        Returns the number of occupied fields.

        """
        if not hasattr(self, "_state"):
            raise Exception("Unable to perform move in uninitialized environment.")

        if self._state[move[0]][move[1]] != EMPTY:
            raise Exception("Unable to perform move on occupied field.")
        
        if cross: 
            self._state[move[0]][move[1]] = 2
        else:
            self._state[move[0]][move[1]] = 0

        
    def reset(
        self, *, seed: int | None = None, options: dict[str, Any] | None = None
    ) -> tuple[np.ndarray, dict[str, Any]]:
        """
        Resets the environment to its initial state.
        
        """
        
        super().reset(seed=seed)

        # increment reset_counter
        self.reset_counter += 1

        # All fields are empty initially
        self._state = [[EMPTY,EMPTY,EMPTY],[EMPTY,EMPTY,EMPTY],[EMPTY,EMPTY,EMPTY]]

        # Random choice whether agent or opponent makes the first move. 
        # In case of opponent, first move of opponent is performed.
        if np.random.random() < 0.5:   
            self.perform_move(self.opponent_policy(), False)
        
        return self._state, dict()
   

    def step(self, action: [int,int]) -> tuple[np.ndarray, float, bool, bool, dict[str, Any]]:
        """
        Performs a step in the environment given an action of the agent.

        Return: new_state, reward, done, truncated, information_dictionary (last two return values are irrelevant for our purposes)  

        """
        
        # Perform agent's move
        self.perform_move(action, True)

        # Check whether game is finished and compute the return
        finished = self.game_finished  
        if self.game_finished == 2:
            return self._state, 1, True, False, dict()
        if self.game_finished == 1:
            return self._state, 0, True, False, dict()


        # Perform opponent's move
        self.perform_move(self.opponent_policy(), False)

        # Check whether game is finished and compute the return
        finished = self.game_finished  
        if finished is None:
            return self._state, 0, False, False, dict()
        elif finished == 0:
            return self._state, -1, True, False, dict()
        elif finished == 1:
            return self._state, 0, True, False, dict()    


    def display(self):
        """
        Prints the current state of the field to the command line. 

        """

        if not hasattr(self, "_state"):
           raise Exception("Unable to visualize uninitialized environment.")
       
        res = [["","",""],["","",""],["","",""]] 

        for i in range(3):
            for j in range(3):
                v = self._state[i][j]
                if v == CROSS:
                    res[i][j] = "X"
                elif v == EMPTY:
                    res[i][j] = " "
                elif v == CIRCLE:
                    res[i][j] = "O"
                else: 
                    raise Exception("Invalid value in TicTacToe Field")

        for l in res: 
            print(l)
    
        print("\n")



In [None]:
# Register environment
gym.register("Sysadmin-ED", partial(SysadminEnv))
env = gym.make("Sysadmin-ED")

In [None]:
# Example on how to experiment with the environment. 

env.reset(seed=42)

print(env.step((1,2)))
env.display()


## Exercise D.1 (2 Points)

Briefly describe the learning algorithm you have implemented. When using function approximation, describe in particular the features that you are using. 

TODO: Answer

## Exercise D.2 (2+2+2+2+4* Points)

Implement a learning algorithm that learns to play Tic-Tac-Toe. For function approximation learning, the function `RL_algorithm` should return the learned feature weights.

In the case that you are not using function approximation but a different RL algorithm, the arguments and return types of the functions below are allowed to be changed.

In [None]:
# Given hyperparameter gamma
gamma = 1

# TODO If neeeded, add further hyperparameter that are used by the implemented algorithm. 


def agent_policy(state, weights):
    """
    Policy of the agent: Given the environment state and feature weights, returns the best estimated performable action. 

    """
    
    #TODO
    
    raise NotImplementedError
    

def training_algorithm(num_episodes: int):
    """
    Reinforcement learning algorithm: For function approximation learning, learn the feature weights for the given number of training episodes, i.e. env.reset() is allowed to be called num_episodes many times. 
    
    Returns the learned feature weights. 

    """
    
    #TODO
        
    raise NotImplementedError



## Policy Evaluation 

We evaluate the learned polices multiple times against the different opponent policies using the script below.  

In [None]:
def evaluate_policy(weights, eval_episodes: int):
    """
    Evaluates the agent's policy described by the learned weights by simulating the given number of episodes. 
    Returns the overall number of wins, draws, looses, and the statistical mean of the episode returns.

    """

    returns = []
    wins, draws, looses = 0,0,0

    for episode in range(eval_episodes):
        
        state = env.reset()[0]
        done = False
        
        while not done:
            
            action = agent_policy(state, weights)
            _, reward, done, _,_ = env.step(action)
            if done: 
                if reward == 1: 
                    wins = wins + 1
                elif reward == 0:
                    draws = draws + 1
                elif reward == -1:
                    looses = looses + 1

                returns.append(reward)  
          
    return wins, draws, looses, np.mean(returns)


opponent_policy_dict = opponent_policy_1 # Change to play against different opponent policy.

# Policy testing
training_episodes = 5000 # Number of training episodes
test_episodes = 100 # Number of test episodes
test_runs = 5 # Number of test runs

for i in range (test_runs):
    env = gym.make("Sysadmin-ED") 
    weights = training_algorithm(training_episodes) # learn the weights via function approximation learning
    
    # Check that number of episodes is not exceeded
    if env.get_reset_counter > training_episodes:
            raise RuntimeError(f"Exceeded maximal number of calls of reset function")
    
    wins, draws, looses, average_return = evaluate_policy(weights, test_episodes) # evaluate the learned policy
    print(f"Training iteration {i}: Wins: {wins}, Draws: {draws}, Looses: {looses}, Average Return: {average_return}") # print results of the current test run
