In [None]:
import json
from textwrap import dedent as d
import dash
from dash.dependencies import Input, Output, State
import dash_core_components as dcc
import dash_html_components as html
from jupyter_dash import JupyterDash
import numpy as np
import dash_bootstrap_components as dbc
import random as rnd
from netifaces import AF_INET, AF_INET6, AF_LINK, AF_PACKET, AF_BRIDGE, ifaddresses

## Python deps

  * jupyter_dash
  * dash_bootstrap_components

## Functions

In [None]:
f_init = 20000
data_r_init = 2000
ref_init = 0.7
alpha_init = 1.0
window_init=6
mode_init = 'sin'

fs =  1000000
N  = 10000
N_dots   = 4
dots_pos = [((i+1)*N/fs/(N_dots+1),1) for i in range(N_dots)]
trace_t = [i/fs for i in range(N)]

# Controller functions

def get_envelope_handles(shapes):
    dots = []
    for s in shapes:
        name = s['name'] or ''
        if name.startswith('envelope_'):
            dots.append((s['xanchor'], s['yanchor']))
    return dots

def get_num_envelope_handles(shapes):
    max_index = -1
    for s in shapes:
        name = s['name'] or ''
        if name.startswith('envelope_'):
            cur_index = int(name.strip('envelope_'))
            max_index = max(cur_index,max_index)
    return max_index + 1

def get_envelope_handle(shapes,index):
    for s in shapes:
        if s['name'] == 'envelope_' + str(index):
            return s
    return None


def add_envelope_handle(shapes, xmax):
    # Add a new envelope handle
    # Use same y value and halfway between the furthest x handle and a maximum
    num_handles = get_num_envelope_handles(shapes)
    shape_template = get_envelope_handle(shapes,num_handles-1).copy()
    shape_template.update(
        {   'name'   : 'envelope_'+str(num_handles),
            'xanchor':(shape_template['xanchor']+xmax)/2
        })
    shapes.append(shape_template)

def rm_envelope_handle(shapes, index=None):
    # Remove envelope_handle with supplied index (of with highest index)
    if index==None:
        index = get_num_envelope_handles(shapes)-1
        # Never implicitly delete the last remaining handle
        if index == 0:
            return
    
    for (i,s) in enumerate(shapes):
        if s['name'] == 'envelope_'+str(index):
            shapes.pop(i)

# Model functions

def calc_envelope(dots):
    sorted_dots = dots.copy()
    sorted_dots.sort(key=lambda x: x[0])
    env = np.interp(trace_t, [p[0] for p in sorted_dots],
                             [p[1] for p in sorted_dots])
    return env

random_data = [rnd.randint(0,3) for _ in range(N)]
def calc_ref(mode, fc, data_rate):
    carrier = (
        np.array([np.cos(2*np.pi*fc*t) for t in trace_t]),
        np.array([np.sin(2*np.pi*fc*t) for t in trace_t])
    )
    if mode=='sin':
        return carrier
    if mode=='am':
        data = 0.3*np.array([np.cos(2*np.pi*data_rate*t) for t in trace_t])+0.7
        return (
            carrier[0]*data,
            carrier[1]*data
        )
    if mode=='fm':
        b = 3
        return (
            np.array([np.cos(2*np.pi*fc*t + b * np.cos(2*np.pi*data_rate*t)) for t in trace_t]),
            np.array([np.sin(2*np.pi*fc*t + b * np.cos(2*np.pi*data_rate*t)) for t in trace_t])
        )
    if mode.startswith('qpsk'):
        data_rate = fc/int(fc/data_rate)
        samples_per_sym = fc/(data_rate)
        
        #Generate RRC pulse shaping filter
        num_weights = 1000 # This is totally ridiculous! Only this big so we can handle large differences between data rate and f_c 
                           # Might become an issue when running on the board?
        alpha = 0.5
        x = 0.9999*np.arange(-int(num_weights/2),int(num_weights/2),1)/samples_per_sym
        raised_cos_weights = np.sinc(x)*(np.cos(alpha*np.pi*x)/(1-((2*alpha*x)**2)))
        
        #Zero padded symbols
        qpsk_map = [1+1j, 1-1j, -1+1j, -1-1j]
        symbols = np.zeros((N,), dtype=np.cdouble)
        for i in range(int((N)/samples_per_sym)):
            symbols[int(i*samples_per_sym)] = qpsk_map[random_data[i]]
        
        #Modulate
        modulated = np.convolve(symbols,raised_cos_weights, mode='same')
        max_sig = max([max(modulated.real),max(modulated.imag)])
        if mode=='qpsk_if':
            modulated = modulated * np.array([np.cos(2*np.pi*fc*t) + 1j*np.sin(2*np.pi*fc*t) for t in trace_t])
        modulated = modulated / max_sig
        return (modulated.real, modulated.imag)
    
(ref_i,ref_q) = calc_ref('sin', f_init, data_r_init)

env = calc_envelope(dots_pos)
trace_i = ref_i * env
trace_q = ref_q * env

## Overlay setup

In [None]:
from pynq import Overlay,allocate
ol = Overlay("block_design.bit")

#Avoid overhead by aliasing ips
agc = ol.agc
dma_agc_i = ol.dma_agc_i
dma_agc_q = ol.dma_agc_q
dma_agc_g = ol.dma_agc_g
dma_in_i = ol.dma_in_i
dma_in_q = ol.dma_in_q

In [None]:
ADDR_EN    = 0
ADDR_WIN   = 4
ADDR_REF   = 8
ADDR_ALPHA = 12

def cfg_agc(en, win, ref, alpha):
    agc.write(ADDR_EN, en)
    agc.write(ADDR_WIN, int(win))
    agc.write(ADDR_REF, int(np.log10(ref*2**15)*2**12))
    agc.write(ADDR_ALPHA, int(alpha*2**6))

In [None]:
cfg_agc(1, 5, 0.5, 1.0)

In [None]:
#[buf_in_i, buf_in_q] = [allocate(shape=(N,), dtype=np.int16) for _ in range(2)]
#[buf_agc_i, buf_agc_q] = [allocate(shape=(N,), dtype=np.int16) for _ in range(2)]
#buf_agc_g = allocate(shape=(N,), dtype=np.uint32)

buf_in_i = allocate(shape=(N,), dtype=np.int16)
buf_in_q = allocate(shape=(N,), dtype=np.int16)
buf_agc_i = allocate(shape=(N,), dtype=np.int16)
buf_agc_q = allocate(shape=(N,), dtype=np.int16)
buf_agc_g = allocate(shape=(N,), dtype=np.uint32)

In [None]:
def agc_go(in_i, in_q):
    for i in range(len(buf_in_i)):
        buf_in_i[i] = int(in_i[i]*(2**15-1))
        buf_in_q[i] = int(in_q[i]*(2**15-1))
        
    dma_agc_i.recvchannel.transfer(buf_agc_i)
    dma_agc_q.recvchannel.transfer(buf_agc_q)
    dma_agc_g.recvchannel.transfer(buf_agc_g)
    dma_in_i.sendchannel.transfer(buf_in_i)
    dma_in_q.sendchannel.transfer(buf_in_q)
    
    dma_agc_q.recvchannel.wait()
    dma_agc_i.recvchannel.wait()
    
    dma_agc_g.recvchannel.wait()
    dma_in_i.sendchannel.wait()
    dma_in_q.sendchannel.wait()
    
    outs = (np.array(buf_agc_i), np.array(buf_agc_q))
    
    # Purge to cover up bug...
    for _ in range(2):
        dma_agc_i.recvchannel.transfer(buf_agc_i)
        dma_agc_q.recvchannel.transfer(buf_agc_q)
        dma_agc_g.recvchannel.transfer(buf_agc_g)

        dma_agc_q.recvchannel.wait()
        dma_agc_i.recvchannel.wait()
        dma_agc_g.recvchannel.wait()
    return outs

## View setup

In [None]:
app = JupyterDash(__name__, external_stylesheets=[dbc.themes.BOOTSTRAP, 'https://maxcdn.bootstrapcdn.com/font-awesome/4.7.0/css/font-awesome.min.css'])
#app = dash.Dash(__name__, external_stylesheets=[dbc.themes.BOOTSTRAP, 'https://maxcdn.bootstrapcdn.com/font-awesome/4.7.0/css/font-awesome.min.css'])

in_control_panel = dbc.Container([
    dbc.Row(
        dbc.Col([
            dbc.ListGroup([
                dbc.ListGroupItem("Input Signal", color='secondary'),
                dbc.ListGroupItem([
                    dbc.InputGroup([
                        dbc.InputGroupAddon("Type", addon_type="prepend"),
                        dbc.Select(id='in-sig-type',options=[
                            {"label": "Sinusoid", "value": 'sin'},
                            {"label": "Amplitude Modulated", "value": 'am'},
                            {"label": "Frequency Modulated", "value": 'fm'},
                            {"label": "QPSK Baseband", "value": 'qpsk_bb'},
                            {"label": "QPSK IF", "value": 'qpsk_if'},
                        ], value=mode_init),
                ], className='mt-1'),
                     dbc.InputGroup([
                        dbc.InputGroupAddon("Data", addon_type="prepend"),
                        dcc.Slider(id='in-f-data', min=500,max=10000, value=data_r_init, className="form-control dbc-slider"),
                        dbc.InputGroupAddon(str(data_r_init)+" Hz",id='in-f-data-label', addon_type="append"),
                ], className='mt-1'),
                    dbc.InputGroup([
                        dbc.InputGroupAddon("Carrier", addon_type="prepend"),
                        dcc.Slider(id='in-f-carrier',min=10,max=100,step=1, value=(f_init/1000), className="form-control dbc-slider"),
                        dbc.InputGroupAddon(str(f_init/1000)+" kHz", id='in-f-carrier-label', addon_type="append")
                ], className='mt-1'),
                ]),
             ])
        ])
    ),
    
    dbc.Row(
        dbc.Col([
            dbc.ListGroup([
                dbc.ListGroupItem("Envelope handles", color='secondary'),
                dbc.ListGroupItem(
                    dbc.ButtonGroup([
                        dbc.Button(html.Span('',className='fa fa-plus'), id='btn-add-handle',color="primary", outline=True, className='w-100'),
                        dbc.Button(html.Span('',className='fa fa-minus'), id='btn-rm-handle', color="danger", outline=True, className='w-100')
                    ], className='d-flex')
                )
             ])
        ])
    ,className='mt-3')
])

agc_control_panel = dbc.Container([
    dbc.Row(
        dbc.Col([
            dbc.ListGroup([
                dbc.ListGroupItem("AGC Parameters", color='secondary'),
                dbc.ListGroupItem([
                    dbc.InputGroup([
                        dbc.InputGroupAddon("Ref", addon_type="prepend"),
                        dcc.Slider(id='agc-ref',min=0,max=1,step=0.001, value=(ref_init), className="form-control dbc-slider"),
                        dbc.InputGroupAddon(str(ref_init*100)+"%", id='agc-ref-label', addon_type="append")
                ], className='mt-1'),
                     dbc.InputGroup([
                        dbc.InputGroupAddon("Alpha", addon_type="prepend"),
                        dcc.Slider(id='agc-alpha', min=0,max=2,step=0.002, value=alpha_init, className="form-control dbc-slider"),
                        dbc.InputGroupAddon(str(alpha_init),id='agc-alpha-label', addon_type="append"),
                ], className='mt-1'),
                dbc.InputGroup([
                        dbc.InputGroupAddon("Window", addon_type="prepend"),
                        dcc.Slider(id='agc-window', min=0,max=31,step=1, value=window_init, className="form-control dbc-slider"),
                        dbc.InputGroupAddon(str(2**window_init)+" Samples",id='agc-window-label', addon_type="append"),
                ], className='mt-1'),
                ]),
             ])
        ]))
])

in_graph = dcc.Graph(
    id='graph-inputs',
    className='six columns',
    figure={
        'data': [{
            'x': trace_t,
            'y': trace_i,
            'name': 'I',
            'mode': 'lines'
        }, {
            'x': trace_t,
            'y': trace_q,
            'name': 'Q',
            'mode': 'lines'
        }],
        'layout': {
            'title': 'Input Signal',
            'xaxis': {'title' : 'Time (s)'},
            'yaxis': {'title' : 'Normalised Amplitude'},
            'height': 350,
            'margin': dict(l=50,
                           r=0,
                           b=50,
                           t=50,
                           pad=4),
            'legend':dict(
                        yanchor="bottom",
                        y=1,
                        xanchor="right",
                        x=1,
            ),
            'shapes': [{
                'type': 'circle',
                'name': 'envelope_'+str(i),
                'x0': -5,
                'x1': 5,
                'xref': 'x',
                'xanchor': x,
                'xsizemode':'pixel',

                'y0': -5,
                'y1': 5,
                'yref': 'y',
                'yanchor': y,
                'ysizemode':'pixel',
                'fillcolor': 'rgb(165, 10, 235)',
                'line': {
                    'width': 4,
                    'color': 'rgb(165, 10, 235)'
                }
            } for (i,(x,y)) in enumerate(dots_pos)]
        }
    },
    config={
        'editable': True,
        'edits': {
            'shapePosition': True
        }
    }
)

agc_graph = dcc.Graph(
    id='graph-agc',
    figure={
        'data': [{
            'x': trace_t,
            'y': trace_i,
            'name': 'I',
            'mode': 'lines'
        }, {
            'x': trace_t,
            'y': trace_q,
            'name': 'Q',
            'mode': 'lines'
        }],
        'layout': {
            'title': 'Output Signal',
            'xaxis': {'title' : 'Time (s)'},
            'yaxis': {'title' : 'Normalised Amplitude'},
            'height': 350,
            'margin': dict(l=50,
                           r=0,
                           b=50,
                           t=50,
                           pad=4),
            'legend':dict(
                        yanchor="bottom",
                        y=1,
                        xanchor="right",
                        x=1,
            ),
        }
    }
)
app.layout = html.Div(style={'font-size':'12px'},className='row', children=[
    dbc.Container([
        dbc.ListGroup(dbc.ListGroupItem(
        dbc.Row([
            dbc.Col(in_graph        , width=8),
            dbc.Col(in_control_panel, width=4),
        ], className='align-items-center')),  className='mt-3'),
        dbc.ListGroup(dbc.ListGroupItem(dbc.Row([
            dbc.Col(agc_graph        , width=8),
            dbc.Col(agc_control_panel, width=4),
        ], className='align-items-center')),  className='mt-3')
    ]
    )
])

@app.callback(
    Output('agc-ref-label', 'children'),
    [Input('agc-ref', 'value')])
def update_data_rate_label(f):
    return f'{int(f*100)} %'

@app.callback(
    Output('agc-window-label', 'children'),
    [Input('agc-window', 'value')])
def update_data_rate_label(f):
    return f'{int(2**f)} Samples'

@app.callback(
    Output('agc-alpha-label', 'children'),
    [Input('agc-alpha', 'value')])
def update_data_rate_label(f):
    return str(f)

@app.callback(
    Output('in-f-data-label', 'children'),
    [Input('in-f-data', 'value')])
def update_data_rate_label(f):
    return f'{int(f)} Hz'

@app.callback(
    Output('in-f-carrier-label', 'children'),
    [Input('in-f-carrier', 'value')])
def update_carrier_label(f):
    return f'{int(f)} kHz'

@app.callback(
    [Output('graph-inputs', 'figure'),Output('graph-agc', 'figure')],
    [Input('graph-inputs', 'relayoutData'),
     Input('btn-add-handle', 'n_clicks'),
     Input('btn-rm-handle', 'n_clicks'),
     Input('in-sig-type', 'value'),
     Input('in-f-carrier', 'value'),
     Input('in-f-data', 'value'),
     Input('agc-ref', 'value'),
     Input('agc-alpha', 'value'),
     Input('agc-window', 'value')
    ],
    [State('graph-inputs', 'figure'),State('graph-agc', 'figure')])
def main_callback(_,_btnadd,_btnrm,in_sig_type,in_f_carrier,in_data_rate,agc_ref,agc_alpha,agc_window,fig_in,fig_agc):
    
    changed_id = [p['prop_id'] for p in dash.callback_context.triggered][0]
    if 'btn-add-handle' in changed_id:
        add_envelope_handle(fig_in['layout']['shapes'], max(trace_t))
    if 'btn-rm-handle' in changed_id:
        rm_envelope_handle(fig_in['layout']['shapes'])
    
    cfg_agc(1,agc_window,agc_ref,agc_alpha)
    
    in_f_carrier = in_f_carrier*1000
    (ref_i,ref_q) = calc_ref(in_sig_type, in_f_carrier,in_data_rate)
    
    env = calc_envelope(get_envelope_handles(fig_in['layout']['shapes']))
    trace_i = ref_i * env
    trace_q = ref_q * env
    
    fig_in['data'][0]['x'] = trace_t
    fig_in['data'][0]['y'] = trace_i
    fig_in['data'][1]['x'] = trace_t
    fig_in['data'][1]['y'] = trace_q
    
    (agc_i,agc_q) = agc_go(trace_i,trace_q)
    fig_agc['data'][0]['x'] = trace_t
    fig_agc['data'][0]['y'] = agc_i
    fig_agc['data'][1]['x'] = trace_t
    fig_agc['data'][1]['y'] = agc_q
    return fig_in, fig_agc

In [None]:
app.run_server(mode='inline', host=ifaddresses('eth0')[AF_INET][0]['addr'], height=810)