# Upper Confidence Bound (UCB) Algorithm

In [None]:
import numpy as np

class UCB:
    def __init__(self, n_arms):
        self.n_arms = n_arms
        self.counts = np.zeros(n_arms)  # Count of pulls for each arm
        self.values = np.zeros(n_arms)  # Average reward for each arm

    def select_arm(self):
        total_counts = np.sum(self.counts)
        if total_counts < self.n_arms:
            # Ensure each arm is selected at least once initially
            return int(total_counts)

        ucb_values = self.values + np.sqrt((2 * np.log(total_counts)) / self.counts)
        return np.argmax(ucb_values)

    def update(self, chosen_arm, reward):
        self.counts[chosen_arm] += 1
        n = self.counts[chosen_arm]
        value = self.values[chosen_arm]
        new_value = ((n - 1) / n) * value + (1 / n) * reward
        self.values[chosen_arm] = new_value

def ucb_simulation(n_arms, n_rounds, true_means):
    ucb = UCB(n_arms)
    rewards = np.zeros(n_rounds)

    for round in range(n_rounds):
        chosen_arm = ucb.select_arm()
        reward = np.random.randn() + true_means[chosen_arm]
        ucb.update(chosen_arm, reward)
        rewards[round] = reward

    return rewards, ucb

# Example usage
n_arms = 5  # Number of arms
n_rounds = 1000  # Number of rounds
true_means = [0.1, 0.2, 0.3, 0.4, 0.5]  # True means of the arms

rewards, ucb = ucb_simulation(n_arms, n_rounds, true_means)
print("Average Reward:", np.mean(rewards))
print("Counts of each arm:", ucb.counts)
print("Estimated values of each arm:", ucb.values)


# Epsilon-Greedy Algorithm

In [None]:
import numpy as np

class EpsilonGreedy:
    def __init__(self, n_arms, epsilon):
        self.n_arms = n_arms
        self.epsilon = epsilon
        self.counts = np.zeros(n_arms)  # Count of pulls for each arm
        self.values = np.zeros(n_arms)  # Average reward for each arm

    def select_arm(self):
        if np.random.rand() < self.epsilon:
            return np.random.randint(0, self.n_arms)  # Explore
        else:
            return np.argmax(self.values)  # Exploit

    def update(self, chosen_arm, reward):
        self.counts[chosen_arm] += 1
        n = self.counts[chosen_arm]
        value = self.values[chosen_arm]
        new_value = ((n - 1) / n) * value + (1 / n) * reward
        self.values[chosen_arm] = new_value

def epsilon_greedy_simulation(n_arms, n_rounds, true_means, epsilon):
    epsilon_greedy = EpsilonGreedy(n_arms, epsilon)
    rewards = np.zeros(n_rounds)

    for round in range(n_rounds):
        chosen_arm = epsilon_greedy.select_arm()
        reward = np.random.randn() + true_means[chosen_arm]
        epsilon_greedy.update(chosen_arm, reward)
        rewards[round] = reward

    return rewards, epsilon_greedy

# Example usage
n_arms = 5  # Number of arms
n_rounds = 1000  # Number of rounds
true_means = [0.1, 0.2, 0.3, 0.4, 0.5]  # True means of the arms
epsilon = 0.1  # Epsilon value

rewards, epsilon_greedy = epsilon_greedy_simulation(n_arms, n_rounds, true_means, epsilon)
print("Average Reward:", np.mean(rewards))
print("Counts of each arm:", epsilon_greedy.counts)
print("Estimated values of each arm:", epsilon_greedy.values)

# Thompson Sampling Algorithm

In [None]:
import numpy as np

class ThompsonSampling:
    def __init__(self, n_arms):
        self.n_arms = n_arms
        self.successes = np.zeros(n_arms)  # Number of successes for each arm
        self.failures = np.zeros(n_arms)   # Number of failures for each arm

    def select_arm(self):
        samples = np.zeros(self.n_arms)
        for arm in range(self.n_arms):
            samples[arm] = np.random.beta(self.successes[arm] + 1, self.failures[arm] + 1)
        return np.argmax(samples)

    def update(self, chosen_arm, reward):
        if reward == 1:
            self.successes[chosen_arm] += 1
        else:
            self.failures[chosen_arm] += 1

def thompson_sampling_simulation(n_arms, n_rounds, true_means):
    thompson_sampling = ThompsonSampling(n_arms)
    rewards = np.zeros(n_rounds)

    for round in range(n_rounds):
        chosen_arm = thompson_sampling.select_arm()
        reward = np.random.rand() < true_means[chosen_arm]  # Bernoulli reward
        thompson_sampling.update(chosen_arm, reward)
        rewards[round] = reward

    return rewards, thompson_sampling

# Example usage
n_arms = 5  # Number of arms
n_rounds = 1000  # Number of rounds
true_means = [0.1, 0.2, 0.3, 0.4, 0.5]  # True means of the arms

rewards, thompson_sampling = thompson_sampling_simulation(n_arms, n_rounds, true_means)
print("Average Reward:", np.mean(rewards))
print("Successes of each arm:", thompson_sampling.successes)
print("Failures of each arm:", thompson_sampling.failures)

# Q-learning Algorithm for Reinforcement Learning

In [None]:
import numpy as np
import random

class QLearning:
    def __init__(self, n_states, n_actions, alpha=0.1, gamma=0.99, epsilon=0.1):
        self.n_states = n_states
        self.n_actions = n_actions
        self.alpha = alpha  # Learning rate
        self.gamma = gamma  # Discount factor
        self.epsilon = epsilon  # Exploration rate
        self.q_table = np.zeros((n_states, n_actions))  # Initialize Q-table

    def choose_action(self, state):
        if random.uniform(0, 1) < self.epsilon:
            return random.randint(0, self.n_actions - 1)  # Explore: random action
        else:
            return np.argmax(self.q_table[state, :])  # Exploit: best action

    def update_q_table(self, state, action, reward, next_state):
        best_next_action = np.argmax(self.q_table[next_state, :])
        td_target = reward + self.gamma * self.q_table[next_state, best_next_action]
        td_error = td_target - self.q_table[state, action]
        self.q_table[state, action] += self.alpha * td_error

def train_q_learning(env, q_learning, n_episodes):
    rewards = []
    for episode in range(n_episodes):
        state = env.reset()
        total_reward = 0
        done = False
        while not done:
            action = q_learning.choose_action(state)
            next_state, reward, done, _ = env.step(action)
            q_learning.update_q_table(state, action, reward, next_state)
            state = next_state
            total_reward += reward
        rewards.append(total_reward)
    return rewards

# Example Grid World Environment
class SimpleGridWorld:
    def __init__(self, size=5):
        self.size = size
        self.n_states = size * size
        self.n_actions = 4  # Up, Down, Left, Right
        self.goal_state = self.n_states - 1
        self.state = 0

    def reset(self):
        self.state = 0
        return self.state

    def step(self, action):
        row, col = divmod(self.state, self.size)
        if action == 0:  # Up
            row = max(row - 1, 0)
        elif action == 1:  # Down
            row = min(row + 1, self.size - 1)
        elif action == 2:  # Left
            col = max(col - 1, 0)
        elif action == 3:  # Right
            col = min(col + 1, self.size - 1)

        self.state = row * self.size + col
        reward = 1 if self.state == self.goal_state else -0.01
        done = self.state == self.goal_state
        return self.state, reward, done, {}

# Example usage
env = SimpleGridWorld(size=5)
q_learning = QLearning(n_states=env.n_states, n_actions=env.n_actions, alpha=0.1, gamma=0.99, epsilon=0.1)
n_episodes = 1000
rewards = train_q_learning(env, q_learning, n_episodes)

print("Trained Q-table:")
print(q_learning.q_table)
print("Rewards over episodes:")
print(rewards)

# Epsilon-Greedy strategy

In [None]:
import numpy as np

def epsilon_greedy(num_actions, epsilon, num_steps):
    """
    Implements the epsilon-greedy algorithm for multi-armed bandit problem.

    Args:
        num_actions (int): Number of actions available.
        epsilon (float): Probability of selecting a random action.
        num_steps (int): Number of steps to run the algorithm.

    Returns:
        numpy.ndarray: Action-value estimates Q(a) for each action a.
    """
    # Initialize action-value estimates Q(a) for each action a
    Q = np.zeros(num_actions)
    # Initialize counts of each action N(a) = 0 for each action a
    N = np.zeros(num_actions)

    for t in range(1, num_steps + 1):
        # With probability epsilon, select a random action
        if np.random.random() < epsilon:
            action = np.random.randint(num_actions)
        # Otherwise, select argmax_a Q(a)
        else:
            action = np.argmax(Q)

        # Simulate taking the selected action and observe reward
        reward = simulate_environment(action)

        # Update action-value estimate Q(a)
        N[action] += 1
        Q[action] += (1 / N[action]) * (reward - Q[action])

    return Q

def simulate_environment(action):
    """
    Simulates the environment and returns the reward for the selected action.

    Args:
        action (int): The action selected.

    Returns:
        float: The reward received for the selected action.
    """
    # Simulate environment and return reward for selected action
    return np.random.normal(loc=action, scale=1)

# Example usage
num_actions = 10  # Number of actions
epsilon = 0.1     # Probability of selecting a random action
num_steps = 1000  # Number of steps to run the algorithm

# Run epsilon-greedy algorithm
action_value_estimates = epsilon_greedy(num_actions, epsilon, num_steps)
print("Action-value estimates:", action_value_estimates)

# Explore-Exploit for Dynamic Pricing

In [None]:
import numpy as np

def dynamic_pricing(mu, sigma, time_horizon):
    """
    Implements a dynamic pricing strategy over a given time horizon.

    Args:
        mu (float): Mean demand.
        sigma (float): Standard deviation of demand.
        time_horizon (int): The total time horizon for the pricing strategy.

    Returns:
        float: The total revenue generated over the time horizon.
    """
    revenue = 0
    exploration_period = int(time_horizon * 0.1)  # Define exploration period (e.g., 10% of time horizon)

    for t in range(time_horizon):
        # Simulate demand using a normal distribution
        demand = np.random.normal(mu, sigma)

        # Choose pricing strategy based on exploration/exploitation phase
        if t < exploration_period:
            price = explore_pricing()
        else:
            price = exploit_pricing()

        # Calculate revenue for the current time step
        revenue += price * demand

    return revenue

def explore_pricing():
    """
    Exploration phase pricing strategy.

    Returns:
        float: The price during the exploration phase.
    """
    # Example: Return a random price for exploration
    return np.random.uniform(50, 150)

def exploit_pricing():
    """
    Exploitation phase pricing strategy.

    Returns:
        float: The price during the exploitation phase.
    """
    # Example: Return an optimal price based on some logic or model
    return 100  # Replace with a more sophisticated pricing strategy

# Example usage
mu = 100  # Mean demand
sigma = 20  # Standard deviation of demand
time_horizon = 1000  # Total time horizon

# Run dynamic pricing algorithm
revenue = dynamic_pricing(mu, sigma, time_horizon)
print("Total revenue:", revenue)

# Explore-Exploit for Revenue Management

In [None]:
def revenue_management(time_horizon):
    """
    Implements a revenue management strategy over a given time horizon.

    Args:
        time_horizon (int): The total time horizon for the revenue management strategy.

    Returns:
        float: The total revenue generated over the time horizon.
    """
    revenue = 0
    exploration_period = int(time_horizon * 0.1)  # Define exploration period (e.g., 10% of time horizon)

    for t in range(time_horizon):
        # Simulate observing the demand
        demand = observe_demand()

        # Choose strategy based on exploration/exploitation phase
        if t < exploration_period:
            strategy = explore_strategy()
        else:
            strategy = exploit_strategy()

        # Allocate resources based on the chosen strategy and calculate revenue
        revenue += allocate_resources(strategy) * demand

    return revenue

def observe_demand():
    """
    Simulates observing demand.

    Returns:
        float: The observed demand value.
    """
    # Example: Return a random demand value (this can be replaced with actual demand observation logic)
    return random.uniform(50, 150)

def explore_strategy():
    """
    Exploration phase strategy.

    Returns:
        str: The strategy during the exploration phase.
    """
    # Example: Return a random strategy for exploration (this can be more sophisticated)
    return random.choice(["strategy1", "strategy2", "strategy3"])

def exploit_strategy():
    """
    Exploitation phase strategy.

    Returns:
        str: The strategy during the exploitation phase.
    """
    # Example: Return an optimal strategy based on some logic or model
    return "optimal_strategy"

def allocate_resources(strategy):
    """
    Allocates resources based on the chosen strategy.

    Args:
        strategy (str): The chosen strategy.

    Returns:
        float: The allocation factor based on the strategy.
    """
    # Example: Return an allocation factor based on the strategy (this can be more sophisticated)
    strategy_allocation = {
        "strategy1": 1.1,
        "strategy2": 1.2,
        "strategy3": 1.3,
        "optimal_strategy": 1.5
    }
    return strategy_allocation.get(strategy, 1.0)

# Example usage
time_horizon = 100  # Total time horizon

# Run revenue management algorithm
total_revenue = revenue_management(time_horizon)
print("Total revenue:", total_revenue)

# Explore-Exploit for Content Recommendation

In [None]:
def content_recommendation(user_preferences):
    """
    Content recommendation system that alternates between exploration and exploitation phases.

    Args:
        user_preferences (dict): User preferences for content recommendation.

    Returns:
        list: List of recommended items.
    """
    recommendation_strategy = initialize_strategy()
    recommended_items = []

    for interaction in user_interactions:
        # Recommend items based on the current strategy
        recommended_items = recommend_items(recommendation_strategy)

        # Observe user feedback from the interaction
        observe_feedback(interaction)

        # Update strategy based on the phase (exploration or exploitation)
        if exploration_phase():
            update_strategy_for_exploration()
        else:
            update_strategy_for_exploitation(user_preferences)

    return recommended_items

def initialize_strategy():
    """
    Initializes the recommendation strategy.

    Returns:
        str: Initial recommendation strategy.
    """
    # Example: Return a basic initial strategy
    return "initial_strategy"

def recommend_items(strategy):
    """
    Recommends items based on the current strategy.

    Args:
        strategy (str): The current recommendation strategy.

    Returns:
        list: List of recommended items.
    """
    # Example: Return a list of recommended items based on the strategy
    if strategy == "initial_strategy":
        return ["item1", "item2", "item3"]
    elif strategy == "exploration_strategy":
        return ["item4", "item5", "item6"]
    elif strategy == "exploitation_strategy":
        return ["item7", "item8", "item9"]

def observe_feedback(interaction):
    """
    Observes user feedback from an interaction.

    Args:
        interaction (dict): The interaction data.
    """
    # Example: Process the interaction feedback (this is a placeholder)
    print(f"Observed feedback from interaction: {interaction}")

def exploration_phase():
    """
    Determines whether the system is in the exploration phase.

    Returns:
        bool: True if in exploration phase, False otherwise.
    """
    # Example: Return True if in exploration phase, False otherwise (this can be more sophisticated)
    return random.choice([True, False])

def update_strategy_for_exploration():
    """
    Updates the recommendation strategy for the exploration phase.
    """
    # Example: Update the strategy for exploration phase (this is a placeholder)
    print("Updating strategy for exploration phase")

def update_strategy_for_exploitation(user_preferences):
    """
    Updates the recommendation strategy for the exploitation phase based on user preferences.

    Args:
        user_preferences (dict): User preferences for content recommendation.
    """
    # Example: Update the strategy for exploitation phase based on user preferences (this is a placeholder)
    print(f"Updating strategy for exploitation phase based on user preferences: {user_preferences}")

def initialize_user_preferences():
    """
    Initializes user preferences.

    Returns:
        dict: Initialized user preferences.
    """
    # Example: Return a dictionary of user preferences (this is a placeholder)
    return {"genre": "sci-fi", "length": "short", "format": "video"}

# Example usage
user_interactions = [
    {"item": "item1", "feedback": "liked"},
    {"item": "item2", "feedback": "disliked"},
    {"item": "item3", "feedback": "liked"},
    # More interactions...
]

# Initialize user preferences
user_preferences = initialize_user_preferences()

# Run content recommendation algorithm
recommended_items = content_recommendation(user_preferences)
print("Recommended items:", recommended_items)

# Explore-Exploit for Adaptive Routing

In [None]:
def adaptive_routing(network_state):
    """
    Adaptive routing algorithm that alternates between exploration and exploitation phases.

    Args:
        network_state (dict): Current state of the network.

    Returns:
        list: Routed traffic information.
    """
    routing_policies = initialize_policies()
    routed_traffic = []

    for incoming_packet in network_traffic:
        # Route packet based on the current routing policies
        route_packet(incoming_packet, routing_policies)

        # Monitor network performance
        monitor_network_performance(network_state)

        # Update routing policies based on the phase (exploration or exploitation)
        if exploration_phase():
            explore_alternative_paths()
        else:
            exploit_high_performing_paths()

        routed_traffic.append(incoming_packet)

    return routed_traffic

def initialize_policies():
    """
    Initializes routing policies.

    Returns:
        dict: Initial routing policies.
    """
    # Example: Return a basic initial policy
    return {"policy": "initial_policy"}

def route_packet(packet, policies):
    """
    Routes an incoming packet based on the current routing policies.

    Args:
        packet (dict): Incoming packet information.
        policies (dict): Current routing policies.
    """
    # Example: Route the packet based on the policies (this is a placeholder)
    print(f"Routing packet {packet} using policies {policies}")

def monitor_network_performance(network_state):
    """
    Monitors the performance of the network.

    Args:
        network_state (dict): Current state of the network.
    """
    # Example: Monitor the network performance (this is a placeholder)
    print(f"Monitoring network performance: {network_state}")

def exploration_phase():
    """
    Determines whether the system is in the exploration phase.

    Returns:
        bool: True if in exploration phase, False otherwise.
    """
    # Example: Return True if in exploration phase, False otherwise (this can be more sophisticated)
    return random.choice([True, False])

def explore_alternative_paths():
    """
    Explores alternative paths for routing.
    """
    # Example: Explore alternative paths (this is a placeholder)
    print("Exploring alternative paths")

def exploit_high_performing_paths():
    """
    Exploits high-performing paths for routing.
    """
    # Example: Exploit high-performing paths (this is a placeholder)
    print("Exploiting high-performing paths")

def initialize_network_state():
    """
    Initializes the network state.

    Returns:
        dict: Initial network state.
    """
    # Example: Return a dictionary representing the network state (this is a placeholder)
    return {"state": "initial_state"}

# Example usage
network_traffic = [
    {"packet_id": 1, "source": "A", "destination": "B"},
    {"packet_id": 2, "source": "A", "destination": "C"},
    {"packet_id": 3, "source": "B", "destination": "C"},
    # More packets...
]

# Initialize network state
network_state = initialize_network_state()

# Run adaptive routing algorithm
routed_traffic = adaptive_routing(network_state)
print("Routed traffic:", routed_traffic)

# Bayesian Optimal Interval for Dose-Finding

In [None]:
import numpy as np

class BayesianOptimalInterval:
    """
    Bayesian Optimal Interval class to manage dose selection and update posterior distribution
    based on observed responses.
    """
    def __init__(self, prior_mean, prior_std):
        """
        Initialize the BayesianOptimalInterval with prior mean and standard deviation.

        Args:
            prior_mean (float): Prior mean of the distribution.
            prior_std (float): Prior standard deviation of the distribution.
        """
        self.prior_mean = prior_mean
        self.prior_std = prior_std
        self.posterior_mean = prior_mean
        self.posterior_std = prior_std

    def select_dose(self):
        """
        Select the dose based on the posterior mean.

        Returns:
            float: The selected dose.
        """
        return self.posterior_mean

    def update_distribution(self, dose, response):
        """
        Update the posterior distribution based on the observed response.

        Args:
            dose (float): The dose that was administered.
            response (float): The observed response.
        """
        self.posterior_mean = (self.prior_mean + response) / 2
        self.posterior_std = max(self.prior_std / 2, 0.1)

    def compute_optimal_interval(self):
        """
        Compute the optimal interval based on the posterior distribution.
        This function is a placeholder and should be implemented based on the specific requirements.
        """
        pass

# Example usage:
boin = BayesianOptimalInterval(prior_mean=0, prior_std=1)
dose = boin.select_dose()
response = 1  # Assume patient response is observed
boin.update_distribution(dose, response)
print(f"Updated Posterior Mean: {boin.posterior_mean}")
print(f"Updated Posterior Standard Deviation: {boin.posterior_std}")

# Q-Learning

In [None]:
import numpy as np

class QLearning:
    """
    Q-Learning algorithm class for reinforcement learning.
    """
    def __init__(self, num_states, num_actions):
        """
        Initialize Q-Learning with the given number of states and actions.

        Args:
            num_states (int): Number of states in the environment.
            num_actions (int): Number of possible actions.
        """
        self.Q = np.zeros((num_states, num_actions))  # Initialize Q-table with zeros

    def select_action(self, state, epsilon):
        """
        Select an action based on the epsilon-greedy policy.

        Args:
            state (int): The current state.
            epsilon (float): Probability of selecting a random action (exploration).

        Returns:
            int: The selected action.
        """
        if np.random.rand() < epsilon:
            return np.random.choice(self.Q.shape[1])  # Explore: select a random action
        else:
            return np.argmax(self.Q[state])  # Exploit: select the action with max Q-value

    def update_q_value(self, state, action, reward, next_state, alpha, gamma):
        """
        Update the Q-value for the given state-action pair.

        Args:
            state (int): The current state.
            action (int): The action taken.
            reward (float): The reward received after taking the action.
            next_state (int): The next state after taking the action.
            alpha (float): Learning rate.
            gamma (float): Discount factor.
        """
        max_next_q = np.max(self.Q[next_state])  # Max Q-value for the next state
        self.Q[state, action] += alpha * (reward + gamma * max_next_q - self.Q[state, action])

# Example usage:
ql = QLearning(num_states=100, num_actions=3)
state = 0
epsilon = 0.1
action = ql.select_action(state, epsilon)
reward = 1  # Assume positive reward for the selected action
next_state = 1
alpha = 0.1
gamma = 0.9
ql.update_q_value(state, action, reward, next_state, alpha, gamma)
print("Updated Q-Table:", ql.Q)