In [None]:
import os
import time
import numpy as np

# set the DB paths
for env_var in ["PYLAB_DB_LOCAL", "PYLAB_DB_OUT"]:
    if env_var not in os.environ:
        for key in os.environ:
            if key.startswith(env_var):
                os.environ[env_var] = os.environ[key]
                print(f"set with {key}")
                break
        else:
            print(f"{env_var} not found in environment variables")

import pylab_dk
import pylab_dk.pltconfig.color_preset as colors
from pylab_dk.constants import cm_to_inch, next_lst_gen, constant_generator
from pylab_dk.file_organizer import FileOrganizer
from pylab_dk.data_process import DataProcess
from pylab_dk.data_plot import DataPlot
from pylab_dk.measure_manager import MeasureManager

In [None]:
project_name = "Date-Material" # random name used for debug
# if paths not correctly set in the environment variables, you can reset them here
#FileOrganizer.reload_paths(local_db_path=, out_db_path=)
folder = FileOrganizer(project_name)

In [None]:
measurement = MeasureManager(project_name)
#measurement.get_visa_resources()   # list all VISA resources
measurement.load_meter("6221","GPIB0::12::INSTR")
measurement.load_meter("2182","GPIB0::7::INSTR")
#measurement.load_meter("2400","GPIB0::23::INSTR")
#measurement.load_meter("6430","GPIB0::24::INSTR")
#measurement.load_meter("sr830","GPIB0::8::INSTR","GPIB0::9::INSTR")
#measurement.load_mercury_ips("TCPIP0::10.97.24.237::7020::SOCKET")
#measurement.load_mercury_ipc("TCPIP0::10.101.28.24::7020::SOCKET")
#measurement.load_ITC503("GPIB0::23::INSTR","GPIB0::24::INSTR")
#measurement.load_rotator()

## Wrapper Check (repeat for every meter)

In [None]:
meter_name = "6221"

In [None]:
# check the get_status function
measurement.instrs[meter_name][0].setup()
measurement.instrs[meter_name][0].uni_output("1uA", type_str="curr")
measurement.instrs[meter_name][0].get_output_status()

In [None]:
# check the switch-off function (should return the output value to 0)
measurement.instrs[meter_name][0].setup()
measurement.instrs[meter_name][0].uni_output("1uA", type_str="curr")
print("before switch off: ", measurement.instrs[meter_name][0].get_output_status()[0])
measurement.instrs[meter_name][0].output_switch("off")
print("after switch off: ", measurement.instrs[meter_name][0].get_output_status()[0])
measurement.instrs[meter_name][0].output_switch("on")
print("after switch off: ", measurement.instrs[meter_name][0].get_output_status()[0])

In [None]:
# check the ramp function (from current and from 0)
measurement.instrs[meter_name][0].setup()
measurement.instrs[meter_name][0].ramp_output("curr", "10uA", interval="1uA",sleep=0.5, from_curr=True)

In [None]:
# check source sweep apply method
measurement.source_sweep_apply("curr", "dc", meter_name, max_value="10uA", step_value="1uA", compliance="1V", sweepmode="0-max-0")

In [None]:
# check the step threshold for every source meter (not implemented)
measurement.instrs[meter_name][0].setup()
measurement.instrs[meter_name][0].uni_output(1, type_str="volt")
measurement.instrs[meter_name][0].uni_output(3, type_str="volt")

## Normal Single Sweep/Vary

In [None]:
measurement.instrs["6221"][0].setup()
measurement.instrs["2182"][0].setup()

In [None]:
step_time = 1

mea_dict = measurement.get_measure_dict(("I_source_sweep_dc", "V_sense"),
                                        1E-3, 5E-5, 1, 1, "0-max-0", "", 0, 0,
                                        wrapper_lst=[measurement.instrs["6221"][0],
                                                     measurement.instrs["2182"][0]],
                                        compliance_lst=["10mA"])

print(f"filepath: {mea_dict['filepath']}")
print(f"no of columns(with time column): {mea_dict['record_num']}")
print(f"vary modules: {mea_dict["vary_mod"]}")

In [None]:
# modify the plot configuration
# note i[0] is timer
measurement.live_plot_init(1,2,1)
measurement.start_saving(mea_dict["plot_record_path"],30)
for i in mea_dict["gen_lst"]:
    measurement.record_update(mea_dict["file_path"], mea_dict["record_num"], i)
    measurement.live_plot_update([0,0],
                                 [0,1],
                                 [0,0],
                                 [i[1],i[0]],
                                 [i[2],i[1]], 
                                 incremental=True)
    time.sleep(step_time)
measurement.stop_saving()

In [None]:
# plot all vars wrt time
measurement.plot_df_cols(mea_mods, *vars_tup)

## Rotate Probe

In [None]:
rot_dev = measurement.instrs["rotator"]
rot_dev.print_info()

In [None]:
rot_dev.set_spd(3)
rot_dev.ramp_angle(360, progress=True, wait=True)

# Vary 
(recommended that only 1 vary is applied)

In [None]:
# integrate vary functions and current value getters
vary_lst = []
curr_val_lst = []
for i in mea_dict["vary_mod"]:
    match i:
        case "T":
            vary_lst.append(mea_dict["tmp_vary"])
            curr_val_lst.append(lambda: measurement.instrs["itc"].temperature)
        case "B":
            vary_lst.append(mea_dict["mag_vary"])
            curr_val_lst.append(lambda: measurement.instrs["ips"].field)
        case "Theta":
            vary_lst.append(mea_dict["angle_vary"])
            curr_val_lst.append(measurement.instrs["rotator"].curr_angle)

assert len(curr_val_lst) == 1, "vary_lst and curr_val_lst have lengths as 1"
assert len(vary_lst) == 1, "only one varying parameter is allowed"

In [None]:
# modify the plot configuration
##TODO::##
measurement.live_plot_init(1,1,1)
##::TODO##
measurement.start_saving(mea_dict["plot_record_path"],3)
begin_vary = False
counter = 0
for i in mea_dict["gen_lst"]:
    measurement.record_update(mea_dict["file_path"], mea_dict["record_num"], i)
    ##TODO:: add vary observing plotting##
    measurement.live_plot_update(0,0,0,i[1],i[2], incremental=True)
    ##::TODO##
    time.sleep(step_time)
    if not begin_vary:
        if counter >= wait_before_vary:
            for funci in vary_lst:
                funci()
            begin_vary = True
        else:
            counter += step_time
measurement.stop_saving()

## Constrained dual-Sweep

In [None]:
measurement.instrs["6221"][0].setup()
measurement.instrs["2182"][0].setup()

In [None]:
# use manual sweep list to achieve constrained multi-sweep
# define a main sweep which is the independent variables
# use list instead of ndarray or generator
max_val = 1 # 1V
npts = 51 # 50 points single direction
swp_lst_main = [i * max_val/(npts - 1) for i in range(0, 51, 1)]
# define a sub sweep which is the dependent variables
def dependent_func(x):
    return 2-x
swp_lst_sub = [dependent_func(i) for i in swp_lst_main]

mea_dict = measurement.get_measure_dict(("V_source_sweep_dc","V_source_sweep_dc", "I_sense"),
                                        "-h",
                                        wrapper_lst=[measurement.instrs["2400"][0],
                                                     measurement.instrs["6430"][0],
                                                     measurement.instrs["2400"][0]],
                                        compliance_lst=["10mA","10mA"],
                                        sweep_tables=[swp_lst_main, swp_lst_sub])


In [None]:
assert len(mea_dict["swp_idx"]) >= 1, "no sweep found"
swp_idxs = mea_dict["swp_idx"]
measurement.live_plot_init(1, 1, 1)
for i in mea_dict["gen_lst"]:
    measurement.record_update(mea_dict["file_path"], mea_dict["record_num"], i)
    measurement.live_plot_update(0, 0, 0, i[swp_idxs[0]], i[2])

## Mapping

In [None]:
##TODO::##
# Parameters
step_time = 1 # s, wait time between each measurement step
mapping = True
wait_before_vary = 30 # s, wait time before starting the varying process

# setup related meters IF SPECIAL PARAMETERS ARE NEEDED
measurement.instrs["6221"][0].setup(low_grounded=False)

# setup mapping IF NEEDED
if mapping:
    def map_func(x):
        """
        the correspondance between mapping variables, default is equal
        """
        return x

    # more mapping variables can be included
    # note the order ((1,2) x (3,4) -> 1,3; 1,4; 2,3; 2,4)
    map1_lst = np.arange(1,100,1)*1E-4 # achieve flexible mapping instead of evenly spaced

    map2_lst = map_func(map1_lst)  # or directly assign the mapping variables
    map_lsts = measurement.create_mapping(map1_lst, map2_lst, idxs=(0,1))
else:
    map_lsts = None

# Core configuration
# Generate the measurement generator
mea_dict = measurement.get_measure_dict(
    mea_mods := ("V_source_sweep_dc","I_sense"),
    *(vars_tup := (1E-3, 5E-5, 1, 1, "0-max-0", "",1,1)), # refer to the last cell for the meaning of each element
    wrapper_lst=[measurement.instrs["6221"][0],
             measurement.instrs["2182"][0]],
    compliance_lst=["10V"],
    sr830_current_resistor = None, # only float
    if_combine_gen = True, # False for coexistence of vary and mapping
    special_name = None,
    sweep_tables = map_lsts
    )
##::TODO##

print(f"filepath: {mea_dict['filepath']}")
print(f"no of columns(with time column): {mea_dict['record_num']}")
print(f"vary modules: {mea_dict["vary_mod"]}")

In [None]:
# modify the plot configuration
measurement.live_plot_init(3,1,1,plot_types=[["scatter"],["scatter"],["contour"]])
measurement.start_saving(mea_dict["plot_record_path"],30)
for i in mea_dict["gen_lst"]:
    measurement.record_update(mea_dict["file_path"], mea_dict["record_num"], i)
    measurement.live_plot_update([0,1,2],[0]*3,[0]*3,
                                 [i[1],i[1],i[0]],
                                 [i[2],i[3],i[1]],
                                 [i[2]], incremental=True)
    time.sleep(step_time)
measurement.stop_saving()

# Mapping with vary

In [None]:
##TODO::##
# Parameters
step_time = 1 # s, wait time between each measurement step
mapping = True
wait_before_vary = 17 # s, wait time before starting the varying process

# setup related meters IF SPECIAL PARAMETERS ARE NEEDED
#measurement.instrs["6221"][0].setup(low_grounded=False)

# setup mapping IF NEEDED
if mapping:
    def map_func(x):
        """
        the correspondance between mapping variables, default is equal
        """
        return x

    # more mapping variables can be included
    # note the order ((1,2) x (3,4) -> 1,3; 1,4; 2,3; 2,4)
    map1_lst = np.concatenate([np.arange(-10, -5, 0.2),np.arange(-5, 5, 0.1),np.arange(5, 10.001, 0.2)]) # achieve flexible mapping instead of evenly spaced

    map2_lst = map_func(map1_lst)  # or directly assign the mapping variables
    map_lsts = measurement.create_mapping(map1_lst, map2_lst, idxs=(0,1))
else:
    map_lsts = None

# Core configuration
# Generate the measurement generator
mea_dict = measurement.get_measure_dict(
    mea_mods := ("V_source_sweep_dc","I_sense","B_vary"),
    *(vars_tup := (1E-3, 5E-5, 1, 1, "0-max-0", "",1,1, vary_start := -1, vary_stop:= 1)), # refer to the last cell for the meaning of each element
    wrapper_lst=[measurement.instrs["6221"][0],
             measurement.instrs["2182"][0]],
    compliance_lst=["10V"],
    sr830_current_resistor = None, # only float
    if_combine_gen = False, # False for coexistence of vary and mapping
    special_name = None,
    sweep_tables = map_lsts
    )
##::TODO##

print(f"filepath: {mea_dict['filepath']}")
print(f"no of columns(with time column): {mea_dict['record_num']}")
print(f"vary modules: {mea_dict["vary_mod"]}")

# integrate vary functions and current value getters
vary_lst = []
curr_val_lst = []
for i in mea_dict["vary_mod"]:
    match i:
        case "T":
            vary_lst.append(mea_dict["tmp_vary"])
            curr_val_lst.append(lambda: measurement.instrs["itc"].temperature)
        case "B":
            vary_lst.append(mea_dict["mag_vary"])
            curr_val_lst.append(lambda: measurement.instrs["ips"].field)
        case "Theta":
            vary_lst.append(mea_dict["angle_vary"])
            curr_val_lst.append(measurement.instrs["rotator"].curr_angle)

assert len(curr_val_lst) == 1, "vary_lst and curr_val_lst have lengths as 1"
assert len(vary_lst) == 1, "only one varying parameter is allowed"

In [None]:
# modify the plot configuration
##TODO::##
measurement.live_plot_init(1,1,1)
##::TODO##
measurement.start_saving(mea_dict["plot_record_path"],30)
counter = 0
# each row in list is a sweep step, so should be interruped each step
tmp_lst_swp = [[]] * len(mea_dict["swp_idx"])
while True:
    measured_lst = next_lst_gen(mea_dict["gen_lst"])
    if measured_lst is None:
        break
    measurement.record_update(mea_dict["file_path"], mea_dict["record_num"], measured_lst)
    # substitute the swp to constant to do varies
    for n, i in enumerate(mea_dict["swp_idx"]):
        tmp_lst_swp[n] = mea_dict["gen_lst"][i]
        mea_dict["gen_lst"][i] = constant_generator(measured_lst[i])
    # begin a circular vary under each sweep step
    time.sleep(wait_before_vary)
    if abs(curr_val_lst[0] - vary_start) < 0.03:
        vary_lst[0]()
    while abs(curr_val_lst[0] - vary_stop) > 0.03:
        measured_lst = next_lst_gen(mea_dict["gen_lst"])
        ##TODO:: add vary observing plotting##
        measurement.live_plot_update(0,0,0,measured_lst[1],measured_lst[2], incremental=True)
        ##::TODO##
        time.sleep(step_time)

    if abs(curr_val_lst[0] - vary_stop) < 0.03:
        vary_lst[0](reverse=True)
    while abs(curr_val_lst[0] - vary_start) > 0.03:
        measured_lst = next_lst_gen(mea_dict["gen_lst"])
        ##TODO:: add vary observing plotting##
        measurement.live_plot_update(0,0,0,measured_lst[1],measured_lst[2], incremental=True)
        ##::TODO##
        time.sleep(step_time)

    # substitute the swp to constant to do varies
    for n, i in enumerate(mea_dict["swp_idx"]):
        mea_dict["gen_lst"][i] = tmp_lst_swp[n]
        tmp_lst_swp[n] = []
    
measurement.stop_saving()