In [None]:
!pip install pennylane

Collecting pennylane
  Downloading PennyLane-0.38.1-py3-none-any.whl.metadata (9.3 kB)


In [None]:
import pennylane as qml
from pennylane import numpy as np
import pandas as pd
from sklearn.model_selection import train_test_split

# Load R matrix from sinr.csv
R_matrix = pd.read_csv('/content/SINR_data.csv', header=None).values

# Constants
NAP = 4  # Number of access points
Nuser = 4  # Number of users
Ncloud = 4  # Number of cloud qubits
sigma = 0.1
pt = 0.5
rho = pt / sigma**2
shift = np.pi / 4
epsilon = 1e-6  # Small value for finite differences

# Load and prepare the data
data_df = pd.read_csv('/content/H_Matrix_data.csv', header=None)
data = data_df.values
data = data.reshape((4, 4, 1000))
X_data = data.reshape((4*4, 1000)).T
Y_data = np.random.rand(1000, 16)  # Dummy labels

# Split data into training and validation sets
X_train, X_val, Y_train, Y_val = train_test_split(X_data, Y_data, test_size=0.2, random_state=42)

# Define the number of qubits and other parameters for the quantum circuit
n_qubits = Ncloud  # using Ncloud as the number of qubits
n_layers = 2

# Preparing the qubits - default state 0
dev = qml.device('default.qubit', wires=n_qubits)

# Define the quantum circuit (QNN)
@qml.qnode(dev)
def quantum_circuit(weights, x):
    qml.AngleEmbedding(x, wires=range(n_qubits))  # Encoding inputs or channel matrix
    qml.StronglyEntanglingLayers(weights, wires=range(n_qubits))  # Quantum layers
    routput = [qml.expval(qml.PauliZ(i)) for i in range(n_qubits)]
    return routput  # Output probabilities

def qnn_layer(weights, x):
    return quantum_circuit(weights, x)

# Define the classical layer function
def classical_layer(inputs, weights, biases):
    return np.dot(inputs, weights.T) + biases

# Hybrid model combining quantum and classical layers
def hybrid_model(weights, X):
    qnn_outputs = []
    for i in range(0, X.shape[1], n_qubits):
        x_subset = X[:, i:i + n_qubits]
        qnn_output = np.array([qnn_layer(weights, x) for x in x_subset])
        qnn_outputs.append(qnn_output)

    qnn_output_combined = np.concatenate(qnn_outputs, axis=1)

    W1 = np.random.random((64, qnn_output_combined.shape[1]))
    b1 = np.zeros((64,))
    W2 = np.random.random((16, 64))
    b2 = np.zeros((16,))

    hidden_output = np.tanh(classical_layer(qnn_output_combined, W1, b1))
    output = classical_layer(hidden_output, W2, b2)

    return output

def loss(weights, X, Y):
    predictions = hybrid_model(weights, X)
    return np.mean((predictions - Y) ** 2)

# Initialize weights for quantum layers
weights_shape = qml.StronglyEntanglingLayers.shape(n_layers=n_layers, n_wires=n_qubits)
weights = np.random.random(weights_shape)

# Use Pennylane's gradient descent optimizer
opt = qml.GradientDescentOptimizer(stepsize=0.1)

try:
    num_epochs = 50

    for epoch in range(num_epochs):
        weights, cost = opt.step_and_cost(lambda w: loss(w, X_train, Y_train), weights)

        if epoch % 150 == 0:
            print(f"\n--- Intermediate Results at Epoch {epoch} ---")
            print(f"Cost: {cost}")

    # Validation
    val_cost = loss(weights, X_val, Y_val)
    print(f"\nFinal Validation Cost: {val_cost}")

except KeyboardInterrupt:
    print("Training interrupted.")
finally:
    print("Training completed.")

def calculate_qassign(h):
    # Compute the assignment policy using pre-loaded R matrix
    Qassign = np.sum(R_matrix)
    print("Qassign:", Qassign)
    return Qassign

def calculate_phi_assign(h):
    _, s, _ = np.linalg.svd(h)
    Phi = np.ones(len(s))
    for i in range(len(s)):
        if s[i] < -1 / rho:
            Phi[i] = float('nan')
        else:
            Phi[i] = np.log2(1 + (s[i] * rho))
    Phi_assign = -np.nansum(Phi)
    print("phi assign:", Phi_assign)
    return Phi_assign

def calculate_lassign(h):
    Qassign, _, _ = calculate_qassign(h)
    Phi_assign = calculate_phi_assign(h)
    Lassign = np.abs(Qassign - Phi_assign)
    return Lassign

# Define the loss function
def loss(weights, X, Y):
    predictions = hybrid_model(weights, X)
    Lassign_total = 0
    for i in range(X.shape[0]):
        h_block = X[i:i + 4].reshape((4, 4))
        Lassign = calculate_lassign(h_block)
        Lassign_total += np.mean((predictions[i] - Y[i]) ** 2) + Lassign
    return Lassign_total / X.shape[0]

# def loss_resource_allocation(weights, X_train):
#     Lassign_total = 0
#     num_batches = 0
#     batch_size = 10
#     for index in range(0, len(X_train), batch_size):
#         if index + batch_size > len(X_train):
#             continue
#         h_block = X_train[index:index + batch_size, :]
#         results = hybrid_model(weights, h_block)
#         Lassign = calculate_lassign(h_block)
#         Lassign_total += Lassign
#         num_batches += 1

#     if num_batches > 0:
#         Lassign_total /= num_batches

#     return Lassign_total

def optimize(initial_params, X_train, epochs=100, lr=0.01):
    params = initial_params
    loss_history = []

    for epoch in range(epochs):
        grad_fn = qml.grad(lambda theta: loss(theta, X_train))
        gradients = grad_fn(params)
        params = params - lr * gradients
        current_loss = loss(params, X_train)
        loss_history.append(current_loss)

        if epoch % 150 == 0:
            print(f"Epoch {epoch} | Loss: {current_loss}")

    return params, loss_history

theta_cloud_initial = np.random.rand(Ncloud)
theta_cloud_optimal, loss_history = optimize(theta_cloud_initial, X_train, epochs=100)

Qassign_list = []
Lassign_list = []

for index in range(0, len(X_val), 10):
    if index + 10 > len(X_val):
        continue
    h_block = X_val[index:index + 10, :]
    results = hybrid_model(weights, h_block)
    print("Results:", results)
    Qassign = calculate_qassign(h_block)
    Lassign = calculate_lassign(h_block)

    Qassign_list.append(Qassign)
    Lassign_list.append(Lassign)

print("Validation Results:")
for i, (Qassign, Lassign) in enumerate(zip(Qassign_list, Lassign_list)):
    print(f"Block {i+1}:")
    print(f"Lassign: {Lassign}")
    print(f"Qassign: {Qassign}")
