Step 0: Imports and coefficients

In [None]:
import numpy as np
from numpy import random
from scipy.optimize import minimize
import cmath
from sklearn.preprocessing import StandardScaler
import cProfile

In [None]:
noise_power_density = 5.0119e-21
p_0 = -0.001
chan_realisations = 100 # number of different channel realisations for one channel
W = 10e6 # bandwidth (unit:Hz)
circuit_power = 1 # constant circuit power of transmitter (unit: Watt)
max_power = 1 # max transmission power (unit:Watt)
num_samples = 10000 # no of rows in dataset

# Define realistic bounds for an urban environment (1 km³ area)
x_range, y_range, z_range = (0, 1000), (0, 1000), (0, 50)   # meters

Step 1: Generate nodes in 3D space

In [None]:
def generate_random_coordinates(n_devices, x_range, y_range, z_range):
    """Generate random coordinates for devices in a 3D space.

    Args:
        n_devices (int): Number of devices to generate coordinates for.
        x_range (tuple): The range (min, max) for the x-coordinate.
        y_range (tuple): The range (min, max) for the y-coordinate.
        z_range (tuple): The range (min, max) for the z-coordinate.

    Returns:
        list: A list of tuples containing the (x, y, z) coordinates.
    """
    coordinates = np.zeros((n_devices, 3))
    for i in range(n_devices):
        x = np.random.uniform(*x_range)
        y = np.random.uniform(*y_range)
        z = np.random.uniform(*z_range)
        coordinates[i] = [x, y, z]

    return coordinates


Step 2: Define channel

In [None]:
def calculate_distance(tx, rx):
  """Calculate the Euclidean distance between two 3D coordinates.

    Args:
        tx: 3D coordinate of the transmitter (x, y, z)
        rx: 3D coordinate of the receiver (x, y, z)

    Returns:
        Distance between the two points.
    """
  return np.linalg.norm(np.array(tx) - np.array(rx))

In [None]:
def generate_channel_model(ref_path_loss, tx, rx):
  """Generate channel model between one tx and its rx

    Args:
        ref_path_loss: reference path loss at 1m
        tx: transmitter coordinates
        rx: list of receiver coordinates

    Returns:
        h: channel model(1 x len(rx) x chan_realisations vector with multiple channel realisations across all tx-rx channel pairs)
    """
  h = np.empty((1, len(rx), chan_realisations), dtype=complex)

  epsilon = 1e-10
  d_rx0 = calculate_distance(tx,rx[0])
  distances_rx0 = np.maximum(d_rx0, epsilon)

  d_rx1 = calculate_distance(tx,rx[1])
  distances_rx1 = np.maximum(d_rx0, epsilon)

  total_large_scale_fading_rx1 = cmath.sqrt(ref_path_loss/(distances_rx0**(3)))
  total_large_scale_fading_rx2 = cmath.sqrt(ref_path_loss/(distances_rx0**(3)))

  # Stack the large-scale fading values into an array
  total_large_scale_fading = np.array([total_large_scale_fading_rx1, total_large_scale_fading_rx2])

  # Generate small-scale fading
  small_scale_fading = np.random.rayleigh(scale=1.0, size=(1, len(rx), chan_realisations))

  # Multiply total large scale fading with small scale fading
  h = small_scale_fading * total_large_scale_fading.reshape(1,2,1)

  return h


In [None]:
def generate_all_channels(ref_path_loss, tx_coords, rx_coords):
  """Generate channel realizations for all transmitter-receiver pairs.

    Args:
        ref_path_loss: reference path loss.
        tx_coords: list of 3D coordinates for transmitters [(x1, y1, z1), (x2, y2, z2)].
        rx_coords: list of 3D coordinates for receivers [(x1, y1, z1), (x2, y2, z2)].

    Returns:
        h_11, h_12, h_21, h_22: Channel model across all tx-rx pairs (shape: 1 x len(rx) x chan_realisations) e.g. h_21 means channel b/w transmitter 2 and receiver 1
    """
  h1 = generate_channel_model(ref_path_loss, tx_coords[0], rx_coords)
  h2 = generate_channel_model(ref_path_loss, tx_coords[1], rx_coords)

  # print(h1)
  # print(h1[:,0,:])
  h_11 = h1[:,0,:]
  h_12 = h1[:,1,:]

  h_21 = h2[:,0,:]
  h_22 = h2[:,1,:]

  return np.array([h_11, h_12, h_21, h_22])


Step 3: Define channel gain

In [None]:
def generate_channel_gains(channels):
  return np.abs(channels)**2

Step 4: Define spectral and energy efficiency functions

In [None]:
def spectral_efficiency(P, h, N0, bandwidth):
    """
    Calculate the total spectral efficiency for two transmitters.
    Parameters:
    - P: Array of transmit powers [P1, P2]
    - h: Array of channel gains [h11, h12, h21, h22] (first number is index of tx, second is index of rx)
    - N0: Noise power density
    - W: Bandwidth

    Returns:
    - Total spectral efficiency
    """
    P1, P2 = P
    # h11, h12, h21, h22 = h
    h11, h12, h21, h22 = h

    SE1 = np.log2(1 + h11 * P1 / (N0 * bandwidth + h21 * P2))
    SE2 = np.log2(1 + h22 * P2 / (N0 * bandwidth + h12 * P1))

    total_SE = SE1 + SE2

    return total_SE

In [None]:
def energy_efficiency(P, h, N0, bandwidth, Pc):
    """
    Calculate the energy efficiency for two transmitters.

    Parameters:
    - P: Array of transmit powers [P1, P2]
    - h: Array of channel gains [h11, h12, h21, h22]
    - N0: Noise power density
    - W: Bandwidth
    - Pc: Constant circuit power of the transmitter

    Returns:
    - Energy efficiency (EE)
    """
    h11, h12, h21, h22 = h
    P1, P2 = P

    # Calculate the spectral efficiencies for the given power configuration
    SE1 = np.log2(1 + h11 * P1 / (N0 * bandwidth + h21 * P2))
    SE2 = np.log2(1 + h22 * P2 / (N0 * bandwidth + h12 * P1))

    P1, P2 = P

    EE = SE1/(P1+Pc) + SE2/(P2+Pc)

    return EE

Step 5: Find optimal transmission powers

In [None]:
def find_optimal_power_on_se(h, N0, W):
    """
    Find the optimal transmit powers for two transmitters using binary power control.

    Parameters:
    - h: Array of channel gains [h11, h12, h21, h22]
    - N0: Noise power density
    - W: Bandwidth

    Returns:
    - Optimal power configuration (P1, P2)
    - Corresponding spectral efficiency
    """
    # Define the possible power configurations: [P1, P2]
    power_configurations = [
        [0, 0],
        [0, 1],
        [1, 0],
        [1, 1]
    ]

    # Initialize variables to store the best configuration and spectral efficiency
    best_power_configuration = None
    best_spectral_efficiency = -float('inf')
    best_channel_gain = None

    h_current = h[:, 0, :]  # Shape (2, 1, num_realizations)

    # Loop over all channel realizations
    for realization in range(h.shape[2]):
      # Iterate over all possible power configurations
      for P in power_configurations:
        se = spectral_efficiency(P, h_current[:, realization], N0, W)

        if se > best_spectral_efficiency:
          best_spectral_efficiency = se
          best_power_configuration = P
          best_channel_gain = h_current[:, realization]


    return best_channel_gain, best_power_configuration

In [None]:
from multiprocessing import Pool, cpu_count


def objective(P, h_curr, N0, W, Pc):
    return -np.sum(energy_efficiency(P, h_curr, N0, W, Pc))


def find_optimal_power_on_ee(h, N0, W, Pc, threshold, num_workers=2):
    # Initial power configuration
    initial_power = [0, 0]

    # Define bounds for each power: (0, max_threshold)
    bounds = [(0.0001, threshold), (0.0001, threshold)]

    # Extract channel gains for all realizations (shape: (2, num_realizations))
    h_current = h[:, 0, :]

    # Set number of workers to use
    num_workers = num_workers or max(1, cpu_count() - 1)  # Leave one core free

    # Prepare data for parallel processing
    realization_data = [(initial_power, bounds, h_current[:, realization], N0, W, Pc) for realization in range(h.shape[2])]
    # print(realization_data)

    results_array = np.empty((h.shape[2],), dtype=[('optimal_power', float, 2), ('max_energy_efficiency', float)])

    initial_powers = [data[0] for data in realization_data]
    channel_gains = [data[2] for data in realization_data]
    N0_list = [data[3] for data in realization_data]
    W_list = [data[4] for data in realization_data]
    Pc_list = [data[5] for data in realization_data]

    with Pool(num_workers) as pool:
        # Vectorize the objective function call using starmap
        results = pool.starmap(objective, zip(
            initial_powers,  # Pass initial powers
            channel_gains,   # Pass channel gains
            N0_list,         # Pass N0 values
            W_list,          # Pass W values
            Pc_list          # Pass Pc values
        ))
        results_array[:] = results
    # Find the index of the maximum energy efficiency
    best_index = np.argmax(results_array['max_energy_efficiency'])

    # Extract the best values using the index
    best_power_configuration = results_array['optimal_power'][best_index]
    best_energy_efficiency = results_array['max_energy_efficiency'][best_index]
    best_channel_gain = h_current[:, best_index]  # Corresponding channel gain for best realization
    return best_channel_gain, best_power_configuration


Generation of data

In [None]:
from functools import partial

# Partial function for handling the operations within a single sample
def process_sample(sample, ref_path_loss, N0, bandwidth, Pc, threshold):
    # Generate coordinates for 2 transmitters and 2 receivers
    transmitters = generate_random_coordinates(2, x_range, y_range, z_range)
    receivers = generate_random_coordinates(2, x_range, y_range, z_range)

    # create channels b/w tx and rx
    channels = generate_all_channels(ref_path_loss, transmitters, receivers)

    # generate channel gains
    h = generate_channel_gains(channels)

    # Calculate optimal power for spectral efficiency
    channel_gains_se, tx_powers_se = find_optimal_power_on_se(h, N0, bandwidth)
    # Calculate optimal power for energy efficiency
    channel_gains_ee, tx_powers_ee = find_optimal_power_on_ee(h, N0, bandwidth, Pc, threshold)

    # Normalize channel gains and multiply by 100, then round to nearest integer
    mean_gain_se = np.mean(channel_gains_se)
    mean_gain_ee = np.mean(channel_gains_ee)

    std_dev_gain_se = np.std(channel_gains_se)
    std_dev_gain_ee = np.std(channel_gains_ee)

    if std_dev_gain_se > 0:
        normalized_channel_gains_se = np.round(((channel_gains_se - mean_gain_se) / std_dev_gain_se) * 100).astype(int)
    else:
        normalized_channel_gains_se = np.zeros_like(channel_gains_se)  # Avoid division by zero

    if std_dev_gain_ee > 0:
        normalized_channel_gains_ee = np.round(((channel_gains_ee - mean_gain_ee) / std_dev_gain_ee) * 100).astype(int)
    else:
        normalized_channel_gains_ee = np.zeros_like(channel_gains_ee) # Avoid division by zero

    # Normalize transmit powers and multiply by 100, then round to nearest integer
    normalized_tx_powers_se = np.round((np.array(tx_powers_se) / threshold) * 100).astype(int)
    normalized_tx_powers_ee = np.round((np.array(tx_powers_ee) / threshold) * 100).astype(int)

    # Prepare output strings
    line_se = f"If A is {', '.join(map(str, normalized_channel_gains_se))}, then B is {normalized_tx_powers_se[0]}, {normalized_tx_powers_se[1]}.\n"
    line_ee = f"If A is {', '.join(map(str, normalized_channel_gains_ee))}, then B is {normalized_tx_powers_ee[0]}, {normalized_tx_powers_ee[1]}.\n"

    return line_se, line_ee

# Use a partial function to fill in the common arguments
process_sample_partial = partial(process_sample, ref_path_loss=p_0, N0=noise_power_density, bandwidth=W, Pc=circuit_power, threshold=max_power)

# Apply the map function to process all samples
results_se, results_ee = zip(*map(process_sample_partial, range(num_samples)))
print()
se_dataset = "se.txt"
ee_dataset = "ee.txt"
# Write to files in one go
with open(se_dataset, 'w') as file_se:
    file_se.writelines(results_se)

with open(ee_dataset, 'w') as file_ee:
    file_ee.writelines(results_ee)
