In [21]:
import os
import time
import numpy as np
import torch
import rasterio
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import accuracy_score, f1_score, precision_score, recall_score
from qiskit import QuantumCircuit, transpile
from qiskit_aer import AerSimulator
from math import ceil
import warnings
warnings.filterwarnings('ignore')

# === Device Configuration ===
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")

# === Unified QHDC Classifier ===
class DualInputQuantumHDC:
    def __init__(self, D=1024, device=None):
        self.D = D
        self.device = device or torch.device("cpu")
        self.class_hv = None
        self.proj_matrix_6 = None
        self.proj_matrix_12 = None

    def _generate_quantum_hypervector(self, num_qubits):
        num_shots = ceil(self.D / num_qubits)
        qc = QuantumCircuit(num_qubits, num_qubits)
        qc.h(range(num_qubits))
        qc.measure(range(num_qubits), range(num_qubits))
        backend = AerSimulator()
        transpiled = transpile(qc, backend)
        result = backend.run(transpiled, shots=num_shots).result()
        counts = result.get_counts()

        all_bits = []
        for bitstring, count in counts.items():
            bits = [1 if b == '1' else -1 for b in reversed(bitstring)]
            all_bits.extend(bits * count)
        all_bits = np.array(all_bits[:self.D]) if len(all_bits) >= self.D else np.pad(all_bits, (0, self.D - len(all_bits)), constant_values=-1)
        return all_bits

    def _init_projection_matrix(self, dim):
        num_qubits = 6 if dim == 6 else 8
        return torch.tensor(np.stack([self._generate_quantum_hypervector(num_qubits) for _ in range(dim)]),
                            dtype=torch.float32, device=self.device)

    def _encode_batchwise(self, x, proj_matrix, batch_size=16):
        encoded = []
        for i in range(0, x.shape[0], batch_size):
            batch = x[i:i+batch_size]
            proj = torch.matmul(batch, proj_matrix)
            encoded.append(torch.nn.functional.normalize(proj, dim=1))
        return torch.cat(encoded, dim=0)

    def encode(self, features1=None, features2=None):
        if features1 is not None:
            if self.proj_matrix_6 is None:
                self.proj_matrix_6 = self._init_projection_matrix(6)
            encoded1 = self._encode_batchwise(features1, self.proj_matrix_6)
        else:
            encoded1 = None

        if features2 is not None:
            if self.proj_matrix_12 is None:
                self.proj_matrix_12 = self._init_projection_matrix(12)
            encoded2 = self._encode_batchwise(features2, self.proj_matrix_12)
        else:
            encoded2 = None

        if encoded1 is not None and encoded2 is not None:
            return torch.cat([encoded1, encoded2], dim=0)
        elif encoded1 is not None:
            return encoded1
        elif encoded2 is not None:
            return encoded2
        else:
            raise ValueError("At least one of features1 or features2 must be provided.")

    def retrain(self, encoded, labels, epochs=3, lr=0.05):
        num_classes = int(labels.max().item()) + 1
        self.class_hv = torch.zeros((num_classes, self.D), dtype=torch.float32, device=self.device)
        for i in range(num_classes):
            mask = (labels == i)
            if mask.sum() > 0:
                self.class_hv[i] = encoded[mask].mean(dim=0)
        self.class_hv = torch.nn.functional.normalize(self.class_hv, dim=1)

        for _ in range(epochs):
            for i in range(encoded.shape[0]):
                x = encoded[i].unsqueeze(0)
                true_cls = labels[i].item()
                pred_cls = self.classify(x)[0].item()
                if pred_cls != true_cls:
                    self.class_hv[pred_cls] -= lr * x.squeeze(0)
                    self.class_hv[true_cls] += lr * x.squeeze(0)
            self.class_hv = torch.nn.functional.normalize(self.class_hv, dim=1)

    def classify(self, encoded):
        samples = torch.nn.functional.normalize(encoded, dim=1)
        return torch.argmax(torch.matmul(samples, self.class_hv.T), dim=1)

# === Data Loader ===
def load_track_data(track):
    folder = f"Track{track}/train/images/"
    feature_img_path = sorted([f for f in os.listdir(folder) if f.endswith(".tif")])[0]
    feature_img_path = os.path.join(folder, feature_img_path)
    label_img_path = feature_img_path.replace("images", "labels").replace(".tif", ".png")

    features = rasterio.open(feature_img_path).read()
    features = np.moveaxis(features, 0, -1).reshape(-1, features.shape[0])
    labels = rasterio.open(label_img_path).read().reshape(-1)

    scaler = StandardScaler()
    features_scaled = scaler.fit_transform(features)

    return torch.tensor(features_scaled, dtype=torch.float32, device=device), \
           torch.tensor(labels, dtype=torch.long, device=device)

# === Run Unified Model ===
def run_fused_model():
    print("\n--- Unified QHDC on Track1 + Track2 ---")
    start = time.time()

    features1, labels1 = load_track_data(track=1)
    features2, labels2 = load_track_data(track=2)

    model = DualInputQuantumHDC(D=1024, device=device)

    encode_start = time.time()
    encoded = model.encode(features1=features1, features2=features2)
    labels = torch.cat([labels1, labels2], dim=0)
    encode_time = time.time() - encode_start

    train_time = 0.0 
    model.retrain(encoded, labels)

    infer_start = time.time()
    preds = model.classify(encoded)
    infer_time = time.time() - infer_start

    y_true = labels.cpu().numpy()
    y_pred = preds.cpu().numpy()

    acc = accuracy_score(y_true, y_pred)
    f1 = f1_score(y_true, y_pred, average='weighted')
    prec = precision_score(y_true, y_pred, average='weighted')
    rec = recall_score(y_true, y_pred, average='weighted')
    total_time = time.time() - start

    print(f"Accuracy : {acc:.4f}")
    print(f"F1 Score : {f1:.4f}")
    print(f"Precision: {prec:.4f}")
    print(f"Recall   : {rec:.4f}")
    print(f"Encoding Time : {encode_time:.4f} s")
    print(f"Training Time : {train_time:.4f} s")
    print(f"Inference Time: {infer_time:.4f} s")
    print(f"Total Time    : {total_time:.4f} s")

# === Run ===
run_fused_model()

Using device: cuda

--- Unified QHDC on Track1 + Track2 ---
Accuracy : 0.7053
F1 Score : 0.8204
Precision: 0.9910
Recall   : 0.7053
Encoding Time : 4.2794 s
Training Time : 0.0000 s
Inference Time: 0.0000 s
Total Time    : 181.6124 s
