# Simple example of time domain beamforming

Let's imagine that we have one or more idealised point sound sources with spherical spreading impinging upon an n-element linear microphone array.

The situation is depicted in the figure below.

![Linear array](figures/linear_array.png)

Note that there are two different frames of reference here. 
There is the frame of reference for the location and orientation of the array.
For this frame of reference we use the unit circle, with the centre of the array at
the origin, and the perpendicular to the array (at $0^o$ rotation) pointing along
the x-axis. 
We use θ to denote the angle of the perpendicular of the array relative to
the unit circle.
For the bearing of contacts relative to the array we use a frame of reference
where $0^o$ denotes pointing directly up the array (forward end-fire),
$90^o$ is pointing perpendicular to the array (broadside) and
$180^o$ is directly down the array (aft end-fire).
We use φ to denote the angle of a signal relative to the array bearing.

Note that we are using point sources, not the plane-wave approximation.

We have used the following helpful links in developing the code for this example:

* [Time delay estimation for passive sonar signal processing](https://ieeexplore.ieee.org/document/1163560)
* [Jupyter User guide](https://docs.bokeh.org/en/latest/docs/user_guide/jupyter.html)
* https://en.wikipedia.org/wiki/Wavelength
* [Equations for Plane Waves, Spherical Waves, and Gaussian Beams](https://onlinelibrary.wiley.com/doi/pdf/10.1002/9781118939154.app3)
* https://github.com/bokeh/bokeh/blob/1.3.2/examples/howto/layouts/dashboard.py
* [Beamforming in the Time Domain](https://onlinelibrary.wiley.com/doi/10.1002/9781119293132.ch11)


For a linear array, a delay of 0 implies a beam-steer direction of broadside (perpendicular
to the array).

A delay equivalent to the spacing between two adjacent elements divided by the velocity of
the soundwave implies a beam steering direction of either endfire (parallel to the array),
where the endfire (front or back) is determined by the sign of the delay

More generally, a delay of $d_t = \cos\theta \frac{d}{c}$ corresponds to a beam-steer at
an angle $\theta$ for $\theta \in [0^o, 180^o]$.

Given that we are dealing with discrete time signals, the delay cannot be chosen arbitrarily.

Instead, for sample rate $S$, we have

$$ \cos\theta = \frac{c}{d} d_t = \frac{c}{d} \frac{m}{S} $$

where

$$ -\frac{Sd}{c} < m < \frac{Sd}{c} $$


## Import modules

In [None]:
from ipywidgets import interactive, fixed, GridspecLayout, Layout
from IPython.display import display
import numpy as np
import seaborn as sns
from bokeh.io import push_notebook, show, output_notebook
from bokeh.plotting import figure
from bokeh.models import Label, Span, Legend, LegendItem
from bokeh.layouts import grid, column, gridplot
from bokeh.models import CustomJS, Slider, ColumnDataSource

output_notebook()

## Set up parameters

In order to create interactive Bokeh widgets, it is necessary to separate the plotting and the 
calculating code, so that an update function can be written, which simple updates the data in
the plots.

See:
* https://github.com/bokeh/bokeh/blob/2.3.0/examples/howto/notebook_comms/Jupyter%20Interactors.ipynb

We create a class whose single instance will hold all the information regarding this example.

In [None]:
class Info(object):
    '''Holder for global information'''
    
    def __repr__(self):
        ''' Print a list of the attributes of an instance of the class'''
        print_str = '['
        for attr, value in self.__dict__.items():
            print_str += f"'{attr}',"
        print_str = print_str[:-1] + ']'
        return print_str
    
    def __str__(self):
        ''' Print a list of the attributes and their values for an
        instance of the class'''
        print_str = ''
        for attr, value in self.__dict__.items():
            print_str += f' {attr} : {value}\n'
        return print_str
    
info = Info()

In [None]:
def calculate_everything(X,# Instance of the `Info` class
        # Parameters we wish to vary:
                         num_elements=6,
                         array_angle=20,
                         num_sources=1,
                         src_radius=10,
                         wavelength=0.2,
                         sample_length=0.002,
                         t_0=0,
        # Debug parameters:
                         verbose=False,
                         ):
    '''This function calculates the temporal and spacial solution
    for a simple (idealistic) sound-field when we have a number of
    sound sources and a linear array of receivers.'''
    
    #TODO: make src_radius, wavelength, src_angle, src_amp into lists of
    # length num_sources

#    pass
#if True:

    #-------------------------------------------------------------------
    # Array characteristics
    #-------------------------------------------------------------------

    X.array_angle = array_angle
    X.spacing = 0.1 # metres
    X.x_location = 0 # metres
    Arr = []
    X.array_length = X.spacing*(num_elements-1)
    for k in range(num_elements):
        x_pos = -(X.array_length/2 - 
                  k*X.spacing)*np.sin(np.deg2rad(array_angle))
        y_pos = (X.array_length/2 - 
                 k*X.spacing)*np.cos(np.deg2rad(array_angle))
        Arr.append([x_pos,y_pos])
    #Arr = np.array(Arr)
    X.Arr = Arr
    X.col = sns.color_palette(None, num_elements).as_hex()

    #-------------------------------------------------------------------
    # Source characteristics
    #-------------------------------------------------------------------

    amp_src = 1
    delta_angle = np.pi/6
    Src = {}
    Src['position'] = []
    Src['absolute_direction'] = []
    Src['amplitude'] = []
    Src['wavelength'] = []
    for n in range(num_sources):
        src_angle = (n+1)*delta_angle
        x = src_radius*np.cos(src_angle)
        y = src_radius*np.sin(src_angle)
        Src['position'].append([x,y]) # Source position
        Src['absolute_direction'].append(np.rad2deg(src_angle))
        Src['amplitude'].append(amp_src) # Source amplitude
        Src['wavelength'].append(wavelength) # Source wavelength
    X.Src = Src

    π = np.pi
    c = 340 # m/s (Speed of sound)
    X.c = c

    #-------------------------------------------------------------------
    # Spacial characteristics
    #-------------------------------------------------------------------

    N = 500
    X.x_min = -1
    X.x_max = 1
    x = np.linspace(X.x_min, X.x_max, N)
    y = np.linspace(X.x_min, X.x_max, N)    
    xx, yy = np.meshgrid(x, y)
    
    #-------------------------------------------------------------------
    # Temporal characteristics
    #-------------------------------------------------------------------

    X.sample_rate = 48000
    X.sample_length = sample_length
    # Discrete beam-steer directions
    m = np.arange(int(-np.floor(X.spacing*X.sample_rate/c)),
                  int(np.ceil(X.spacing*X.sample_rate/c)),1)
    X.m = m
    cos_phi = c/X.spacing*m/X.sample_rate
    X.phi = np.rad2deg(np.arccos(cos_phi))
    # We need to get enough time samples, so that we can shift
    # by num_elements * (m[-1] - m[0]) without wrapping around
    t = np.arange(t_0 + m[0]*num_elements/X.sample_rate,
            t_0 + m[-1]*num_elements/X.sample_rate + sample_length,
            1/X.sample_rate)
    X.t_0 = t_0
    X.t = t
    X.beam_widths = np.stack([
        np.concatenate((X.phi[:-1] + np.diff(X.phi)/2,0), axis=None),
        np.concatenate((180, X.phi[:-1] + np.diff(X.phi)/2), axis=None)
    ])

    #-------------------------------------------------------------------
    # Sound-pressure level over all space at time, t_0
    #-------------------------------------------------------------------
    
    Z = 0*xx
    for n in range(num_sources):
        [s_x, s_y] = Src['position'][n] #location of source
        A = Src['amplitude'][n]
        λ = Src['wavelength'][n]
        f = c/λ # Hz
        rr = np.sqrt((xx-s_x)*(xx-s_x) + (yy-s_y)*(yy-s_y))
        Z = Z + A/rr*np.sin(2*π/λ*(rr - c*t_0))
        if verbose:
            print(f'Source {n+1}: A = {A}, ',
                  f'f = {f} Hz, λ = {λ} m , T = {1/f:0.2e} s') 
    X.Z = Z

    #-------------------------------------------------------------------
    # Sound-pressure level at each array element over all time, t
    #-------------------------------------------------------------------

    Y = []
    for k in range(num_elements):
        F = 0*t
        for n in range(num_sources):
            [s_x, s_y] = Src['position'][n] #location of source
            A = Src['amplitude'][n]
            λ = Src['wavelength'][n] # metres
            f = c/λ # Hz
            r = np.sqrt((Arr[k][0]-s_x)*(Arr[k][0]-s_x) + 
                        (Arr[k][1]-s_y)*(Arr[k][1]-s_y))
            F = F + A/r*np.sin(2*π/λ*(r - c*t))
        Y.append(F)
    X.Y = Y
        
    #-------------------------------------------------------------------
    # Time-domain beamforming
    #-------------------------------------------------------------------

    beams = 0*X.phi
    
    start_index = np.argmin(np.abs(t-t_0))
    stop_index = np.argmin(np.abs(t-t_0-sample_length))
    # We create a delay-sum time series for each look direction by
    # shifting the time-series for each element by an amount, m,
    # padding with zeros
    sum_F = {}
    for idx,j in enumerate(m):
        sum_F[j] = 0*Y[0]
        for k in range(num_elements):
            sum_F[j] = sum_F[j] + pad(Y[k],j*k)
        sum_F[j] = sum_F[j]/num_elements
        # We only sum the section of the beam which doesn't contain
        # any padding
        beams[idx] = np.sum(np.abs(
            sum_F[j][start_index:stop_index]))/(stop_index-start_index)
    X.sum_F = sum_F
    X.beams = beams

    return X

def pad(F,n):
    '''Return an array of the same size as F, where the elements of
    F are shifted by n, padding with zeros.'''
    if n == 0:
        return F
    if n > 0:
        return np.concatenate([np.zeros(n), F[:-n]])
    if n < 0:
        return np.concatenate([F[-n:], np.zeros(-n)])


In [None]:
def plot_everything(D, look_direction=90, verbose=False):

#    pass
#if True:

    num_elements = len(D.Arr)
    num_sources = len(D.Src['absolute_direction'])
    
    look_index = np.argmin(np.abs(D.phi-look_direction))
    bs = D.phi[look_index] # beam steer
    
    if verbose:
        print(f'Look direction : {look_direction}')
        print(f'Look index : {look_index}')
        print(f'beam-steer : {bs}') 
        
    title_str = (f'Look directions : ({look_direction}°,' +
                   f'{look_direction-180}°), ' +
                   f'Array angle : {D.array_angle}°, ' +
                   f'λ : {D.Src["wavelength"][0]:0.2f} m, ' +
                   f'f : {D.c/D.Src["wavelength"][0]:0.2f} Hz'
                  )
    
    #-------------------------------------------------------------------
    # Spacial Window
    #-------------------------------------------------------------------

    p = figure(tooltips=[("x", "$x"), ("y", "$y"), ("value", "@image")],
            title=title_str)
    p.xaxis.axis_label = p.yaxis.axis_label = 'Distance (m)'
    p.x_range.range_padding = p.y_range.range_padding = 0

    # Plot the sound field
    D.spacial = p.image(image=[D.Z],
            x=D.x_min, y=D.x_min,
            dw=(D.x_max-D.x_min), dh=(D.x_max-D.x_min), 
            palette="Spectral11", level="image")

    # Add the sensors to the plot
    D.sensor = []
    for k in range(num_elements):
        D.sensor.append(p.circle([D.Arr[k][0]],[D.Arr[k][1]],
                 size=10,
                 fill_color=D.col[k],
                 legend_label=f'Sensor {k}',
                 ))

    # Add the look-directions to the plot
    ang = np.deg2rad(look_direction + 90 + D.array_angle)
    x = [0,2*np.cos(ang)]
    y = [0,2*np.sin(ang)]
    D.look_beam_port = p.line(x,y,color='red',
        legend_label='Port Look Direction',
        line_width=3)
    ang = np.deg2rad(look_direction - 90 + D.array_angle)
    x = [0,2*np.cos(ang)]
    y = [0,2*np.sin(ang)]
    D.look_beam_starboard = p.line(x,y,color='green',
                legend_label='Starboard Look Direction', line_width=3)
    
    # Add the beamwidths to the plot
    ang1 = np.deg2rad(D.beam_widths[0,look_index] + 90 + D.array_angle)
    ang2 = np.deg2rad(D.beam_widths[1,look_index] + 90 + D.array_angle)
    x = [0,2*np.cos(ang1),2*np.cos(ang2)]
    y = [0,2*np.sin(ang1),2*np.sin(ang2)]
    D.beam_shade_port = p.patch(x,y,color='red',alpha=0.4,
        legend_label='Port Look Direction',
        line_width=0)
    ang1 = np.deg2rad(D.beam_widths[0,look_index] - 90 + D.array_angle)
    ang2 = np.deg2rad(D.beam_widths[1,look_index] - 90 + D.array_angle)
    x = [0,2*np.cos(ang1),2*np.cos(ang2)]
    y = [0,2*np.sin(ang1),2*np.sin(ang2)]
    D.beam_shade_starboard = p.patch(x,y,color='green',alpha=0.4,
        legend_label='Starboard Look Direction',line_width=0)

    # Add the source directions to the plot
    D.source_line = []
    for n in range(num_sources):
        D.source_line.append(p.line([0, D.Src['position'][n][0]],
                [0,D.Src['position'][n][1]],color='black',
                legend_label='Source direction(s)', line_width=2))
    
    p.grid.grid_line_width = 0.5
    p.legend.click_policy="hide"
    p.x_range.start = D.x_min
    p.x_range.end = D.x_max
    p.y_range.start = D.x_min
    p.y_range.end = D.x_max
    D.p = p

    #-------------------------------------------------------------------
    # Time Series Window
    #-------------------------------------------------------------------

    q = figure(title=title_str)
        
    q.xaxis.axis_label = 'Time (s)'
    q.yaxis.axis_label = 'Pressure'

    # plot the time series for each element
    D.time_series = []
    plot_max = 0
    for k in range(num_elements):
        D.time_series.append(q.line(D.t,pad(D.Y[k],D.m[look_index]*k),
                    color=D.col[k],legend_label=f'Sensor {k}'))
        plot_max = np.max([plot_max,np.max(D.Y[k])])

    # Plot the delay-sum for the look direction
    D.delay_sum = q.line(D.t,D.sum_F[D.m[look_index]],color='black',
            legend_label='Delay-sum')

    # Add a shading for the region used to calculate the beams
    x = [D.t_0, D.t_0,D.t_0+D.sample_length,D.t_0+D.sample_length]
    y = plot_max*np.array([-1,1,1,-1])
    D.sample_patch = q.patch(x,y,color='yellow',alpha=0.2,line_width=0)
    
    q.legend.click_policy="hide"
    D.q = q

    #-------------------------------------------------------------------
    # Beamforming Window
    #-------------------------------------------------------------------

    b = figure(title=title_str)
    
    b.xaxis.axis_label = 'Steering Angle (degrees)'
    b.yaxis.axis_label = 'Pressure'
    leg = 'Beam Pattern'
    
    # We duplicate the beam pattern for both the left and right of
    # the array
    x = np.concatenate((D.phi,D.phi-180))
    y = np.concatenate((D.beams,D.beams))
    D.beampattern_points = b.circle(x,y, legend_label=leg)
    D.beampattern_line =  b.line(x,y, legend_label=leg)
    D.look_line_port = Span(location=bs-180,dimension='height',
                     line_color='red', line_width=3)
    D.look_line_starboard = Span(location=bs,dimension='height',
                     line_color='green', line_width=3)

    D.source_line = []
    for n in range(num_sources):
        D.source_line.append(Span(
            location=90-D.array_angle+D.Src['absolute_direction'][n],
            dimension='height', line_color='black', line_width=2))
    b.line([[], []], legend_label='Port Look direction', 
           line_color="red", line_width=2)
    b.line([[], []], legend_label='Starboard Look direction', 
           line_color="green",line_width=2)
    b.line([[], []], legend_label='Source direction(s)', 
           line_color="black", line_width=2)
    b.renderers.extend([D.look_line_port,D.look_line_starboard, 
                        *D.source_line])

    D.b = b
    
    return D

In [None]:
info = calculate_everything(info,verbose=True)

In [None]:
info = plot_everything(info,verbose=True)

In [None]:
show(gridplot([info.p, info.q, info.b], ncols=3))

In [None]:
def update(D,θ,look_direction,
           num_elements,
           Source_distance,wavelength,sample_length,t_0):
    
    num_sources = 1
    
    #print(num_elements)
    #print(len(D.Arr))
    
    D = calculate_everything(D,
                             num_elements=num_elements,
                             array_angle=θ,
                             num_sources=num_sources,
                             src_radius=Source_distance,
                             wavelength=wavelength,
                             sample_length=sample_length,
                             t_0=t_0,
                            )

    look_index = np.argmin(np.abs(D.phi-look_direction))
    bs = D.phi[look_index] # beam steer
    
    title_str = (f'Look directions : ({look_direction}°,' +
                   f'{look_direction-180}°), ' +
                   f'Array angle : {D.array_angle}°, ' +
                   f'λ : {D.Src["wavelength"][0]:0.2f} m, ' +
                   f'f : {D.c/D.Src["wavelength"][0]:0.2f} Hz'
                  )
    
    #-------------------------------------------------------------------
    # Update the spacial window
    #-------------------------------------------------------------------
    D.spacial.data_source.data['image'] = [D.Z]
    
    for k in range(num_elements):
        x = [D.Arr[k][0]]
        y = [D.Arr[k][1]]
        D.sensor[k].data_source.data = {'x': x, 'y': y}
    
    ang = np.deg2rad(look_direction + 90 + D.array_angle)
    x = [0,2*np.cos(ang)]
    y = [0,2*np.sin(ang)]
    D.look_beam_port.data_source.data = {'x': x, 'y': y}
    ang = np.deg2rad(look_direction - 90 + D.array_angle)
    x = [0,2*np.cos(ang)]
    y = [0,2*np.sin(ang)]
    D.look_beam_starboard.data_source.data = {'x': x, 'y': y}

    # Update the beamwidths to the plot
    ang1 = np.deg2rad(D.beam_widths[0,look_index] + 90 + D.array_angle)
    ang2 = np.deg2rad(D.beam_widths[1,look_index] + 90 + D.array_angle)
    x = [0,2*np.cos(ang1),2*np.cos(ang2)]
    y = [0,2*np.sin(ang1),2*np.sin(ang2)]
    D.beam_shade_port.data_source.data = {'x': x, 'y': y}
    ang1 = np.deg2rad(D.beam_widths[0,look_index] - 90 + D.array_angle)
    ang2 = np.deg2rad(D.beam_widths[1,look_index] - 90 + D.array_angle)
    x = [0,2*np.cos(ang1),2*np.cos(ang2)]
    y = [0,2*np.sin(ang1),2*np.sin(ang2)]
    D.beam_shade_starboard.data_source.data = {'x': x, 'y': y}
    D.p.title.text = title_str
    
    #-------------------------------------------------------------------
    # Update the temporal window
    #-------------------------------------------------------------------
    plot_max = 0    
    for k in range(num_elements):
        D.time_series[k].data_source.data = {'x': D.t,
                    'y': pad(D.Y[k],D.m[look_index]*k)}
        plot_max = np.max([plot_max,np.max(D.Y[k])])

    D.delay_sum.data_source.data = {'x': D.t,
                    'y': D.sum_F[D.m[look_index]]}

    # Add a shading for the region used to calculate the beams
    x = [D.t_0, D.t_0,D.t_0+D.sample_length,D.t_0+D.sample_length]
    y = plot_max*np.array([-1,1,1,-1])
    D.sample_patch.data_source.data = {'x': x, 'y': y}

    D.q.title.text = title_str

    #-------------------------------------------------------------------
    # Update the Beamforming window
    #-------------------------------------------------------------------   
    x = np.concatenate([D.phi,D.phi-180])
    y = np.concatenate([D.beams,D.beams])
    D.beampattern_points.data_source.data = {'x': x, 'y': y}
    D.beampattern_line.data_source.data = {'x': x, 'y': y}
    D.look_line_port.location = bs - 180
    D.look_line_starboard.location = bs
    for n in range(num_sources):
        D.source_line[n].location = (D.Src['absolute_direction'][n] + 
                                     90 - D.array_angle)

    D.b.title.text = title_str

    push_notebook()


In [None]:
w = interactive(update, D=fixed(info), 
         θ=(-90, 90, 1),
         look_direction = (0,180,1),
         num_elements=(1,12,1),
         Source_distance=(1,100), 
         wavelength=(0.01,0.2,0.01),
         sample_length=(0.001,0.01,0.001),
         t_0=(0,0.01,0.0001),
        )

for q in w.children:
    q.style = {'description_width': 'initial'}

grid = GridspecLayout(2, 4)

for i in range(2):
    for j in range(4):
        if i*4+j <= 7:
            grid[i, j] = w.children[i*4+j]

In [None]:
show(gridplot([info.p, info.q, info.b], ncols=3), notebook_handle=True)
grid[1,0].value = 0.2
grid
