<a href="https://colab.research.google.com/github/sanjithganesa/WSD-Financial-NLP-Pipeline/blob/main/Quantum.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [10]:
# Colab friendly

import os
import re
from pathlib import Path
from typing import List, Optional, Tuple, Dict, Any

import numpy as np
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.decomposition import PCA
from sklearn.preprocessing import LabelEncoder
from sklearn.metrics import accuracy_score, f1_score
from sklearn.datasets import fetch_20newsgroups
from sklearn.feature_extraction.text import TfidfVectorizer
from tqdm import tqdm

# Optional SBERT
try:
    from sentence_transformers import SentenceTransformer
    SBERT_AVAILABLE = True
except Exception:
    SBERT_AVAILABLE = False

# Qiskit imports
QISKIT_AVAILABLE = False
AER_AVAILABLE = False
USE_STATEVECTOR = False
try:
    from qiskit import QuantumCircuit, transpile
    from qiskit.circuit import ParameterVector
    try:
        from qiskit_aer import AerSimulator
        AER_AVAILABLE = True
    except Exception:
        try:
            from qiskit.providers.aer import AerSimulator
            AER_AVAILABLE = True
        except Exception:
            AER_AVAILABLE = False
    from qiskit.quantum_info import Statevector
    QISKIT_AVAILABLE = True
    if not AER_AVAILABLE:
        USE_STATEVECTOR = True
except Exception as e:
    QISKIT_AVAILABLE = False
    print("[WARN] Qiskit not available:", e)

if not QISKIT_AVAILABLE:
    raise SystemExit("Qiskit not available. Install qiskit and restart runtime.")

# -------------------------
# Utilities
# -------------------------
def clean_text(s: str) -> str:
    if s is None:
        return ""
    s = re.sub(r"\s+", " ", s.replace("\n", " ").replace("\r", " ")).strip()
    s = re.sub(r"http\S+", "", s)
    s = re.sub(r"[^A-Za-z0-9\s\.\,\-\$%€£:;()\/\#\@]", " ", s)
    s = re.sub(r"\s{2,}", " ", s)
    return s.strip()

# -------------------------
# Embedding helpers
# -------------------------
def get_sentence_embeddings(texts: List[str], model_name: str = "sentence-transformers/all-MiniLM-L6-v2") -> np.ndarray:
    if SBERT_AVAILABLE:
        try:
            model = SentenceTransformer(model_name)
            vecs = model.encode(texts, convert_to_numpy=True, show_progress_bar=True)
            return np.array(vecs, dtype=np.float32)
        except Exception as e:
            print("[WARN] SBERT encoding failed:", e)
    tf = TfidfVectorizer(max_features=512)
    X = tf.fit_transform(texts).toarray()
    return X.astype(np.float32)

# -------------------------
# Quantum circuit utilities
# -------------------------
def build_feature_map(n_qubits: int, feature_angles: np.ndarray) -> QuantumCircuit:
    qc = QuantumCircuit(n_qubits)
    for i in range(n_qubits):
        qc.ry(float(feature_angles[i]), i)
    return qc

def build_variational_circuit(n_qubits: int, reps: int = 2) -> Tuple[QuantumCircuit, ParameterVector]:
    param_len = n_qubits * reps * 2
    params = ParameterVector("theta", length=param_len)
    qc = QuantumCircuit(n_qubits)
    idx = 0
    for r in range(reps):
        for q in range(n_qubits):
            qc.ry(params[idx], q); idx += 1
            qc.rz(params[idx], q); idx += 1
        for q in range(n_qubits - 1):
            qc.cx(q, q + 1)
        qc.cx(n_qubits - 1, 0)
    return qc, params

def expectation_from_counts(counts: Dict[str, int]) -> float:
    total = sum(counts.values()) if len(counts) else 1
    prob0 = 0.0
    for bitstr, c in counts.items():
        if len(bitstr) == 0:
            continue
        if bitstr[-1] == "0":
            prob0 += c
    return prob0 / total

# -------------------------
# Simulator wrapper
# -------------------------
class SimulatorRunner:
    def __init__(self, n_qubits: int, shots: int = 512):
        self.n_qubits = n_qubits
        self.shots = shots
        self.use_aer = AER_AVAILABLE
        if self.use_aer:
            self.backend = AerSimulator()
        else:
            self.backend = None

    def run_and_get_prob0(self, qc: QuantumCircuit) -> float:
        if self.use_aer:
            tqc = transpile(qc, self.backend)
            job = self.backend.run(tqc, shots=self.shots)
            result = job.result()
            counts = result.get_counts()
            return expectation_from_counts(counts)
        else:
            qc_no_meas = qc.remove_final_measurements(inplace=False)
            sv = Statevector.from_instruction(qc_no_meas)
            prob0 = sum(p for bitstr, p in sv.probabilities_dict().items() if bitstr[-1]=='0')
            return float(prob0)

# -------------------------
# Quantum classifier
# -------------------------
class QuantumClassifier:
    def __init__(self, n_qubits=6, reps=2, shots=512):
        self.n_qubits = n_qubits
        self.reps = reps
        self.shots = shots
        self.vqc_template, self.params = build_variational_circuit(n_qubits, reps)
        self.runner = SimulatorRunner(n_qubits, shots)
        try:
            from qiskit.utils import algorithm_globals
            algorithm_globals.random_seed = 42
        except:
            pass

    def _construct_circuit(self, angles, theta_values):
        fmap = build_feature_map(self.n_qubits, angles)
        qc = QuantumCircuit(self.n_qubits, self.n_qubits)
        qc = qc.compose(fmap)
        qc = qc.compose(self.vqc_template)
        bind_dict = {self.params[i]: float(theta_values[i]) for i in range(len(self.params))}
        try:
          # newer Qiskit
          qc = qc.bind_parameters(bind_dict)
        except AttributeError:
          # older Qiskit
          qc = qc.assign_parameters(bind_dict)

        qc.measure(range(self.n_qubits), range(self.n_qubits))
        return qc

    def predict_proba_single(self, angles, theta_values):
        qc = self._construct_circuit(angles, theta_values)
        return self.runner.run_and_get_prob0(qc)

    def batch_predict_proba(self, angles_batch, theta_values):
        return np.array([self.predict_proba_single(angles, theta_values) for angles in angles_batch])

# -------------------------
# Loss + training
# -------------------------
def binary_cross_entropy(probs, labels, eps=1e-9):
    probs = np.clip(probs, eps, 1-eps)
    labels = labels.astype(np.float32)
    return -np.mean(labels*np.log(probs) + (1-labels)*np.log(1-probs))

def train_quantum_classifier(qc, X_train, y_train, X_val, y_val, epochs=30, optimizer_name="COBYLA"):
    from scipy.optimize import minimize
    n_params = len(qc.params)
    theta0 = np.random.RandomState(42).normal(scale=0.1, size=n_params)
    def objective(theta):
        probs = qc.batch_predict_proba(X_train, theta)
        return float(binary_cross_entropy(probs, y_train))
    method = "COBYLA" if optimizer_name.upper()=="COBYLA" else "Powell"
    res = minimize(objective, theta0, method=method, options={"maxiter":epochs, "disp":True})
    theta_opt = res.x
    val_probs = qc.batch_predict_proba(X_val, theta_opt)
    val_preds = (val_probs>=0.5).astype(int)
    acc = accuracy_score(y_val, val_preds)
    f1 = f1_score(y_val, val_preds, average="macro")
    return theta_opt, acc, f1, val_probs

# -------------------------
# Helper functions
# -------------------------
def angles_from_features(X, n_qubits):
    N, d = X.shape
    if d > n_qubits:
        X = X[:, :n_qubits]
        d = n_qubits
    angles = np.zeros((N, n_qubits), dtype=float)
    mins = X.min(axis=0, keepdims=True)
    maxs = X.max(axis=0, keepdims=True)
    denom = maxs - mins
    denom[denom==0] = 1.0
    X_norm = (X - mins)/denom
    angles[:, :d] = (X_norm - 0.5)*np.pi
    return angles

# -------------------------
# Demo runner
# -------------------------
def demo_quantum_wsd_pipeline(n_qubits=6, n_pca=None, n_samples=120, epochs=30, reps=1):
    print("[DATA] Loading 20 Newsgroups subset...")
    categories = ['rec.sport.hockey', 'sci.space']
    dataset = fetch_20newsgroups(subset='train', categories=categories, remove=('headers','footers','quotes'))
    texts = [clean_text(t) for t in dataset.data[:n_samples]]
    labels = np.array(dataset.target[:n_samples])

    print(f"[DATA] {len(texts)} samples loaded.")

    print("[EMB] Getting embeddings...")
    emb = get_sentence_embeddings(texts)

    if n_pca is None:
        n_pca = min(emb.shape[1], n_qubits)
    n_pca = min(n_pca, n_qubits)

    print(f"[PCA] Reducing embeddings to {n_pca} dims (for {n_qubits} qubits)")
    pca = PCA(n_components=n_pca)
    X_reduced = pca.fit_transform(emb)

    X_train, X_val, y_train, y_val = train_test_split(X_reduced, labels, test_size=0.2, random_state=42, stratify=labels)
    X_angles_train = angles_from_features(X_train, n_qubits)
    X_angles_val = angles_from_features(X_val, n_qubits)

    print("[Q] Building quantum classifier...")
    qc = QuantumClassifier(n_qubits=n_qubits, reps=reps, shots=512)

    print("[TRAIN] Starting hybrid training...")
    theta_opt, acc_val, f1_val, val_probs = train_quantum_classifier(qc, X_angles_train, y_train, X_angles_val, y_val, epochs=epochs)

    print(f"[RESULT] Val Acc={acc_val:.4f}  Val F1={f1_val:.4f}")
    preds = (val_probs >= 0.5).astype(int)
    for i in range(min(8, len(preds))):
        print(f"Sample {i}: label={y_val[i]} prob={val_probs[i]:.3f} pred={preds[i]}")

    return {"theta": theta_opt, "val_acc": acc_val, "val_f1": f1_val, "val_probs": val_probs}

# -------------------------
# Example usage
# -------------------------
result = demo_quantum_wsd_pipeline(n_qubits=4, n_samples=80, epochs=15, reps=1)
print(result)


[DATA] Loading 20 Newsgroups subset...
[DATA] 80 samples loaded.
[EMB] Getting embeddings...


Batches:   0%|          | 0/3 [00:00<?, ?it/s]

[PCA] Reducing embeddings to 4 dims (for 4 qubits)
[Q] Building quantum classifier...
[TRAIN] Starting hybrid training...
Return from COBYLA because the objective function has been evaluated MAXFUN times.
Number of function values = 15   Least value of F = 0.678279728065877
The corresponding X is:
[ 0.04967142 -0.01382643  1.06476885  1.15230299 -0.02341534 -0.0234137
  1.15792128  0.07674347]

[RESULT] Val Acc=0.4375  Val F1=0.4353
Sample 0: label=0 prob=0.459 pred=0
Sample 1: label=1 prob=0.525 pred=1
Sample 2: label=1 prob=0.889 pred=1
Sample 3: label=1 prob=0.496 pred=0
Sample 4: label=0 prob=0.502 pred=1
Sample 5: label=0 prob=0.477 pred=0
Sample 6: label=1 prob=0.455 pred=0
Sample 7: label=0 prob=0.496 pred=0
{'theta': array([ 0.04967142, -0.01382643,  1.06476885,  1.15230299, -0.02341534,
       -0.0234137 ,  1.15792128,  0.07674347]), 'val_acc': 0.4375, 'val_f1': 0.43529411764705883, 'val_probs': array([0.45898438, 0.52539062, 0.88867188, 0.49609375, 0.50195312,
       0.476562

In [2]:
# Colab install cell (run this first)
!pip install --quiet qiskit qiskit-aer scipy sentence-transformers tqdm
# If qiskit-aer fails to build on Python 3.12 this will still install qiskit core; code falls back to Statevector.


[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m8.0/8.0 MB[0m [31m36.7 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m12.4/12.4 MB[0m [31m72.3 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m2.2/2.2 MB[0m [31m58.7 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m49.5/49.5 kB[0m [31m2.8 MB/s[0m eta [36m0:00:00[0m
[?25h

In [1]:
import kagglehub
import shutil
import os

# Download dataset
path = kagglehub.dataset_download("nltkdata/reuters")
print("KaggleHub downloaded to:", path)

# Target folder in Google Drive
target_path = "/content/drive/MyDrive/reuters"

# Copy downloaded dataset into Drive
if os.path.exists(target_path):
    shutil.rmtree(target_path)

shutil.copytree(path, target_path)

print("Dataset copied to:", target_path)


Downloading from https://www.kaggle.com/api/v1/datasets/download/nltkdata/reuters?dataset_version_number=2...


100%|██████████| 12.8M/12.8M [00:00<00:00, 14.5MB/s]

Extracting files...





KaggleHub downloaded to: /root/.cache/kagglehub/datasets/nltkdata/reuters/versions/2
Dataset copied to: /content/drive/MyDrive/reuters


In [6]:
import os
import re
from bs4 import BeautifulSoup
import shutil

SOURCE_PATH = "/content/drive/MyDrive/reuters/reuters/reuters/reuters/training"
TARGET_PATH = "/content/drive/MyDrive/reuters_extracted"

# Reset output folder
if os.path.exists(TARGET_PATH):
    shutil.rmtree(TARGET_PATH)
os.makedirs(TARGET_PATH)

def clean_text(t):
    return re.sub(r"\s+", " ", t).strip()

for fname in os.listdir(SOURCE_PATH):
    if not fname.endswith(".sgm"):
        continue

    with open(os.path.join(SOURCE_PATH, fname), "r", encoding="latin-1") as f:
        data = f.read()

    soup = BeautifulSoup(data, "html.parser")
    articles = soup.find_all("reuters")

    for art in articles:
        body = art.find("body")
        topics = art.find("topics")

        if body is None or topics is None:
            continue

        labels = [t.text for t in topics.find_all("d")]
        if len(labels) == 0:
            continue

        label = labels[0]
        text = clean_text(body.text)

        # Create label folder
        label_folder = os.path.join(TARGET_PATH, label)
        os.makedirs(label_folder, exist_ok=True)

        # Save text file
        file_id = art["newid"]
        with open(os.path.join(label_folder, f"{file_id}.txt"), "w", encoding="utf-8") as fw:
            fw.write(text)

print("Extraction completed →", TARGET_PATH)


Extraction completed → /content/drive/MyDrive/reuters_extracted
