In [None]:
# verify version
!python --version

In [None]:
!pip install --upgrade luxai-s3

In [None]:
import os
import sys
import numpy as np
import random

In [None]:
os.listdir("../../Data/lux-ai-season-3/")

In [None]:
!mkdir Test

In [None]:
!cp -r ../../Data/lux-ai-season-3/* Test/

In [None]:
prac_array = np.array([1, 2, 3, 2])
prac_array

In [None]:
prac_condition = prac_array <= 2
prac_condition

In [None]:
prac_result = prac_array[prac_condition]
prac_result

In [None]:
prac_array = np.array([[1, 2], [3, 4], [5, 6]])
prac_array

In [None]:
5 in prac_array

In [None]:
7 in prac_array

In [None]:
empty_list = []

In [None]:
empty_list

In [None]:
np.zeros((4, 4, 4)).shape

In [None]:
random.uniform(0, 1)

In [None]:
Q_table = np.zeros((4, 4, 4))
Q_table.shape

In [None]:
np.argmax(Q_table)

In [None]:
import numpy as np
import random
import matplotlib.pyplot as plt
import seaborn as sns

# Define the environment (4x4 Grid)
GRID_SIZE = 4
ACTIONS = ["UP", "DOWN", "LEFT", "RIGHT"]
ACTION_MAP = {0: (-1, 0), 1: (1, 0), 2: (0, -1), 3: (0, 1)}

# Q-learning parameters
alpha = 0.1  # Learning rate
gamma = 0.9  # Discount factor
epsilon = 0.2  # Exploration probability
num_episodes = 5000  # Training episodes

# Initialize Q-table (4x4 grid, 4 possible actions)
Q_table = np.zeros((GRID_SIZE, GRID_SIZE, len(ACTIONS)))
Q_table[GRID_SIZE - 1, GRID_SIZE - 1, :] = 100  # Set all actions at (3,3) to 100

# Define reward function
def get_reward(state):
    return 100 if state == (GRID_SIZE - 1, GRID_SIZE - 1) else -1  # Goal = 100, Else = -1

# Function to choose action (ε-greedy)
def choose_action(state):
    if random.uniform(0, 1) < epsilon:  # Explore
        return random.choice(range(len(ACTIONS)))
    else:  # Exploit (choose best action)
        return np.argmax(Q_table[state[0], state[1]])

# Training loop
for episode in range(num_episodes):
    state = (0, 0)  # Start position
    while state != (GRID_SIZE - 1, GRID_SIZE - 1):
        action = choose_action(state)
        move = ACTION_MAP[action]
        next_state = (max(0, min(GRID_SIZE - 1, state[0] + move[0])),
                      max(0, min(GRID_SIZE - 1, state[1] + move[1])))

        reward = get_reward(next_state)

        # If next state is goal, set its Q-value directly
        if next_state == (GRID_SIZE - 1, GRID_SIZE - 1):
            Q_table[state[0], state[1], action] = reward  # Directly assign goal reward
        else:
            # Standard Q-learning update rule
            Q_table[state[0], state[1], action] += alpha * (
                reward + gamma * np.max(Q_table[next_state[0], next_state[1]]) - Q_table[state[0], state[1], action]
            )

        state = next_state  # Move to next state


# Extract the maximum Q-values for each state
best_q_values = np.max(Q_table, axis=2)

# Visualize the Q-table as a heatmap
plt.figure(figsize=(6, 6))
sns.heatmap(best_q_values, annot=True, fmt=".1f", cmap="coolwarm", square=True, cbar=True)

plt.title("Learned Q-values (Max Over Actions)")
plt.xlabel("Column Index")
plt.ylabel("Row Index")
plt.show()

# My Agent

In [None]:
%%writefile Test/agent.py
from transformers import AutoTokenizer, AutoModelForCausalLM
import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np
import gym
import heapq
from lux.utils import direction_to
from stable_baselines3 import PPO

# ------------------------
# Neural network components
# ------------------------
class WorldModel(nn.Module):
    def __init__(self, input_dim, hidden_dim, output_dim):
        super(WorldModel, self).__init__()
        self.fc1 = nn.Linear(input_dim, hidden_dim)
        self.fc2 = nn.Linear(hidden_dim, hidden_dim)
        self.fc3 = nn.Linear(hidden_dim, output_dim)
        self.relu = nn.ReLU()
    
    def forward(self, x):
        x = self.relu(self.fc1(x))
        x = self.relu(self.fc2(x))
        x = self.fc3(x)
        return x

class MetaLearner(nn.Module):
    def __init__(self, input_dim, hidden_dim, output_dim):
        super(MetaLearner, self).__init__()
        self.fc1 = nn.Linear(input_dim, hidden_dim)
        self.fc2 = nn.Linear(hidden_dim, hidden_dim)
        self.fc3 = nn.Linear(hidden_dim, output_dim)
        self.relu = nn.ReLU()
    
    def forward(self, x):
        x = self.relu(self.fc1(x))
        x = self.relu(self.fc2(x))
        x = self.fc3(x)
        return torch.softmax(x, dim=-1)

    def fine_tune(self, state, target_weights, optimizer):
        optimizer.zero_grad()
        predicted_weights = self.forward(state)
        loss = nn.MSELoss()(predicted_weights, target_weights)
        loss.backward()
        optimizer.step()
        return loss.item()

# ------------------------
# Policy and evaluation modules
# ------------------------
class SelfImprovingPolicy:
    def __init__(self, learning_rate=0.0003):
        # Create a PPO agent using a simple gym environment for demonstration.
        self.env = gym.make("LuxAI-v3")
        self.model = PPO("MlpPolicy", self.env, verbose=1)
        self.optimizer = optim.Adam(self.model.policy.parameters(), lr=learning_rate)
    
    def update_policy(self, state, reward, action):
        self.optimizer.zero_grad()
        # For demonstration purposes, we assume evaluate_actions returns a tuple
        loss = -reward * self.model.policy.evaluate_actions(state, action)[0].mean()
        loss.backward()
        self.optimizer.step()
    
    def select_action(self, observation):
        # The PPO predict returns a tuple (action, _)
        return self.model.predict(observation, deterministic=False)

class RiskRewardEvaluator:
    def __init__(self, risk_tolerance=0.5, aggression_factor=1.2, efficiency_factor=0.8):
        self.risk_tolerance = risk_tolerance
        self.aggression_factor = aggression_factor
        self.efficiency_factor = efficiency_factor
    
    def evaluate(self, reward, uncertainty, energy_efficiency, aggression_level):
        adjusted_reward = reward - (self.risk_tolerance * uncertainty)
        adjusted_reward += (self.aggression_factor * aggression_level) - (self.efficiency_factor * (1 - energy_efficiency))
        return adjusted_reward

class ExplorationExploitationBalancer:
    def __init__(self, exploration_weight=0.85):
        self.exploration_weight = exploration_weight
    
    def balance(self, unit_state, known_rewards):
        exploration_factor = np.random.uniform(0, 1)
        if exploration_factor < self.exploration_weight:
            return np.random.choice(range(len(known_rewards)))  # Explore
        return np.argmax(known_rewards)  # Exploit

# ------------------------
# Prediction and planning modules
# ------------------------
class TemporalOpponentPredictor(nn.Module):
    def __init__(self, input_dim, hidden_dim, output_dim):
        super(TemporalOpponentPredictor, self).__init__()
        self.lstm = nn.LSTM(input_dim, hidden_dim, batch_first=True)
        self.fc = nn.Linear(hidden_dim, output_dim)
    
    def forward(self, x):
        lstm_out, _ = self.lstm(x)
        return self.fc(lstm_out[:, -1, :])

class AIPathPlanner:
    def __init__(self, grid_size=(24, 24)):
        self.grid_size = grid_size
    
    def heuristic(self, a, b):
        return abs(a[0] - b[0]) + abs(a[1] - b[1])
    
    def a_star_search(self, start, goal, obstacles):
        open_set = []
        heapq.heappush(open_set, (0, start))
        came_from = {}
        cost_so_far = {start: 0}
        
        while open_set:
            _, current = heapq.heappop(open_set)
            if current == goal:
                break
            for dx, dy in [(-1, 0), (1, 0), (0, -1), (0, 1)]:
                neighbor = (current[0] + dx, current[1] + dy)
                if (0 <= neighbor[0] < self.grid_size[0] and 
                    0 <= neighbor[1] < self.grid_size[1] and 
                    neighbor not in obstacles):
                    new_cost = cost_so_far[current] + 1
                    if neighbor not in cost_so_far or new_cost < cost_so_far[neighbor]:
                        cost_so_far[neighbor] = new_cost
                        priority = new_cost + self.heuristic(goal, neighbor)
                        heapq.heappush(open_set, (priority, neighbor))
                        came_from[neighbor] = current
        
        path = []
        current = goal
        while current in came_from:
            path.append(current)
            current = came_from[current]
        path.reverse()
        return path

# ------------------------
# Opponent modeling and adaptive strategy
# ------------------------
class OpponentMemory:
    def __init__(self, max_size=300):
        self.max_size = max_size
        self.memory = []
    
    def add(self, data):
        if len(self.memory) >= self.max_size:
            self.memory.pop(0)
        self.memory.append(data)
    
    def get_recent(self, num=15):
        return self.memory[-num:]
    
    def analyze_opponent(self):
        if len(self.memory) < 15:
            return "unknown"
        move_patterns = [entry['move'] for entry in self.memory[-15:]]
        aggression_levels = [entry['aggression'] for entry in self.memory[-15:]]
        
        avg_aggression = np.mean(aggression_levels)
        if avg_aggression > 0.8:
            return "hyper-aggressive"
        elif avg_aggression > 0.6:
            return "aggressive"
        elif avg_aggression < 0.3:
            return "defensive"
        return "balanced"

class AdaptiveOpponentStrategy:
    def __init__(self):
        self.counter_strategies = {
            "hyper-aggressive": "trap",
            "aggressive": "defensive",
            "defensive": "resource-hogging",
            "balanced": "opportunistic",
            "unknown": "cautious"
        }
    
    def get_counter_strategy(self, opponent_style):
        return self.counter_strategies.get(opponent_style, "cautious")

class EnergyOptimizer:
    def __init__(self, efficiency_factor=0.93):
        self.efficiency_factor = efficiency_factor
    
    def optimize(self, energy_available, energy_needed):
        return min(energy_available, energy_needed * self.efficiency_factor)

class HRLController:
    def __init__(self):
        self.high_level_tasks = ["scout", "harvest", "combat", "defend", "ambush", "trap"]
    
    def assign_task(self, unit_state, opponent_style):
        if opponent_style == "hyper-aggressive" and unit_state['energy'] > 80:
            return "trap"
        elif opponent_style == "aggressive" and unit_state['energy'] > 70:
            return "defend"
        elif unit_state['energy'] > 100:
            return "combat"
        elif unit_state['energy'] > 60:
            return "harvest"
        elif opponent_style == "defensive":
            return "ambush"
        return "scout"

class CommunicationModule:
    def __init__(self):
        self.messages = []
    
    def broadcast(self, message):
        self.messages.append(message)
    
    def receive(self):
        return self.messages.pop(0) if self.messages else None

class AdaptiveCombatStrategy:
    def __init__(self):
        self.aggressiveness_threshold = 0.85
    
    def adjust_combat(self, opponent_behavior, unit_state):
        if opponent_behavior == "hyper-aggressive":
            return "set-trap" if unit_state['energy'] > 90 else "evade"
        elif opponent_behavior == "aggressive":
            return "evade" if unit_state['energy'] < 50 else "counterattack"
        elif opponent_behavior == "defensive":
            return "press" if unit_state['energy'] > 75 else "hold"
        return "balanced"

# ------------------------
# Agent Class: Integrating all modules
# ------------------------
class Agent():
    def __init__(self, player: str, env_cfg) -> None:
        self.player = player
        self.enemy_player = "player_1" if self.player == "player_0" else "player_0"
        self.team_id = 0 if self.player == "player_0" else 1
        self.enemy_team_id = 1 if self.team_id == 0 else 0
        self.env_cfg = env_cfg
        self.unit_move_cost = env_cfg['unit_move_cost']
        self.unit_sap_cost = env_cfg['unit_sap_cost']
        self.unit_sap_range = env_cfg['unit_sap_range']
        self.unit_sensor_range = env_cfg['unit_sensor_range']
        self.map_height = env_cfg['map_height']
        self.map_width = env_cfg['map_width']
        self.first_spawn = False
        
        self.map_explored_status = np.zeros((self.map_height, self.map_width), dtype=int)
        
        # Modules used in decision making:
        self.opponent_memory = OpponentMemory(max_size=100)
        self.tokenizer = AutoTokenizer.from_pretrained("deepseek-ai/DeepSeek-R1-Distill-Qwen-1.5B")
        self.model = AutoModelForCausalLM.from_pretrained("deepseek-ai/DeepSeek-R1-Distill-Qwen-1.5B")
        self.world_model = WorldModel(input_dim=20, hidden_dim=128, output_dim=20)
        self.meta_learner = MetaLearner(input_dim=10, hidden_dim=64, output_dim=3)
        self.self_improving_policy = SelfImprovingPolicy()
        self.risk_reward_evaluator = RiskRewardEvaluator()
        self.exploration_exploitation_balancer = ExplorationExploitationBalancer()
        self.temporal_predictor = TemporalOpponentPredictor(input_dim=5, hidden_dim=64, output_dim=5)
        self.ai_path_planner = AIPathPlanner(grid_size=(self.map_height, self.map_width))
        self.adaptive_opponent_strategy = AdaptiveOpponentStrategy()
        self.energy_optimizer = EnergyOptimizer()
        self.hrl_controller = HRLController()
        self.communication_module = CommunicationModule()
        self.adaptive_combat_strategy = AdaptiveCombatStrategy()
    
    def act(self, step: int, obs, remainingOverageTime: int = 60):
        # ------------------------
        # 1. Opponent Modeling and Temporal Prediction
        # ------------------------
        # (Simulate adding an opponent move to memory)
        self.opponent_memory.add({'move': np.random.rand(), 'aggression': np.random.rand()})
        opponent_style = self.opponent_memory.analyze_opponent()
        
        # Prepare a sequence for the temporal predictor.
        recent_memory = self.opponent_memory.get_recent(5)
        # Extract 'move' values; pad if needed.
        moves = [entry.get('move', 0) for entry in recent_memory]
        if len(moves) < 5:
            moves += [0] * (5 - len(moves))
        # TemporalOpponentPredictor expects input shape (batch, sequence, feature_dim)
        # Here, we repeat each move value 5 times to form a dummy 5-dimensional vector.
        state_sequence = torch.tensor([[[m] * 5 for m in moves]], dtype=torch.float32)
        predicted_opponent_move = self.temporal_predictor(state_sequence).argmax().item()
        
        unit_actions = {}
        
        # ------------------------
        # 2. Process Each Unit’s Decision
        # ------------------------
        for idx, pos in enumerate(obs['units']['position'][self.team_id]):
            # Simulate unit state (here, energy is randomly set for demonstration)
            unit_energy = np.random.randint(50, 150)
            unit_state = {'energy': unit_energy, 'position': pos}
            
            # High-Level Task Assignment (via HRL Controller)
            task = self.hrl_controller.assign_task(unit_state, opponent_style)
            
            # Path Planning: For tasks that involve movement, plan a path.
            if task in ["harvest", "scout", "combat"]:
                # For demonstration, set the goal as the center of the map.
                goal = (self.map_height // 2, self.map_width // 2)
                obstacles = []  # In a real scenario, populate this with known obstacles.
                path = self.ai_path_planner.a_star_search(tuple(pos), goal, obstacles)
            else:
                path = []
            
            # Adaptive Combat: If the task is combat, adjust combat strategy.
            if task == "combat":
                combat_strategy = self.adaptive_combat_strategy.adjust_combat(opponent_style, unit_state)
            else:
                combat_strategy = None
            
            # Adaptive Opponent Strategy: Determine a counter strategy.
            counter_strategy = self.adaptive_opponent_strategy.get_counter_strategy(opponent_style)
            
            # Energy Optimization: Decide how much energy to allocate.
            energy_needed = np.random.randint(30, 100)
            optimized_energy = self.energy_optimizer.optimize(unit_energy, energy_needed)
            
            # Action Selection: Use the self-improving policy to choose an action.
            action, _ = self.self_improving_policy.select_action(obs)
            
            # Risk-Reward Evaluation: Evaluate the situation.
            team_points = obs['team_points'][self.team_id]
            enemy_points = obs['team_points'][self.enemy_team_id]
            reward_diff = team_points - enemy_points
            risk_adjusted_reward = self.risk_reward_evaluator.evaluate(
                reward_diff,
                np.random.rand(),                 # uncertainty (simulated)
                np.random.uniform(0.7, 1.0),        # energy efficiency (simulated)
                np.random.rand()                  # aggression level (simulated)
            )
            
            # Update the policy based on the risk-adjusted reward.
            self.self_improving_policy.update_policy(obs, risk_adjusted_reward, action)
            
            # Meta Learning: Fine-tune meta parameters.
            dummy_state = torch.rand(10)
            target_weights = torch.rand(3)
            meta_optimizer = optim.Adam(self.meta_learner.parameters(), lr=0.001)
            meta_loss = self.meta_learner.fine_tune(dummy_state, target_weights, meta_optimizer)
            
            # World Model: Predict future state based on a dummy input.
            dummy_input = torch.rand(20)
            predicted_state = self.world_model(dummy_input)
            
            # Exploration vs Exploitation: Choose a mode based on known rewards.
            known_rewards = [risk_adjusted_reward, np.random.rand(), np.random.rand()]
            exploration_choice = self.exploration_exploitation_balancer.balance(unit_state, known_rewards)
            
            # Language Generation: Use the transformer model for a status report.
            prompt = f"Unit {idx} assigned task {task} with counter strategy {counter_strategy}."
            input_ids = self.tokenizer(prompt, return_tensors="pt").input_ids
            generated_output = self.model.generate(input_ids, max_length=20)
            language_response = self.tokenizer.decode(generated_output[0], skip_special_tokens=True)
            
            # Communication: Broadcast the decision.
            message = (
                f"Unit {idx}: Task {task}, Path: {path}, Combat: {combat_strategy}, "
                f"Optimized Energy: {optimized_energy}, Exploration Choice: {exploration_choice}, "
                f"Language Response: {language_response}"
            )
            self.communication_module.broadcast(message)
            
            # Combine all decisions into the final unit action.
            unit_actions[idx] = {
                'action': action,
                'task': task,
                'path': path,
                'combat_strategy': combat_strategy,
                'optimized_energy': optimized_energy,
                'exploration_choice': exploration_choice,
                'language_response': language_response,
                'predicted_opponent_move': predicted_opponent_move,
                'meta_loss': meta_loss,
                'predicted_state': predicted_state.detach().numpy()
            }
        
        # Optionally, process any incoming communications.
        comms = []
        received = self.communication_module.receive()
        while received is not None:
            comms.append(received)
            received = self.communication_module.receive()
        # (Here you could log or otherwise process the communications)
        
        return unit_actions

In [None]:
!luxai-s3 Test/main.py Test/main.py --output=replay_my_agent.html