In [None]:
# Imports
import numpy as np
import scipy
import scipy.signal as signal
import scipy.interpolate as interpolate
import matplotlib.pyplot as plt
import plotly.graph_objs as go
from plotly.subplots import make_subplots
from scipy.interpolate import CubicSpline
from scipy.signal import hilbert
import numpy as np
import ipywidgets as ipw
import base64
from random import randint
from pynq import Clocks
import xrfdc
import os
from pynq.lib import Pmod_IO
import time

# Use the RFSoC base overlay
from pynq.overlays.base import BaseOverlay

base = BaseOverlay('base.bit')

# Start RF clocks
base.init_rf_clks()

In [None]:
# ADC initialization
# Channels
DAC_CHANNEL_B = 0 # 'Channel 0': {'Tile': 224, 'Block': 0}
DAC_CHANNEL_A = 1 # 'Channel 1': {'Tile': 230, 'Block': 0}

ADC_CHANNEL_D = 0 # 'Channel 0': {'Tile': 224, 'Block': 0}
ADC_CHANNEL_C = 1 # 'Channel 1': {'Tile': 224, 'Block': 1}
ADC_CHANNEL_B = 2 # 'Channel 2': {'Tile': 226, 'Block': 0}
ADC_CHANNEL_A = 3 # 'Channel 3': {'Tile': 226, 'Block': 1}

antenna_adc = ADC_CHANNEL_D

adc_char_array = ['D']

number_samples = 32768  # Between 16 and 32768
decimation_factor = 1 # 2 is default
sample_frequency = 4915.2e6/decimation_factor  # Hz The default sample frequency is 4915.2e6 Hz which is sufficient for our signal

original_adc_settings = base.radio.receiver.channel[ADC_CHANNEL_D].adc_block.MixerSettings

base.radio.receiver.channel[antenna_adc].adc_block.DecimationFactor = decimation_factor
base.radio.receiver.channel[antenna_adc].adc_block.MixerSettings = {
    'CoarseMixFreq':  xrfdc.COARSE_MIX_BYPASS,
    'EventSource':    xrfdc.EVNT_SRC_TILE, 
    'FineMixerScale': xrfdc.MIXER_SCALE_1P0,
    'Freq':           0.0,
    'MixerMode':      xrfdc.MIXER_MODE_R2C,
    'MixerType':      xrfdc.MIXER_TYPE_COARSE,
    'PhaseOffset':    0.0
}
base.radio.receiver.channel[antenna_adc].adc_block.UpdateEvent(xrfdc.EVENT_MIXER)

In [None]:
lazy_harold_ear = Pmod_IO(base.PMODB,2,'out')
lazy_harold_mouth = Pmod_IO(base.PMODA,0,'in')

lazy_harold_ear.write(0)    # Drive pin low

one = 0
zero = 0
count = 0
while count < 100000:
    if lazy_harold_mouth.read() == 1:
        one += 1
    else:
        zero += 1
    count += 1
print("one:", one)
print("zero:", zero)
print(one/(zero+one)*100)
    

In [None]:
lazy_harold_ear.write(0)

In [None]:
def plot_complex_time(data, n=number_samples, fs=sample_frequency, 
                      title='Complex Time Plot'):
    plt_re_temp = (go.Scatter(x = np.arange(0, n/fs, 1/fs),
                              y = np.real(data), name='Real'))
    plt_im_temp = (go.Scatter(x = np.arange(0, n/fs, 1/fs),
                              y = np.imag(data), name='Imag'))
    return go.FigureWidget(data = [plt_re_temp, plt_im_temp],
                           layout = {'title': title, 
                                     'xaxis': {
                                         'title': 'Seconds (s)',
                                         'autorange' : True},
                                     'yaxis': {
                                         'title': 'Amplitude (V)'}})
rx_amplitude_data = []
rx_power_data = []
    
# Take measurement
rx_amplitude_data.append(base.radio.receiver.channel[antenna_adc].transfer(number_samples))
rx_amplitude_data_real = np.real(rx_amplitude_data[0])
rx_power_data.append(10*np.log10((np.square(rx_amplitude_data_real[np.argmax(rx_amplitude_data_real)]/np.sqrt(2))/100)/.001))
print("Power:", rx_power_data[0])

fig = plot_complex_time(rx_amplitude_data[0])
fig.show()

In [None]:
# Method 2: One way communication WORKING

message = 1
LH_step_size = .9
start_angle = -90 # (degrees)
sweep_degrees = 180 # Choose MUST BE MULTIPLE OF .9 (degrees)
if (int(sweep_degrees%LH_step_size)) != 0:
    print("Invalid sweep_degrees. Choose value divisible by", LH_step_size)
number_of_measurements = (sweep_degrees / LH_step_size) + 1


end_angle = start_angle + sweep_degrees # (degrees)
actual_radians = sweep_degrees*(np.pi/180)
print("Sweeping", sweep_degrees, "degrees to generate radiation pattern using", int(number_of_measurements), "measurements recorded between", start_angle, "degrees and", end_angle, "degrees every", LH_step_size, "degrees...")

rx_amplitude_data = []
rx_power_data = []

for measurement in range(0, int(number_of_measurements)):
    
    # Take measurement
    rx_amplitude_data.append(base.radio.receiver.channel[antenna_adc].transfer(number_samples))
    rx_amplitude_data_real = np.real(rx_amplitude_data[measurement])
    rx_power_data.append(10*np.log10((np.square(rx_amplitude_data_real[np.argmax(rx_amplitude_data_real)]/np.sqrt(2))/100)/.001))
    # if rx_power_data[measurement+90] > 10 or rx_power_data[measurement+90] < 4:
    #     print("FAIL.")
    
    # Tell LH measurement is complete
    if measurement != (number_of_measurements - 1):
        lazy_harold_ear.write(message)
        if message == 1:
            message = 0
        else:
            message = 1

    # Give LH time to complete a twitch
    time.sleep(1.5)

# lazy_harold_ear.write(0)

   
    
    
        
fig, ax = plt.subplots(subplot_kw={'projection': 'polar'})
ax.plot(np.linspace(-np.pi/2, (-np.pi/2)+actual_radians, int(number_of_measurements)), rx_power_data) # MAKE SURE TO USE RADIAN FOR POLAR
ax.set_theta_zero_location('N') # make 0 degrees point up
ax.set_theta_direction(-1) # increase clockwise
ax.set_rlabel_position(55)  # Move grid labels away from other labels
plt.show()

print("Average power:", np.average(rx_power_data), "dBm")
print("Max power:", rx_power_data[np.argmax(rx_power_data)], "dBm")
print("Detected at:", start_angle + (np.argmax(rx_power_data)*LH_step_size), "degrees")

all_fig = make_subplots(specs=[[{"secondary_y": False}]])  # Adjust as necessary
    
# Sample carrier signal
carrier_data = []
for ADC in adc_array: # Show only ADC D
    carrier_data.append(base.radio.receiver.channel[ADC].transfer(number_samples))
    time_data = np.arange(0, number_samples/sample_frequency, 1/sample_frequency)
    # print(carrier_data)
    sampled_signal = np.real(carrier_data[ADC])
    cs_real = CubicSpline(time_data, sampled_signal)

    # Interpolated data
    dense_t = np.linspace(time_data.min(), time_data.max(), len(time_data) * 10)  # Increase density
    interpolated_signal = cs_real(dense_t)
    
    # Create Plotly figure
    fig = make_subplots(specs=[[{"secondary_y": False}]])  # Adjust as necessary

    # Add actual data trace
    fig.add_trace(
    go.Scatter(x=time_data, y=sampled_signal, name=f"Actual Data ADC {adc_char_array[ADC]}"),
    secondary_y=False,
    )

    # Add interpolated data trace
    fig.add_trace(
    go.Scatter(x=dense_t, y=interpolated_signal, name=f"Interpolated Data ADC {adc_char_array[ADC]}"),
    secondary_y=False,
    )
    # all_fig.add_trace(
    # go.Scatter(x=time_data, y=sampled_signal, name=f"Actual Data ADC {ADC}"),
    # secondary_y=False,
    # )

    # Add interpolated data trace
    all_fig.add_trace(
    go.Scatter(x=dense_t, y=interpolated_signal, name=f"Interpolated Data ADC {adc_char_array[ADC]}"),
    secondary_y=False,
    )

    # Update layout
    fig.update_layout(
    title=f"Time Domain Plot of ADC Channel {adc_char_array[ADC]}",
    xaxis_title="Time (s)",
    yaxis_title="Amplitude",
    )
    
    all_fig.update_layout(
    title=f"Time Domain Plot of all ADC's",
    xaxis_title="Time (s)",
    yaxis_title="Amplitude",
    )
    
    # figs.append(fig)
    
    phase_real = calculate_phase(np.real(carrier_data[ADC]))
    print("Measured Data Phase for ADC " + str(adc_char_array[ADC]) + ": " + str(phase_real))
    

    # Calculate phase for interpolated data
    phase_interpolated = calculate_phase(interpolated_signal)
    print("Interpolated Data Phase for ADC " + str(adc_char_array[ADC]) + ": " + str(phase_interpolated))
    
    average_phases[ADC] = phase_interpolated
    
    print(carrier_data[ADC])
    print(interpolated_signal)



print("Measured peak 2 peak amplitude: ? V")
print("Measured frequency: ? MHz")
print("Measured period: ? ns")
print("Phase differences: ????")


figs.append(all_fig)



for figgy in figs:
    figgy.show()
    
    
print()
print("AVERAGE PHASES:")
    
for i in range(0,4):
    adc = ''
    if i == 0:
        adc = 'D'
    if i == 1:
        adc = 'C'
    if i == 2:
        adc = 'B'
    if i == 3:
        adc = 'A'
    print("ADC " + str(adc) + " phase:" + str(average_phases[i]))
    
    
# APPLY BEAMFORMING WEIGHTS

sample_rate = 1e6
N = 10000 # number of samples to simulate
d = 0.5 # half wavelength spacing
Nr = 4
theta_degrees = 30 # direction of arrival (feel free to change this, it's arbitrary)
theta = theta_degrees / 180 * np.pi # convert to radians

# Create 4 tones to simulate signals being seen by each element
t = np.arange(N)/sample_rate # time vector
shift = 50/4
f_tone = 0.02e6
tx = np.exp(2j * np.pi * f_tone * t)

b_0 = np.exp(-2j * np.pi * d * 0 * np.sin(theta)) # array factor
b_1 = np.exp(-2j * np.pi * d * 1 * np.sin(theta)) # array factor
b_2 = np.exp(-2j * np.pi * d * 2 * np.sin(theta)) # array factor
b_3 = np.exp(-2j * np.pi * d * 3 * np.sin(theta)) # array factor

tx_0 = tx * b_0
tx_1 = tx * b_1
tx_2 = tx * b_2
tx_3 = tx * b_3

beamformed_data = [[],[],[],[]]

beamformed_figs = make_subplots(specs=[[{"secondary_y": False}]])

# DELAY
if theta_degrees >= 0 and theta_degrees <= 90:
    for ADC in carrier_data:
        match ADC:
            case 0:
                beamformed_data[ADC] = carrier_data[ADC] * b_0
            case 1:
                beamformed_data[ADC] = carrier_data[ADC] * b_1
            case 2:
                beamformed_data[ADC] = carrier_data[ADC] * b_2
            case 3:
                beamformed_data[ADC] = carrier_data[ADC] * b_3
                # Add interpolated data trace
        beamformed_figs.add_trace(
        go.Scatter(x=dense_t, y=interpolated_signal, name=f"Beamformed Data ADC {beamformed_data[ADC]}"),
        secondary_y=False,
        )
        
        beamformed_figs.update_layout(
        title=f"Time Domain Plot of all Beamformed Data",
        xaxis_title="Time (s)",
        yaxis_title="Amplitude",
        )
elif theta_degrees < 0 and theta_degrees >= -90:
    for ADC in carrier_data:
        match ADC:
            case 0:
                beamformed_data[ADC] = carrier_data[ADC] * b_3
            case 1:
                beamformed_data[ADC] = carrier_data[ADC] * b_2
            case 2:
                beamformed_data[ADC] = carrier_data[ADC] * b_1
            case 3:
                beamformed_data[ADC] = carrier_data[ADC] * b_0
        beamformed_figs.add_trace(
        go.Scatter(x=dense_t, y=interpolated_signal, name=f"Beamformed Data ADC {beamformed_data[ADC]}"),
        secondary_y=False,
        )
        
        beamformed_figs.update_layout(
        title=f"Time Domain Plot of all Beamformed Data",
        xaxis_title="Time (s)",
        yaxis_title="Amplitude",
        )
else:
    print("Input angle out of range. Please input an angle between -90 and 90 degrees.")


for figgy in beamformed_figs:
    figgy.show()
    
summed_data = beamformed_data[0]
for ADC in beamformed_data:
    if ADC in adc_array:
        for index in range(len(beamformed_data[ADC])):
            summed_data[index] = summed_data[index] + beamformed_data[ADC][index]
            

summed_figs = make_subplots(specs=[[{"secondary_y": False}]])
summed_figs.add_trace(
        go.Scatter(x=dense_t, y=interpolated_signal, name=f"Summed Beamforming Data"),
        secondary_y=False,
        )
summed_figs.show()


time_data = np.arange(0, number_samples/sample_frequency, 1/sample_frequency)

summed_sampled_signal = np.real(summed_data)
cs_real = CubicSpline(time_data, summed_sampled_signal)

# Interpolated data
dense_t = np.linspace(time_data.min(), time_data.max(), len(time_data) * 10)  # Increase density
interpolated_signal = cs_real(dense_t)

# Create Plotly figure
interpolated_summed_figs = make_subplots(specs=[[{"secondary_y": False}]])

# Add actual data trace
interpolated_summed_figs.add_trace(
go.Scatter(x=time_data, y=sampled_signal, name=f"Actual Data Shifted and Summed"),
secondary_y=False,
)

# Add interpolated data trace
interpolated_summed_figs.add_trace(
go.Scatter(x=dense_t, y=interpolated_signal, name=f"Interpolated Data Shifted and Summed"),
secondary_y=False,
)



interpolated_summed_figs.show()