In [2]:
# Task 4.2

from ple.games.flappybird import FlappyBird
from ple import PLE
import random
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from collections import defaultdict

class T4_2_FlappyAgent:
    def __init__(self):
        self.epsilon = 0.1
        self.alpha = 0.1
        self.gamma = 1.0
        self.q_table = defaultdict(lambda: [0.0, 0.0])

    def reward_values(self):
        return {"positive": 1.0, "tick": 0.0, "loss": -5.0}

    def discretize_state(self, state):
        player_y = int(state['player_y'] / 15)
        next_pipe_top_y = int(state['next_pipe_top_y'] / 15)
        next_pipe_dist_to_player = int(state['next_pipe_dist_to_player'] / 15)
        player_vel = state['player_vel']
        return (player_y, next_pipe_top_y, next_pipe_dist_to_player, player_vel)

    def observe(self, s1, a, r, s2, end):
        s1_discrete = self.discretize_state(s1)
        s2_discrete = self.discretize_state(s2)
        max_q_s2 = max(self.q_table[s2_discrete]) if not end else 0
        self.q_table[s1_discrete][a] += self.alpha * (r + self.gamma * max_q_s2 - self.q_table[s1_discrete][a])

    def training_policy(self, state):
        state_discrete = self.discretize_state(state)
        if random.random() < self.epsilon:
            return random.randint(0, 1)
        return np.argmax(self.q_table[state_discrete])

    def policy(self, state):
        state_discrete = self.discretize_state(state)
        return np.argmax(self.q_table[state_discrete])

def t4_2_train(nb_episodes, agent):
    reward_values = agent.reward_values()
    env = PLE(FlappyBird(), fps=30, display_screen=False, force_fps=True, rng=None, reward_values=reward_values)
    env.init()
    rewards_per_episode = []
    for episode in range(nb_episodes):
        state = env.game.getGameState()
        total_reward = 0
        while not env.game_over():
            action = agent.training_policy(state)
            reward = env.act(env.getActionSet()[action])
            new_state = env.game.getGameState()
            agent.observe(state, action, reward, new_state, env.game_over())
            state = new_state
            total_reward += reward
        rewards_per_episode.append(total_reward)
        env.reset_game()
    t4_2_plot_training_progress(rewards_per_episode)

def t4_2_plot_training_progress(rewards):
    sns.set(style="whitegrid")
    smoothed_rewards = np.convolve(rewards, np.ones(10) / 10, mode='valid')
    x_vals = np.arange(len(smoothed_rewards))
    
    plt.figure(figsize=(10, 6))
    sns.lineplot(x=x_vals, y=smoothed_rewards, lw=2, label="Smoothed Reward")
    log_x = np.log(x_vals + 1)
    coeffs = np.polyfit(log_x, smoothed_rewards, 1)
    fitted_y = coeffs[0] * log_x + coeffs[1]
    plt.plot(x_vals, fitted_y, color="red", label="Logarithmic Best Fit Line")
    plt.xlabel("Episodes")
    plt.ylabel("Total Reward per Episode")
    plt.title("Training Progress of Q-Learning Agent in Flappy Bird")
    plt.legend()
    plt.show()

def t4_2_evaluate_agent(agent, nb_episodes=10):
    env = PLE(FlappyBird(), fps=30, display_screen=False, force_fps=True, rng=None)
    env.init()
    scores = []
    for episode in range(nb_episodes):
        state = env.game.getGameState()
        total_reward = 0
        while not env.game_over():
            action = agent.policy(state)
            reward = env.act(env.getActionSet()[action])
            state = env.game.getGameState()
            total_reward += reward
        scores.append(total_reward)
        print(f"Score for episode {episode + 1}: {total_reward}")
        env.reset_game()
    return scores

t4_2_agent = T4_2_FlappyAgent()
t4_2_train(nb_episodes=100000, agent=t4_2_agent)
t4_2_scores = t4_2_evaluate_agent(t4_2_agent, nb_episodes=10)
print("Scores from 10 simulated games:", t4_2_scores)



KeyboardInterrupt: 

In [None]:
# Task 4.3

import random
import numpy as np
from collections import deque
from sklearn.neural_network import MLPRegressor
from ple.games.flappybird import FlappyBird
from ple import PLE

class T4_3_FlappyAgent:
    def __init__(self, state_size=4, action_size=2):
        self.state_size = state_size
        self.action_size = action_size
        self.memory = deque(maxlen=1000)
        self.gamma = 0.99
        self.epsilon = 1.0
        self.epsilon_min = 0.1
        self.epsilon_decay = 0.99995
        self.learning_rate = 0.01
        self.batch_size = 100
        self.update_target_frequency = 100
        self.model = MLPRegressor(hidden_layer_sizes=(100, 10), activation='logistic', learning_rate_init=self.learning_rate)
        self.target_model = MLPRegressor(hidden_layer_sizes=(100, 10), activation='logistic', learning_rate_init=self.learning_rate)
        self.initial_fit()

    def initial_fit(self):
        dummy_X = np.random.rand(10, self.state_size)
        dummy_y = np.random.rand(10, self.action_size)
        self.model.fit(dummy_X, dummy_y)
        self.target_model.fit(dummy_X, dummy_y)

    def reward_values(self):
        return {"positive": 1.0, "tick": 0.0, "loss": -5.0}

    def normalize_state(self, state):
        max_values = {'player_y': 512, 'next_pipe_top_y': 512, 'next_pipe_dist_to_player': 288, 'player_vel': 10}
        return [
            (state['player_y'] / max_values['player_y']) * 2 - 1,
            (state['next_pipe_top_y'] / max_values['next_pipe_top_y']) * 2 - 1,
            (state['next_pipe_dist_to_player'] / max_values['next_pipe_dist_to_player']) * 2 - 1,
            (state['player_vel'] / max_values['player_vel']) * 2 - 1,
        ]

    def observe(self, s1, a, r, s2, end):
        s1 = self.normalize_state(s1)
        s2 = self.normalize_state(s2)
        self.memory.append((s1, a, r, s2, end))
        if len(self.memory) >= self.batch_size:
            self.replay()

    def replay(self):
        batch = random.sample(self.memory, self.batch_size)
        states = np.array([item[0] for item in batch])
        actions = [item[1] for item in batch]
        rewards = np.array([item[2] for item in batch])
        next_states = np.array([item[3] for item in batch])
        ends = np.array([item[4] for item in batch])
        q_values_next = self.target_model.predict(next_states).max(axis=1)
        targets = rewards + (self.gamma * q_values_next * (1 - ends))
        q_values = self.model.predict(states)

        for i, action in enumerate(actions):
            q_values[i, action] = targets[i] if not ends[i] else rewards[i]

        self.model.partial_fit(states, q_values)

        if random.randint(1, self.update_target_frequency) == 1:
            self.update_target_network()

    def update_target_network(self):
        self.target_model.coefs_ = self.model.coefs_
        self.target_model.intercepts_ = self.model.intercepts_

    def training_policy(self, state):
        state = self.normalize_state(state)
        if random.uniform(0, 1) < self.epsilon:
            return random.randint(0, self.action_size - 1)
        q_values = self.model.predict([state])
        return int(np.argmax(q_values))

    def policy(self, state):
        state = self.normalize_state(state)
        q_values = self.model.predict([state])
        return int(np.argmax(q_values))

    def train_on_episode_end(self):
        if self.epsilon > self.epsilon_min:
            self.epsilon *= self.epsilon_decay

def t4_3_train(nb_episodes, agent):
    reward_values = agent.reward_values()
    env = PLE(FlappyBird(), fps=30, display_screen=False, force_fps=True, reward_values=reward_values, rng=None)
    env.init()
    score = 0
    scores = []
    max_score = 0
    print(f"Starting training for {nb_episodes} episodes")

    for episode in range(nb_episodes):
        state = env.game.getGameState()
        done = False

        while not done:
            action = agent.training_policy(state)
            reward = env.act(env.getActionSet()[action])
            new_state = env.game.getGameState()
            done = env.game_over()
            agent.observe(state, action, reward, new_state, done)
            state = new_state
            score += reward

        scores.append(score)
        if score > max_score:
            max_score = score
        agent.train_on_episode_end()
        env.reset_game()
        score = 0

        if (episode + 1) % 1000 == 0:
            avg_score_1000 = sum(scores[-1000:]) / 1000
            print(f"Episode {episode + 1}: Highest Score: {max_score:.2f}, Average Score over last 1000 episodes: {avg_score_1000:.2f}")

    print("Training completed.")

def t4_3_evaluate(agent, nb_episodes=5):
    reward_values = agent.reward_values()
    env = PLE(FlappyBird(), fps=30, display_screen=False, force_fps=True, reward_values=reward_values, rng=None)
    env.init()
    print("Evaluation results for 5 runs:")

    for episode in range(nb_episodes):
        score = 0
        state = env.game.getGameState()
        done = False
        while not done:
            action = agent.policy(state)
            reward = env.act(env.getActionSet()[action])
            state = env.game.getGameState()
            score += reward
            done = env.game_over()
        
        print(f"Score for evaluation episode {episode + 1}: {score:.2f}")
        env.reset_game()

t4_3_agent = T4_3_FlappyAgent()
t4_3_train(10000, t4_3_agent)
t4_3_evaluate(t4_3_agent)


In [None]:
# Task 4.4

from ple.games.flappybird import FlappyBird
from ple import PLE
import random
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from collections import defaultdict

class T4_4_FlappyAgent:
    def __init__(self, epsilon=0.1, alpha=0.1, gamma=1.0, reward_structure=None, resolution=15):
        self.epsilon = epsilon
        self.alpha = alpha
        self.gamma = gamma
        self.q_table = defaultdict(lambda: [0.0, 0.0])
        self.reward_structure = reward_structure if reward_structure else {"positive": 1.0, "tick": 0.0, "loss": -5.0}
        self.resolution = resolution

    def reward_values(self):
        return self.reward_structure

    def discretize_state(self, state):
        player_y = int(state['player_y'] / self.resolution)
        next_pipe_top_y = int(state['next_pipe_top_y'] / self.resolution)
        next_pipe_dist_to_player = int(state['next_pipe_dist_to_player'] / self.resolution)
        player_vel = state['player_vel']
        return (player_y, next_pipe_top_y, next_pipe_dist_to_player, player_vel)

    def observe(self, s1, a, r, s2, end):
        s1_discrete = self.discretize_state(s1)
        s2_discrete = self.discretize_state(s2)
        max_q_s2 = max(self.q_table[s2_discrete]) if not end else 0
        self.q_table[s1_discrete][a] += self.alpha * (r + self.gamma * max_q_s2 - self.q_table[s1_discrete][a])

    def training_policy(self, state):
        state_discrete = self.discretize_state(state)
        if random.random() < self.epsilon:
            return random.randint(0, 1)
        return np.argmax(self.q_table[state_discrete])

    def policy(self, state):
        state_discrete = self.discretize_state(state)
        return np.argmax(self.q_table[state_discrete])

def t4_4_train(nb_episodes, agent):
    reward_values = agent.reward_values()
    env = PLE(FlappyBird(), fps=30, display_screen=False, force_fps=True, rng=None, reward_values=reward_values)
    env.init()
    episode_scores = []
    for episode in range(nb_episodes):
        state = env.game.getGameState()
        score = 0
        while not env.game_over():
            action = agent.training_policy(state)
            reward = env.act(env.getActionSet()[action])
            new_state = env.game.getGameState()
            agent.observe(state, action, reward, new_state, env.game_over())
            state = new_state
            score += 1 if reward == reward_values["positive"] else 0  
        episode_scores.append(score)
        env.reset_game()
    return episode_scores

def t4_4_simulate_games(agent, nb_games=2):
    env = PLE(FlappyBird(), fps=30, display_screen=False, force_fps=True, rng=None)
    env.init()
    scores = []
    for _ in range(nb_games):
        state = env.game.getGameState()
        score = 0
        while not env.game_over():
            action = agent.policy(state)
            reward = env.act(env.getActionSet()[action])
            state = env.game.getGameState()
            score += 1 if reward == agent.reward_structure["positive"] else 0
        scores.append(score)
        env.reset_game()
    return scores

def t4_4_plot_experiment_results(experiment_results):
    sns.set(style="whitegrid")
    plt.figure(figsize=(12, 8))
    for label, scores in experiment_results.items():
        smoothed_scores = np.convolve(scores, np.ones(10) / 10, mode='valid')
        x = np.arange(len(smoothed_scores))
        polynomial_coefficients = np.polyfit(x, smoothed_scores, 3)
        polynomial = np.poly1d(polynomial_coefficients)
        plt.plot(smoothed_scores, label=label)
        plt.plot(x, polynomial(x), linestyle="--")
    plt.xlabel("Episodes")
    plt.ylabel("Average Score per Episode")
    plt.title("Comparison of Agent Performance with Different Parameters")
    plt.legend()
    plt.show()

def t4_4_run_experiments():
    nb_episodes = 30000
    experiments = [
        {"epsilon": 0.1, "alpha": 0.1, "gamma": 1.0, "label": "Baseline"},
        {"epsilon": 0.01, "alpha": 0.1, "gamma": 1.0, "label": "Very Low Epsilon (0.01)"},
        {"epsilon": 0.3, "alpha": 0.1, "gamma": 1.0, "label": "High Epsilon (0.3)"},
        {"epsilon": 0.1, "alpha": 0.05, "gamma": 1.0, "label": "Lower Alpha (0.05)"},
        {"epsilon": 0.1, "alpha": 0.2, "gamma": 1.0, "label": "Higher Alpha (0.2)"},
        {"epsilon": 0.1, "alpha": 0.1, "gamma": 0.95, "label": "Lower Gamma (0.90)"},
        {"epsilon": 0.1, "alpha": 0.1, "gamma": 1.0, "reward_structure": {"positive": 2.0, "tick": 0.1, "loss": -5.0}, "label": "Modified Rewards (Positive=2.0)"},
        {"epsilon": 0.1, "alpha": 0.1, "gamma": 1.0, "reward_structure": {"positive": 1.0, "tick": 0.1, "loss": -10.0}, "label": "Harsher Loss (-10)"},
        {"epsilon": 0.1, "alpha": 0.1, "gamma": 1.0, "resolution": 10, "label": "Higher State Resolution (10)"},
        {"epsilon": 0.1, "alpha": 0.1, "gamma": 1.0, "resolution": 20, "label": "Lower State Resolution (20)"}
    ]
    experiment_results = {}
    simulation_scores = {}

    for exp in experiments:
        print(f"Running experiment: {exp['label']}")
        agent = T4_4_FlappyAgent(
            epsilon=exp.get("epsilon", 0.1),
            alpha=exp.get("alpha", 0.1),
            gamma=exp.get("gamma", 1.0),
            reward_structure=exp.get("reward_structure", None),
            resolution=exp.get("resolution", 15)
        )
        episode_scores = t4_4_train(nb_episodes, agent)
        experiment_results[exp["label"]] = episode_scores
        simulation_scores[exp["label"]] = t4_4_simulate_games(agent, nb_games=5)
    
    t4_4_plot_experiment_results(experiment_results)
    
    print("Simulation scores for each configuration:")
    for label, scores in simulation_scores.items():
        print(f"{label}: {scores}")

t4_4_run_experiments()