In [76]:
import numpy as np
import pandas as pd
import os
from qiskit.providers.aer import AerSimulator
from qiskit import ClassicalRegister
from qiskit.utils import QuantumInstance
from qiskit import QuantumCircuit, Aer, execute
from qiskit.circuit import Parameter
from qiskit_optimization import QuadraticProgram
from qiskit_optimization.algorithms import MinimumEigenOptimizer
from qiskit.algorithms import QAOA
from sklearn.metrics import roc_auc_score, accuracy_score, precision_score

In [30]:
#Load colored graph data
def load_graph_(filename,folder_g,folder_c):
    graph_dir = os.path.join(os.getcwd(), folder_g)
    # Construct the full path to the specified file
    full_path = os.path.join(graph_dir, filename)
    #Read a single graph NPZ
    with np.load(full_path) as f:
        temp=dict(f.items())
    #Extract data and build Ri, Ro
    X, y, Ri_rows, Ri_cols, Ro_rows, Ro_cols = temp['X'],temp['y'],temp['Ri_rows'],temp['Ri_cols'],temp['Ro_rows'],temp['Ro_cols']
    n_nodes, n_edges = X.shape[0], Ri_rows.shape[0]
    Ri = np.zeros((n_nodes, n_edges), np.float32)
    Ro = np.zeros((n_nodes, n_edges), np.float32)
    Ri[Ri_rows, Ri_cols] = 1
    Ro[Ro_rows, Ro_cols] = 1
    #Load colored X
    color_dir = os.path.join(os.getcwd(), folder_c)
    color_csv = filename[:-3]+"csv"
    color_dir = os.path.join(color_dir, color_csv)
    Xcol=pd.read_csv(color_dir)
    Xcol = Xcol.values
    return [Xcol,Ri,Ro,y]
def file_names(folder):
    # Specify the directory path 
    graph_dir = os.path.join(os.getcwd(), folder)
    # Get a list of all files in the directory
    file_names = os.listdir(graph_dir)
    # filter out only files
    file_names = [f for f in file_names if os.path.isfile(os.path.join(graph_dir, f)) and f[-1]!='e']
    return file_names

In [None]:
#Load train and test data set
training_data = [load_graph_(file,'graphs','color') for file in file_names('graphs')]
test_data = [load_graph_(file,'graphs2','color2') for file in file_names('graphs2')]

In [None]:
# Define evaluation metrics
def evaluate_predictions(y_true, y_pred):
    auc = roc_auc_score(y_true, y_pred)
    accuracy = accuracy_score(y_true, y_pred)
    precision = precision_score(y_true, y_pred)
    return auc, accuracy, precision
# Training loop
num_epochs = 2
for epoch in range(num_epochs):
    print(f"Epoch {epoch + 1}:")
    losses = []
    accuracies = []
    aucs = []
    precisions = []
    # Train the QGNN
    for data in training_data:
        Nd, Ri, Ro, Y = data
        NV, NE = len(Nd), len(Ri[0])
        num_qubits = 4 * NV
        # Calculate edge_weights based on edge labels (Y)
        edge_weights = [Y[i] for i in range(NE)]
        
        # Define a parameterized quantum circuit for nodes
        params = [Parameter(f'x_{i}') for i in range(num_qubits)]
        def create_node_circuit(params, n):
            num_qubits = 4 * n
            q = QuantumCircuit(n, num_qubits)
            for i in range(n):
                for j in range(4):
                    q.ry(params[i * 4 + j], i)
            return q

        # Create the QGNN circuit
        qgnn_circuit = QuantumCircuit(NV, num_qubits)
        node_circuit = create_node_circuit(params, NV)

        # Compose the PQC
        qgnn_circuit = qgnn_circuit.compose(node_circuit)
        print("composition completes")

        # Create a Quadratic Program for optimization
        qubit_op = QuadraticProgram()
        qubit_op.binary_var_names = [f'x_{i}' for i in range(num_qubits)]

        # Choose a quantum solver (QAOA in this case)
        quantum_instance = QuantumInstance(backend=Aer.get_backend('statevector_simulator'))

        # Define QAOA optimizer
        optimizer = QAOA(quantum_instance=quantum_instance)
        # Define an optimizer to minimize the binary cross-entropy loss
        def binary_cross_entropy_loss(Y_true, Y_pred):
            return -(1/NE) * sum(Y_true[i] * np.log(Y_pred[i]) + (1 - Y_true[i]) * np.log(1 - Y_pred[i]) for i in range(NE))
        predicted_labels = []  # Initialize the predicted_labels for nodes
        
        # Create a simulator
        test_simulator = AerSimulator()
        # Create a dictionary to specify parameter bindings
        parameter_binds = {param: 0.0 for param in params}  # You may need to set the parameter values accordingly
        # Execute the circuit with specified parameter bindings
        test_job = test_simulator.run(qgnn_circuit, parameter_binds=[parameter_binds], shots=1000)
        
        #Error found in the below line: Debugging....
        test_result = test_job.result()
        print("Simulator built")
        
        # Create a function to aggregate node-level predictions to edge-level predictions
        def aggregate_predictions(node_predictions, Ri, Ro):
            edge_predictions = []
            for i in range(NE):
                edge_predictions.append(max(node_predictions[j] for j in range(NV) if Ri[j, i] == 1 or Ro[j, i] == 1))
            return edge_predictions

        # Iterate over the measurement results for each node
        for j in range(NV):
            counts = test_result.get_counts(j)
            # Determine the predicted label based on the measurement results for this node
            predicted_label = [0 if counts['0'] >= counts['1'] else 1]
            # Append the predicted label to the list of predicted_labels for nodes
            predicted_labels.extend(predicted_label)
        print("node prediction complete")
        # Use the aggregation function to obtain edge-level predictions
        edge_predictions = aggregate_predictions(predicted_labels, Ri, Ro)
        # Now 'predicted_labels' contains the predicted labels for the test graphs
        print("edge prediction complete")
        # Calculate loss using binary cross-entropy loss
        loss = binary_cross_entropy_loss(Y, edge_predictions)
        losses.append(loss)
        print("Loss calculated, ",loss)
        # Track other evaluation metrics
        accuracy, auc, precision = evaluate_predictions(Y, predicted_labels)
        accuracies.append(accuracy)
        aucs.append(auc)
        precisions.append(precision)
        
        # Minimize the loss using the quantum optimizer
        qaoa_optimizer = MinimumEigenOptimizer(optimizer)
        result = qaoa_optimizer.solve(qubit_op, objective=loss)
        x_opt = result.x
    # Calculate and print metrics for the epoch
    avg_loss = np.mean(losses)
    avg_accuracy = np.mean(accuracies)
    avg_auc = np.mean(aucs)
    avg_precision = np.mean(precisions)

    print(f"Loss: {avg_loss}, Accuracy: {avg_accuracy}, AUC: {avg_auc}, Precision: {avg_precision}")
# End of the training loop

In [None]:
# Testing loop
auc, accuracy, precision = [], [], []
for data in test_data:
    Nd, Ri, Ro, Y = data
    NV, NE = len(Nd), len(Ri[0])
    num_qubits = 4 * NV

    # Calculate edge_weights based on edge labels (Y)
    edge_weights = [Y[i] for i in range(NE)]

    # Define a parameterized quantum circuit for nodes
    params = [Parameter(f'x_{i}') for i in range(num_qubits)]

    def create_node_circuit(params, n):
        num_qubits = 4 * n
        q = QuantumCircuit(n, num_qubits)
        for i in range(n):
            for j in range(4):
                q.ry(params[i * 4 + j], i)
        return q

    # Create the QGNN circuit for test graphs
    test_qgnn_circuit = QuantumCircuit(NV, num_qubits)
    for i in range(NV):
        test_node_circuit = create_node_circuit(params, NV)
        test_node_circuit.measure_all()  # Add measurements for each qubit
        test_qgnn_circuit = test_qgnn_circuit.compose(test_node_circuit)

    # Simulate the quantum circuit and make predictions
    test_simulator = Aer.get_backend('qasm_simulator')
    test_job = execute(test_qgnn_circuit, test_simulator, shots=1000)
    test_result = test_job.result()

    predicted_labels = []
    for j in range(NV):
        counts = test_result.get_counts(j)
        # Determine the predicted label based on the measurement results
        predicted_label = 0 if counts['0'] >= counts['1'] else 1
        predicted_labels.append(predicted_label)

    # Calculate evaluation metrics (you can use your existing evaluation code)
    y_true = Y  # Assuming Y contains the true labels for the test data
    auc_epoch, accuracy_epoch, precision_epoch = evaluate_predictions(y_true, predicted_labels)

    # Append the metrics to the lists
    auc.append(auc_epoch)
    accuracy.append(accuracy_epoch)
    precision.append(precision_epoch)

# Calculate and print average metrics for the test data
avg_auc = np.mean(auc)
avg_accuracy = np.mean(accuracy)
avg_precision = np.mean(precision)
print(f"Test AUC: {avg_auc}, Test Accuracy: {avg_accuracy}, Test Precision: {avg_precision}")