# Demo parameter sweeps of channel measurements
This demonstrates the construction of larger datasets built by looping across measurements of different channels.

In [1]:
from channel_analysis import FilteredCapture
from channel_analysis.source import simulated_awgn
import xarray as xr

capture = FilteredCapture(sample_rate=15.36e6, analysis_bandwidth=10e6, duration=0.2)

analysis_spec = {
    'power_time_series': {
        'detector_period': 10e-3,
        'detectors': ('rms', 'peak')
    },
    'cyclic_channel_power': {
        'cyclic_period': 10e-3,
        'detector_period': 1e-3 / 15 / 4,
        'detectors': ('rms', 'peak'),
        'cyclic_statistics': ('min', 'mean', 'max'),
    },
    'persistence_spectrum': {
        'window': 'hamming',
        'fractional_overlap': 0.5,
        'resolution': 15e3,
        'quantiles': [0.1, 0.25, 0.5, 0.75, 0.9, 0.95, 0.99, 0.999, 1],
    },
    'amplitude_probability_distribution': {
        'power_low': -40,
        'power_high': 15,
        'power_count': 221,  # 0.25 dB resolution
    },
    'iq_waveform': {
        'start_time_sec': 0,
        'stop_time_sec': 100e-3
    }
}

# filter_spec = {
#     'fft_size': 1024,
#     'window': 'hamming',  # 'hamming', 'blackman', or 'blackmanharris'
# }

# acquisition_spec = {
#     'duration': 0.2,
#     'sample_rate': source.sample_rate
# }

### Single acquisition

In [2]:
# from __future__ import annotations
# import msgspec
# from typing import Literal, Optional
# from channel_analysis import waveform
# from pathlib import Path

# class Capture(msgspec.Struct):
#     # RF and leveling
#     center_frequency: float = 3710e6
#     channel: int = 0
#     gain: float = -10
#     calibrated: bool = True

#     # acquisition
#     duration: float = 0.1
#     sample_rate: float = 15.36e6

#     # filtering and resampling
#     analysis_bandwidth: float = 10e6
#     if_frequency: Optional[float] = None # Hz (or none, for no IF frontend)
#     lo_shift: Optional['left'|'right'] = 'left' # shift the LO outside the acquisition band
#     window: 'hamming'|'blackman'|'blackmanharris' = 'hamming' # the COLA spectral window to use


# class System(msgspec.Struct):
#     location: Optional[tuple[str,str,str]] = None
#     timebase: Literal['builtin']|Literal['gps'] = 'builtin'
#     cyclic_trigger: bool|float = False
#     calibration_path: Optional[str] = None
#     defaults: Capture = msgspec.field(default_factory=Capture)

# class Run(msgspec.Struct, omit_defaults=True):
#     acquisition: System = msgspec.field(default_factory=System)
#     sweep: list[Capture] = msgspec.field(default_factory=lambda: [Capture()])
#     channel_analysis: waveform._ConfigStruct = \
#         msgspec.field(default_factory=lambda: waveform._registry.tostruct()())

# runner = Run()
# runner.acquisition.calibration_path

In [3]:
from channel_analysis.waveform import analyze_by_spec

iq = simulated_awgn(capture)

analyze_by_spec(iq, capture, spec=analysis_spec)

(6001, 1024) (6001, 1024)


## RF parameter sweep
### Single parameter

In [4]:
data = []

for fc in [3705e6, 3715e6, 3725e6]:
    iq = simulated_awgn(capture)

    ret = analyze_by_spec(iq, capture, spec=analysis_spec)

    data.append(ret.assign_coords({'center_frequency': [fc]}))

data = xr.combine_by_coords(data)
data

(6001, 1024) (6001, 1024)
(6001, 1024) (6001, 1024)
(6001, 1024) (6001, 1024)


In [5]:
# data = []

# for atten in [0.0, 10.0]:
#     for fc in [3705e6, 3715e6, 3725e6]:
#         iq = simulated_awgn(capture)

#         ret = (
#             dataset.from_spec(
#                     iq,
#                     source,
#                     analysis_spec=analysis_spec,
#             ).assign_coords(
#                 {'center_frequency': [fc], 'attenuation': [atten]}
#             )
#         )

#         data.append(ret)

# ds = xr.combine_by_coords(data)
# ds.center_frequency.attrs = {'label': 'RF center frequency', 'units': 'Hz'}
# ds.attenuation.attrs = {'label': 'External attenuation setting', 'units': 'dB'}

In [6]:
# (
#     ds
#     .chunk({'iq_sample': round(source.sample_rate*10e-3)})
#     .to_zarr('dataset.zarr', mode='w')
# )

# ds

In [7]:
# from channel_analysis import figures
# import matplotlib.pyplot as plt
# import iqwaveform # needed for the ieee double column style
# plt.style.use('iqwaveform.ieee_double_column')

# fig, ax = plt.subplots()
# apd = ds.amplitude_probability_distribution.sel(center_frequency=3705e6)
# apd.plot.line(x='channel_power', ax=ax)
# ax.set_yscale('gamma-qq', k=1, db_ordinal=True)
# figures.label_axis(ax.xaxis, apd, 'channel_power', tick_units=False)
# figures.label_axis(ax.yaxis, apd)

In [8]:
from pathlib import Path
import zarr
import numcodecs
from channel_analysis import source

def directory_size(path):
    path = Path(path)
    if path.is_file():
        return path.stat().st_size

    children = Path(path).glob('**/*')
    return sum([p.stat().st_size for p in children if p.is_file()])

# assume we will want to read 10 ms at a time for performance reasons
path = 'xarray-sweep.zarr.zip'

source.dump(path, data, 'w')

print(f'data size: {directory_size(path)/1e6:0.1f} MB')

data size: 34.6 MB


In [10]:
%%timeit -n1 -r1
out = source.load(path)

26.8 ms ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop each)


In [1]:
from iqwaveform import fourier

fourier.equivalent_noise_bandwidth(('dpss', 3.87638403), 2048)*30.72e6/2048
# fourier.equivalent_noise_bandwidth(('dpss', 11.010), 1024)*15.36e6/1024

30014.845401182785

In [None]:
from channel_analysis import waveform

import typing

import msgspec
import inspect

class ChannelAnalysisSpec(msgspec.Struct):
    pass

def _msgspec_param(name, parameter):
    if parameter.annotation is inspect._empty:
        type_ = typing.Any
    else:
        type_ = parameter.annotation

    if parameter.default is inspect._empty:
        return (name, type_)
    else:
        return (name, type_, parameter.default)

def _struct_from_func(func) -> msgspec.Struct:
    name = func.__name__

    params = inspect.signature(func).parameters

    globals()['params'] = params

    kws = [
        _msgspec_param(k, p)
        for k, p in list(params.items())
        if p.kind is inspect.Parameter.KEYWORD_ONLY
    ]

    struct = msgspec.defstruct(
        name,
        kws,
        bases=(ChannelAnalysisSpec,)
    )

    return struct

Struct = _struct_from_func(waveform.persistence_spectrum)
s = Struct(quantiles=[1,2,3], resolution=15e3, window='flattop')
msgspec.to_builtins(s)