In [55]:
import os, random, math, time, warnings
warnings.filterwarnings("ignore")
import numpy as np
import torch, torch.nn as nn
from torch.utils.data import DataLoader, Subset,Dataset
from torchvision import transforms
import gymnasium as gym
from gymnasium import spaces
import pennylane as qml
from pennylane import numpy as pnp
from stable_baselines3 import PPO
from medmnist import ChestMNIST

In [56]:
DEVICE = "cuda" if torch.cuda.is_available() else "cpu"
SEED   = 42
torch.manual_seed(SEED)  
np.random.seed(SEED)
random.seed(SEED)

In [57]:
BATCH_SZ = 64
IMG_SIZE  = 32
NUM_CLASSES = 3

In [58]:
transform = transforms.Compose([
    transforms.Resize((IMG_SIZE, IMG_SIZE)),
    transforms.ToTensor(),
])

In [59]:
train = ChestMNIST(split="train", download=True, as_rgb=True, transform=transform)
test  = ChestMNIST(split="test", download=True, as_rgb=True, transform=transform)

In [None]:
#Apenas 3 classes classificadas nesse momento
#train_idx = [i for i,(x,y) in enumerate(train) if y[0] < NUM_CLASSES]
#test_idx  = [i for i,(x,y) in enumerate(test)  if y[0] < NUM_CLASSES]

In [60]:
class MappedDataset(Dataset):
    def __init__(self, base_dataset, class_map):
        self.data = [(x, class_map[int(y[0])]) for x, y in base_dataset if int(y[0]) in class_map]

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        return self.data[idx]

In [61]:
all_labels = [int(y[0]) for _, y in train]
class_list = sorted(set(all_labels))[:NUM_CLASSES]
class_map = {orig: i for i, orig in enumerate(class_list)}

In [66]:
train_dataset = MappedDataset(train, class_map)
test_dataset = MappedDataset(test, class_map)

subset_size = int(0.1 * len(train_dataset))
train_subset = Subset(train_dataset, list(range(subset_size)))

train_loader = DataLoader(train_subset, batch_size=BATCH_SZ, shuffle=True)

subset_size = int(0.1 * len(test_dataset))
test_subset = Subset(test_dataset, list(range(subset_size)))
test_loader = DataLoader(test_subset, batch_size=BATCH_SZ, shuffle=False)

In [None]:
#train_loader = DataLoader(Subset(train, train_idx), batch_size=BATCH_SZ, shuffle=True)
#test_loader  = DataLoader(Subset(test,  test_idx),  batch_size=BATCH_SZ, shuffle=False)

In [63]:
# ---------- 3. Hiper‑parâmetros de RL / PQC ----------
N_QUBITS   = 4
MAX_DEPTH  = 8          # nº máximo de gates que o agente pode adicionar
ACTION_SET = (['rx','ry','rz'] + ['cnot'])   # gates disponíveis
N_ACTIONS  = len(ACTION_SET) * N_QUBITS

dev = qml.device("default.qubit", wires=N_QUBITS)

In [None]:
import torch
import torch.nn as nn
import pennylane as qml

class CircuitBuilderEnv2(gym.Env):
    

In [27]:
import torch
import torch.nn as nn
import pennylane as qml

class Quantumnet(nn.Module):
    def __init__(self, circuit, n_qubits=4, n_classes=3):
        super(Quantumnet, self).__init__()
        self.n_qubits = n_qubits
        self.circuit = circuit
        self.q_params = nn.Parameter(0.01 * torch.randn(len(circuit)))
        self.conv = nn.Sequential(
            nn.Conv2d(3, 8, kernel_size=3, stride=1, padding=1),
            nn.ReLU(),
            nn.MaxPool2d(2),     # Reduz as dimensões pela metade
            nn.Conv2d(8, 16, kernel_size=3, stride=1, padding=1),
            nn.ReLU(),
            nn.MaxPool2d(2)  # Reduz as dimensões pela metade
        )

        with torch.no_grad():
            dummy_input = torch.zeros(1, 3, 32, 32)
            dummy_output = self.conv(dummy_input)
            flatten_dim = dummy_output.view(1, -1).shape[1]
            
        self.pre_net = nn.Linear(flatten_dim, n_qubits)
        self.post_net = nn.Linear(n_qubits, n_classes)

        # Define QNode
        self.dev = qml.device("default.qubit", wires=n_qubits)

        @qml.qnode(self.dev, interface="torch")
        def qnode(inputs, weights):
            for i in range(n_qubits):
                qml.RY(inputs[i], wires=i)
            for i, (gate, wire) in enumerate(circuit):
                angle = weights[i]
                if gate == 'rx':
                    qml.RX(angle, wires=wire)
                elif gate == 'ry':
                    qml.RY(angle, wires=wire)
                elif gate == 'rz':
                    qml.RZ(angle, wires=wire)
                elif gate == 'cnot':
                    qml.CNOT(wires=[wire, (wire + 1) % n_qubits])
            return [qml.expval(qml.PauliZ(i)) for i in range(self.n_qubits)]

        self.qnode = qnode
    def forward(self, x):
        # x shape: [batch_size, input_dim]
        x = x.view(x.size(0), 3, 32, 32)
        
        x = self.conv(x)                          # [batch, 16, 8, 8]
        x = x.view(x.size(0), -1)                 # [batch, 16*8*8]
        x = self.pre_net(x)                        # [batch, n_qubits]
        x = torch.tanh(x) * (torch.pi / 3.0)

        q_out = []
        for xi in x:
            q_result = self.qnode(xi, self.q_params)
            q_out.append(torch.stack(q_result).to(DEVICE))   # <- converte aqui

        q_out = torch.stack(q_out)
        return self.post_net(q_out)

In [68]:
import gym
import numpy as np
from gym import spaces

class CircuitBuilderEnv(gym.Env):
    def __init__(self, n_qubits=4, max_depth=8):
        super().__init__()
        self.n_qubits = n_qubits
        self.max_depth = max_depth

        # Define conjunto de ações: gate (rx, ry, rz, cnot) em cada qubit
        self.action_set = ['rx', 'ry', 'rz', 'cnot']
        self.n_actions = len(self.action_set) * n_qubits

        # Espaços do Gym
        self.action_space = spaces.Discrete(self.n_actions)
        self.observation_space = spaces.Box(
            low=0.0,
            high=1.0,
            shape=(self.max_depth * 2,),
            dtype=np.float32
        )

        self.reset()
    def seed(self, seed=None):
        self.np_random, seed = gym.utils.seeding.np_random(seed)
        return [seed]
        
    def _add_gate(self, gate, wire):
        print(f"Adicionando gate {gate} no qubit {wire}")
        self.circuit.append((gate, wire))

    def _encode_state(self):
        """Retorna o vetor de estado normalizado com dtype float32"""
        vec = np.zeros((self.max_depth * 2,), dtype=np.float32)
        for i, (g, w) in enumerate(self.circuit):
            g_id = self.action_set.index(g)
            vec[i * 2] = g_id / len(ACTION_SET)
            vec[i * 2 + 1] = w / self.n_qubits
        return vec

    def reset(self):
        self.circuit = []
        self.step_count = 0
        return self._encode_state()

    def step(self, action):
        gate_id = action // self.n_qubits
        wire = action % self.n_qubits
        gate = self.action_set[gate_id]

        if len(self.circuit) < self.max_depth:
            self._add_gate(gate, wire)

        self.step_count += 1

        # Aqui você pode usar seu modelo de avaliação híbrido
        reward = self._evaluate_current_circuit()
        done = self.step_count >= self.max_depth
        obs = self._encode_state()

        return obs, reward, done, {}

    def _evaluate_current_circuit(self):
        """
        Define a lógica para calcular recompensa com base no circuito atual.
        Aqui está um placeholder (exemplo fixo).
        """
        # Em produção, chame um modelo de avaliação com o circuito atual
        return 1.0 if len(self.circuit) == self.max_depth else 0.0

    @property
    def circuit(self):
        return self._circuit

    @circuit.setter
    def circuit(self, value):
        self._circuit = value


In [69]:
class CircuitBuilderEnv(gym.Env):
    def __init__(self):
        super(CircuitBuilderEnv, self).__init__()
        self.action_space = spaces.Discrete(N_ACTIONS)
        self.observation_space = spaces.Box(low=0.0, high=1.0, shape=(MAX_DEPTH * 2,), dtype=np.float32)
        self.train_iter = iter(train_loader)
        #elf.reset()
    
    def seed(self, seed=None):
        self.np_random, seed = gym.utils.seeding.np_random(seed)
        return [seed]
        
    def reset(self, *, seed=None, options=None):
        if seed is not None:
            self.seed(seed)
        self.circuit = []
        self.steps = 0
        self.state = np.zeros(MAX_DEPTH * 2, dtype=np.float32)
        return self.state,{}



    def step(self, action):
        gate_idx = action // N_QUBITS
        wire = action % N_QUBITS
        gate = ACTION_SET[gate_idx]

        if self.steps < MAX_DEPTH:
            self.circuit.append((gate, wire))
            self.steps += 1
            self._update_state(gate_idx, wire)

        reward = self._evaluate_circuit()
        terminated = self.steps == MAX_DEPTH
        truncated = False  # Você pode definir lógica de truncamento se quiser
        info = {}

        return self.state, reward, terminated, truncated, info

    def _update_state(self, gate_idx, wire):
        self.state[2 * (self.steps - 1)]     = gate_idx / (len(ACTION_SET) - 1)
        self.state[2 * (self.steps - 1) + 1] = wire / (N_QUBITS - 1)

    def _evaluate_circuit(self):
        if len(self.circuit) == 0:
            return 0.0  # ou um pequeno valor de recompensa neutra

        try:
            x, y = next(self.train_iter)
        except StopIteration:
            self.train_iter = iter(train_loader)
            x, y = next(self.train_iter)

        x = x.to(DEVICE)
        y = y.argmax(dim=1)
        x = x.view(x.size(0), -1)

        model = Quantumnet(self.circuit, n_qubits=N_QUBITS).to(DEVICE)
        model.eval()
        with torch.no_grad():
            logits = model(x)
            pred = logits.argmax(dim=1)
            acc = (pred == y).float().mean().item()
        return acc


In [70]:
#-- Train RL agent --
def train_agent():
    env = CircuitBuilderEnv()

    model_rl = PPO("MlpPolicy", env, 
                    learning_rate=3e-4,n_steps=2,
                    batch_size=64,gamma=0.95,
                    verbose=1,seed=SEED)
    
    print("Training RL agent...")
    model_rl.learn(total_timesteps=1)
    model_rl.save("rl_agent")
    print("RL agent trained!")
    return model_rl, env

#-- Test RL agent --
def train_final_agent():
    model_rl, best_env = train_agent()  # ← recebe os dois agora
    best_circuit = best_env.circuit     # ← acessa o circuito do ambiente

    final_model = Quantumnet(best_circuit).to(DEVICE)
    criterion = nn.CrossEntropyLoss()
    optimizer = torch.optim.Adam(final_model.parameters(), lr=1e-3)

    EPOCHS_SUP = 3
    for epoch in range(EPOCHS_SUP):
        final_model.train()
        for image, labels in train_loader:
            image = image.to(DEVICE)
            labels = labels.squeeze().long().to(DEVICE)

            optimizer.zero_grad()
            logits = final_model(image)
            loss = criterion(logits, labels)
            loss.backward()
            optimizer.step()

        #--Sample evaluation--
        final_model.eval()
        correct = total = 0
        with torch.no_grad():
            for image, labels in test_loader:
                image = image.to(DEVICE)
                labels = labels.squeeze().long().to(DEVICE)
                logits = final_model(image)
                preds = logits.argmax(dim=1)
                correct += (preds == labels).sum().item()
                total += labels.size(0)
        acc = correct / total
        print(f"Epoch {epoch+1}/{EPOCHS_SUP}, Test Accuracy: {acc:.4f}")
    print("Final model trained!")


if __name__ == "__main__":
    #-- Train RL agent --
    train_agent()

    #-- Train final model --
    train_final_agent()


Using cpu device
Wrapping the env with a `Monitor` wrapper
Wrapping the env in a DummyVecEnv.
Training RL agent...


IndexError: Dimension out of range (expected to be in range of [-1, 0], but got 1)

In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import gym
import numpy as np
import pennylane as qml
from stable_baselines3 import PPO
from gym import spaces

# Configs
BATCH_SZ = 64
IMG_SIZE = 32
NUM_CLASSES = 3
SEED = 42
DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# Quantum-aware CNN model
class Quantumnet(nn.Module):
    def __init__(self, circuit, n_qubits=4, n_classes=NUM_CLASSES):
        super().__init__()
        self.n_qubits = n_qubits
        self.circuit = circuit
        self.q_params = nn.Parameter(0.01 * torch.randn(len(circuit)))

        self.conv = nn.Sequential(
            nn.Conv2d(3, 8, kernel_size=3, stride=1, padding=1),
            nn.ReLU(),
            nn.MaxPool2d(2),
            nn.Conv2d(8, 16, kernel_size=3, stride=1, padding=1),
            nn.ReLU(),
            nn.MaxPool2d(2)
        )

        with torch.no_grad():
            dummy_input = torch.zeros(1, 3, IMG_SIZE, IMG_SIZE)
            dummy_output = self.conv(dummy_input)
            flatten_dim = dummy_output.view(1, -1).shape[1]

        self.pre_net = nn.Linear(flatten_dim, n_qubits)
        self.post_net = nn.Linear(n_qubits, n_classes)

        self.dev = qml.device("default.qubit", wires=n_qubits)

        @qml.qnode(self.dev, interface="torch")
        def qnode(inputs, weights):
            for i in range(n_qubits):
                qml.RY(inputs[i], wires=i)
            for i, (gate, wire) in enumerate(circuit):
                angle = weights[i]
                if gate == 'rx':
                    qml.RX(angle, wires=wire)
                elif gate == 'ry':
                    qml.RY(angle, wires=wire)
                elif gate == 'rz':
                    qml.RZ(angle, wires=wire)
                elif gate == 'cnot':
                    qml.CNOT(wires=[wire, (wire + 1) % n_qubits])
            return [qml.expval(qml.PauliZ(i)) for i in range(n_qubits)]

        self.qnode = qnode

    def forward(self, x):
        x = x.view(x.size(0), 3, IMG_SIZE, IMG_SIZE)
        x = self.conv(x)
        x = x.view(x.size(0), -1)
        x = self.pre_net(x)
        x = torch.tanh(x) * (torch.pi / 3.0)
        q_out = [torch.tensor(self.qnode(xi, self.q_params), dtype=torch.float32, device=DEVICE) for xi in x]
        q_out = torch.stack(q_out)
        return self.post_net(q_out)


def draw_circuit(circuit, n_qubits=4):
    dev = qml.device("default.qubit", wires=n_qubits)

    @qml.qnode(dev)
    def dummy_qnode():
        for i in range(n_qubits):
            qml.RY(0.0, wires=i)
        for gate, wire in circuit:
            if gate == 'rx':
                qml.RX(0.1, wires=wire)
            elif gate == 'ry':
                qml.RY(0.1, wires=wire)
            elif gate == 'rz':
                qml.RZ(0.1, wires=wire)
            elif gate == 'cnot':
                qml.CNOT(wires=[wire, (wire + 1) % n_qubits])
        return [qml.expval(qml.PauliZ(i)) for i in range(n_qubits)]

    print(qml.draw(dummy_qnode)())

# Circuit builder environment
class CircuitBuilderEnv(gym.Env):
    def __init__(self, n_qubits=4, max_depth=8):
        super().__init__()
        self.n_qubits = n_qubits
        self.max_depth = max_depth
        self.action_set = ['rx', 'ry', 'rz', 'cnot']
        self.n_actions = len(self.action_set) * n_qubits

        self.action_space = spaces.Discrete(self.n_actions)
        self.observation_space = spaces.Box(
            low=0.0, high=1.0, shape=(self.max_depth * 2,), dtype=np.float32
        )
        self.reset()

    def _add_gate(self, gate, wire):
        self._circuit.append((gate, wire))

    def _encode_state(self):
        vec = np.zeros((self.max_depth * 2,), dtype=np.float32)
        for i, (g, w) in enumerate(self._circuit):
            g_id = self.action_set.index(g)
            vec[i * 2] = g_id / len(self.action_set)
            vec[i * 2 + 1] = w / self.n_qubits
        return vec

    def reset(self):
        self._circuit = []
        self.step_count = 0
        return self._encode_state()

    def step(self, action):
        gate_id = action // self.n_qubits
        wire = action % self.n_qubits
        gate = self.action_set[gate_id]

        if len(self._circuit) < self.max_depth:
            self._add_gate(gate, wire)

        self.step_count += 1
        reward = self._evaluate_current_circuit()
        done = self.step_count >= self.max_depth
        return self._encode_state(), reward, done, {}

    def _evaluate_current_circuit(self):
        return 1.0 if len(self._circuit) == self.max_depth else 0.0
    """    def _evaluate_current_circuit(self):
        # Avaliação real do circuito usando Quantumnet e um mini-treino
        model = Quantumnet(self._circuit).to(DEVICE)
        criterion = nn.CrossEntropyLoss()
        optimizer = torch.optim.Adam(model.parameters(), lr=1e-3)

        model.train()
        for images, labels in train_loader:
            images, labels = images.to(DEVICE), labels.to(DEVICE)
            optimizer.zero_grad()
            logits = model(images)
            loss = criterion(logits, labels)
            loss.backward()
            optimizer.step()
            break  # usa apenas um minibatch

        model.eval()
        with torch.no_grad():
            logits = model(images)
            preds = logits.argmax(dim=1)
            acc = (preds == labels).float().mean().item()

        return acc  # recompensa com base na acurácia"""
    @property
    def circuit(self):
        return self._circuit

    def seed(self, seed=None):
        self.np_random, seed = gym.utils.seeding.np_random(seed)
        return [seed]
# Training RL agent
def train_agent():
    env = CircuitBuilderEnv()
    model_rl = PPO("MlpPolicy", env, learning_rate=3e-4, n_steps=64,
                   batch_size=BATCH_SZ, gamma=0.95, verbose=1, seed=SEED)
    model_rl.learn(total_timesteps=1000)
    return model_rl, env

# Supervised training
def train_final_agent():
    _, best_env = train_agent()
    best_circuit = best_env.circuit
    draw_circuit(best_circuit)
    final_model = Quantumnet(best_circuit).to(DEVICE)
    criterion = nn.CrossEntropyLoss()
    optimizer = torch.optim.Adam(final_model.parameters(), lr=1e-3)

    for epoch in range(3):
        final_model.train()
        for images, labels in train_loader:
            images, labels = images.to(DEVICE), labels.to(DEVICE)
            optimizer.zero_grad()
            logits = final_model(images)
            loss = criterion(logits, labels)
            loss.backward()
            optimizer.step()

        final_model.eval()
        correct = total = 0
        with torch.no_grad():
            for images, labels in test_loader:
                images, labels = images.to(DEVICE),labels.to(DEVICE)
                logits = final_model(images)
                preds = logits.argmax(dim=1)
                correct += (preds == labels).sum().item()
                total += labels.size(0)
        acc = correct / total
        print(f"Epoch {epoch+1}/3 - Test Accuracy: {acc:.4f}")


if __name__ == "__main__":
    #-- Train RL agent --
    #train_agent()

    #-- Train final model --
    train_final_agent()


Using cpu device
Wrapping the env with a `Monitor` wrapper
Wrapping the env in a DummyVecEnv.
---------------------------------
| rollout/           |          |
|    ep_len_mean     | 8        |
|    ep_rew_mean     | 2.16     |
| time/              |          |
|    fps             | 1        |
|    iterations      | 1        |
|    time_elapsed    | 43       |
|    total_timesteps | 64       |
---------------------------------
-------------------------------------------
| rollout/                |               |
|    ep_len_mean          | 8             |
|    ep_rew_mean          | 2.86          |
| time/                   |               |
|    fps                  | 1             |
|    iterations           | 2             |
|    time_elapsed         | 86            |
|    total_timesteps      | 128           |
| train/                  |               |
|    approx_kl            | 0.00033001974 |
|    clip_fraction        | 0             |
|    clip_range           | 0.2       