In [1]:
#second_try_with_knowledge_using PPO algorithm:


In [11]:
import numpy as np
import tensorflow as tf
import tensorflow_probability as tfp
import random
import tensorflow.keras as keras
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Dense, Input
from tensorflow.keras.optimizers import Adam
import pandas as pd

In [15]:
# read by default 1st sheet of an excel file
df = pd.read_excel('knowledge_base1.xlsx')

# Define Features
FEATURES = ["Hair", "Feathers", "Eggs", "Milk", "Airborne", "Aquatic", "Predator", "Toothed",
            "Backbone", "Breathes", "Venomous", "Fins", "Nlegs_0", "Nlegs_2", "Nlegs_4",
            "Nlegs_5", "Nlegs_6", "Nlegs_8", "Tail", "Domestic", "Catsize","Mammal","Bird","Reptile","Fish","Amphibian","Insect","Invertebrate"
]

animal_features = df[FEATURES].to_numpy()

NUM_ANIMALS = 100  
NUM_FEATURES = len(FEATURES)
MAX_STEPS = 20

# Environment Class
class AnimalQuestionEnv:
    def __init__(self):
        self.animals = list(range(NUM_ANIMALS))
        self.target_animal = random.choice(self.animals)
        self.remaining_animals = set(self.animals)
        self.state = np.ones(NUM_FEATURES)
        self.step_count = 0
        self.done = False

    def reset(self):
        self.target_animal = random.choice(self.animals)
        self.remaining_animals = set(self.animals)
        self.state = np.ones(NUM_FEATURES)
        self.step_count = 0
        self.done = False
        return self.state

    def step(self, action):
        """Handles selecting a question (feature index)."""
        if self.done:
            return self.state, 0, self.done

        feature_idx = np.clip(action.numpy(), 0, NUM_FEATURES - 1)
        correct_answer = animal_features[self.target_animal][feature_idx]

        # Remove animals that do not match the answer
        self.remaining_animals = {
            a for a in self.remaining_animals if animal_features[a][feature_idx] == correct_answer
        }

        # Update state
        self.state[feature_idx] = correct_answer
        self.step_count += 1

        # Reward based on elimination
        reward = len(self.animals) - len(self.remaining_animals)

        if self.step_count >= MAX_STEPS or len(self.remaining_animals) == 1:
            self.done = True
            guessed_animal = next(iter(self.remaining_animals), -1)
            reward = 30 if guessed_animal == self.target_animal else -30

        return self.state, reward, self.done

# PPO Networks
class PolicyNetwork(tf.keras.Model):
    def __init__(self):
        super(PolicyNetwork, self).__init__()
        self.dense1 = tf.keras.layers.Dense(64, activation="relu")
        self.dense2 = tf.keras.layers.Dense(32, activation="relu")
        self.output_layer = tf.keras.layers.Dense(NUM_FEATURES, activation="softmax")

    def call(self, state):
        x = self.dense1(state)
        x = self.dense2(x)
        return self.output_layer(x)

class ValueNetwork(tf.keras.Model):
    def __init__(self):
        super(ValueNetwork, self).__init__()
        self.dense1 = tf.keras.layers.Dense(64, activation="relu")
        self.dense2 = tf.keras.layers.Dense(32, activation="relu")
        self.output_layer = tf.keras.layers.Dense(1, activation="linear")  # Value estimation

    def call(self, state):
        x = self.dense1(state)
        x = self.dense2(x)
        return self.output_layer(x)

# Training Setup
env = AnimalQuestionEnv()
policy_net = PolicyNetwork()
value_net = ValueNetwork()

policy_optimizer = tf.keras.optimizers.Adam(learning_rate=0.001)
value_optimizer = tf.keras.optimizers.Adam(learning_rate=0.001)

# PPO Hyperparameters
# PPO Hyperparameters
gamma = 0.99
clip_ratio = 0.2  # Clipping range for PPO updates
num_episodes = 500
batch_size = 32  # Number of experiences to process per update
EPOCHS = 10  # Number of PPO training iterations per batch

# Experience storage
trajectories = []

for episode in range(num_episodes):
    state = env.reset()
    log_probs = []
    values = []
    rewards = []
    states = []
    actions = []

    for t in range(MAX_STEPS):
        state_tensor = tf.convert_to_tensor(state, dtype=tf.float32)
        state_tensor = tf.expand_dims(state_tensor, axis=0)
        action_probs = policy_net(state_tensor)

        # Use TensorFlow Probability for PPO action selection
        action_distribution = tfp.distributions.Categorical(probs=action_probs)
        action = action_distribution.sample()

        log_prob = action_distribution.log_prob(action)
        value = value_net(state_tensor)

        log_probs.append(log_prob)
        values.append(value)
        states.append(state)
        actions.append(action)

        next_state, reward, done = env.step(action)
        rewards.append(reward)

        if done:
            break
        state = next_state

    # Compute discounted rewards
    returns = []
    G = 0
    for r in reversed(rewards):
        G = r + gamma * G
        returns.insert(0, G)

    returns = tf.convert_to_tensor(returns, dtype=tf.float32)
    values = tf.convert_to_tensor(values, dtype=tf.float32)
    log_probs = tf.convert_to_tensor(log_probs, dtype=tf.float32)
    states = tf.convert_to_tensor(states, dtype=tf.float32)
    actions = tf.convert_to_tensor(actions, dtype=tf.int32)

    # Compute advantages
    advantages = returns - values

    # PPO Update
    for _ in range(EPOCHS):
        with tf.GradientTape() as policy_tape, tf.GradientTape() as value_tape:
            new_action_probs = policy_net(states)
            new_action_distribution = tfp.distributions.Categorical(probs=new_action_probs)
            new_log_probs = new_action_distribution.log_prob(actions)

            # PPO Clipped Surrogate Loss
            ratio = tf.exp(new_log_probs - log_probs)
            clipped_ratio = tf.clip_by_value(ratio, 1 - clip_ratio, 1 + clip_ratio)
            policy_loss = -tf.reduce_mean(tf.minimum(ratio * advantages, clipped_ratio * advantages))

            # Value Loss (MSE)
            value_loss = tf.reduce_mean((returns - value_net(states)) ** 2)

        # Compute gradients and apply updates
        policy_grads = policy_tape.gradient(policy_loss, policy_net.trainable_variables)
        value_grads = value_tape.gradient(value_loss, value_net.trainable_variables)

        policy_optimizer.apply_gradients(zip(policy_grads, policy_net.trainable_variables))
        value_optimizer.apply_gradients(zip(value_grads, value_net.trainable_variables))

    if episode % 50 == 0:
        print(f"Episode {episode}, Remaining Animals: {len(env.remaining_animals)}, Reward: {sum(rewards)}")

print("Training complete!")

Episode 0, Remaining Animals: 8, Reward: 1543
Episode 50, Remaining Animals: 87, Reward: 217
Episode 100, Remaining Animals: 87, Reward: 217
Episode 150, Remaining Animals: 87, Reward: 217
Episode 200, Remaining Animals: 87, Reward: 217
Episode 250, Remaining Animals: 87, Reward: 217
Episode 300, Remaining Animals: 67, Reward: 577
Episode 350, Remaining Animals: 4, Reward: 1652
Episode 400, Remaining Animals: 6, Reward: 1505
Episode 450, Remaining Animals: 2, Reward: 1600
Training complete!
