In [1]:
# Import necessary libraries
import os
import sys
import json
import time
import pyvisa
import numpy as np
from scipy import signal
from pathlib import Path
from dataclasses import dataclass
from typing import List, Optional, Tuple, Dict
from enum import Enum
import requests
import matplotlib.pyplot as plt
from matplotlib.widgets import Slider
import ipywidgets as widgets
from IPython.display import display, clear_output

print("All dependency libraries imported successfully!")


All dependency libraries imported successfully!


In [2]:
# Get current file directory and construct absolute paths
current_dir = Path.cwd()
tools_dir = current_dir / "../tools"

awg_create_wave_path = tools_dir / "AWG_createWave.ipynb"
bvc_tools_path = tools_dir / "BVC_Tools.ipynb"

%run $awg_create_wave_path
%run $bvc_tools_path

print("Tool classes imported successfully!")

All dependency libraries imported successfully!
Signal parameter class definition completed!
Jupyter signal generator class definition completed!
Signal analyzer created successfully!
All dependency libraries imported successfully!
Jupyter signal generator class definition completed!
Tool classes imported successfully!


In [3]:
# Device parameter configuration class
@dataclass
class AWGParams:
    # Basic parameters
    device_ip: str = "127.0.0.1"
    module_name: str = "S2_M4901"
    board_name: str = "S0_M1210"
    
    # Sample rate
    sample_rate: int = 4  # 4GHz
    
    # Channel enable
    channel_en: Dict[str, bool] = None
    
    # Trigger parameters
    trigger_source: str = "Internal"  # Internal/External
    in_trigger_repeat: int = 4294967295
    in_trigger_cycle: float = 0.001
    
    # Waveform type
    waveform_type: str = None  # sine/cose/square/triangle/pulse/chirp/multi_tone/iq
    
    # Waveform parameters
    wave_len: int = 163840
    duration: float = 0.0001024  # Duration
    amplitude: float = 0.8  # Amplitude
    frequency: float = 100e6  # Frequency
    phase: float = 0.0  # Phase
    dc_offset: float = 0.0  # DC offset
    
    # Square wave parameters
    duty_cycle: float = 0.5
    
    # Multi-tone signal parameters
    frequencies: List[float] = None
    amplitudes: List[float] = None
    phases: List[float] = None
    
    # Pulse signal parameters
    pulse_width: float = 0.1
    pulse_period: float = 1.0
    
    # Linear chirp signal parameters
    f0: float = 100.0
    f1: float = 1000.0
    
    # DUC parameters
    duc_enable: bool = False
    internal_multiple: int = 1 # Interpolation multiple
    duc_frequency: float = 0.0  # DUC frequency

    def __post_init__(self):
        if self.channel_en is None:
            self.channel_en = {"CH1": True, "CH2": True}
        if self.frequencies is None:
            self.frequencies = [100e6, 300e6, 400e6]
        if self.amplitudes is None:
            self.amplitudes = [0.6, 0.4, 0.2]
        if self.phases is None:
            self.phases = [0.0, 0.0, 0.0]

print("AWG parameter class definition completed!")


AWG parameter class definition completed!


In [None]:
class AWGController:
    """AWG comprehensive controller"""
    
    def __init__(self):
        self.generator = JupyterSignalGenerator()
        self.analyzer = JupyterSignalAnalyzer(self.generator)
        self.bvc_tools = BVCTools()
        self.scpi_query = None
        self.scpi_write = None
        self.scpi_read = None
        self.wave_list = "seg1"
        self.nswave_name = "sequence1"
        
    def connect_device(self, params: AWGParams):
        """Connect to device"""
        try:
            visa_resource = f'TCPIP::{params.device_ip}::5555::SOCKET'
            rm = pyvisa.ResourceManager('@py')
            session = rm.open_resource(visa_resource)
            session.write_termination = '\n'
            session.read_termination = '\n'
            session.timeout = 50000
            
            self.scpi_query = session.query
            self.scpi_write = session.write
            self.scpi_read = session.read
            
            print(f"Successfully connected to device: {params.device_ip}")
            return True
        except Exception as e:
            print(f"Failed to connect to device: {e}")
            return False
    
    def get_device_info(self, params: AWGParams):
        """Get device information"""
        try:
            device_name = self.scpi_query(':SYS:DEVice:NAMe?').replace('\n', '').split(',')
            module_names = self.scpi_query(':SYS:DEVice:MODUles?').replace('\n', '').split(',')
            
            model_info = {}
            for module_name in module_names:
                module_dict = {}
                funcs = self.scpi_query(f':SYS:DEVice:MODUles:FUNC? {module_name}').replace('\n', '').split(',')
                in_chnls = []
                out_chnls = []
                for func in funcs:
                    if func == 'AWG':
                        in_chnls = self.scpi_query(f':SYS:DEVice:MODUles:CHNLs:OUT? {module_name}').replace('\n', '').split(',')
                    if func == 'Digitizer':
                        out_chnls = self.scpi_query(f':SYS:DEVice:MODUles:CHNLs:IN? {module_name}').replace('\n', '').split(',')
                module_dict["AWG"] = in_chnls
                module_dict["Digitizer"] = out_chnls
                model_info[module_name] = module_dict
            
            print(f"Device name: {device_name}")
            print(f"Module information: {model_info}")
            return device_name, model_info
        except Exception as e:
            print(f"Failed to get device information: {e}")
            return None, None
    
    def configure_sample_rate(self, params: AWGParams):
        """Configure sample rate"""
        try:
            self.scpi_query(f':AWG:SOURce:DAC:SRATe {params.module_name},{params.sample_rate}')
            self.bvc_tools.sync(params.device_ip, params.module_name, params.board_name)
            return True
        except Exception as e:
            print(f"Failed to configure sample rate: {e}")
            return False
    
    def configure_trigger(self, params: AWGParams):
        """Configure trigger"""
        try:
            if params.trigger_source == "Internal":
                self.scpi_query(f':SAT:TRIGger:SOURce {params.board_name},Internal')
                self.scpi_query(f':SAT:TRIGger:INTernal:REPeat {params.board_name},{params.in_trigger_repeat}')
                self.scpi_query(f':SAT:TRIGger:INTernal:CYCle {params.board_name},{params.in_trigger_cycle}')
                
                self.scpi_query(f':AWG:TRIGger:SOURce {params.module_name},PXISTARTrig')
                self.scpi_query(f':AWG:TRIGger:INTernal:REPeat {params.module_name},{params.in_trigger_repeat}')
                self.scpi_query(f':AWG:TRIGger:INTernal:CYCle {params.module_name},{params.in_trigger_cycle}')
            elif params.trigger_source == "External":
                self.scpi_query(f':SAT:TRIGger:SOURce {params.board_name},External')
                self.scpi_query(f':AWG:TRIGger:SOURce {params.module_name},PXISTARTrig')
            else:
                print(f"Error: {params.trigger_source} does not exist / 错误：{params.trigger_source} 不存在")
            return True
        except Exception as e:
            print(f"Failed to configure trigger: {e}")
            return False
    
    def configure_duc(self, params: AWGParams):
        """Configure DUC"""
        try:
            self.scpi_query(f':AWG:SOURce:WAVE:InternalMultiple {params.module_name},{params.internal_multiple}')

            for key, value in params.channel_en.items():
                if value:
                    self.scpi_query(f':AWG:OUTPut:DUC:EN {key},{params.duc_enable}')
                    if params.duc_enable:
                        self.scpi_query(f':AWG:OUTPut:DUC:NCOFrequence {key},{params.duc_frequency}')
            return True
        except Exception as e:
            print(f"Failed to configure DUC: {e}")
            return False
    
    def generate_waveform(self, params: AWGParams):
        """Generate waveform"""
        try:
            sample_rate = params.sample_rate * 10**9
            duration = ((int(sample_rate * params.duration) - 1) // 16384 + 1) * 16384
            
            wave_params = SignalParams(
                waveform_type=params.waveform_type,
                duration=duration,
                sample_rate=sample_rate,
                amplitude=params.amplitude,
                frequency=params.frequency,
                phase=params.phase,
                dc_offset=params.dc_offset,
                duty_cycle=params.duty_cycle,
                frequencies=params.frequencies,
                amplitudes=params.amplitudes,
                phases=params.phases,
                pulse_width=params.pulse_width,
                pulse_period=params.pulse_period,
                f0=params.f0,
                f1=params.f1
            )
            
            t, signal_data = self.generator.generate_signal(wave_params)
            
            # Display waveform
            show_pts = min(int(sample_rate / params.frequency) * 20, len(signal_data))
            self.analyzer.plot_signal_inline(t[:show_pts], signal_data[:show_pts], wave_params)
            
            # Statistical analysis
            stats = self.analyzer.analyze_signal_statistics(signal_data, wave_params)
            
            # Save waveform file
            wave_file_path = current_dir / 'wave_file'
            if not os.path.exists(wave_file_path):
                os.makedirs(wave_file_path)
            file_path = wave_file_path / f'wave_{params.waveform_type}_{len(signal_data)}_{int(sample_rate)}_{params.amplitude}_{int(params.frequency)}_{params.phase}_{params.dc_offset}.bin'
            signal_data.astype(np.float64).tofile(file_path)
            print(f"Waveform file saved successfully: {file_path}")
            
            return file_path, len(signal_data)
        except Exception as e:
            print(f"Failed to generate waveform: {e}")
            return None, None

    """AWG controller upload and playback functions"""
    
    def upload_waveform(self, params: AWGParams, file_path: str, wave_length: int):
        """Upload waveform to device"""
        try:
            # Create wave table
            self.scpi_query(f':AWG:WAVList:ADDList {params.module_name},{self.wave_list},{wave_length},Double')
            
            # Upload waveform file
            data = {
                "scpi": f":AWG:WAVLIST:WAVeform:RTTRACE {params.module_name},{self.wave_list},0,{wave_length},\n"
            }
            with open(file_path, 'rb') as f:
                files = {"file": f}
                response = requests.post(f'http://{params.device_ip}:8000/scpi', data=data, files=files)
                if response.status_code == 200:
                    print("Waveform uploaded successfully")
                    return True
                else:
                    print(f"Waveform upload failed: {response.status_code}")
                    return False
        except Exception as e:
            print(f"Failed to upload waveform: {e}")
            return False
    
    def configure_nsqc(self, params: AWGParams):
        """Configure NSWave sequence"""
        try:
            # Create NSWave
            self.scpi_query(f':AWG:NSQC:ADD {self.nswave_name}')
            
            # Generate NSWave program
            nswave_code = f"""@nw.kernel
def program(wlist: dict[str, np.ndarray]):
    {self.wave_list}: nw.ArbWave = nw.init_arbwave(wlist, '{self.wave_list}')
    while True:
        nw.play_arb({self.wave_list})
    return nw.Kernel()"""
            
            nswave_data = {
                "scpi": f":AWG:NSQC:UPload {self.nswave_name},\n",
                "nsqc": nswave_code
            }
            
            response = requests.post(f'http://{params.device_ip}:8000/scpi', json=nswave_data)
            if response.status_code == 200:
                print("NSWave program uploaded successfully")
                
                # Compile and configure channels
                for key, value in params.channel_en.items():
                    if value:
                        self.scpi_query(f':AWG:NSQC:COMPile {key},{self.nswave_name}')
                
                # Deploy
                self.scpi_query(f':AWG:WAVList:NSQC:COMPile:Send {params.module_name}')
                print("NSWave configuration completed")
                return True
            else:
                print(f"NSWave upload failed: {response.status_code}")
                return False
        except Exception as e:
            print(f"Failed to configure NSWave: {e}")
            return False
    
    
    
    def start_output(self, params: AWGParams):
        """Start output"""
        try:
            # Configure output channels
            for key, value in params.channel_en.items():
                self.scpi_query(f':AWG:OUTPut:EN {key},{value}')
            
            # Start system
            self.scpi_query(f':SYS:Control:RUN {params.module_name}')
            self.scpi_query(f':SAT:TRIGger:INTernal:Run {params.board_name}')
            
            print("AWG output started")
            return True
        except Exception as e:
            print(f"Failed to start output: {e}")
            return False
    
    def stop_output(self, params: AWGParams):
        """Stop output"""
        try:
            self.scpi_query(f':SAT:TRIGger:INTernal:Stop {params.board_name}')
            self.scpi_query(f':SYS:Control:STOP {params.module_name}')
            print("AWG output stopped")
            return True
        except Exception as e:
            print(f"Failed to stop output: {e}")
            return False

print("AWG controller class definition completed!")

AWG controller class definition completed!


In [5]:
# Create AWG controller instance
awg_controller = AWGController()
print("AWG controller instance created successfully!")

AWG controller instance created successfully!


In [6]:
# Example 1: Sine wave generation (waveform generation only, no device connection)）
print("=== Example 1: Sine Wave Generation ===")
sine_params = AWGParams(
    device_ip="127.0.0.1",
    module_name="S2_M4901",
    board_name="S0_M1210",
    waveform_type="sine",
    frequency=100e6,
    amplitude=0.8,
    duration=0.00004096
)

# 1. Connect to device
awg_controller.connect_device(sine_params)

# 2. Get device information
device_name, model_info = awg_controller.get_device_info(sine_params)

# 8. Configure DUC
awg_controller.configure_duc(sine_params)

# 3. Configure sample rate
awg_controller.configure_sample_rate(sine_params)

# 4. Configure trigger
awg_controller.configure_trigger(sine_params)

# 5. Generate waveform
file_path, wave_length = awg_controller.generate_waveform(sine_params)

# 6. Upload waveform
awg_controller.upload_waveform(sine_params, file_path, wave_length)

# 7. Configure NSWave
awg_controller.configure_nsqc(sine_params)


=== Example 1: Sine Wave Generation ===
Successfully connected to device: 127.0.0.1
Device name: ['TestDevice']
Module information: {'S0_M1210': {'AWG': [], 'Digitizer': []}, 'S0_M2301': {'AWG': [], 'Digitizer': []}, 'S2_M4901': {'AWG': ['CH1', 'CH2'], 'Digitizer': ['CH1', 'CH2']}}

Failed to generate waveform: Unable to allocate 4.66 PiB for an array with shape (655360000000000,) and data type float64
Failed to upload waveform: VI_ERROR_TMO (-1073807339): Timeout expired before operation completed.
NSWave program uploaded successfully
NSWave configuration completed


True

In [7]:
awg_controller.start_output(sine_params)

AWG output started


True

In [8]:
awg_controller.stop_output(sine_params)

AWG output stopped


True