<a href="https://colab.research.google.com/github/lalityadav1980/ml_learnig/blob/main/ibm_quantum_live.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:

import os

import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import MinMaxScaler

from qiskit import QuantumCircuit
from qiskit.circuit import ParameterVector
from qiskit.primitives import Sampler

from qiskit_ibm_runtime import QiskitRuntimeService, Session, SamplerV2
from qiskit_machine_learning.neural_networks import SamplerQNN
from qiskit_machine_learning.connectors import TorchConnector

import pandas as pd
from qiskit_aer import Aer, AerSimulator
import matplotlib.pyplot as plt
import time
start_time = time.time()
print("GPU Available:", torch.cuda.is_available())
# Add this line to define the device
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

print(f"Using device: {device}")
API_TOKEN = "4ed9d81c24c1d6fb39f4f076976e930b0a0b57cf633303cafd3216068b4a268283b4a4bb0ca9d3ae6a787caf2b98fca3f4044a9051cb634ffb72f2eabdc6784b"
# === Authenticate and Connect to IBM Quantum Service ===
try:
    service = QiskitRuntimeService(
        channel="ibm_quantum",
        token=API_TOKEN
    )
    print("✅ Successfully authenticated with IBM Quantum.")

except Exception as e:
    print(f"❌ Authentication failed: {e}")
# ================== Create Parametric Circuit ==================
def create_quantum_circuit(num_qubits, num_features):
    input_params = ParameterVector('x', length=num_features)
    weight_params = ParameterVector('θ', length=num_qubits * 8)  # Adjusted length

    qc = QuantumCircuit(num_qubits)

    # 1️⃣ Encode features
    for i in range(num_features):
        qc.rx(input_params[i], i % num_qubits)

    # 2️⃣ Initial Variational Layer
    for i in range(num_qubits):
        qc.ry(weight_params[i], i)
    for i in range(0, num_qubits, 2):
        if i + 1 < num_qubits:
            qc.cz(i, i + 1)
    for i in range(num_qubits):
        qc.rz(weight_params[num_qubits + i], i)

    qc.barrier()  # Add barrier for clarity

    # 3️⃣ Enhanced Variational Layers (Add More Layers for Higher Capacity)
    for layer in range(3):  # Repeat 3 times for deeper layers
        for i in range(num_qubits):
            ry_idx = (2 + layer * 2) * num_qubits + i  # Dynamic index calculation
            rz_idx = (3 + layer * 2) * num_qubits + i

            qc.ry(weight_params[ry_idx], i)
            qc.rz(weight_params[rz_idx], i)

        # Add entanglement (circular CZ connections)
        for i in range(num_qubits):
            qc.cz(i, (i + 1) % num_qubits)  # Circular entanglement

        qc.barrier()  # Optional: Improves circuit visualization

    # 4️⃣ Measurement
    qc.measure_all()

    return qc, input_params, weight_params


def process_last_records(historical_data, window_size=5):
    """
    Process the last N historical records to prepare input for prediction.
    """
    last_records = historical_data.tail(window_size)
    return last_records.values.flatten().reshape(1, -1)


def predict_next_draw(model, scaler, historical_data):
    # Preprocess last N draws
    input_data = process_last_records(historical_data)
    scaled_data = scaler.transform(input_data)

    with torch.no_grad():
        probs = model(torch.tensor(scaled_data, dtype=torch.float32))

    return postprocess_predictions(probs.numpy()[0])


# ================== Build Samplers ==================
def build_local_simulator_sampler():
    """
    Create a local simulator-based Sampler using Qiskit Primitives.
    """
    simulator = AerSimulator(method='statevector')  # or 'qasm'
    print("Available Methods:", simulator.available_methods())
    local_sampler = Sampler()
    return local_sampler


def build_hardware_sampler(num_qubits):
    # Fetch available backends that are operational and meet qubit requirements
    possible_backends = [
        backend for backend in service.backends()
        if (not backend.configuration().simulator and
            backend.status().operational and
            backend.configuration().num_qubits >= num_qubits)
    ]

    if not possible_backends:
        raise ValueError("No operational real device found with enough qubits.")

    # Select the least busy backend
    least_busy_backend = min(possible_backends, key=lambda b: b.status().pending_jobs)
    print(f"🔗 Connecting to backend: {least_busy_backend.name}")



    # ✅ Set shots using `default_shots`
    hardware_sampler = SamplerV2(mode=least_busy_backend, options={"default_shots": 256})

    return hardware_sampler


# ================== Hybrid Model (Accepts Any Sampler) ==================
class HybridQuantumNet(nn.Module):
    def __init__(self, input_dim, num_qubits, sampler):
        """
        Build a hybrid quantum-classical model that uses the given `sampler`,
        which could be local or hardware-based.
        """
        super().__init__()

        # 1. Create parametric circuit
        qc, input_params, weight_params = create_quantum_circuit(num_qubits, input_dim)

        # 2. Build QNN
        self.quantum_nn = TorchConnector(
            SamplerQNN(
                circuit=qc,
                input_params=input_params,
                weight_params=weight_params,
                sampler=sampler,  # local or hardware
                input_gradients=True
            )
        )

        # 3. Classical network
        # Now output 12 real numbers
        self.classical_fc = nn.Sequential(
            nn.Linear(16, 128),
            nn.ReLU(),
            nn.Linear(128, 49),
            nn.Softmax(dim=1)
        )

    def forward(self, x):
        x = self.quantum_nn(x)
        return self.classical_fc(x)


def create_sequences(data, window_size=5):
    """Create time-series sequences from historical data"""
    X, y = [], []
    for i in range(window_size, len(data)):
        X.append(data[i - window_size:i].flatten())  # Historical data
        y.append(data[i][:7])  # Next draw's 7 numbers
    return np.array(X), np.array(y)


# ================== Data Processing ==================
def preprocess_data(data_path, window_size=5):
    df = pd.read_csv(data_path)
    numbers = df[[f"winning_number_{i}" for i in range(1, 7)] + ["additional_number"]].values
    stats = df[["Low", "High", "Odd", "Even", "1-10", "11-20", "21-30", "31-40", "41-50"]].values

    X_seq, y_seq = create_sequences(numbers, window_size)
    X_stats = stats[window_size:]

    # Combine sequential data with statistical features
    X = np.hstack([X_seq, X_stats])

    scaler = MinMaxScaler(feature_range=(0, np.pi))
    return train_test_split(scaler.fit_transform(X), y_seq, test_size=0.2)


# ================== Training with Model Checkpoint ==================
def train_model(model, X_train, y_train, X_val, y_val, epochs=20, checkpoint_path='best_model.pth'):
    model = model.to(device)  # Move model to GPU
    print(f"Training on device: {device}")
    BATCH_SIZE = 64
    if os.path.exists(checkpoint_path):
        model.load_state_dict(torch.load(checkpoint_path, map_location=device))
        print(f"✅ Loaded pre-trained model from {checkpoint_path}.")
        return model

    criterion = nn.CrossEntropyLoss()
    optimizer = optim.Adam(model.parameters(), lr=0.01)

    X_train_t = torch.tensor(X_train, dtype=torch.float32).to(device)
    y_train_t = torch.tensor(y_train, dtype=torch.long).to(device)
    X_val_t = torch.tensor(X_val, dtype=torch.float32).to(device)
    y_val_t = torch.tensor(y_val, dtype=torch.long).to(device)

    train_losses, val_losses = [], []
    best_val_loss = float('inf')
    patience, counter = 5, 0

    # Training loop with batches
    for epoch in range(epochs):
        model.train()
        optimizer.zero_grad()

        for i in range(0, len(X_train_t), BATCH_SIZE):
            batch_X = X_train_t[i:i + BATCH_SIZE]
            batch_y = y_train_t[i:i + BATCH_SIZE]

            outputs = model(batch_X)
            loss = criterion(outputs, batch_y.argmax(dim=1))

            loss.backward()
            optimizer.step()

        model.eval()
        with torch.no_grad():
            val_pred = model(X_val_t)
            val_pred_7 = val_pred[:, :7]
            val_loss = criterion(val_pred_7, y_val_t.argmax(dim=1))

        print(f"📈 Epoch [{epoch + 1}/{epochs}] - Training Loss: {loss.item():.6f}, Validation Loss: {val_loss.item():.6f}")

        train_losses.append(loss.item())
        val_losses.append(val_loss.item())

        if val_loss.item() < best_val_loss:
            best_val_loss = val_loss.item()
            torch.save(model.state_dict(), checkpoint_path)
            print(f"💾 Best model saved with validation loss: {best_val_loss:.6f}")
            counter = 0
        else:
            counter += 1
            if counter >= patience:
                print("⏹️ Early stopping triggered.")
                break

    print("✅ Training completed successfully.")
    print(f"⏱️ Training completed in {time.time() - start_time:.2f} seconds")
    plot_loss(train_losses, val_losses)
    return model



# ================== Plotting Losses ==================
def plot_loss(train_losses, val_losses):
    plt.figure(figsize=(10, 6))
    plt.plot(range(1, len(train_losses) + 1), train_losses, label='Training Loss')
    plt.plot(range(1, len(val_losses) + 1), val_losses, label='Validation Loss')
    plt.xlabel('Epochs')
    plt.ylabel('Loss')
    plt.title('Training vs Validation Loss')
    plt.legend()
    plt.grid(True)
    plt.show()  # Use show() instead of savefig()


# ================== Cache Quantum Model ==================
def cache_quantum_model(model, cache_path='quantum_model_cache.pth'):
    torch.save(model.state_dict(), cache_path)
    print(f"Quantum hardware model cached at {cache_path}.")


def load_cached_model(model, cache_path='quantum_model_cache.pth'):
    if os.path.exists(cache_path):
        model.load_state_dict(torch.load(cache_path))
        print(f"Loaded cached quantum model from {cache_path}")
    else:
        print(f"No cached model found at {cache_path}.")
    return model


# ================== Prediction vs Actual Plot ==================
def plot_predictions(predictions, actuals):
    plt.figure(figsize=(10, 6))
    plt.plot(predictions, label='Predicted', marker='o')
    plt.plot(actuals, label='Actual', marker='x')
    plt.xlabel('Sample Index')
    plt.ylabel('Values')
    plt.title('Predictions vs Actuals')
    plt.legend()
    plt.grid(True)
    plt.show()


# ================== Postprocessing ==================
def postprocess_predictions(probs, top_k=7):
    """Select numbers based on model confidence"""
    numbers = np.arange(1, 50)
    sorted_idx = np.argsort(-probs)  # Descending order
    selected = []

    for idx in sorted_idx:
        num = numbers[idx]
        if num not in selected:
            selected.append(num)
        if len(selected) == top_k:
            break

    return sorted(selected)


# ================== Main Execution ==================
def main():
    DATA_PATH = "/content/sample_data/ToTo.csv"
    NUM_QUBITS = 4

    # 1️⃣ Load data
    X_train, X_test, y_train, y_test = preprocess_data(DATA_PATH)

    # 2️⃣ Train with Simulator
    local_sampler = build_local_simulator_sampler()
    model_sim = HybridQuantumNet(
        input_dim=X_train.shape[1],
        num_qubits=NUM_QUBITS,
        sampler=local_sampler
    )
    model_sim = train_model(model_sim, X_train, y_train, X_test, y_test, epochs=20)

    # 3️⃣ Evaluate Predictions (Simulator)
    with torch.no_grad():
        predictions = model_sim(torch.tensor(X_test, dtype=torch.float32))[:, :7].numpy()

    plot_predictions(predictions.flatten(), y_test.flatten())

    # 4️⃣ Quantum Hardware Model (Cache if available)
    hardware_sampler = build_hardware_sampler(NUM_QUBITS)
    model_hw = HybridQuantumNet(
        input_dim=X_train.shape[1],
        num_qubits=NUM_QUBITS,
        sampler=hardware_sampler
    )

    # Load cached model if exists
    model_hw = load_cached_model(model_hw)

    # If no cached model, train and save cache
    if not os.path.exists("/content/drive/MyDrive/quantum_model_cache.pth"):
        model_hw.load_state_dict(model_sim.state_dict())  # Transfer learning
        model_hw = train_model(model_hw, X_train, y_train, X_test, y_test, epochs=3)
        cache_quantum_model(model_hw)

    # 5️⃣ Final Predictions (Hardware)
    with torch.no_grad():
        hardware_predictions = model_hw(torch.tensor(X_test, dtype=torch.float32))[:, :7].numpy()

    plot_predictions(hardware_predictions.flatten(), y_test.flatten())

    # 6️⃣ 🔮 Predict the Next Draw Using Latest Historical Data
    historical_data = pd.read_csv(DATA_PATH)
    scaler = MinMaxScaler(feature_range=(0, np.pi))
    scaler.fit(X_train)  # Fit the scaler with training data for consistency

    next_draw_prediction = predict_next_draw(model_hw, scaler, historical_data)
    print(f"🎯 Predicted Next Draw Numbers: {next_draw_prediction}")

main()

GPU Available: True
Using device: cuda
✅ Successfully authenticated with IBM Quantum.
Available Methods: ('automatic', 'statevector', 'density_matrix', 'stabilizer', 'matrix_product_state', 'extended_stabilizer', 'unitary', 'superop')
Training on device: cuda


  local_sampler = Sampler()
  SamplerQNN(


📈 Epoch [1/20] - Training Loss: 3.031900, Validation Loss: 1.312111
💾 Best model saved with validation loss: 1.312111
📈 Epoch [2/20] - Training Loss: 3.031549, Validation Loss: 1.312050
💾 Best model saved with validation loss: 1.312050
📈 Epoch [3/20] - Training Loss: 3.031549, Validation Loss: 1.312050
