In [None]:
import asyncio
import math
import matplotlib.pyplot as plt
from IPython.display import clear_output
from astropy.coordinates import EarthLocation, AltAz, SkyCoord, GCRS
from astropy import units as u
from astropy.time import Time
import numpy as np
from datetime import datetime, timedelta
from collections import defaultdict


axes = None
fig = None
time = None

start_time = datetime(2025, 2, 21, 12, 0, 0)
current_time = start_time

# Let's start with simple example 
# we have a baseline in xyz coordinates
vla = EarthLocation.of_site('vla')
source_ra = 10*u.hourangle
source_dec = 30*u.deg
source = SkyCoord(ra=source_ra, dec=source_dec, frame='icrs')

obs_frequency = 500_000
obs_wavelength = 3.00 * 10 ** 8 / obs_frequency

time_step = 1 / (2 * obs_frequency)


# convert to x,y,z coordinates where z points to the NCP
# we need the latitude of the site
# y will point locally eastward
# z aligns with NCP
# think of projection of NU axes onto z.
# x is y X z
latitude = vla.lat
transform_enu_xyz = np.array([
    [0, -np.sin(latitude), np.cos(latitude)], 
    [1, 0, 0], 
    [0, np.cos(latitude), np.sin(latitude)]
])



class Signal:

    all_signals = []
    

    def __init__(self, frequency: float, amplitude: float, phase_shift: float, source_ra: float, source_dec: float):
        self.frequency = frequency
        self.amplitude = amplitude
        self.phase_shift = phase_shift
        self.source_ra = source_ra
        self.source_dec = source_dec

        Signal.all_signals.append(self)

    def get_signal_value(self, time: float) -> float:
        signal = self.amplitude * math.cos(2 * math.pi * self.frequency * time + self.phase_shift)
        return signal

    @classmethod
    def get_all_instances(cls):
        return cls.all_signals


    
class Antenna:

    all_antennae = []

    color_map = {
        1: 'b',
        2: 'r',
        3: 'g',
        4: 'p',
    }
    
    def __init__(self, antenna_id: int, baseline_enu: np.array):
        self.id = antenna_id
        self.baseline_enu = baseline_enu
        self.baseline_xyz = np.matmul(transform_enu_xyz, baseline_enu)
        print(f"baseline length is {np.linalg.norm(self.baseline_xyz)}")
        Antenna.all_antennae.append(self)
    
    def get_signal_delay(self, transform_xyz_uvw: np.array) -> float:
        baseline_uvw = np.matmul(transform_xyz_uvw, self.baseline_xyz)
        print(f"Baseline uvw has length {np.linalg.norm(baseline_uvw)}")
        # The vector w points towards the source - so B.s = B.w = (Bx, By, Bz) . (0, 0, 1) = Bz
        # w is also a unit vector so to find the angle between these two vectors we just divide by the
        # length of the baseline.
        # We then need to take the negative as the signal is coming from the source so it is traversing the
        # path in reverse
        angle_degrees = np.arccos(baseline_uvw[2] / np.linalg.norm(baseline_uvw)) * 360 / (2 * np.pi)
        print(angle_degrees * 360 / ( 2 * np.pi))
        delay_distance = -baseline_uvw[2]
        print(delay_distance) 
        # geometric delay
        Tg = delay_distance / (3.00 * 10 ** 8)
        return Tg

    @classmethod
    def get_all_instances(cls):
        return cls.all_antennae

antennae = [
    Antenna(1, np.array([0,0,0])),
    Antenna(2, np.array([4000, 200, 0])),
    Antenna(3, np.array([-4000, -200, 10])),
]
signals = [
    Signal(obs_frequency,1, 0, source_ra, source_dec),
    Signal(obs_frequency * 0.8, 4, np.pi/2, source_ra + 10 * u.hourangle, source_dec + 5 * u.deg),
    Signal(obs_frequency * 1.2, 2, np.pi / 4, source_ra - 20 * u.hourangle, source_dec + 10 * u.deg),
]


received_signals = defaultdict(list)

async def telescope_signal_receivers():
    global time
    time = 0
    global current_time
    
    while True:
        all_antennae = Antenna.get_all_instances()
        all_signals = Signal.get_all_instances()

        current_time = start_time + timedelta(seconds=time)    
        obs_time = Time(current_time.isoformat(), location=vla)

        obs_local_sidereal_time = obs_time.sidereal_time(kind='mean')

        for antenna in all_antennae:
            signal_combined = 0
            for signal in all_signals:
                
                hour_angle = obs_local_sidereal_time - signal.source_ra 
                
                transform_xyz_uvw = np.array([
                    [np.sin(hour_angle), np.cos(hour_angle), 0],
                    [-np.sin(signal.source_dec) * np.cos(hour_angle), np.sin(signal.source_dec) * np.sin(hour_angle), np.cos(signal.source_dec)],
                    [np.cos(hour_angle) * np.cos(signal.source_dec), -np.sin(hour_angle) * np.cos(signal.source_dec), np.sin(signal.source_dec)]    
                ])
                
                # replace obs_wavelength with signal.wavelength?
                antenna_delay = antenna.get_signal_delay(transform_xyz_uvw)
                signal_combined += signal.get_signal_value(time - antenna_delay)
            received_signals[antenna.id].append(signal_combined)
            if len(received_signals[antenna.id]) >= 100:
                received_signals[antenna.id].pop(0)

        time += time_step
        await asyncio.sleep(0)
    

def update_plot():
    clear_output(wait=True)
    for antenna in Antenna.get_all_instances():
        plt.plot(received_signals[antenna.id], label=f"Antenna {antenna.id}", color=Antenna.color_map[antenna.id])
    plt.title("Real-Time Signals")
    plt.xlabel("Time (arbitrary units)")
    plt.ylabel("Signal Value")
    plt.xlim(0, 100) 
    plt.ylim(-10.1, 10.1)

    plt.legend()
    plt.show() 
    plt.pause(0.00001)



async def handle_signal_plot():    
    while True:
        update_plot()
        await asyncio.sleep(.01)




In [None]:
async def main():
    print("starting main loop")
    await asyncio.gather(
        telescope_signal_receivers(),
        handle_signal_plot()
    )


await main()

In [None]:
start_time

In [None]:
current_time

# Next Steps

- Fringe patterns
- Do correlation
- Do the Fourier transform to try and get back our point sources
- Add documentation to all of this
- Validations

In [None]:
time_step

In [None]:
time

# Validations

How can we validate this? Need to know more about the geometry. Use astropy to find the angle to the source and then try and calculate the delays analytically in original co-ordinate system.

## Validation 1
The first validation - we will align the baseline with the signal. Then the signal will have to travel the |B|m and we can easily calculate the delay time as B / c.
This should align with the delay calculated by the antenna.

In [None]:
obs_frame = AltAz(obstime=start_time, location=vla)
source_altaz = source.transform_to(obs_frame)
source_altaz

In [None]:
obs_time = Time(start_time.isoformat(), location=vla)
obs_local_sidereal_time = obs_time.sidereal_time(kind='mean')

In [None]:
# This is a unit vector that points towards the source
# The 90 + 360 - d comes from the fact that np.cos measures from the x-axis
# and the bearing is from the y-axis clockwise.
s_enu = np.array([np.cos((90 + 360 - source_altaz.az.degree) * 2 * np.pi/360) * np.cos(source_altaz.alt.degree * 2 * np.pi / 360),  np.sin((90 + 360 - source_altaz.az.degree) * 2 * np.pi / 360) * np.cos(source_altaz.alt.degree * 2 * np.pi / 360), np.sin(source_altaz.alt.degree * 2 * np.pi / 360)])
s_enu = s_enu / np.linalg.norm(s_enu)
s_enu

In [None]:
signal_aligned_baseline = np.array([-np.cos((90 + 360 - source_altaz.az.degree) * 2 * np.pi/360) * 4000 * np.cos(source_altaz.alt.degree * 2 * np.pi / 360), -np.sin((90 + 360 - source_altaz.az.degree) * 2 * np.pi/360) * 4000 * np.cos(source_altaz.alt.degree * 2 * np.pi / 360), -4000 * np.sin(source_altaz.alt.degree * 2 * np.pi / 360)])
signal_aligned_baseline = signal_aligned_baseline * 4000 / np.linalg.norm(signal_aligned_baseline)
antennae_signal_aligned_baseline = Antenna(4, signal_aligned_baseline)

# Take the negative as the signal is coming from the source and s_enu points towards the source
delay_distance = np.dot(-s_enu, antennae_signal_aligned_baseline.baseline_enu)
delay_time = delay_distance / (3.00 * 10 ** 8)

print(delay_distance)
print(delay_time)

assert np.isclose(delay_distance, np.linalg.norm(signal_aligned_baseline))

In [None]:
# Check that signal_aligned_baseline is aligned with the signal
# and that s_enu is aligned with the signal.

# If so, all should pass and there will be no output to this cell.
# Otherwise there will be an assertionerror.
assert all([np.isclose(np.arctan(signal_aligned_baseline[1] / signal_aligned_baseline[0]) * 360 / (2 * np.pi), - (source_altaz.az.degree - 270)),
    np.isclose(np.arctan(s_enu[1] / s_enu[0]) * 360 / (2 * np.pi), -(source_altaz.az.degree - 270)),
     np.isclose(np.arcsin(s_enu[2]) * 360 / (2 * np.pi), source_altaz.alt.degree),
     np.isclose(np.arcsin(signal_aligned_baseline[2] / np.linalg.norm(signal_aligned_baseline)) * 360 / (2 * np.pi), - source_altaz.alt.degree),
    ] 
)


In [None]:
# Code from above that calculates the delay
# This delay should match the delay_time calculated above.

obs_time = Time(start_time.isoformat(), location=vla)
obs_local_sidereal_time = obs_time.sidereal_time(kind='mean')

for antenna in [antennae_signal_aligned_baseline]:
    signal_combined = 0
    for signal in [signals[0]]:
        hour_angle = obs_local_sidereal_time - signal.source_ra
        
        transform_xyz_uvw = np.array([
            [np.sin(hour_angle), np.cos(hour_angle), 0],
            [-np.sin(signal.source_dec) * np.cos(hour_angle), np.sin(signal.source_dec) * np.sin(hour_angle), np.cos(signal.source_dec)],
            [np.cos(hour_angle) * np.cos(signal.source_dec), -np.sin(hour_angle) * np.cos(signal.source_dec), np.sin(signal.source_dec)]    
        ])

        # This should be approximately 1 so that it does not change the baseline length when transforming the coordinates.
        assert np.isclose(np.linalg.det(transform_xyz_uvw), 1)
        
        # replace obs_wavelength with signal.wavelength?
        antenna_delay = antenna.get_signal_delay(transform_xyz_uvw)

antenna_delay

In [None]:
assert np.isclose(delay_time, antenna_delay)