In [1]:
#引入需要的包裝

import pyvisa
import pandas as pd
import time
import math

### AFG31101

===============Function List=================

* Set weveform type
* Set frequency
* Set amplitude
* Set output state
* Set amplitude limit
  
若要串接其他儀器 一樣使用class 定義其他儀器的function

In [2]:
class AFG_31101():
    def __init__(self, resource_address, visa_dll=None):
        """
        初始化儀器控制器。
        :param resource_address: 儀器的資源地址。
        :param visa_dll: 使用的 VISA DLL 路徑。如果為 None，則使用系統預設空白。
        """
        self.resource_address = resource_address
        self.visa_dll = visa_dll if visa_dll is not None else ''
        self.rm = pyvisa.ResourceManager(self.visa_dll)
        self.instrument = None
        self.connect_to_instrument()
        
    def connect_to_instrument(self):
        # 建立與儀器的連接
        self.instrument = self.rm.open_resource(self.resource_address)
        print("Connected to AFG_31101.")
        print(self.query_identity())

    def query_identity(self):
         # 查詢並返回儀器的身份識別信息
        if self.instrument:
             return self.instrument.query("*IDN?")
        else:
             raise Exception("Instrument not connected. Please connect first.")

    def close_instrument(self):
        if self.instrument:
            self.instrument.close()
            print("AFG_31101 connection closed.")
        else:
            print("No instrument to close.")

    def set_waveform(self, shape):
        if self.instrument:
            self.instrument.write(f"SOURce1:FUNCtion {shape}")
            time.sleep(0.05)
            print(f"set signal form be {shape}")
        else:
            raise Exception("Instrument not connected. Please connect first.")

    def set_frequency(self, freq_amplitude, freq_unit):
        if self.instrument:
            self.instrument.write(f"SOURce1:FREQuency:FIXed {freq_amplitude} {freq_unit}")
            time.sleep(0.05)
            print(f"set signal frequency be {freq_amplitude}{freq_unit}")
        else:
            raise Exception("Instrument not connected. Please connect first.")

    def query_frequency(self):
        if self.instrument:
            self.instrument.write(f"SOURce1:FREQuency:FIXed?")
            time.sleep(0.05)
            return self.instrument.read()
        else:
            raise Exception("Instrument not connected. Please connect first.")

    def set_amplitude(self, amp_magnitude, amp_unit):
        if self.instrument:
            self.instrument.write(f"SOURce1:VOLTage:LEVel:IMMediate:AMPLitude {amp_magnitude}{amp_unit}")
            time.sleep(0.05)
            print(f"set signal amplitude be {amp_magnitude}{amp_unit}")
        else:
            raise Exception("Instrument not connected. Please connect first.")

    def query_amplitude(self):
        if self.instrument:
            self.instrument.write(f"SOURce1:VOLTage:LEVel:IMMediate:AMPLitude?")
            time.sleep(0.05)
            return self.instrument.read()
        else:
            raise Exception("Instrument not connected. Please connect first.")
    
    def output_on(self):
        if self.instrument:
            self.instrument.write(f"OUTPut1:STATe ON")
            time.sleep(0.05)
            print(f"turn AFG31101 output on")
        else:
            raise Exception("Instrument not connected. Please connect first.")

    def output_off(self):
        if self.instrument:
            self.instrument.write(f"OUTPut1:STATe OFF")
            time.sleep(0.05)
            print(f"turn AFG31101 output off")
        else:
            raise Exception("Instrument not connected. Please connect first.")

In [3]:
def check_output_state(afg, output_state):
    if output_state == 'ON': #判斷輸出狀態，選擇要操作在哪一個輸出狀態DEF
        afg.output_on()
    else:
        afg.output_off()

In [4]:
def check_amp_limit(afg, amplitude_limit, amplitude):  # 檢查振幅是否超過限制
    amp_num = float(amplitude['magnitude'])  # 將當下振幅大小的字串型態轉為數字型態
    if amp_num >= amplitude_limit:  # 判斷振幅是否大於AMPLITUDE_LIMIT
        print(f'Magnitude of amplitude cannot exceed {amplitude_limit} voltage')
        afg.output_off()  # 將OUTPUT關閉

### RF-150A_100D

===============Function List=================

* Set power state
* Set gain

若要串接其他儀器 一樣使用class 定義其他儀器的function

In [5]:
class RF_150A100D():
    def __init__(self, resource_address, visa_dll=None):
        """
        初始化儀器控制器。
        :param resource_address: 儀器的資源地址。
        :param visa_dll: 使用的 VISA DLL 路徑。如果為 None，則使用系統預設空白。
        """
        self.resource_address = resource_address
        self.visa_dll = visa_dll if visa_dll is not None else ''
        self.rm = pyvisa.ResourceManager(self.visa_dll)
        self.instrument = None
        self.connect_to_instrument()
        
    def connect_to_instrument(self):
        # 建立與儀器的連接
        self.instrument = self.rm.open_resource(self.resource_address)
        print("Connected to RF_150A100D.")
        print(self.query_identity())

    def query_identity(self):
         # 查詢並返回儀器的身份識別信息
        if self.instrument:
             return self.instrument.query("*IDN?")
        else:
             raise Exception("Instrument not connected. Please connect first.")

    def close_instrument(self):
        if self.instrument:
            self.instrument.close()
            print(f"RF_150A100D connection closed.")
        else:
            print(f"No instrument to close.")

    def power_on(self):
        if self.instrument:
            self.instrument.write(f"P1")
            print(f"turn RF150A100D power on")
        else:
            raise Exception("Instrument not connected. Please connect first.")

    def power_off(self):
        if self.instrument:
            self.instrument.write(f"P0")
            print(f"turn RF150A100D power off")
        else:
            raise Exception("Instrument not connected. Please connect first.")

    def set_gain(self, gain ,gain_set):
        if self.instrument:
            self.instrument.write(f"G{gain_set}")
            print(f"set power gain be {gain}")
        else:
            raise Exception("Instrument not connected. Please connect first.")

    def query_gain(slef):
        if self.instrument:
            self.instrument.write(f"G?")
            time.sleep(0.05)
            return self.instrument.read()
        else:
            raise Exception("Instrument not connected. Please connect first.")

    def reset(self):
        if self.instrument:
            self.instrument.write(f"R")
            print(f"clear faults")
        else:
            raise Exception("Instrument not connected. Please connect first.")

In [6]:
def check_power_state(rf, power_state):
    if power_state == 'ON': #判斷書出狀態，選擇def
        rf.power_on()
    else:
        rf.power_off()

In [7]:
def check_power_gain(rf, gain):
    if gain < 0:
        rf.power_off()
        print(f"gain you set {gain} can't < 0")

In [8]:
def calculate_gain_set(gain):
    """
    計算並返回設置增益的值
    透過設定的gain去計算給儀器的gain_set該為多少
    """
    ratio = 4095 / 100  # MAX和MIN的比例
    gain_set = math.ceil(ratio * gain)  # 使用 math函數進行無條件進位
    gain_set = min(gain_set, 4095)  # 限制 gain_set 最大值為 4095
    gain_set = str(gain_set).zfill(4)  # 保持四個字元
    #print(f"CMD of gain set be G{ratio * gain}")
    return gain_set

### MSO64B

===============Function List=================

* Set channel
* Auto set horizontal scale
* Auto set Gating type
* Auto set cursor positon
* Auto set bandwidth limit
* Auto set Acquistion mode & numbers of waveform
* Set measurements[AMP、MEAN、DELAY]
* Set math function

若要串接其他儀器 一樣使用class 定義其他儀器的function

In [9]:
class MSO_64B():
    def __init__(self, resource_address, visa_dll=None):
        """
        初始化儀器控制器。
        :param resource_address: 儀器的資源地址。
        :param visa_dll: 使用的 VISA DLL 路徑。如果為 None，則使用系統預設空白。
        """
        self.resource_address = resource_address
        self.visa_dll = visa_dll if visa_dll is not None else ''
        self.rm = pyvisa.ResourceManager(self.visa_dll)
        self.instrument = None
        self.connect_to_instrument()
        
    def connect_to_instrument(self):
        # 建立與儀器的連接
        self.instrument = self.rm.open_resource(self.resource_address)
        print("Connected to MSO64B.")
        print(self.query_identity())

    def query_identity(self):
         # 查詢並返回儀器的身份識別信息
        if self.instrument:
             return self.instrument.query("*IDN?")
        else:
             raise Exception("Instrument not connected. Please connect first.")

    def close_instrument(self):
        #關閉與儀器的連接
        if self.instrument:
            self.instrument.close()
            print("MSO64B connection closed.")
        else:
            print("No instrument to close.")

    def clear(self):
        #執行示波器上的clear鍵，讓計算、測量項重新運算
        if self.instrument:
            self.instrument.write(f':CLEAR')
            time.sleep(0.3)  # 在命令之間暫停0.5
            print(f"clear acquistions, measurements, and waveforms")
        else:
            raise Exception("Instrument not connected. Please connect first.")

    def set_gating_type(self):
        if self.instrument:
            #設定measurement中Gating type
            self.instrument.write(f'MEASUrement:GATing CURSor')
            time.sleep(0.05)  # 在命令之間暫停0.5秒
            print(f"take measurements on the portion of the waveform between the cursor")
        else:
            raise Exception("Instrument not connected. Please connect first.")

    def cursor_control(self, cursor_A_position, cursor_B_position):
        if self.instrument:
            #設定cursor範圍為三個波形週期
            self.instrument.write(f':DISplay:WAVEView1:CURSor:CURSOR1:STATE ON')
            self.instrument.write(f':DISplay:WAVEView1:CURSor:CURSOR1:WAVEform:APOSition {cursor_A_position}')
            self.instrument.write(f':DISplay:WAVEView1:CURSor:CURSOR1:WAVEform:BPOSition {cursor_B_position}')
            time.sleep(0.05)  # 在命令之間暫停0.5秒
            print(f"set cursor range be {cursor_A_position} to {cursor_B_position}")
        else:
            raise Exception("Instrument not connected. Please connect first.")
            
    def result_table(self):
        #打開resulit table
        if self.instrument:
            self.instrument.write(f':MEASTABle:ADDNew "TABLE1"')
            time.sleep(0.05)  # 在命令之間暫停0.5
            print(f"add measurent result table")
        else:
            raise Exception("Instrument not connected. Please connect first.")
            
    def add_measurement(self,meas,source):
        if self.instrument:       #新增CH1、CH2、CH3的MEAN測量值
            type_CMD = f":MEASUrement:{meas}:TYPE MEAN" #測量類型CMD
            source_CMD = f":MEASUrement:{meas}:SOURCE {source}" #訊號源CMD
            command = f"{type_CMD};{source_CMD}" #完整叫出AMPLITUDE的CMD
            self.instrument.write(command) #寫入儀器
            time.sleep(0.05)  # 在命令之間暫停0.5秒
            print(f"add measurent {meas} of {source}")
        else:
            raise Exception("Instrument not connected. Please connect first.")

    def set_horizon_scale(self, horizon_scale):
        if self.instrument:
            self.instrument.write(f':HORIZONTAL:MODE:SCALE {horizon_scale}')
            time.sleep(0.05)  # 在命令之間暫停0.5秒
            print(f"set Horizontal Scale to {horizon_scale}s/div")
        else:
            raise Exception("Instrument not connected. Please connect first.")

    def detect_clip(self,clip_check):
        if self.instrument:
            #查看是否有波形clipping?
            return self.instrument.query(f":{clip_check}:CLIPping?")
            time.sleep(0.05)  # 在命令之間暫停0.5秒
        else:
            raise Exception("Instrument not connected. Please connect first.")
            
    def query_vertical_scale(self,clip_check):
        if self.instrument:
            #回傳當下有clipping之通道的vertical scale為多少?
            return self.instrument.query(f":{clip_check}:SCAle?")
            time.sleep(0.05)  # 在命令之間暫停0.5秒
        else:
            raise Exception("Instrument not connected. Please connect first.")
    
    def set_vertical_scale(self, clip_channel, new_vertical_value):
        if self.instrument:
            #重新對通道設定vertical scale
            self.instrument.write(f":{clip_channel}:SCAle {new_vertical_value}")
            time.sleep(0.05)  # 在命令之間暫停0.5秒
            print(f"set {clip_channel} vertical scale be {new_vertical_value}")
        else:
            raise Exception("Instrument not connected. Please connect first.")

    def math_add(self, math_num, math_function):
        if self.instrument:
            #新增math formula
            self.instrument.write(f':MATH:ADDNew "{math_num}"')
            time.sleep(0.05)  # 在命令之間暫停0.5秒
            self.instrument.write(f':MATH:{math_num}:DEFine "{math_function}"')
            time.sleep(0.05)  # 在命令之間暫停0.5秒
            print(f"Add {math_num} to calculate {math_function}")
        else:
            raise Exception("Instrument not connected. Please connect first.")

    def set_math_autoscale(self):
        if self.instrument:
            #新增math formula
            self.instrument.write(f':DISplay:WAVEView1:MATH:MATH4:AUTOScale ON')
            time.sleep(0.05)  # 在命令之間暫停0.5秒
            print(f"enables the autoscaling the math4 in the specified Waveform View")
            self.instrument.write(f':DISplay:WAVEView1:MATH:MATH5:AUTOScale ON')
            time.sleep(0.05)  # 在命令之間暫停0.5秒
            print(f"enables the autoscaling the math5 in the specified Waveform View")
        else:
            raise Exception("Instrument not connected. Please connect first.")
            
    def set_math_label(self, math_num, label_name):
        if self.instrument:
            #新增math formula
            self.instrument.write(f':MATH:{math_num}:LABel:NAMe "{label_name}" ')
            time.sleep(0.05)  # 在命令之間暫停0.5秒
            print(f"set label of {math_num} be {label_name} ")
        else:
            raise Exception("Instrument not connected. Please connect first.")

    def off_display(self, off_channel, off_math):  
        if self.instrument:
            #將不必要的波形關閉
            self.instrument.write(f':DIS:WAVEVIEW1:{off_channel}:STATE OFF')
            time.sleep(0.05)  # 在命令之間暫停0.5秒
            print(f"turn {off_channel} display off")
            self.instrument.write(f':DISplay:WAVEView1:MATH:{off_math}:STATE OFF')
            time.sleep(0.05)  # 在命令之間暫停0.5秒
            print(f"turn {off_math} display off")
        else:
            raise Exception("Instrument not connected. Please connect first.")

    def measure_delta_B(self):
        if self.instrument:
            #新增測量磁通最大值及最小值的measurement
            self.instrument.write(f':MEASUREMENT:MEAS9:TYPE MAXIMUM; SOUrce MATH5') #新增math5的最大值
            time.sleep(0.05)  # 在命令之間暫停0.5秒           
            self.instrument.write(f':MEASUREMENT:MEAS10:TYPE MINIMUM; SOUrce MATH5') #新增math5的最小值
            time.sleep(0.05)  # 在命令之間暫停0.5
            print(f"add measurent maximum of math5")
            print(f"add measurent minimum of math5")
        else:
            raise Exception("Instrument not connected. Please connect first.")

    def power_function(self,num,voltage,current):
        if self.instrument:
            self.instrument.write(f':POWer:POWer{num}:POWERQUALITY:VSOURce {voltage}') #新增math5的最小值
            time.sleep(0.05)  # 在命令之間暫停0.5
            self.instrument.write(f':POWer:POWer{num}:POWERQUALITY:ISOURce {current}') #新增math5的最小值
            time.sleep(0.05)  # 在命令之間暫停0.5
            print(f"add measurent maximum of math5")
            print(f"add measurent minimum of math5")
        else:
            raise Exception("Instrument not connected. Please connect first.")

    def set_deskew(self, deskew_ch1):
        if self.instrument:
            self.instrument.write(f'CH1:DESKEW {deskew_ch1}')
            print(f"CH1 deskew set to {deskew_ch1}")
            time.sleep(0.05)
        else:
            raise Exception("Instrument not connected. Please connect first.")

    def set_deskew_zero(self):
        if self.instrument:
            self.instrument.write(f'CH1:DESKEW 0')
            print(f"CH1 deskew set to 0")
            time.sleep(0.05)
        else:
            raise Exception("Instrument not connected. Please connect first.")

    def query_mean_value(self, meas):
        if self.instrument:                  
            self.instrument.write(f'MEASUrement:{meas}:RESUlts:CURRentacq:MEAN?')
            time.sleep(0.05)
            return float(self.instrument.read())
        else:
             raise Exception("Instrument not connected. Please connect first.")

    def query_true_power(self, power_num):
        if self.instrument:
            self.instrument.write(f'POWer:POWer{power_num}:RESUlts:CURRentacq:MEAN? "TruePWR"')
            time.sleep(0.05)
            return float(self.instrument.read())
        else:
             raise Exception("Instrument not connected. Please connect first.")

    def query_population(self):
        if self.instrument:
            self.instrument.write(f'MEASUrement:MEAS3:RESUlts:ALLAcqs:POPUlation?')
            time.sleep(0.05)
            return int(self.instrument.read())
        else:
            raise Exception("Instrument not connected. Please connect first.")

In [10]:
def wait_for_population(mso, threshold=25):
    while True: #重複查詢採樣次數(population)
        pop_num = mso.query_population() 
        print(f"Current population: {pop_num}") #列出每0.5秒的pop次數
        if pop_num >= threshold: #當pop次數大於等於100，結束while迴圈
            print(f"Poplution採樣數量已超過 {threshold}") 
            break #跳脫迴圈
        time.sleep(1)  # 防止過於頻繁的查詢，調整每0.5秒查詢一次

In [11]:
def set_cursor_range(afg, mso):
    """
    設置示波器的cursor range
    [因為如果一開始示波器的horizon scale沒有調好，抓到的freq值會過大，調整錯誤，因此直接從AFG訊號產生器抓值]
    """
    freq = float(afg.query_frequency()) #從afg抓頻率值
    print(f"the frequency of excitaton is {freq} Hz")
    
    cursor_A_position = -2 * (1 / freq)  # 設cursor_A為負1.5波形週期sec
    cursor_B_position = 2 * (1 / freq)  # 設cursor_B為正1.5波形週期sec
    mso.cursor_control(cursor_A_position, cursor_B_position)

In [12]:
def set_horizon_scale(afg, mso):
    """
    設置示波器的horizon scale
    [因為如果一開始示波器的horizon scale沒有調好，抓到的freq值會過大，調整錯誤，因此直接從AFG訊號產生器抓值]
    """
    freq = float(afg.query_frequency()) #從afg抓頻率值
    print(f"the frequency of signal is {freq}Hz")
    
    horizon_scale = (1 / freq) * (4 / 10)
    mso.set_horizon_scale(horizon_scale) #自動調整horizon scale

In [13]:
def set_vertical_scale(mso):
    clip_channels = ['CH1','CH2','CH3']#分別測試ch1、ch2、ch3是否clipping?

    channel_meas = {'CH1': 'MEAS3','CH2': 'MEAS4','CH3': 'MEAS5'} #每個channel對應的amplitude測量項
    
    for clip_channel in clip_channels:
        while True: #重複測試到最後沒通道clipping & vertical scale都設定到80%
            all_channels_clipped = False
            
            clip_result = float(mso.detect_clip(clip_channel)) 
            #print(f"clipping state:{clip_result}") #顯示1代表有clipping，0代表沒有clipping
            if clip_result == 1: #clipping時
                clip_vertical_scale = float(mso.query_vertical_scale(clip_channel))
                new_clip_vertical_scale = clip_vertical_scale + 40e-3 #將當下的vertical scale以20mV/div為step加上去
                print(f"{clip_channel} is clipping")
                mso.set_vertical_scale(clip_channel, new_clip_vertical_scale) #設定新的vertical scale
                time.sleep(0.05) #最後調整到沒有clipping等待久一點，讓下面程序執行正常
            elif clip_result == 0: # 當channel沒有clipping時
                meas = channel_meas[clip_channel] #當下的clip_channel對應的meas是什麼?
                time.sleep(2)
                print(f"{clip_channel} no clipping")
                clip_amplitude = float(mso.query_mean_value(meas)) # 看該通道的amplitude大小
                print(f"amplitude of {clip_channel} is {clip_amplitude}")
                new_vertical_scale = clip_amplitude / 5.5  #將amplitude大小除以7即為理想的60~80% vertical scale
                mso.set_vertical_scale(clip_channel, new_vertical_scale) # 設定新的vertical scale
                time.sleep(0.2)
                clip_result = float(mso.detect_clip(clip_channel)) 
                if clip_result == 1:
                    time.sleep(0.5)
                    print(f'{clip_channel} is clipping')
                    print('readjust the vertical scale')
                    all_channels_clipped = True  # 代表還有channel clipping
                else:
                    print(f'vertical state of {clip_channel} is good')
                    break

In [14]:
def calculate_core_parameters(core_He, core_OD, core_ID):
    """
    計算鐵芯的有效路徑長度(l_e)、有效截面積(A_e)和有效體積(V_e)
    """
    # 計算 C1 和 C2
    h = core_He
    d1 = core_OD
    d2 = core_ID

    C1 = (2 * math.pi) / (h * math.log(d1 / d2))
    C2 = (4 * math.pi * (1/d2 - 1/d1)) / (h**2 * math.log(d1 / d2)**3)

    # 計算 le, Ae, Ve
    le = (C1**2) / C2
    Ae = C1 / C2
    Ve = le * Ae
    print(f"有效路徑長度 (le): {le} mm")
    print(f"有效截面積 (Ae): {Ae} mm^2")
    print(f"有效體積 (Ve): {Ve} mm^3")

    return le, Ae, Ve

In [15]:
def save_mso_query_values(mso, sequence, core_effective_area, turn):
    """
    用來儲存向儀器(示波器)查詢之值的包裝
    """
    B_max = mso.query_mean_value('MEAS9')
    B_min = mso.query_mean_value('MEAS10')
    calculate_delta_B = (B_max - B_min) / (core_effective_area * turn * 0.000001)
    
    time.sleep(0.05)
    print(f"The maximum B value is {B_max} T")
    print(f"The minimum B value is {B_min} T")
    print(f"The delta of B is {calculate_delta_B} T")
    
    return B_max, B_min, calculate_delta_B

In [16]:
def save_mso_power_values(mso, sequence):
    """
    用來儲存向儀器(示波器)查詢之值的包裝
    """
    power1_value = float(mso.query_true_power('1'))
    power2_value = float(mso.query_true_power('2'))
    
    time.sleep(0.05)
    print(f"The true power of 1 is {power1_value} W")
    print(f"The true power of 2 is {power2_value} W")
    
    return power1_value, power2_value

In [17]:
def set_vref_deskew(afg, mso, deskew_vref):
    freq = float(afg.query_frequency()) #查詢測試環境所設定之激勵源的頻率
    deskew_ch1 = (deskew_vref)/(360 * freq) #計算Deskew角度要調整的秒數
    mso.set_deskew(deskew_ch1)

In [18]:
def acquire_measurement(mso, i, core_effective_area, turn, max_B, min_B, delta_B, Pcore_true_power, PL_true_power, deskew=False):
    B_max, B_min, calculate_delta_B = save_mso_query_values(mso, i, core_effective_area, turn)
        
    # 更新 max_B 和 min_B 字典
    if deskew:
        if 'MEAS9' not in max_B:
            max_B['MEAS9'] = []
        max_B['MEAS9'].append((i, B_max))
        # print("max_B_deskew:", max_B)

        if 'MEAS10' not in min_B:
            min_B['MEAS10'] = []
        min_B['MEAS10'].append((i, B_min))
        # print("min_B_deskew:", min_B)
        
        delta_B[i] = calculate_delta_B
        # print("delta_B_deskew:", delta_B)
    else:
        if 'MEAS9' not in max_B:
            max_B['MEAS9'] = []
        max_B['MEAS9'].append((i, B_max))
        # print("max_B:", max_B)

        if 'MEAS10' not in min_B:
            min_B['MEAS10'] = []
        min_B['MEAS10'].append((i, B_min))
        # print("min_B:", min_B)
        
        delta_B[i] = calculate_delta_B
        # print("delta_B:", delta_B)

    # 量測損耗功率
    power1_value, power2_value = save_mso_power_values(mso, i)

    if deskew:
        if '1' not in Pcore_true_power:
            Pcore_true_power['1'] = []
        Pcore_true_power['1'].append((i, power1_value))
        # print("Pcore true power_deskew:", Pcore_true_power)
        
        if '2' not in PL_true_power:
            PL_true_power['2'] = []
        PL_true_power['2'].append((i, power2_value))
        # print("PL true power_deskew:", PL_true_power)
    else:
        if '1' not in Pcore_true_power:
            Pcore_true_power['1'] = []
        Pcore_true_power['1'].append((i, power1_value))
        #print("Pcore true power:", Pcore_true_power)
        
        if '2' not in PL_true_power:
            PL_true_power['2'] = []
        PL_true_power['2'].append((i, power2_value))
        #print("PL true power:", PL_true_power)

In [19]:
def calculate_final_power_loss(sequence, Pcore_true_power, PL_true_power, Pcore_true_power_deskew, PL_true_power_deskew, core_effective_volume):
    """
    套公式計算 k_factor, Pcore 和 Pcore_cv
    """
    k_factor = (PL_true_power_deskew['2'][sequence-1][1] - PL_true_power['2'][sequence-1][1]) / (Pcore_true_power_deskew['1'][sequence-1][1] - Pcore_true_power['1'][sequence-1][1])
    print(f"k factor:{k_factor}")
    Pcore = (Pcore_true_power['1'][sequence-1][1] - (k_factor * PL_true_power['2'][sequence-1][1]))
    print(f"Core Real power loss:{Pcore}")
    Pcore_cv = (Pcore / core_effective_volume) * 1000000
    print(f"Every volume of Core Real power loss:{Pcore_cv}")

    return k_factor, Pcore, Pcore_cv

In [20]:
def adjust_rf_afg(mso, rf, afg, i, core_effective_area, turn, target_deltaB, gain, amplitude, amplitude_limit):
    """
    調整 RF 增益以達到目標 delta B 值
    減少
    """
    tolerance_top = 0.015
    tolerance_max = 0.008
    tolerance_mid = 0.005
    tolerance_min = 0.003
    tolerance_final = 0.001 #磁通密度抓得較精準

    time.sleep(0.5)
    # 查詢初始的 delta B 值
    B_max, B_min, calculate_delta_B = save_mso_query_values(mso, i, core_effective_area, turn)
    
    while abs(target_deltaB - calculate_delta_B) > tolerance_final:
        if calculate_delta_B > 0.155:
            print("delta B 大於155 mT")
            tolerance_final = 0.0018 #磁通密度較高，抓得精準範圍越鬆
            step_rf_max = 1
            step_rf_mid = 1
            step_rf_mean = 1
            step_rf_min = 1
            step_rf_decrease = 1
            step_afg_increase_max = 0
            step_afg_increase_mid = 0
            step_afg_increase_min = 0
            step_afg_decrease = 7
        elif calculate_delta_B > 0.11:
            print("delta B 大於110 mT")
            tolerance_final = 0.0018 #磁通密度較高，抓得精準範圍越鬆
            step_rf_max = 2
            step_rf_mid = 1
            step_rf_mean = 1
            step_rf_min = 1
            step_rf_decrease = 1
            step_afg_increase_max = 2
            step_afg_increase_mid = 1
            step_afg_increase_min = 0
            step_afg_decrease = 5
        elif calculate_delta_B > 0.07:
            print("delta B 大於70 mT")
            tolerance_final = 0.0015 #磁通密度較高，抓得精準範圍越鬆
            step_rf_max = 2
            step_rf_mid = 2
            step_rf_mean = 1
            step_rf_min = 1
            step_rf_decrease = 2
            step_afg_increase_max = 2
            step_afg_increase_mid = 1
            step_afg_increase_min = 1
            step_afg_decrease = 3
        elif calculate_delta_B > 0.04:
            print("delta B 大於50 mT")
            tolerance_final = 0.001 #磁通密度較高，抓得精準範圍越鬆
            step_rf_max = 3
            step_rf_mid = 2
            step_rf_mean = 2
            step_rf_min = 1
            step_rf_decrease = 2         
            step_afg_increase_max = 8
            step_afg_increase_mid = 4
            step_afg_increase_min = 2
            step_afg_decrease = 3
        else:
            print("delta B 小於50 mT")
            step_rf_max = 4
            step_rf_mid = 3
            step_rf_mean = 2
            step_rf_min = 1
            step_rf_decrease = 1
            step_afg_increase_max = 10
            step_afg_increase_mid = 7
            step_afg_increase_min = 4
            step_afg_decrease = 2
            
        tolerance = target_deltaB - calculate_delta_B
        print(f"實際 delta B: {calculate_delta_B} 與預期 delta B: {target_deltaB} 差了 {tolerance}T")

        if tolerance < -tolerance_final:
            print(f"實際deltaB大於預期deltaB {tolerance_final}")
            amplitude_value = float(amplitude['magnitude'].replace('E-3', '')) - step_afg_decrease  # 移除 'E-3' 並減少 1mV
            amplitude['magnitude'] = f"{amplitude_value}E-3" #重新設定amplitude
            print(f"AFG 振幅已減少 {step_afg_decrease}V")
            gain -= step_rf_decrease
            print(f"增益減少 {step_rf_decrease}")
        elif tolerance > tolerance_max:
            print(f"delta B 誤差大於 {tolerance_max}")
            amplitude_value = float(amplitude['magnitude'].replace('E-3', '')) + step_afg_increase_max  # 移除 'E-3' 並增加 3mV
            amplitude['magnitude'] = f"{amplitude_value}E-3" #重新設定amplitude
            print(f"AFG 振幅已增加 {step_afg_increase_max}V")
            gain += step_rf_max
            print(f"增益增加 {step_rf_max}")
        elif tolerance > tolerance_mid:
            print(f"delta B 誤差介於 {tolerance_mid} ~ {tolerance_max}")
            amplitude_value = float(amplitude['magnitude'].replace('E-3', '')) + step_afg_increase_mid  # 移除 'E-3' 並增加 2mV
            amplitude['magnitude'] = f"{amplitude_value}E-3" #重新設定amplitude
            print(f"AFG 振幅已增加 {step_afg_increase_mid}V")
            gain += step_rf_mid
            print(f"增益增加 {step_rf_mid}")
        elif tolerance > tolerance_min:
            print(f"delta B 誤差介於 {tolerance_min} ~ {tolerance_mid}")
            amplitude_value = float(amplitude['magnitude'].replace('E-3', '')) + step_afg_increase_min  # 移除 'E-3' 並增加 2mV
            amplitude['magnitude'] = f"{amplitude_value}E-3" #重新設定amplitude
            print(f"AFG 振幅已增加 {step_afg_increase_min}V")
            gain += step_rf_mean
            print(f"增益增加 {step_rf_mean}")
        else:
            print(f"delta B 誤差介於 {tolerance_final} ~ {tolerance_min}")
            gain += step_rf_min
            print(f"增益增加 {step_rf_min}")
            
        check_amp_limit(afg, amplitude_limit, amplitude)  # 檢查振幅是否超過限制
        afg.set_amplitude(amplitude['magnitude'], amplitude['unit'])

        check_power_gain(rf, gain)
        gain_set = calculate_gain_set(gain)
        rf.set_gain(gain, gain_set)

        time.sleep(0.5)
        set_vertical_scale(mso)

        time.sleep(1)

        time.sleep(1.5)
        B_max, B_min, calculate_delta_B = save_mso_query_values(mso, i, core_effective_area, turn)
    
    print(f"delta B已達預期的{target_deltaB}")
    return amplitude, gain

#### 從較小的激勵源開始

* 設定expect deltaB：[0.02,0.04,0.06](T)
* 查看calculate deltaB
* 計算兩者誤差tolerance

* tolerance為負的:RF的Gain:不變/AFG振幅:-5mV

* 相差0.01以上：RF的Gain:+10/AFG振幅：不變

* 相差0.005~0.01：RF的:Gain+5/AFG振幅:不變

* 相差0.003~0.005：RF的:Gain+3/AFG振幅：不變

* 相差0.001~0.003：RF的:Gain+1/AFG振幅：不變

* 相差0.001內:不變/AFG振幅：不變

In [21]:
def format_time(seconds):
    """將記錄下來測量時間的秒數轉為分:秒"""
    minutes = int(seconds // 60)
    remaining_seconds = int(seconds % 60)
    return f"{minutes}:{remaining_seconds}"

In [22]:
def every_measurement_time(i, start_time, measurement_start_time, measurement_times, accumulated_times):
    """計算每次測量的結束時間及其花費時間"""
    measurement_end_time = time.time()
    measurement_duration = measurement_end_time - measurement_start_time
    measurement_times.append(measurement_duration)

    accumulated_time = measurement_end_time - start_time
    accumulated_times.append(accumulated_time)

    formatted_measurement_duration = format_time(measurement_duration)
    formatted_accumulated_time = format_time(accumulated_time)

    print(f"[第{i}次測量]花費時間: {formatted_measurement_duration}")
    print(f"程式執行累計時間: {formatted_accumulated_time}")

    return measurement_end_time

In [None]:
def main():
    
    # =========================Parmeters===============================
    #=====MSO64B parameter=====
    """
    channel:{CH1 | CH2 | CH3 | CH4 }[page:2-541]
    measure type:{AMPLITUDE | FREQUENCY | MAXIMUM | MEAN | MINIMUM | PERIOD | PHASE 
    | PK2PK | RMS } [page:2-717]
    math_num:{MATH1 | MATH2 | MATH3 | MATH4 | ...}[page:2-672]
    math_function:{first letter should be capital then calculate}[page:2-677]
    gating_type:{NONE | SCREEN | CURSor | LOGic | SEARch | TIMe }[page:2-754]
    """

    #針對display的通道新增測量項
    measurements = [
        {'measurement': 'MEAS6','channel': 'CH1'},
        {'measurement': 'MEAS7','channel': 'CH2'},
        {'measurement': 'MEAS8','channel': 'CH3'}
    ]    
    
    #設定欲增加計算及公式
    mso_maths = [
        {'math_num': 'MATH1', 'math_function': 'Ch1-Meas6'},
        {'math_num': 'MATH2', 'math_function': 'Ch2-Meas7'},
        {'math_num': 'MATH3', 'math_function': 'Ch3-Meas8'},
        {'math_num': 'MATH4', 'math_function': 'Math1/10'},
        {'math_num': 'MATH5', 'math_function': 'Intg(Math2)'}
    ]

    math_labels = [
        {'math_num' : 'MATH4', 'label_name': 'A'},
        {'math_num' : 'MATH5', 'label_name': 'T'},
    ]
    
    #設定powerfunction內容 done
    powers = [
        {'Power_num': '1','Voltage_Source': 'MATH2', 'Current_Source': 'MATH4'},
        {'Power_num': '2','Voltage_Source': 'MATH3', 'Current_Source': 'MATH4'}
    ]

    #將特定通道停止顯示(display off)，可控制channel&math
    off_displays = [
        {'channel': 'NONE','math': 'MATH1'},
        {'channel': 'NONE','math': 'MATH2'},
        {'channel': 'NONE','math': 'MATH3'},
        {'channel': 'NONE','math': 'MATH6'},
        {'channel': 'NONE','math': 'MATH7'},
        {'channel': 'NONE','math': 'MATH8'},
        {'channel': 'NONE','math': 'MATH9'}
    ]

    #=====AFG_31101 parameter=====
    """
    shape:{SINusoid|SQUare|PULSe|RAMP|PRNoise|DC|SINC|GAUSsian|LORentz|ERISe|EDECay|HAVersine
     |EMEMory[1]|EMEMory2|EFILe} 
    amplitude:
        magnitude:output amplitude
        <units>::=[VPP | VRMS | DBM]
    frequency:
        magnitude:output frequency
        <units>::=[Hz | kHz | MHz]
    output_state:{ON|OFF}
    """
    #設置輸入源訊號參數
    shape = 'SINusoid'
    frequency = {'magnitude': '500','unit': 'KHz'}
    amplitude = {'magnitude': '300E-3','unit': 'VPP'}
    output_state = "ON"
    amplitude_limit = 0.5

    #=======RF_150A_100D parameter======
    """
    Gain : 0~100
    Power state : { ON | OFF }
    """
    #設置射頻功率放大器參數
    gain = 15
    power_state = 'ON'
    
    #====================Instrument Connect Test===========================
    #=====MSO64B connection===
    MSO_instrument_address = 'USB0::0x0699::0x0530::C048992::INSTR'
    mso = MSO_64B(MSO_instrument_address)
    
    #=====AFG_31101 connection=====
    AFG_instrument_address = 'USB0::0x0699::0x0359::C016504::INSTR'
    afg = AFG_31101(AFG_instrument_address)

    #=====RF_150A_100D connection=====
    RF_instrument_address = 'USB0::0x0547::0x1B58::0358752::INSTR'
    rf = RF_150A100D(RF_instrument_address)
    #若有其他儀器串接下去...
    
    #============================Auto Test================================= 
    # 初始化時間追蹤列表
    start_time = time.time()
    measurement_times = []
    accumulated_times = []
    
    # TO DO: 這邊給你寫For迴圈自動測量的區域
    
    #=====MSO64B control===== #若示波器已設置完成，可省略這段
    mso.result_table()#新增measurement result table的介面
    
    #===新增測量項目及功率測量項==    
    for measurement in measurements:
        meas = measurement['measurement']
        source = measurement['channel']
        mso.add_measurement(meas,source)  

    set_cursor_range(afg, mso)
    
    #回傳math部分需要的公式
    for mso_math in mso_maths:
        math_num = mso_math['math_num']
        math_function = mso_math['math_function']
        mso.math_add(math_num, math_function)

    mso.set_math_autoscale()

    for math_label in math_labels:
        math_num = math_label['math_num']
        label_name = math_label['label_name']
        mso.set_math_label(math_num,label_name)
        
    mso.set_gating_type() #調整measurement的gating type
    mso.measure_delta_B() #執行測量Bmax&Bmin
    
    for power in powers:
        num = power['Power_num']
        voltage = power['Voltage_Source']
        current = power['Current_Source']
        mso.power_function(num,voltage, current)

    for off_display in off_displays:
        off_channel = off_display['channel']
        off_math = off_display['math']
        mso.off_display(off_channel,off_math)

    #=====RF_150A_100D control=====
    gain_set = calculate_gain_set(gain)  # 計算增益設置值
    rf.set_gain(gain, gain_set)

    check_power_state(rf, power_state) #判斷放大power狀態，是開或關
    
    #=====AFG_31101 control=====
    afg.set_waveform(shape) #設置訊號類
    afg.set_frequency(frequency['magnitude'],frequency['unit']) #設置訊號頻率
    afg.set_amplitude(amplitude['magnitude'],amplitude['unit']) #設置訊號振幅

    check_output_state(afg, output_state) #判斷輸出狀態，是開或關
    check_amp_limit(afg, amplitude_limit, amplitude)  # 檢查振幅是否超過限制
    
    #===Autoset horizon & vertical scale===
    mso.clear()
    wait_for_population(mso, threshold=2) #先等待pop到10再抓數值
    set_horizon_scale(afg, mso)

    mso.clear()
    wait_for_population(mso, threshold=2)
    set_vertical_scale(mso)
        
    #=====start measure & save data to Excel by pandas======
    #=====core parameter=====
    #記錄待測鐵心知各樣參數
    core_type = 'ML95S'
    core_He = 5 #unit:mm
    core_OD = 14 #unit:mm
    core_ID = 7 #unit:mm
    turn = 3 #unit:turn
    core_effective_length, core_effective_area, core_effective_volume = calculate_core_parameters(core_He, core_OD, core_ID)
    
    #===開始量測===
    # 設定測量順序、目標 delta B 值及desekw Vref的角度

    expect_deltaB = [0.04,0.08, 0.1, 0.12, 0.14, 0.16, 0.18, 0.2]
    deskew_vref = 1
    sequences = list(range(1, len(expect_deltaB) + 1))
    
    # 將這個值的字典先重製歸零
    vin_amplitude = {}
    power_gain ={}
    max_B = {}
    min_B = {}
    delta_B ={}
    Pcore_true_power = {}
    PL_true_power = {}
    max_B_deskew = {}
    min_B_deskew = {}
    delta_B_deskew ={}
    Pcore_true_power_deskew = {}
    PL_true_power_deskew = {}
    k_factor = {}
    Pcore = {}
    Pcore_cv = {}

    #===開始測量===
    for i in sequences:
        print(f"[第{i}次測量]")

        # 計算每次測量開始的時間
        measurement_start_time = time.time()
    
        #===自動調整激勵源===
        target_deltaB = expect_deltaB[i-1] #從expect deltaB抓出當下對應預期要達到的deltaB
        wait_for_population(mso, threshold=5) #讓最測量的最一開始先等待一下再抓delta B
        amplitude, gain = adjust_rf_afg(mso, rf, afg, i, core_effective_area, turn, target_deltaB, gain, amplitude, amplitude_limit)
    
        vin_amplitude[i] = amplitude['magnitude']
        #print(vin_amplitude)
        power_gain[i] = gain
        #print(power_gain)
    
        #===開始抓值===
        mso.set_deskew_zero() #將channel1(Vref)的Deskew歸零
    
        wait_for_population(mso) #先等待pop到一定次數再抓數值
        
        acquire_measurement(mso, i, core_effective_area, turn, max_B, min_B, delta_B, Pcore_true_power, PL_true_power)

        print("對Vref Deskew")
        set_vref_deskew(afg, mso, deskew_vref) #設定Vref deskew
    
        mso.clear()
        wait_for_population(mso) #先等待pop到一定次數再抓數值
    
        acquire_measurement(mso, i, core_effective_area, turn, max_B_deskew, min_B_deskew, delta_B_deskew, Pcore_true_power_deskew, PL_true_power_deskew, deskew=True)
    
        # 計算 k_factor, Pcore, Pcore_cv
        k_factor[i], Pcore[i], Pcore_cv[i] = calculate_final_power_loss(i, Pcore_true_power, PL_true_power, Pcore_true_power_deskew, PL_true_power_deskew, core_effective_volume)

        # 計算每次測量的結束時間及其花費時間
        measurement_end_time = every_measurement_time(i, start_time, measurement_start_time, measurement_times, accumulated_times)

    #=====測量完畢，關閉儀器====
    print("已經測量完畢")
    gain_set = calculate_gain_set(gain=0)  # 計算增益設置值
    rf.set_gain(gain, gain_set) #先將功率放大器增益調為零
    time.sleep(0.5)
    afg.output_off() #資料儲存完畢，關閉AFG
    rf.power_off() #資料儲存完畢，關閉RF
    
    #=====將所有資料丟到data最後建立成DataFrame=====
    data = {
        ("core parameter","Type"): [core_type] * len(sequences),
        ("core parameter","Le(mm)"): [core_effective_length] * len(sequences),
        ("core parameter","Ae(mm^2)"): [core_effective_area] * len(sequences),
        ("core parameter","Ve(mm^3)"): [core_effective_volume] * len(sequences),
        ("core parameter","Turns"): [turn] * len(sequences),
        ("Test environment (R=10)", "Sequence"): sequences,
        ("Test environment (R=10)", "Function"): [shape] * len(sequences),
        ("Test environment (R=10)", "freq(kHz)"): [frequency['magnitude']] * len(sequences),
        ("Test environment (R=10)", "deltaB(T)"): expect_deltaB,
        ("Test environment (R=10)", "Vin(Vpp)"): [vin_amplitude[seq] for seq in sequences], #每次調整都要重新記錄
        ("Test environment (R=10)", "Gain"): [power_gain[seq] for seq in sequences], #每次調整都要記錄
        #("flux density (DS=0)", "Max(T)"): [v for seq, v in max_B['MEAS9']],
        #("flux density (DS=0)", "Min(T)"): [v for seq, v in min_B['MEAS10']],
        ("flux density (DS=0)", "delta B(T)"): [delta_B[seq] for seq in sequences],
        ("DS=0", "Pcore(W)"): [v for seq, v in Pcore_true_power['1']],
        ("DS=0", "Pair(W)"): [v for seq, v in PL_true_power['2']],
        #("flux density (DS=1)", "Max(T)"): [v for seq, v in max_B_deskew['MEAS9']],
        #("flux density (DS=1)", "Min(T)"): [v for seq, v in min_B_deskew['MEAS10']],
        ("flux density (DS=1)", "delta B(T)"): [delta_B_deskew[seq] for seq in sequences],
        ("DS=1", "Pcore(W)"): [v for seq, v in Pcore_true_power_deskew['1']],
        ("DS=1", "Pair(W)"): [v for seq, v in PL_true_power_deskew['2']],
        ("Final calculate","K"):[k_factor[seq] for seq in sequences],
        ("Final calculate","Real Pcore(W)"):[Pcore[seq] for seq in sequences],
        ("Final calculate","Real Pcv(kW/m^3)"):[Pcore_cv[seq] for seq in sequences],
        ("Final calculate", "Spend time(sec)"): [format_time(duration) for duration in measurement_times],  # 每次測試花費的時間
        ("Final calculate", "Accumulated time(sec)"): [format_time(accumulated_time) for accumulated_time in accumulated_times],  # 累計時間
    }

    # 建立MultiIndex
    index = pd.MultiIndex.from_tuples(data.keys())

    #建立DataFrame
    df = pd.DataFrame(data) #用一代號表示整個二維資料
    df.columns = index

    #顯示DataFrame
    print(df) #列出來

    #儲存為Excel文件
    df.to_csv('(1MHz)ML95s_SIN_20-200mT(0903).csv', index=False) #將資料弄成一個CSV檔
    
    # ============================ Close Instrument =================================
    afg.close_instrument()
    mso.close_instrument()
    rf.close_instrument()
    
if __name__ == "__main__":
    main()

Connected to MSO64B.
TEKTRONIX,MSO64B,C048992,CF:91.1CT FV:1.44.3.433

Connected to AFG_31101.
TEKTRONIX,AFG31101,C016504,SCPI:99.0 FV:1.6.1

Connected to RF_150A100D.
AR-RF/MICROWAVE-INST,150A100D,1.0

add measurent result table
add measurent MEAS6 of CH1
add measurent MEAS7 of CH2
add measurent MEAS8 of CH3
the frequency of excitaton is 500000.0 Hz
set cursor range be -4e-06 to 4e-06
Add MATH1 to calculate Ch1-Meas6
Add MATH2 to calculate Ch2-Meas7
Add MATH3 to calculate Ch3-Meas8
Add MATH4 to calculate Math1/10
Add MATH5 to calculate Intg(Math2)
enables the autoscaling the math4 in the specified Waveform View
enables the autoscaling the math5 in the specified Waveform View
set label of MATH4 be A 
set label of MATH5 be T 
take measurements on the portion of the waveform between the cursor
add measurent maximum of math5
add measurent minimum of math5
add measurent maximum of math5
add measurent minimum of math5
add measurent maximum of math5
add measurent minimum of math5
turn NONE d