In [1]:
import numpy as np
import torch
from env import Env, RandomEnv
from reward import random_reward
from _types import Reward
from utils import timed
from canon import epic_canon

  from .autonotebook import tqdm as notebook_tqdm


In [10]:
#randomly generating 100 transition functions
import numpy as np

class Env():
    def __init__(
        self,
        n_s: int,
        n_a: int,
        discount: float,
        init_dist: np.ndarray,
        transition_dist: np.ndarray,
    ):
        self.n_s = n_s
        self.states = np.arange(n_s)
        self.n_a = n_a
        self.actions = np.arange(n_a)
        self.discount = discount
        self.init_dist = init_dist
        self.transition_dist = transition_dist

class RandomEnv(Env):
    def __init__(self, n_s: int = 128, n_a: int = 16, discount: int = 0.9):
        init_dist = np.ones(n_s) / n_s
        thresh = 1 if n_s < 50 else (1.5 if n_s < 100 else 1.8)
        transition_dist = np.random.randn(n_s, n_a, n_s)
        transition_dist = np.where(transition_dist > thresh,
                             transition_dist, np.zeros_like(transition_dist) - 20)
        transition_dist = softmax(transition_dist)
        super().__init__(n_s, n_a, discount, init_dist, transition_dist)
    def modified_reward_matrix(self, original_reward_matrix: np.ndarray) -> np.ndarray:
        n_s, n_a, _ = original_reward_matrix.shape
        modified_reward_matrix = np.zeros((n_s, n_a, n_s))

        for s in range(n_s):
            for a in range(n_a):
                expected_reward = np.sum(self.transition_dist[s, a] * original_reward_matrix[s, a])
                modified_reward_matrix[s, a] = expected_reward

        return modified_reward_matrix

def random_reward(env: Env) -> np.ndarray:
    r = np.random.randn(env.n_s, env.n_a, env.n_s)
    if np.random.random() > 0.8:
        thresh = 3 if env.n_s < 50 else (3.5 if env.n_s < 100 else 3.8)
        r = np.where(r > thresh, r, np.zeros_like(r))
    if np.random.random() > 0.3:
        r *= 10 * np.random.random()
    if np.random.random() > 0.7:
        r += 10 * np.random.random()
    if np.random.random() > 0.5:
        potential = np.random.randn(env.n_s)
        potential *= 10 * np.random.random()
        potential += np.random.random()
        r += env.discount * potential[None, None, :] - potential[:, None, None]
    return r

def softmax(x):
    e_x = np.exp(x - np.max(x))
    return e_x / e_x.sum(axis=-1, keepdims=True)
    #Cause why not

def epic(r1: np.ndarray, r2: np.ndarray, env: Env) -> float:
    r1_can = epic_canon(r1, env)
    r2_can = epic_canon(r2, env)

    r1_norm = r1_can / np.linalg.norm(r1_can.flatten(), 2)
    r2_norm = r2_can / np.linalg.norm(r2_can.flatten(), 2)

    return np.linalg.norm((r1_norm - r2_norm).flatten(), 2)

def random_transition(env: Env) -> np.ndarray:
    random_factor = np.random.uniform(0.8, 1.2)
    thresh = random_factor * (1 if env.n_s < 50 else (1.5 if env.n_s < 100 else 1.8))
    
    mean = np.random.uniform(-1, 1)
    std_dev = np.random.uniform(0.5, 2)
    transition_dist = mean + std_dev * np.random.randn(env.n_s, env.n_a, env.n_s)
    
    sparsity = np.random.uniform(0.1, 0.9)
    transition_dist = np.where(transition_dist > thresh, transition_dist, np.zeros_like(transition_dist) - 20)
    
    transition_dist = softmax(transition_dist)
    
    if np.random.random() > 0.5:
        permute_states = np.random.permutation(env.n_s)
        permute_actions = np.random.permutation(env.n_a)
        transition_dist = transition_dist[permute_states, :, :]
        transition_dist = transition_dist[:, permute_actions, :]
    
    return transition_dist

import pandas as pd
results = pd.DataFrame(columns=["d1", "d2"])

for i in range(10):
    env = RandomEnv(n_s=128, n_a=16, discount=0.9)
    r1 = random_reward(env)
    r2 = random_reward(env)
    r1_tau = env.modified_reward_matrix(r1)
    r2_tau = env.modified_reward_matrix(r2)
    transition_dist = random_transition(env)
    env.transition_dist = transition_dist
    d1 = epic(r1, r2, env)
    num_transition_functions = 1000
    distances = []
    for _ in range(num_transition_functions):
        transition_dist = random_transition(env)
        env.transition_dist = transition_dist

        distance = epic(r1_tau, r2_tau, env)
        distances.append(distance)

    d2 = np.max(distances)
    results.loc[i] = [d1, d2]
print(results)



         d1        d2
0  1.414691  1.416386
1  1.415620  1.447523
2  1.413433  1.404739
3  1.413949  1.375424
4  1.415768  1.413673
5  1.412839  1.403442
6  1.414413  1.478716
7  1.412464  1.442633
8  1.413156  1.427106
9  1.413786  1.414610


In [21]:
#Genetic Algorithm to find max for the transition function
def genetic_algorithm(
    env: Env,
    r1: np.ndarray,
    r2: np.ndarray,
    epic_function,
    population_size: int = 100,
    n_generations: int = 100,
    mutation_rate: float = 0.1,
    crossover_rate: float = 0.8,
    selection_method: str = "tournament",
    tournament_size: int = 5,
) -> np.ndarray:

    def crossover(parent1: np.ndarray, parent2: np.ndarray) -> np.ndarray:
        n_s, n_a, _ = parent1.shape
        crossover_point = np.random.randint(1, n_s * n_a)
        
        parent1_flat = parent1.reshape(-1)
        parent2_flat = parent2.reshape(-1)
        
        offspring_flat = np.concatenate((parent1_flat[:crossover_point], parent2_flat[crossover_point:]))
        offspring = offspring_flat.reshape((n_s, n_a, n_s))
        
        offspring = softmax(offspring)
        return offspring

    def mutate(individual: np.ndarray, mutation_strength: float = 0.05) -> np.ndarray:
        mutation = (np.random.randn(*individual.shape) * mutation_strength)
        mutated_individual = individual + mutation
        
        mutated_individual = softmax(mutated_individual)
        return mutated_individual

    def tournament_selection(population: List[np.ndarray], fitnesses: List[float]) -> List[np.ndarray]:
        selected = []
        for _ in range(tournament_size):
            selected.append(random.choice(population))

        selected_fitnesses = [fitnesses[next(i for i, p in enumerate(population) if np.array_equal(individual, p))] for individual in selected]
        winner = selected[np.argmax(selected_fitnesses)]
        return winner

    def create_population() -> List[np.ndarray]:
        return [random_transition(env) for _ in range(population_size)]

    def evaluate_fitness(population: List[np.ndarray]) -> List[float]:
        return [
            epic_function(r1, r2, Env(n_s=env.n_s, n_a=env.n_a, discount=env.discount, init_dist=env.init_dist, transition_dist=individual))
            for individual in population
        ]

    population = create_population()
    best_individual = None
    best_fitness = -np.inf

    for generation in range(n_generations):
        fitnesses = evaluate_fitness(population)

        new_population = []
        for _ in range(population_size):
            if selection_method == "tournament":
                parent1 = tournament_selection(population, fitnesses)
                parent2 = tournament_selection(population, fitnesses)

            if random.random() < crossover_rate:
                offspring = crossover(parent1, parent2)
            else:
                offspring = random.choice([parent1, parent2])

            if random.random() < mutation_rate:
                offspring = mutate(offspring)

            new_population.append(offspring)

        population = new_population
        best_in_generation = population[np.argmax(fitnesses)]
        best_fitness_in_generation = max(fitnesses)

        if best_fitness_in_generation > best_fitness:
            best_fitness = best_fitness_in_generation
            best_individual = best_in_generation

    return best_individual


In [30]:
def epic(r1: Reward, r2: Reward, env: Env) -> float:
  r1_can = epic_canon(r1, env)
  r2_can = epic_canon(r2, env)

  r1_norm = r1_can / np.linalg.norm(r1_can.flatten(), 2)
  r2_norm = r2_can / np.linalg.norm(r2_can.flatten(), 2)

  return np.linalg.norm((r1_norm - r2_norm).flatten(), 2)

env = RandomEnv(n_s=128, n_a=16, discount=0.9)


r1 = random_reward(env)
r2 = random_reward(env)
r1_tau = env.modified_reward_matrix(r1)
r2_tau = env.modified_reward_matrix(r2)


best_transition = genetic_algorithm(
    env=env,
    r1=r1_tau,
    r2=r2_tau,
    epic_function=epic,  
    population_size=100,
    n_generations=100,
    mutation_rate=0.1,
    crossover_rate=0.8,
    selection_method="tournament",
    tournament_size=5,
)

print("Best transition matrix found:")

env.transition_dist = best_transition
distance = epic(r1_tau, r2_tau, env)
print(distance)


Best transition matrix found:
1.3916893546664557


In [34]:
results = pd.DataFrame(columns=["d1", "d3"])

for i in range(10):
    env = RandomEnv(n_s=128, n_a=16, discount=0.9)
    r1 = random_reward(env)
    r2 = random_reward(env)
    r1_tau = env.modified_reward_matrix(r1)
    r2_tau = env.modified_reward_matrix(r2)
    transition_dist = random_transition(env)
    env.transition_dist = transition_dist
    d1 = epic(r1, r2, env)
    num_transition_functions = 1000
    distances = []
    for _ in range(num_transition_functions):
        transition_dist = random_transition(env)
        env.transition_dist = transition_dist

        distance = epic(r1_tau, r2_tau, env)
        distances.append(distance)

    d2 = np.max(distances)

    x = genetic_algorithm(
    env=env,
    r1=r1_tau,
    r2=r2_tau,
    epic_function=epic,  
    population_size=10,
    n_generations=10,
    mutation_rate=0.1,
    crossover_rate=0.8,
    selection_method="tournament",
    tournament_size=5,
    )
    env.transition_dist = x
    d3 = epic(r1_tau, r2_tau, env)
    results.loc[i] = [d1, d3]
print(results)

#d1 is epic(r1, r2, env) and d3 is epic(r1_tau, r2_tau, env) for d^\omega(R1, R2) = max_{\tau} d^\tau(R1, R2)

         d1        d3
0  1.416153  1.399301
1  1.413445  1.462412
2  1.413541  1.365799
3  1.415791  1.470546
4  1.414791  1.416462
5  1.416196  1.425550
6  1.412823  1.421715
7  1.412610  1.403398
8  1.413838  1.405704
9  1.414402  1.388592
