In [None]:
# Imports and Constants
import sys
import os
import itertools

from collections.abc import Callable
from collections.abc import MutableMapping

from dataclasses import dataclass

from enum import Enum
from copy import deepcopy
from typing import Optional
from numbers import Number
import numpy as np
from scipy import signal
import matplotlib.pyplot as plt
import matplotlib.colors as mplcolors

import IPython.display as ipd
from ipywidgets import interact, interactive, fixed, interact_manual, Layout
import ipywidgets as widgets


π = np.pi
π2 = 2 * np.pi
SPEED_OF_LIGHT = 299792458.0 # m / s

In [None]:
# Define Signal Generators

class Func:
    _f: Callable[[list[float]], list[Number]]

    def __init__(self, *args, func: Callable[[list[float]], list[Number]]):
        self._f = func
    
    def __call__(self, t:list[float]) -> list[Number]:
        return self._f(t)

class EulerFunc(Func):
    A: float # amplitude
    f: float # frequency = c / wavelength
    λ: float # wavelength = c / frequncy
    ϕ: float # phase
    
    def __init__(self,
                 amplitude:float,
                 frequency:Optional[float]=None,
                 wavelength:Optional[float]=None,
                 phase:float=0):
        assert ((frequency is None) != (wavelength is None)), 'Must only specify one of: [frequency, wavelength]'

        if frequency:
            self.f =frequency
            self.λ = SPEED_OF_LIGHT / frequency
        else:
            self.f = SPEED_OF_LIGHT / wavelength
            self.λ = wavelength
        
        self.A=amplitude        
        self.ϕ=phase
        super().__init__(self, func =(lambda t: self.A * np.exp(1j * (π2*self.f*t)+self.ϕ)))
        
class SineFunc(Func):
    A: float # amplitude
    f: float # frequency = c / wavelength
    λ: float # wavelength = c / frequncy
    ϕ: float # phase
    
    def __init__(self,
                 amplitude:float,
                 frequency:Optional[float]=None,
                 wavelength:Optional[float]=None,
                 phase:float=0):
        assert ((frequency is None) != (wavelength is None)), 'Must only specify one of: [frequency, wavelength]'

        if frequency:
            self.f =frequency
            self.λ = SPEED_OF_LIGHT / frequency
        else:
            self.f = SPEED_OF_LIGHT / wavelength
            self.λ = wavelength
        
        self.A=amplitude        
        self.ϕ=phase
        super().__init__(self, func = lambda t: self.A * np.sin((π2 * self.f * t) + self.ϕ))

class CosineFunc(Func):
    A: float # amplitude
    f: float # frequency = c / wavelength
    λ: float # wavelength = c / frequncy
    ϕ: float # phase
    
    def __init__(self,
                 amplitude:float,
                 frequency:Optional[float]=None,
                 wavelength:Optional[float]=None,
                 phase:float=0):
        assert ((frequency is None) != (wavelength is None)), 'Must only specify one of: [frequency, wavelength]'

        if frequency:
            self.f =frequency
            self.λ = SPEED_OF_LIGHT / frequency
        else:
            self.f = SPEED_OF_LIGHT / wavelength
            self.λ = wavelength
        
        self.A=amplitude        
        self.ϕ=phase
        super().__init__(self, func = lambda t: self.A * np.cos(π2*self.f*t+self.ϕ))



# @dataclass
class BinaryEncoder(Func):
    # bits_to_encode: list[bool] # List of 0s and 1s to encode.
    bits_to_encode: np.ndarray[np.bool] # numpy.typing.NDArray[numpy.int32]
    baud_rate: float   # How many times symbol changes per second. Perhaps this should be labed "baud" - I've read conflicing things on this.
    # pulse_duration_in_sec: float
    prefix: list[bool]
    suffix: list[bool]

    
    # msg: np.ndarray
    

    def __init__(self,
        bits_to_encode: list[bool],
        baud_rate: float, # symbols per second 
        sample_rate: float, # samples per second 
        # pulse_duration_in_sec: float,
        prefix: list[Optional[bool]] = [],
        suffix: list[Optional[bool]] = []):
        self.bits_to_encode = np.array(bits_to_encode, dtype=bool)
        self.baud_rate = baud_rate
        self.sample_rate = sample_rate
        
        framed_bits = prefix + bits_to_encode + suffix
        msg_duration = len(framed_bits) / baud_rate

        # self.ts = np.arange(0,len(framed_bits)/baud_rate, sample_rate)
        # self.msg = np.zeros(len(self.ts))

        
        
        # print('framed_bits:',framed_bits)
        # print('msg_duration:',msg_duration)
        # def f(x:float):
        #     orig_x = x
        #     x = np.fmod(x, msg_duration)
        #     symbol_idx = int(x//(1/baud_rate))
        #     print(f'wrapped_t:{orig_x}\t-->\t{x}\t symbol:\t{symbol_idx}')
            
        #     if (framed_bits[symbol_idx] is None):
        #         return 0.0
        #     elif (framed_bits[symbol_idx]):
        #         return 1.0
        #     else:
        #         return -1.0
                
        # super().__init__(self, func = lambda t: np.array(list(map(f, t))))


        def f(ts:float):
            out = np.zeros(len(ts))
            for i,t in enumerate(ts): 
                orig_t = t
                t = np.fmod(t, msg_duration)
                symbol_idx = int(t//(1/baud_rate))
                last_t = t - 1/self.sample_rate
                last_sample_symbol_idx = int((last_t)//(1/baud_rate))

                first_sample_for_symbol = (symbol_idx != last_sample_symbol_idx)
                # print(f"wrapped_t:{orig_t}\t-->\t{t}\t symbol:\t{symbol_idx}\tlast_t:{last_t}\tlast_symbol:\t{last_sample_symbol_idx}\t\t\t{'IS_FIRST_SAMPLE' if first_sample_for_symbol else ''}")

                if first_sample_for_symbol and framed_bits[symbol_idx] is not None:
                    if (framed_bits[symbol_idx] == True):
                        out[i]= 1.0
                    elif (framed_bits[symbol_idx] == False):
                        out[i]= -1.0
                else:
                    out[i]= 0.0
            return out
                
        super().__init__(self, func = f)

def test_binary_encoder():
    binary_encoder=BinaryEncoder(
        bits_to_encode = [True,False,False,True,True,False],
        baud_rate=0.5,
        sample_rate=4,
        prefix = 3*[None,],
        suffix = 8*[None,],
    )
    t = np.arange(0.0,80.0,step=0.25,)
    x = binary_encoder(t)
    plt.plot(t,x)
    plt.show()
test_binary_encoder()




In [None]:

class ConvFunc(Func):
    taps: np.ndarray[float] # Are these taps or is it a filter?

    def __init__(self, *args, taps: np.ndarray[float]):
        self.taps = taps
    
    def __call__(self, t:list[float]) -> list[Number]:
        # return signal.fftconvolve(self.taps,t)
        return np.convolve(self.taps, t, mode='same')

class RaisedCosineFilter(ConvFunc):
    def __init__(self, *args, 
                 num_taps: int,
                 beta: float,
                 symbol_period_in_samples: float,
                ):
        t = np.arange(num_taps) - (num_taps-1)//2
        h = np.sinc(t/symbol_period_in_samples) * np.cos(π * beta*t/symbol_period_in_samples) / (1 - (2*beta*t/symbol_period_in_samples)**2)
        super().__init__(self, taps = h)


def test_raised_cosine_filter():
    baud_rate = 0.25 #sym/sec
    sample_rate = 0.25 #sec/samples
    
    binary_encoder=BinaryEncoder(
        bits_to_encode = [True,False,False,True,True,False],
        baud_rate=baud_rate,
        sample_rate=sample_rate,
        prefix = 3*[None,],
        suffix = 8*[None,],
    )

    
    
    rcf = RaisedCosineFilter(num_taps=101,
                             beta=0.3,
                             symbol_period_in_samples=(1/(sample_rate*baud_rate)))
    t = np.arange(0.0,300.0,step=0.5)

    plt.plot(t,binary_encoder(t))
    x = rcf(binary_encoder(t))
    plt.plot(t,x[:len(t)])
    plt.show()
    
test_raised_cosine_filter()

In [None]:
# t = np.linspace(0,π2,16)

# euler = EulerFunc(amplitude=1.0, frequency=π/2)
# sine = SineFunc(1.0, π/2)
# cosine = CosineFunc(1.0, π/2)
# i_q_approach = Func(func = (lambda t: sine(t)+cosine(t)))

# print(euler(t))
# print(sine(t))
# print(cosine(t))
# print(i_q_approach(t))

In [None]:
import sys
import platform

os_platform = platform.system()
# print('os_platform',os_platform)
match os_platform:
    case 'Darwin':
        os.environ['DYLD_LIBRARY_PATH'] = '/opt/homebrew/lib'
    case 'Linux':
        # os.environ['LD_LIBRARY_PATH'] = '/opt/homebrew/lib'
        # sys.path.append('/opt/homebrew/lib')
        pass
    case 'Windows':
        pass
    case _:
        raise TypeError("Unsupported OS")


In [None]:
# Define RTL-SDR Sampler

from rtlsdr import RtlSdr

rtlsdr_valid_gain_values = ['auto']

def update_valid_gain_values(sdr) -> None:
    # rtlsdr_valid_gain_values = ['auto'].extend(sdr.valid_gains_db)
    # generator_parameters[SingalGeneratorType.RtlSdrSampler]['gain'].options = rtlsdr_valid_gain_values
    pass

class RtlSdrSampler(Func):
    sample_rate: float # Hz
    center_freq: float # Hz
    freq_correction: float # PPM
    gain: float # phase
    
    def __init__(self,
                 sample_rate:float,
                 center_freq:float,
                 freq_correction:float,
                 gain:float='auto'):
        self.sample_rate = sample_rate
        self.center_freq = center_freq
        self.freq_correction = freq_correction
        self.gain = gain

        def sample_sdr(num_samples: int) -> list[complex]:
            sdr = RtlSdr()
            sdr.sample_rate = self.sample_rate
            sdr.center_freq = self.center_freq
            sdr.freq_correction = self.freq_correction
            sdr.gain = self.gain
            update_valid_gain_values(sdr)
            samples = sdr.read_samples(num_samples)
            sdr.close()
            return samples
        super().__init__(self, func=lambda t: sample_sdr(len(t)))




def test_RtlSdrSampler():
    rtl_sampler = RtlSdrSampler(
    sample_rate = 2.048e6, # Hz
    center_freq = 100e6, # Hz
    freq_correction = 60,  # PPM
    gain = 49.6)

    t = range(512)
    x = rtl_sampler(t)
    plt.plot(x.real)
    plt.plot(x.imag)
    plt.legend(["I", "Q"])
    plt.show()

    t = range(2048)
    x = rtl_sampler(t)
    plt.plot(x.real)
    plt.plot(x.imag)
    plt.legend(["I", "Q"])
    plt.show()

    t = range(4096)
    x = rtl_sampler(t)
    plt.plot(x.real)
    plt.plot(x.imag)
    plt.legend(["I", "Q"])
    plt.show()

# test_RtlSdrSampler()

In [None]:
class SingalGeneratorType(Enum):
    WaveGenerator = 0
    BinarySignalGenerator = 1
    RtlSdrSampler = 2
    

class NoiseType(Enum):
    NoNoise = 0
    AdditiveWhiteGaussianNoise = 1

class PulseShapingApproach(Enum):
    NoShaping = 0
    RaisedCosine = 1


class WindowingApproach(Enum):
    NoWindowing = 0
    Hamming = 1

def flatten_dict(dictionary, parent_key='', separator='_'):
    items = []
    for key, value in dictionary.items():
        normalized_parent_key = parent_key
        if isinstance(parent_key, Enum):
            normalized_parent_key=parent_key.name
        new_key = normalized_parent_key + separator + str(key) if parent_key else key
        if isinstance(value, MutableMapping):
            items.extend(flatten_dict(value, new_key, separator=separator).items())
        else:
            items.append((new_key, value))
    return dict(items)

def convert_to_db(value:float, reference_value:float, is_power_or_energy_value: bool = True) -> float:
    '''
    Reference: https://dspillustrations.com/pages/posts/misc/decibel-conversion-factor-10-or-factor-20.html
    '''
    if is_power_or_energy_value:
        return 10*np.log10(value/reference_value)
    else:
        return 20*np.log10(value/reference_value)

def cart2pol(x, y):
    rho = np.sqrt(x**2 + y**2)
    phi = np.arctan2(y, x)
    return(rho, phi)

def pol2cart(rho, phi):
    x = rho * np.cos(phi)
    y = rho * np.sin(phi)
    return(x, y)

def wrap_plot(ys: list[list[float]],
        x:Optional[list[float]]=None,
        x_ax_label:str = '',
        y_ax_label:str = '',
        y_labels:Optional[list[str]]=None,
        set_y_log_scale:bool = False,
        set_x_log_scale:bool = False,
        x_range: Optional[tuple[float,float]] = None,
        y_range: Optional[tuple[float,float]] = None,
        marker=None,
        linestyle:str='-',
        max_samples_per_series:Optional[int]=None) -> None:
    if x is None:
        x = np.arange( len(ys[0]))
    assert((y_labels == None) or (len(ys) == len(y_labels)))
    
    color_cycle = itertools.cycle(mplcolors.TABLEAU_COLORS)
    
    for i, (y, color) in enumerate(zip(ys, color_cycle)):
        assert(len(x) == len(y)), f"x and y must have same length. Instead x has {len(x)} items and y has {len(y)}."
        line_label = y_labels[i] if y_labels else None 
        x_s = x
        y_s = y
        if max_samples_per_series is not None:
            x_s=x_s[:max_samples_per_series]
            y_s=y_s[:max_samples_per_series]
        
        plt.plot(x_s, y_s, color=color, marker=marker, linestyle=linestyle, label=line_label)
        
    # plt.plot(f[np.argmax(X_mag)], np.max(X_mag), '') # show max
    plt.grid()
    
    if x_ax_label:
        plt.xlabel(x_ax_label)
    if y_ax_label:
        plt.ylabel(y_ax_label)
    
    if set_y_log_scale:
        plt.yscale('symlog')
    if set_x_log_scale:
        plt.xscale('symlog')

    if x_range:
        plt.xlim(x_range[0], x_range[1])
    if y_range:
        plt.ylim(y_range[0], y_range[1])
    plt.show()

def wrap_scatter(
        xy_s:list[tuple[list[float],list[[float]]]],
        x_ax_label:str = '',
        y_ax_label:str = '',
        set_y_log_scale:bool = False,
        set_x_log_scale:bool = False,
        x_range: Optional[tuple[float,float]] = None,
        y_range: Optional[tuple[float,float]] = None,
        marker=None,
        max_samples_per_series:Optional[int]=None) -> None:

    color_cycle = itertools.cycle(mplcolors.TABLEAU_COLORS)
    
    for i, ((x,y), color) in enumerate(zip(xy_s, color_cycle)):   
        x_s = x
        y_s = y
        if max_samples_per_series is not None:
            x_s=x_s[:max_samples_per_series]
            y_s=y_s[:max_samples_per_series]
        plt.scatter(x_s,y_s, color=color, marker=marker)
        
    plt.grid()
    
    if x_ax_label:
        plt.xlabel(x_ax_label)
    if y_ax_label:
        plt.ylabel(y_ax_label)
    
    if set_y_log_scale:
        plt.yscale('symlog')
    if set_x_log_scale:
        plt.xscale('symlog')

    if x_range:
        plt.xlim(x_range[0], x_range[1])
    if y_range:
        plt.ylim(y_range[0], y_range[1])
    plt.show()


In [None]:
# Style/layout tweaks added to all widgets.

def default_widget_params():
    return {'style':{'description_width': 'initial'},'layout':Layout(width='90%')}

def default_widget_params_starts_hidden():
    return {'style':{'description_width': 'initial'},'layout':Layout(width='90%', visibility='hidden')}


sample_generator_selection_widget=widgets.Dropdown(value=SingalGeneratorType.BinarySignalGenerator, options=SingalGeneratorType, **default_widget_params())

sampling_parameters = {
    # 'num_samples': widgets.IntLogSlider(value=44100*10, min=101, max=2049, step=0.2, **default_widget_params()),
    'sample_length_in_sec': widgets.FloatSlider(value=5, min=0.1, max=120, step=0.2, **default_widget_params()),
    'sample_rate': widgets.FloatLogSlider(value=44100, base=10, min=-1, max=10, step=0.2, **default_widget_params()),
}

# Parameters for various signal generators.
generator_parameters = {
    SingalGeneratorType.WaveGenerator : {
        'amplitude_in_phase' : widgets.FloatSlider(value=1.0, min=-10.0, max=10.0, step=0.1, **default_widget_params_starts_hidden()),
        'frequency' : widgets.FloatLogSlider(value=0.5, base=10, min=-10, max=10, step=0.1, **default_widget_params_starts_hidden()),
        'phase' : widgets.FloatSlider(value=0.0, min=-np.pi, max=np.pi, step=0.0001, **default_widget_params_starts_hidden()),
        'amplitude_quadriture' : widgets.FloatSlider(value=1.0, min=-10.0, max=10.0, step=0.1, **default_widget_params_starts_hidden()),
    },
    SingalGeneratorType.BinarySignalGenerator : {
        'bits_to_encode' : widgets.Text(value='0100100001100101011011000110110001101111', **default_widget_params_starts_hidden()),
        'baud_rate' : widgets.FloatSlider(value=10, min=1e-3, max=1000, step=0.1, **default_widget_params_starts_hidden()),
        'pulse_shaping_method' : widgets.Dropdown(value=PulseShapingApproach.RaisedCosine, options=PulseShapingApproach),
        'raised_cos_num_taps' : widgets.IntSlider(value=2600, min=101, max=10001, step=2, **default_widget_params_starts_hidden()),
        'raised_cos_beta' : widgets.FloatSlider(value=0.3, min=0, max=1.0, step=0.01, **default_widget_params_starts_hidden()),
    },
    SingalGeneratorType.RtlSdrSampler : {
        'center_freq' : widgets.FloatLogSlider(value=100e6, base=10, min=-10, max=10, step=0.1, **default_widget_params_starts_hidden()),
        'freq_correction' : widgets.IntSlider(value=60, min=0, max=1000, step=1, **default_widget_params_starts_hidden()),
        'gain' : widgets.Dropdown(value='auto', options=rtlsdr_valid_gain_values),
    }
}

# Parameters for controlling simulated noise.
noise_parameters ={
    NoiseType.AdditiveWhiteGaussianNoise : {
        'mean' : widgets.FloatSlider(value=0, min=-10, max=10, step=0.1, **default_widget_params()),
        'std' : widgets.FloatSlider(value=0, min=1e-4, max=1, step=1e-2, **default_widget_params()),
    }
}

# Parameters for controlling simulated noise.
# For more info see: https://pysdr.org/content/frequency_domain.html#windowing
windowing_parameters = {
    'windowing_selector' : widgets.Dropdown(value=WindowingApproach.Hamming, options=WindowingApproach,**default_widget_params())
}

filtering_parameters = {
    'squelch_floor_percentage' : widgets.FloatSlider(value=10, min=0, max=100, step=0.1, **default_widget_params())
}

def select_sample_generator(selected_generator: SingalGeneratorType) -> None:
    for gen_name, gen_parameters in generator_parameters.items():
        for param_name, param_widget in gen_parameters.items():
            # print ('param:', param_name, 'gen_name(',gen_name.name,') =?= selected_generator(',selected_generator.name,')')
            if selected_generator.name == gen_name.name:
                # print('matched for:', gen_name.name)
                param_widget.layout.visibility = 'visible'
                param_widget.layout.display = 'flex'
            else:
                param_widget.layout.visibility = 'hidden'
                param_widget.layout.display = 'none'
    pass

# interact(select_sample_generator, x=sample_generator_selection_widget);

transmitted_waveform=None

def generate_and_display_sample(**kwargs):
    global transmitted_waveform


    select_sample_generator(sample_generator_selection_widget.value)
    print('sample_generator_selection_widget.value:',sample_generator_selection_widget.value)
    signal_gen_params = generator_parameters[sample_generator_selection_widget.value]

    sample_length_in_sec = sampling_parameters['sample_length_in_sec'].value
    sample_rate = sampling_parameters['sample_rate'].value
    num_samples = int(sample_length_in_sec * sample_rate)

    t = np.linspace(start = 0,
                    stop = sample_length_in_sec,
                    num = num_samples)
    
    s_func = None
    match sample_generator_selection_widget.value:

        case SingalGeneratorType.WaveGenerator:
            amplitude_I = signal_gen_params['amplitude_in_phase'].value
            amplitude_Q = signal_gen_params['amplitude_quadriture'].value
    
            frequency = signal_gen_params['frequency'].value
            phase = signal_gen_params['phase'].value
    
            # For more info see: https://pysdr.org/content/sampling.html#quadrature_sampling
            s_I_component = CosineFunc(amplitude=amplitude_I, frequency=frequency)
            s_Q_component = SineFunc(amplitude=amplitude_Q, frequency=frequency) # 90 degrees out of phase of I-component
            s_func = Func(func = lambda t: s_I_component(t) + s_Q_component(t))
            
            wrap_plot([s_func(t),s_I_component(t),s_Q_component(t)],t,
                      x_ax_label='Time (seconds)',
                      y_labels=['Combined', 'I-component(Cosine)', 'Q-component(Sine)'])
        case SingalGeneratorType.BinarySignalGenerator:
            print('binary')
            bits_to_encode_str = signal_gen_params['bits_to_encode'].value
            baud_rate = signal_gen_params['baud_rate'].value
    
            bits_to_encode = []
            for bit_char in bits_to_encode_str:
                if bit_char == '1':
                    bits_to_encode.append(True)
                elif bit_char == '0':
                    bits_to_encode.append(False)
                elif bit_char == '_':
                    bits_to_encode.append(None)
                else:
                    raise ValueError('Bit must be 1/0/_')
            
            binary_encoder=BinaryEncoder(
                bits_to_encode = bits_to_encode,
                baud_rate=baud_rate,
                sample_rate=sample_rate,
                prefix = 3*[None,],
                suffix = 7*[None,],
            )
            
            # wrap_plot([binary_encoder(t)], t, x_ax_label='t', y_ax_label='binary encoded', set_y_log_scale=False,set_x_log_scale=False)


            match signal_gen_params['pulse_shaping_method'].value:
                case PulseShapingApproach.NoShaping:
                    s_func = Func(func = lambda x: binary_encoder(x))
                case PulseShapingApproach.RaisedCosine:
                    rcf = RaisedCosineFilter(
                                num_taps=signal_gen_params['raised_cos_num_taps'].value,
                                beta=signal_gen_params['raised_cos_beta'].value,
                                symbol_period_in_samples=(sample_rate/baud_rate))
                    s_func = Func(func = lambda x: rcf(binary_encoder(x)))
                case _:
                    raise ValueError('Unsupported Pulse Shaping')
        
        case _:
            raise ValueError('Unsupported Sample Generator')

    s = s_func(t)
    
    # Generate some gaussian white noise to simulate real world conditions.
    awgn_params = noise_parameters[NoiseType.AdditiveWhiteGaussianNoise]
    awgn = np.random.normal(loc=awgn_params['mean'].value,scale=awgn_params['std'].value, size=len(s))
    s = s + awgn

    wrap_plot([s],t,
              x_ax_label='Time (seconds)',
              y_labels=['Final Signal'])
    transmitted_waveform = (s.copy(),t.copy())

    # Select a Windowing
    if(windowing_parameters['windowing_selector'].value == WindowingApproach.Hamming):
        s = s * np.hamming(len(s))
    
    
    # FFT 
    #
    # S = np.fft.fft(s)
    # S = np.fft.fftshift(S)
    # freqs = np.linspace(-np.ceil(sample_rate/2),np.ceil(sample_rate/2), num_samples)

    # FFT: only non-negative frequencies since our input signal is real)
    S = np.fft.rfft(s)
    print('len(S):',len(S))
    freqs = np.linspace(0,np.ceil(sample_rate/2), num_samples//2+1)
    fft_mag = np.abs(S)
    fft_phase = np.angle(S)

    max_fft_mag = np.max(fft_mag)
    print('max_fft_mag:',max_fft_mag)
    squelch_floor_val = max_fft_mag * (filtering_parameters['squelch_floor_percentage'].value/100.0)
    
    filtered_mag = fft_mag[fft_mag > squelch_floor_val]
    filtered_phase = fft_phase[fft_mag > squelch_floor_val]
    
    wrap_plot([fft_mag, np.full_like(fft_mag,squelch_floor_val)], freqs, x_ax_label='Frequency (Hz)', y_ax_label='FFT Magnitude', set_y_log_scale=False,set_x_log_scale=True)
    wrap_plot([fft_phase], freqs, x_ax_label='Frequency (Hz)', y_ax_label='FFT Phase', set_y_log_scale=False,set_x_log_scale=True)

    # Plot IQ Constellations 
    (iq_x, iq_y) = pol2cart(filtered_mag, filtered_phase)
    print(iq_x.shape)
    print(iq_y.shape)
    window_max = np.max(np.sqrt(iq_x**2 + iq_y**2)) * 1.1
    wrap_scatter([(iq_x, iq_y),],
                 x_ax_label='I', y_ax_label='Q',
                 x_range=(-window_max,window_max),
                 y_range=(-window_max,window_max))

    pass

flattened_params = {}
flattened_params.update({'select_sample_generator':sample_generator_selection_widget})
flattened_params.update(flatten_dict(sampling_parameters))
flattened_params.update(flatten_dict(generator_parameters))
flattened_params.update(flatten_dict(noise_parameters))
flattened_params.update(flatten_dict(windowing_parameters))
flattened_params.update(flatten_dict(filtering_parameters))


interact(generate_and_display_sample,**flattened_params);


# sr = 22050 # sample rate
# T = 2.0    # seconds
# t = numpy.linspace(0, T, int(T*sr), endpoint=False) # time variable
# x = 0.5*numpy.sin(2*numpy.pi*440*t)                # pure sine wave at 440 Hz

# sr = 22050 # sample rate
# T = 2.0    # seconds
# t = np.linspace(0, T, int(T*sr), endpoint=False) # time variable
# x = 0.5*np.sin(π2*440*t)                # pure sine wave at 440 Hz

# ipd.Audio(x, rate=sr) # load a NumPy array

In [None]:
# print(audio_info)
# print(len(audio_info))
# print(sampling_parameters['sample_rate'].value)
# audio_info2 = np.repeat(audio_info,10000)

# ipd.Audio(
#     data=audio_info2,
#     rate=sampling_parameters['sample_rate'].value) # load a NumPy array

In [None]:
# Apply a phase + frequency shift
phase_shift = 0.4 #
# frequency_offset =  0.1

time_s = transmitted_waveform[1]
# recieved_waveform = transmitted_waveform[0] * np.exp(1j*2*np.pi*frequency_offset*time_s) # perform freq shift
recieved_waveform = transmitted_waveform[0]
time_s += phase_shift

In [None]:


num_samples = len(samples)

mu = 0 # initial estimate of phase of sample
out = np.zeros(num_samples + 10, dtype=np.complex64)
out_rail = np.zeros(num_samples + 10, dtype=np.complex64) # stores values, each iteration we need the previous 2 values plus current value
i_in = 0 # input samples index
i_out = 2 # output index (let first two outputs be 0)




while i_out < num_samples and i_in+16 < num_samples:
    out[i_out] = samples[i_in + int(mu)] # grab what we think is the "best" sample
    out_rail[i_out] = int(np.real(out[i_out]) > 0) + 1j*int(np.imag(out[i_out]) > 0)
    x = (out_rail[i_out] - out_rail[i_out-2]) * np.conj(out[i_out-1])
    y = (out[i_out] - out[i_out-2]) * np.conj(out_rail[i_out-1])
    mm_val = np.real(y - x)
    mu += sps + 0.3*mm_val
    i_in += int(np.floor(mu)) # round down to nearest int since we are using it as an index
    mu = mu - np.floor(mu) # remove the integer part of mu
    i_out += 1 # increment output index
out = out[2:i_out] # remove the first two, and anything after i_out (that was never filled out)



samples = out # only include this line if you want to connect this code snippet with the Costas Loop later on


