# Arbitrary Waveform Generator

This notebook provides an interactive interface to generate arbitrary waveforms with multiple frequency components.

**Features:**
- Support for Sine, Square, Sawtooth, Triangle, Chirp waveforms
- Combine multiple waveforms with different frequencies
- Configurable sample rate (default 4 GS/s)
- Phase offset and rise/fall time control
- Export to binary file (float32) for FPGA compatibility
- Normalized floating-point output (-1.0 to +1.0)

In [3]:
import numpy as np
import scipy.signal as sp_sig
import matplotlib.pyplot as plt
import ipywidgets as widgets
from IPython.display import display, clear_output
import struct
import os

In [4]:
# ============================================================
# WAVEFORM GENERATOR CLASS
# ============================================================

class WaveformGenerator:
    """
    Arbitrary Waveform Generator with support for multiple waveform types
    and combination of multiple frequency components.
    
    Output is normalized floating-point (-1.0 to +1.0) for maximum flexibility.
    """
    
    WAVEFORM_TYPES = ['sine', 'square', 'sawtooth', 'triangle', 'chirp']
    
    def __init__(self, sample_rate=4.0e9, num_samples=16384):
        """
        Initialize the waveform generator.
        
        Args:
            sample_rate: Sample rate in Hz (default 4 GS/s)
            num_samples: Number of samples in the waveform buffer
        """
        self.sample_rate = sample_rate
        self.num_samples = num_samples
        self.components = []  # List of waveform components to combine
        self._time_axis = None
        self._update_time_axis()
    
    def _update_time_axis(self):
        """Recalculate the time axis when sample rate or num_samples changes."""
        self._time_axis = np.arange(self.num_samples) / self.sample_rate
    
    @property
    def time_axis(self):
        """Time axis in seconds."""
        return self._time_axis
    
    @property
    def duration(self):
        """Total waveform duration in seconds."""
        return self.num_samples / self.sample_rate
    
    def set_sample_rate(self, sample_rate):
        """Update the sample rate and recalculate time axis."""
        self.sample_rate = sample_rate
        self._update_time_axis()
    
    def set_num_samples(self, num_samples):
        """Update the number of samples and recalculate time axis."""
        self.num_samples = int(num_samples)
        self._update_time_axis()
    
    def clear_components(self):
        """Remove all waveform components."""
        self.components = []
    
    def add_component(self, waveform_type, frequency, amplitude=1.0, phase=0.0,
                      duty_cycle=0.5, rise_time=0.0, fall_time=0.0,
                      chirp_end_freq=None, chirp_method='linear'):
        """
        Add a waveform component to the generator.
        
        Args:
            waveform_type: 'sine', 'square', 'sawtooth', 'triangle', or 'chirp'
            frequency: Frequency in Hz (or start frequency for chirp)
            amplitude: Relative amplitude (0.0 to 1.0)
            phase: Phase offset in degrees
            duty_cycle: Duty cycle for square wave (0.0 to 1.0)
            rise_time: Rise time in seconds (for square wave edge smoothing)
            fall_time: Fall time in seconds (for square wave edge smoothing)
            chirp_end_freq: End frequency for chirp (Hz)
            chirp_method: Chirp method ('linear', 'quadratic', 'logarithmic', 'hyperbolic')
        """
        if waveform_type not in self.WAVEFORM_TYPES:
            raise ValueError(f"Unknown waveform type: {waveform_type}. "
                           f"Supported types: {self.WAVEFORM_TYPES}")
        
        component = {
            'type': waveform_type,
            'frequency': frequency,
            'amplitude': amplitude,
            'phase': phase,
            'duty_cycle': duty_cycle,
            'rise_time': rise_time,
            'fall_time': fall_time,
            'chirp_end_freq': chirp_end_freq if chirp_end_freq else frequency * 2,
            'chirp_method': chirp_method
        }
        self.components.append(component)
        return len(self.components) - 1  # Return index of added component
    
    def remove_component(self, index):
        """Remove a waveform component by index."""
        if 0 <= index < len(self.components):
            self.components.pop(index)
    
    def _generate_single_waveform(self, component):
        """
        Generate a single waveform component.
        
        Returns:
            numpy array of the waveform (normalized -1 to +1 before amplitude scaling)
        """
        t = self._time_axis
        phase_rad = np.deg2rad(component['phase'])
        freq = component['frequency']
        amp = component['amplitude']
        
        wtype = component['type']
        
        if wtype == 'sine':
            wave = np.sin(2 * np.pi * freq * t + phase_rad)
        
        elif wtype == 'square':
            duty = component['duty_cycle']
            wave = sp_sig.square(2 * np.pi * freq * t + phase_rad, duty=duty)
            
            # Apply rise/fall time smoothing if specified
            rise_time = component['rise_time']
            fall_time = component['fall_time']
            
            if rise_time > 0 or fall_time > 0:
                wave = self._apply_edge_smoothing(wave, rise_time, fall_time)
        
        elif wtype == 'sawtooth':
            wave = sp_sig.sawtooth(2 * np.pi * freq * t + phase_rad)
        
        elif wtype == 'triangle':
            # Triangle is sawtooth with width=0.5
            wave = sp_sig.sawtooth(2 * np.pi * freq * t + phase_rad, width=0.5)
        
        elif wtype == 'chirp':
            f0 = freq
            f1 = component['chirp_end_freq']
            method = component['chirp_method']
            wave = sp_sig.chirp(t, f0=f0, f1=f1, t1=self.duration,
                               phi=component['phase'], method=method)
        
        return wave * amp
    
    def _apply_edge_smoothing(self, wave, rise_time, fall_time):
        """
        Apply rise/fall time smoothing to a square wave using a simple
        linear interpolation approach.
        """
        if rise_time <= 0 and fall_time <= 0:
            return wave
        
        # Convert times to samples
        rise_samples = int(rise_time * self.sample_rate)
        fall_samples = int(fall_time * self.sample_rate)
        
        # Use a simple moving average filter for smoothing
        # The kernel size is the average of rise and fall times
        kernel_size = max(rise_samples, fall_samples, 1)
        if kernel_size > 1:
            kernel = np.ones(kernel_size) / kernel_size
            wave = np.convolve(wave, kernel, mode='same')
        
        return wave
    
    def generate(self, normalize=True):
        """
        Generate the combined waveform from all components.
        
        Args:
            normalize: If True, normalize final output to -1.0 to +1.0
        
        Returns:
            tuple: (time_axis, waveform) where waveform is float64 array
        """
        if not self.components:
            # Return zeros if no components
            return self._time_axis, np.zeros(self.num_samples, dtype=np.float64)
        
        # Sum all components
        combined = np.zeros(self.num_samples, dtype=np.float64)
        for comp in self.components:
            combined += self._generate_single_waveform(comp)
        
        # Normalize to -1.0 to +1.0
        if normalize:
            max_val = np.max(np.abs(combined))
            if max_val > 1e-10:
                combined = combined / max_val
        
        return self._time_axis, combined
    
    def adjust_frequency_to_fit_buffer(self, target_freq):
        """
        Calculate the nearest frequency that fits an integer number of cycles
        in the buffer (prevents clicking/discontinuities when looping).
        
        Args:
            target_freq: Desired frequency in Hz
        
        Returns:
            Adjusted frequency in Hz
        """
        num_cycles = np.round((target_freq * self.num_samples) / self.sample_rate)
        actual_freq = (num_cycles * self.sample_rate) / self.num_samples
        return actual_freq
    
    def get_info(self):
        """Return a dictionary with current generator settings."""
        return {
            'sample_rate': self.sample_rate,
            'num_samples': self.num_samples,
            'duration_s': self.duration,
            'duration_us': self.duration * 1e6,
            'num_components': len(self.components),
            'components': self.components.copy()
        }

In [5]:
# ============================================================
# FILE I/O FUNCTIONS
# ============================================================

def save_waveform_binary(filename, time_axis, waveform, sample_rate, metadata=None):
    """
    Save waveform to binary file with header for FPGA compatibility.
    
    File Format:
        Header (32 bytes):
            - Magic number: 4 bytes ('AWFG')
            - Version: 4 bytes (uint32, currently 1)
            - Sample rate: 8 bytes (float64, Hz)
            - Num samples: 4 bytes (uint32)
            - Data type: 4 bytes (1=float32, 2=float64)
            - Reserved: 8 bytes
        Data:
            - Waveform samples as float32 (for FPGA compatibility)
    
    Args:
        filename: Output file path
        time_axis: Time array (saved for reference but not strictly needed)
        waveform: Waveform amplitude array (float, normalized -1 to +1)
        sample_rate: Sample rate in Hz
        metadata: Optional dict with additional info (saved as separate .meta file)
    """
    # Ensure .bin extension
    if not filename.endswith('.bin'):
        filename += '.bin'
    
    num_samples = len(waveform)
    
    with open(filename, 'wb') as f:
        # Write header
        f.write(b'AWFG')  # Magic number (4 bytes)
        f.write(struct.pack('<I', 1))  # Version (4 bytes, little-endian uint32)
        f.write(struct.pack('<d', sample_rate))  # Sample rate (8 bytes, float64)
        f.write(struct.pack('<I', num_samples))  # Num samples (4 bytes, uint32)
        f.write(struct.pack('<I', 1))  # Data type: 1=float32 (4 bytes)
        f.write(b'\x00' * 8)  # Reserved (8 bytes)
        
        # Write waveform data as float32
        waveform_f32 = waveform.astype(np.float32)
        f.write(waveform_f32.tobytes())
    
    # Optionally save metadata as JSON
    if metadata:
        import json
        meta_filename = filename.replace('.bin', '.meta.json')
        with open(meta_filename, 'w') as f:
            json.dump(metadata, f, indent=2)
    
    print(f"Saved waveform to: {filename}")
    print(f"  - Samples: {num_samples}")
    print(f"  - Sample rate: {sample_rate/1e9:.3f} GS/s")
    print(f"  - Duration: {num_samples/sample_rate*1e6:.3f} µs")
    print(f"  - File size: {os.path.getsize(filename)} bytes")
    
    return filename


def load_waveform_binary(filename):
    """
    Load waveform from binary file.
    
    Returns:
        dict with keys: 'waveform', 'sample_rate', 'num_samples', 'time_axis'
    """
    with open(filename, 'rb') as f:
        # Read header
        magic = f.read(4)
        if magic != b'AWFG':
            raise ValueError(f"Invalid file format. Expected 'AWFG' magic number.")
        
        version = struct.unpack('<I', f.read(4))[0]
        sample_rate = struct.unpack('<d', f.read(8))[0]
        num_samples = struct.unpack('<I', f.read(4))[0]
        data_type = struct.unpack('<I', f.read(4))[0]
        reserved = f.read(8)  # Skip reserved bytes
        
        # Read waveform data
        if data_type == 1:
            waveform = np.frombuffer(f.read(), dtype=np.float32)
        else:
            waveform = np.frombuffer(f.read(), dtype=np.float64)
    
    # Reconstruct time axis
    time_axis = np.arange(num_samples) / sample_rate
    
    print(f"Loaded waveform from: {filename}")
    print(f"  - Version: {version}")
    print(f"  - Samples: {num_samples}")
    print(f"  - Sample rate: {sample_rate/1e9:.3f} GS/s")
    
    return {
        'waveform': waveform,
        'sample_rate': sample_rate,
        'num_samples': num_samples,
        'time_axis': time_axis
    }

In [6]:
# ============================================================
# INTERACTIVE DASHBOARD
# ============================================================

class WaveformDashboard:
    """
    Interactive dashboard for the waveform generator using ipywidgets.
    """
    
    def __init__(self):
        self.generator = WaveformGenerator()
        self.component_widgets = []  # Track widgets for each component
        self._setup_widgets()
        self._setup_layout()
    
    def _setup_widgets(self):
        """Create all the dashboard widgets."""
        
        # --- Global Settings ---
        self.sample_rate_widget = widgets.FloatText(
            value=4.0,
            description='Sample Rate (GS/s):',
            style={'description_width': '150px'},
            layout=widgets.Layout(width='300px')
        )
        
        self.num_samples_widget = widgets.IntText(
            value=16384,
            description='Num Samples:',
            style={'description_width': '150px'},
            layout=widgets.Layout(width='300px')
        )
        
        self.filename_widget = widgets.Text(
            value='waveform_output',
            description='Filename:',
            style={'description_width': '150px'},
            layout=widgets.Layout(width='300px')
        )
        
        # --- Component Controls ---
        self.waveform_type_widget = widgets.Dropdown(
            options=['sine', 'square', 'sawtooth', 'triangle', 'chirp'],
            value='sine',
            description='Waveform Type:',
            style={'description_width': '150px'},
            layout=widgets.Layout(width='300px')
        )
        
        self.frequency_widget = widgets.FloatText(
            value=10.0,
            description='Frequency (MHz):',
            style={'description_width': '150px'},
            layout=widgets.Layout(width='300px')
        )
        
        self.amplitude_widget = widgets.FloatSlider(
            value=1.0,
            min=0.0,
            max=1.0,
            step=0.01,
            description='Amplitude:',
            style={'description_width': '150px'},
            layout=widgets.Layout(width='400px')
        )
        
        self.phase_widget = widgets.FloatSlider(
            value=0.0,
            min=-180.0,
            max=180.0,
            step=1.0,
            description='Phase (deg):',
            style={'description_width': '150px'},
            layout=widgets.Layout(width='400px')
        )
        
        self.duty_cycle_widget = widgets.FloatSlider(
            value=0.5,
            min=0.01,
            max=0.99,
            step=0.01,
            description='Duty Cycle:',
            style={'description_width': '150px'},
            layout=widgets.Layout(width='400px')
        )
        
        self.rise_time_widget = widgets.FloatText(
            value=0.0,
            description='Rise Time (ns):',
            style={'description_width': '150px'},
            layout=widgets.Layout(width='300px')
        )
        
        self.fall_time_widget = widgets.FloatText(
            value=0.0,
            description='Fall Time (ns):',
            style={'description_width': '150px'},
            layout=widgets.Layout(width='300px')
        )
        
        # Chirp-specific
        self.chirp_end_freq_widget = widgets.FloatText(
            value=100.0,
            description='End Freq (MHz):',
            style={'description_width': '150px'},
            layout=widgets.Layout(width='300px')
        )
        
        self.chirp_method_widget = widgets.Dropdown(
            options=['linear', 'quadratic', 'logarithmic', 'hyperbolic'],
            value='linear',
            description='Chirp Method:',
            style={'description_width': '150px'},
            layout=widgets.Layout(width='300px')
        )
        
        self.fit_buffer_checkbox = widgets.Checkbox(
            value=True,
            description='Adjust freq to fit buffer (prevent clicking)',
            style={'description_width': 'initial'},
            layout=widgets.Layout(width='400px')
        )
        
        # --- Buttons ---
        self.add_button = widgets.Button(
            description='Add Component',
            button_style='success',
            layout=widgets.Layout(width='150px')
        )
        self.add_button.on_click(self._on_add_component)
        
        self.clear_button = widgets.Button(
            description='Clear All',
            button_style='danger',
            layout=widgets.Layout(width='150px')
        )
        self.clear_button.on_click(self._on_clear_all)
        
        self.preview_button = widgets.Button(
            description='Preview Waveform',
            button_style='info',
            layout=widgets.Layout(width='150px')
        )
        self.preview_button.on_click(self._on_preview)
        
        self.save_button = widgets.Button(
            description='Save to File',
            button_style='primary',
            layout=widgets.Layout(width='150px')
        )
        self.save_button.on_click(self._on_save)
        
        # --- Output Areas ---
        self.components_output = widgets.Output(
            layout=widgets.Layout(border='1px solid #ddd', padding='10px', min_height='100px')
        )
        
        self.plot_output = widgets.Output(
            layout=widgets.Layout(border='1px solid #ddd', padding='10px', min_height='400px')
        )
        
        self.status_output = widgets.Output(
            layout=widgets.Layout(border='1px solid #ddd', padding='10px', min_height='50px')
        )
        
        # Wire up type change to show/hide relevant widgets
        self.waveform_type_widget.observe(self._on_type_change, names='value')
    
    def _setup_layout(self):
        """Arrange widgets into the dashboard layout."""
        
        # Global settings section
        global_settings = widgets.VBox([
            widgets.HTML('<h3>Global Settings</h3>'),
            widgets.HBox([self.sample_rate_widget, self.num_samples_widget]),
            self.filename_widget
        ])
        
        # Component settings section
        self.component_specific = widgets.VBox([
            self.duty_cycle_widget,
            widgets.HBox([self.rise_time_widget, self.fall_time_widget]),
            widgets.HBox([self.chirp_end_freq_widget, self.chirp_method_widget])
        ])
        
        component_settings = widgets.VBox([
            widgets.HTML('<h3>Add Waveform Component</h3>'),
            widgets.HBox([self.waveform_type_widget, self.frequency_widget]),
            self.amplitude_widget,
            self.phase_widget,
            self.component_specific,
            self.fit_buffer_checkbox,
            widgets.HBox([self.add_button, self.clear_button])
        ])
        
        # Components list
        components_list = widgets.VBox([
            widgets.HTML('<h3>Current Components</h3>'),
            self.components_output
        ])
        
        # Actions
        actions = widgets.VBox([
            widgets.HTML('<h3>Actions</h3>'),
            widgets.HBox([self.preview_button, self.save_button]),
            self.status_output
        ])
        
        # Main layout
        left_panel = widgets.VBox([global_settings, component_settings, components_list, actions])
        right_panel = widgets.VBox([widgets.HTML('<h3>Waveform Preview</h3>'), self.plot_output])
        
        self.layout = widgets.HBox(
            [left_panel, right_panel],
            layout=widgets.Layout(width='100%')
        )
        
        # Initialize visibility
        self._on_type_change({'new': 'sine'})
    
    def _on_type_change(self, change):
        """Show/hide widgets based on waveform type."""
        wtype = change['new']
        
        # Square wave specific
        show_square = wtype == 'square'
        self.duty_cycle_widget.layout.display = 'flex' if show_square else 'none'
        self.rise_time_widget.layout.display = 'flex' if show_square else 'none'
        self.fall_time_widget.layout.display = 'flex' if show_square else 'none'
        
        # Chirp specific
        show_chirp = wtype == 'chirp'
        self.chirp_end_freq_widget.layout.display = 'flex' if show_chirp else 'none'
        self.chirp_method_widget.layout.display = 'flex' if show_chirp else 'none'
    
    def _on_add_component(self, button):
        """Add a waveform component."""
        # Update generator settings
        self.generator.set_sample_rate(self.sample_rate_widget.value * 1e9)
        self.generator.set_num_samples(self.num_samples_widget.value)
        
        # Get frequency (convert MHz to Hz)
        freq_hz = self.frequency_widget.value * 1e6
        
        # Optionally adjust frequency to fit buffer
        if self.fit_buffer_checkbox.value:
            freq_hz = self.generator.adjust_frequency_to_fit_buffer(freq_hz)
        
        # Add component
        self.generator.add_component(
            waveform_type=self.waveform_type_widget.value,
            frequency=freq_hz,
            amplitude=self.amplitude_widget.value,
            phase=self.phase_widget.value,
            duty_cycle=self.duty_cycle_widget.value,
            rise_time=self.rise_time_widget.value * 1e-9,  # ns to s
            fall_time=self.fall_time_widget.value * 1e-9,
            chirp_end_freq=self.chirp_end_freq_widget.value * 1e6,
            chirp_method=self.chirp_method_widget.value
        )
        
        self._update_components_display()
        self._update_status(f"Added {self.waveform_type_widget.value} component at {freq_hz/1e6:.4f} MHz")
    
    def _on_clear_all(self, button):
        """Clear all components."""
        self.generator.clear_components()
        self._update_components_display()
        self._update_status("Cleared all components")
        
        with self.plot_output:
            clear_output(wait=True)
    
    def _on_preview(self, button):
        """Generate and preview the waveform."""
        self.generator.set_sample_rate(self.sample_rate_widget.value * 1e9)
        self.generator.set_num_samples(self.num_samples_widget.value)
        
        time_axis, waveform = self.generator.generate()
        
        with self.plot_output:
            clear_output(wait=True)
            
            fig, axes = plt.subplots(2, 1, figsize=(10, 8))
            
            # Full waveform
            time_us = time_axis * 1e6
            axes[0].plot(time_us, waveform, 'b-', linewidth=0.5)
            axes[0].set_xlabel('Time (µs)')
            axes[0].set_ylabel('Amplitude')
            axes[0].set_title('Full Waveform')
            axes[0].grid(True, alpha=0.3)
            axes[0].set_ylim(-1.1, 1.1)
            
            # Zoomed view (first 500 samples or first 5 cycles of lowest freq)
            zoom_samples = min(500, len(waveform))
            time_ns = time_axis[:zoom_samples] * 1e9
            axes[1].plot(time_ns, waveform[:zoom_samples], 'b-', linewidth=1)
            axes[1].set_xlabel('Time (ns)')
            axes[1].set_ylabel('Amplitude')
            axes[1].set_title('Zoomed View (First 500 samples)')
            axes[1].grid(True, alpha=0.3)
            axes[1].set_ylim(-1.1, 1.1)
            
            plt.tight_layout()
            plt.show()
        
        self._update_status(f"Preview generated: {len(waveform)} samples")
    
    def _on_save(self, button):
        """Save the waveform to file."""
        self.generator.set_sample_rate(self.sample_rate_widget.value * 1e9)
        self.generator.set_num_samples(self.num_samples_widget.value)
        
        time_axis, waveform = self.generator.generate()
        
        filename = self.filename_widget.value
        metadata = self.generator.get_info()
        
        with self.status_output:
            clear_output(wait=True)
            save_waveform_binary(filename, time_axis, waveform, 
                               self.generator.sample_rate, metadata)
    
    def _update_components_display(self):
        """Update the components list display."""
        with self.components_output:
            clear_output(wait=True)
            
            if not self.generator.components:
                print("No components added yet.")
                return
            
            for i, comp in enumerate(self.generator.components):
                freq_mhz = comp['frequency'] / 1e6
                print(f"{i+1}. {comp['type'].upper()}: {freq_mhz:.4f} MHz, "
                      f"Amp={comp['amplitude']:.2f}, Phase={comp['phase']:.1f}°")
    
    def _update_status(self, message):
        """Update the status output."""
        with self.status_output:
            clear_output(wait=True)
            print(message)
    
    def display(self):
        """Display the dashboard."""
        display(self.layout)

In [7]:
# ============================================================
# LAUNCH DASHBOARD
# ============================================================

dashboard = WaveformDashboard()
dashboard.display()

HBox(children=(VBox(children=(VBox(children=(HTML(value='<h3>Global Settings</h3>'), HBox(children=(FloatText(…

---
## Programmatic Usage (Without Dashboard)

You can also use the `WaveformGenerator` class directly in code:

In [8]:
# Example: Create a complex waveform programmatically

# Create generator with 4 GS/s, 16384 samples (same as your DAC buffer)
gen = WaveformGenerator(sample_rate=4.0e9, num_samples=16384)

# Add a 10 MHz square wave (like your existing code)
target_freq = 10.15e6
adjusted_freq = gen.adjust_frequency_to_fit_buffer(target_freq)
print(f"Adjusted frequency: {adjusted_freq/1e6:.4f} MHz")

gen.add_component('square', frequency=adjusted_freq, amplitude=1.0)

# Add a higher harmonic for testing
gen.add_component('sine', frequency=adjusted_freq * 3, amplitude=0.3, phase=45)

# Generate
time_axis, waveform = gen.generate()

# Save to binary file
save_waveform_binary('test_waveform', time_axis, waveform, gen.sample_rate, gen.get_info())

Adjusted frequency: 10.2539 MHz
Saved waveform to: test_waveform.bin
  - Samples: 16384
  - Sample rate: 4.000 GS/s
  - Duration: 4.096 µs
  - File size: 65568 bytes


'test_waveform.bin'

In [9]:
# Example: Load the waveform back

loaded = load_waveform_binary('waveform_output.bin')

# Plot to verify
plt.figure(figsize=(12, 4))
zoom = 500
plt.plot(loaded['time_axis'][:zoom] * 1e9, loaded['waveform'][:zoom])
plt.xlabel('Time (ns)')
plt.ylabel('Amplitude')
plt.title('Loaded Waveform')
plt.grid(True, alpha=0.3)
plt.show()

FileNotFoundError: [Errno 2] No such file or directory: 'waveform_output.bin'