# LoneSTAR

Baseline from:

I. P. Roberts, S. Vishwanath, and J. G. Andrews, “LoneSTAR: Analog Beamforming Codebooks for Full-Duplex Millimeter Wave Systems,” IEEE Transactions on Wireless Communications, vol. 22, no. 9, pp. 5754–5769, Sep. 2023, [doi: 10.1109/TWC.2023.3236352](https://doi.org/10.1109/TWC.2023.3236352).


In [1]:
import time

import numpy as np
import pandas as pd
from tqdm.notebook import tqdm

import cvxpy as cp

from cissir import params, optimization as opt
from cissir.physics import pow2db, db2power, mag2db, db2mag
from cissir.beamforming import dft_codebook
from cissir.raytracing import si_paths2cir
from cissir.utils import PrintBuffer, res_path


In [2]:
# Global parameters

c = params.c
fc = params.fc_hz
wavelength_m = params.wavelength_m

ula_el_spacing = params.array_electrical_spacing

N_r = params.n_rx
N_t = params.n_tx

# Number of beams
L_r = params.num_beams
L_t = params.num_beams

### DFT beam codebook


In [3]:
dft_tx_codebook, tx_degs = dft_codebook(L_max = L_t , N1 = N_t, full_grid=True,
                                          az_min = -60, az_max = 60, transmit=True)
L_t = len(tx_degs)
dft_rx_codebook, rx_degs = dft_codebook(L_max = L_r , N1 = N_r, full_grid=True,
                                          az_min = -60, az_max = 60, transmit=False)
L_r = len(rx_degs)

In [4]:
si_num_taps = 1

def cb_si_db(tx_codebook, rx_codebook, si_channel):
    return mag2db(opt.codebook_si(tx_codebook, rx_codebook, si_channel))

with np.load(res_path/"channel_impulse_responses.npz") as rt_data:
    t_rt = np.squeeze(rt_data['t_channel_s'])
    ht_si_full = np.squeeze(rt_data['ht_si'])
    h_full = np.sum(ht_si_full, axis=0).transpose(2, 0, 1)

with np.load(res_path/"si_mimo.npz") as rt_data:
    h_si = rt_data['h_si_matrix'][:si_num_taps]
    si_db_ref = cb_si_db(dft_tx_codebook, dft_rx_codebook,
                         si_paths2cir(ht_si_full[:si_num_taps])).max()
    h_si *= db2mag(si_db_ref - cb_si_db(dft_tx_codebook, dft_rx_codebook, h_si).max())
    
    assert h_si.shape == (si_num_taps, N_r, N_t), "Unexpected SI shape"

    h_si = np.squeeze(h_si)


In [5]:
# ||H||_F=1.0
h_si_normalized = h_si/np.linalg.norm(h_si)

In [6]:
# Prob (43)

phased_array = True
max_att = -100

feasible_codebook = opt.feasible_phased_array if phased_array else opt.feasible_tapered

A_tx = cp.Constant(dft_tx_codebook/np.abs(dft_tx_codebook))
A_rx = cp.Constant(dft_rx_codebook/np.abs(dft_rx_codebook))

# Create TX problem

threshold_tx = cp.Parameter(pos=True)
Wt = cp.Constant(dft_rx_codebook)
Ft = cp.Variable(shape=(N_t, L_t), complex=True)
Ft.value = dft_tx_codebook

obj_tx = cp.Minimize(cp.norm(Wt.H @ h_si_normalized @ Ft, p='fro')**2)
similarity_tx = cp.norm(N_t * np.ones(L_t) - cp.diag(A_tx.H @ Ft), p=2)**2 <= threshold_tx
feasible_tx = cp.abs(Ft) <= cp.Constant(1.0)

problem_tx = cp.Problem(obj_tx, constraints=[similarity_tx, feasible_tx])

# Create RX problem

threshold_rx = cp.Parameter(pos=True)

Wr = cp.Variable(shape=(N_r, L_r), complex=True)
Fr = cp.Parameter(shape=(N_t, L_t), complex=True, value=dft_tx_codebook)
Wr.value = dft_rx_codebook

obj_rx = cp.Minimize(cp.norm(Wr.H @ h_si_normalized @ Fr, p='fro')**2)

similarity_rx = cp.norm(N_r * np.ones(L_r) - cp.diag(A_rx.H @ Wr), p=2)**2 <= threshold_rx
feasible_rx = cp.abs(Wr) <= cp.Constant(1.0)

problem_rx = cp.Problem(obj_rx, constraints=[similarity_rx, feasible_rx])

error_db_values = np.arange(-70, -2, 1)

results = []
tx_codebooks = []
rx_codebooks = []
pbuf = PrintBuffer(print_input=False)
for error_db in tqdm(error_db_values):

    threshold_tx.value = db2power(error_db) * L_t * N_t ** 2
    threshold_rx.value = db2power(error_db) * L_r * N_r ** 2
    Ft.value = dft_tx_codebook
    Wr.value = dft_rx_codebook

    start_time = time.perf_counter()
    problem_tx.solve(solver='SCS', warm_start=True)
    F_sol = feasible_codebook(Ft.value) # Make realizable
    Fr.value = F_sol
    problem_rx.solve(solver='SCS', warm_start=True)
    W_sol = feasible_codebook(Wr.value)
    end_time = time.perf_counter()

    opt_time = end_time - start_time

    tx_status = problem_tx.status
    if tx_status != 'optimal':
        opt_time = None
        pbuf.print(f"Tx error for {error_db} dB: {tx_status}")
    rx_status = problem_rx.status
    if rx_status != 'optimal':
        opt_time = None
        pbuf.print(f"Rx error for {error_db} dB: {rx_status}")

    f_codebook = F_sol/np.sqrt(N_t)
    w_codebook = W_sol/np.sqrt(N_r)
    
    si_db_1tap = cb_si_db(f_codebook, w_codebook, h_si).max()
    si_db_full = cb_si_db(f_codebook, w_codebook, h_full).max()

    att_db = -si_db_1tap
    att_full_db = -si_db_full

    if att_db > max_att:
        max_att = att_db
    elif att_db < max_att -2.0:
        print("SI attenuation does not improve. Exiting loop...")
        break

    error_f = pow2db(max(opt.codebook_deviation_power(dft_tx_codebook, f_codebook), 1e-10))
    error_w = pow2db(max(opt.codebook_deviation_power(dft_rx_codebook, w_codebook), 1e-10))
    
    results.append({"loss_tgt": error_db, "opt": "lonestar", "elapsed_time": opt_time,
                    "att_opt": att_db, "tx_loss": error_f, "rx_loss": error_w,
                    "att_full": att_full_db,
                    "phased": phased_array, "si_taps": si_num_taps, "num_beams": L_t})
    
    pbuf.print(f"Optimization for error {error_db:.1f} dB")
    pbuf.print(f"\tTx error:\t{error_f:.1f} dB")
    pbuf.print(f"\tRx error:\t{error_w:.1f} dB")
    pbuf.print(f"\tSI att 1-tap:\t{att_db:.1f} dB")
    pbuf.print(f"\tFull SI att:\t{att_full_db:.1f} dB")
    pbuf.print("\n")
    
    tx_codebooks.append(f_codebook)
    rx_codebooks.append(w_codebook)


  0%|          | 0/68 [00:00<?, ?it/s]



SI attenuation does not improve. Exiting loop...


In [7]:
pbuf.print()

Tx error for -70 dB: optimal_inaccurate
Rx error for -70 dB: optimal_inaccurate
Optimization for error -70.0 dB
	Tx error:	-32.3 dB
	Rx error:	-32.3 dB
	SI att 1-tap:	61.9 dB
	Full SI att:	55.4 dB


Rx error for -69 dB: optimal_inaccurate
Optimization for error -69.0 dB
	Tx error:	-31.9 dB
	Rx error:	-31.8 dB
	SI att 1-tap:	62.0 dB
	Full SI att:	55.4 dB


Optimization for error -68.0 dB
	Tx error:	-31.4 dB
	Rx error:	-31.4 dB
	SI att 1-tap:	62.1 dB
	Full SI att:	55.5 dB


Optimization for error -67.0 dB
	Tx error:	-30.9 dB
	Rx error:	-30.9 dB
	SI att 1-tap:	62.2 dB
	Full SI att:	55.5 dB


Optimization for error -66.0 dB
	Tx error:	-30.4 dB
	Rx error:	-30.4 dB
	SI att 1-tap:	62.3 dB
	Full SI att:	55.6 dB


Optimization for error -65.0 dB
	Tx error:	-29.9 dB
	Rx error:	-29.8 dB
	SI att 1-tap:	62.4 dB
	Full SI att:	55.7 dB


Optimization for error -64.0 dB
	Tx error:	-29.4 dB
	Rx error:	-29.4 dB
	SI att 1-tap:	62.5 dB
	Full SI att:	55.7 dB


Optimization for error -63.0 dB
	Tx error:	-28.

## Saving results

In [8]:
suffix = "phased" if phased_array else "taper"
fname = f"lonestar_cb_results_{suffix}.csv"
res_df = pd.DataFrame(results)
res_df.to_csv(res_path/fname)
print(f"{fname} was saved at {res_path}.")

lonestar_cb_results_phased.csv was saved at /mnt/project/results.


In [9]:
error_array = np.insert(error_db_values.astype(float), 0, -np.inf)
tx_codebooks = np.stack([dft_tx_codebook] + tx_codebooks, axis=0)
rx_codebooks = np.stack([dft_rx_codebook] + rx_codebooks, axis=0)

metadata = np.array({"method": "lonestar", "phased": phased_array, "si_num_taps": si_num_taps})
fname = f"lonestar_codebooks.npz"
np.savez(res_path/fname, error=error_array,
         rx_degrees=rx_degs, tx_degrees=tx_degs,
         rx=rx_codebooks, tx=tx_codebooks,
         metadata=metadata)
print(f"{fname} was saved at {res_path}.")

lonestar_codebooks.npz was saved at /mnt/project/results.
