In [1]:
from phantasy import fetch_data
from epics import caput,caget
import re
import pandas as pd
import numpy as np
from typing import List,Union
import time

In [2]:
def ensure_set(setpoint_pv: Union[str, List[str]],
               readback_pv: Union[str, List[str]],
               setpoint_goal: Union[float, List[float]],
               readback_goal: Union[float, List[float]],
               tol: Union[float, List[float]] = 0.05,
               timeout: float = 30.0,):
    if not (isinstance(setpoint_pv,list) or isinstance(setpoint_pv,np.ndarray)):
        setpoint_pv = [setpoint_pv]
    if not (isinstance(readback_pv,list) or isinstance(readback_pv,np.ndarray)):
        readback_pv = [readback_pv]  
    if not (isinstance(setpoint_goal,list) or isinstance(setpoint_goal,np.ndarray)):
        setpoint_goal = [setpoint_goal]  
    if not (isinstance(readback_goal,list) or isinstance(readback_goal,np.ndarray)):
        readback_goal = [readback_goal]  
    for pv, val in zip(setpoint_pv,setpoint_goal):
        caput(pv,val)
    t0 = time.monotonic()
    while(True):
        time.sleep(0.5)
        rds = np.array([caget(pv) for pv in readback_pv])
        if np.all(np.abs(rds - np.array(readback_goal)) < tol):
            break
        if time.monotonic()-t0 > timeout:
            print("warn! Ramping timeout")
            input("press anykey to continue")
            break
    time.sleep(0.5)

In [3]:
class constructFC:
    def __init__(self,dnum_or_PV):
        info =[
            [717, "FE_SCS2", "IN_CMD_DRV","LMNEG_RSTS_DRV","LMPOS_RSTS_DRV"],
            [738, "FE_SCS1", "IN_CMD_DRV","LMNEG_RSTS_DRV","LMPOS_RSTS_DRV"],
            [814, "FE_LEBT", "IN_CMD_DRV","LMNEG_RSTS_DRV","LMPOS_RSTS_DRV"],
            [977, "FE_LEBT", "IN_CMD_DRV","LMNEG_RSTS_DRV","LMPOS_RSTS_DRV"],
            [998, "FE_LEBT", "IN_CMD"    ,"LMIN_RSTS"     ,"LMOUT_RSTS"    ],
            [1102,"FE_MEBT", "IN_CMD"    ,"LMIN_RSTS"     ,"LMOUT_RSTS"    ],
        ]
        info = pd.DataFrame(info,columns=['D','SEG','IN_CMD','OUT_CMD','IN_STATUS','OUT_STATUS']).set_index('D')

        if type(dnum_or_PV) is int:
            dnum = dnum_or_PV
        elif type(dnum_or_PV) is str:
            match=re.search(r"_D(\d+)",dnum_or_PV)
            dnum = int(match.group(1))
        if dnum not in info.index:
            raise ValueError("unrecognized FC D-number")
        info = info.loc[dnum]
        self.dnum = dnum
        self.name = info['SEG'] + ':FC_D' + f"{dnum:04d}"
        self.in_cmd     = self.name +":" + info['IN_CMD']
        self.out_cmd    = self.name +":" + info['OUT_CMD']
        self.in_status  = self.name +":" + info['IN_STATUS']
        self.out_status = self.name +":" + info['OUT_STATUS']
        self.prkd       = self.name +":PKAVG_RD"

In [4]:
class constructAP:
    def __init__(self,dnum_or_PV):
        info =[
            [796, "FE_LEBT", "IN_CMD", "LMIN_RSTS" ,"LMOUT_RSTS"],
            [807, "FE_LEBT", "IN_CMD", "LMIN_RSTS" ,"LMOUT_RSTS"],
        ]
        info = pd.DataFrame(info,columns=['D','SEG','IN_CMD','IN_STATUS','OUT_STATUS']).set_index('D')

        if type(dnum_or_PV) is int:
            dnum = dnum_or_PV
        elif type(dnum_or_PV) is str:
            match=re.search(r"_D(\d+)",dnum_or_PV)
            dnum = int(match.group(1))
        if dnum not in info.index:
            raise ValueError("unrecognized FC D-number")
        info = info.loc[dnum]
        self.dnum = dnum
        self.name = info['SEG'] + ':AP_D' + f"{dnum:04d}"
        self.in_cmd     = self.name +":" + info['IN_CMD']
        self.in_status  = self.name +":" + info['IN_STATUS']
        self.out_status = self.name +":" + info['OUT_STATUS']

In [5]:
def scan_apertures(FCs_to_read,isource,data_ave_time=5):
    if isource ==1:
        allFCs = [constructFC(fc) for fc in [738,814,977,998,1102]]
    elif isource == 2:
        allFCs = [constructFC(fc) for fc in [717,814,977,998,1102]]
    else:
        raise ValueError("isource must be 1 or 2")
        
    AP1 = constructAP(796)
    AP2 = constructAP(807) 
    readFCs = [constructFC(fc) for fc in FCs_to_read]
    
    data = {}

    for rFC in readFCs:
        data[rFC.name] = [0]*4
        
        setPV = [rFC.in_cmd]
        setGoal = [1]
        rdPV = [rFC.in_status]
        rdGoal = [1]
        
        for aFC in allFCs:
            if aFC.dnum < rFC.dnum:
                setPV.append(aFC.out_cmd)
                setGoal.append(1)
                rdPV.append(aFC.out_status)
                rdGoal.append(1)
        
        # both AP out
        setPV += [AP1.in_cmd,AP2.in_cmd]
        setGoal += [0, 0]
        rdPV  += [AP1.out_status,AP2.out_status]
        rdGoal += [1, 1]
        ensure_set(setPV,rdPV,setGoal,rdGoal,tol=0.1)
        data[rFC.name][0] = fetch_data(rFC.pkrd,data_ave_time)[0][0]
        
        # AP1 in
        setPV = [AP1.in_cmd]
        setGoal = [1]
        rdPV  = [AP1.in_status]
        rdGoal = [1]
        ensure_set(setPV,rdPV,setGoal,rdGoal,tol=0.1)
        data[rFC.name][1] = fetch_data(rFC.pkrd,data_ave_time)[0][0]
        
        # both AP in
        setPV = [AP2.in_cmd]
        setGoal = [1]
        rdPV  = [AP2.in_status]
        rdGoal = [1]
        ensure_set(setPV,rdPV,setGoal,rdGoal,tol=0.1)
        data[rFC.name][3] = fetch_data(rFC.pkrd,data_ave_time)[0][0]
        
        # AP2 in (AP1 out)
        setPV = [AP1.in_cmd]
        setGoal = [0]
        rdPV  = [AP1.out_status]
        rdGoal = [1]
        ensure_set(setPV,rdPV,setGoal,rdGoal,tol=0.1)
        data[rFC.name][2] = fetch_data(rFC.pkrd,data_ave_time)[0][0]
        
    return data

In [6]:
scan_apertures(FCs_to_read=[814,977],isource=1)

cannot connect to FE_SCS1:FC_D0738:OUT_CMD_DRV
cannot connect to FE_SCS1:FC_D0738:LMPOS_RSTS_DRV


TypeError: unsupported operand type(s) for -: 'NoneType' and 'int'