In [1]:
import time
import os
from datetime import datetime


timeout = 3

In [2]:
os.environ['EPICS_CA_ADDR_LIST'] += ' 10.0.38.35'
os.environ['EPICS_CA_ADDR_LIST']

' 10.128.1.12:5064 10.128.1.12:5070 10.128.1.13:5064 10.128.1.13:5068 10.128.1.13:5070 10.128.1.54 10.128.1.55  10.0.38.29  10.0.38.37 10.0.38.39 10.0.38.52 10.0.38.35 10.0.38.143  10.0.38.31 10.0.38.150  10.31.74.16 10.0.38.83 10.0.38.72 10.0.38.149 10.0.38.36  10.30.14.19 10.30.13.22 10.39.50.46 10.31.54.38 10.32.74.44  10.10.10.124  10.0.28.64   10.0.38.35'

In [3]:

import numpy as np
import matplotlib.pyplot as plt

from siriuspy.search import IDSearch
from siriuspy.devices import VPU, SOFB, HLFOFB, IDFF, Tune, DVFImgProc, BunchbyBunch
from siriuspy.devices.bbb import SingleBunch

from mathphys.functions import save, load
import epics

# Functions definition

In [4]:
def initialize_vpu(beamline):
    # Search ID
    devnamevpu = IDSearch.conv_beamline_2_idname(beamline=beamline)
    vpu = VPU(devname=devnamevpu)

    # Disable beamline control
    vpu.cmd_beamline_ctrl_disable()
    print('beamline control: ', vpu.is_beamline_ctrl_enabled)

    # Set gap speed
    # vpu.set_gap_speed(0.1)
    # time.sleep(0.5)
    # print('gap speed: {:.3f} mm/s'.format(vpu.gap_speed))

    # # Set gap to parked condition
    # vpu.set_gap(vpu.kparameter_parked)
    # time.sleep(0.5)
    # print('gap: {:.3f} mm'.format(vpu.gap))

    return vpu

def move_vpu_gap(vpu:VPU, gap, timeout, verbose=False):
    vpu.set_gap(gap)
    time.sleep(0.5)
    print('Gap-RB {:.3f} mm'.format(vpu.gap)) if verbose else 0
    if vpu.cmd_move_start(timeout):
        time.sleep(0.5)
        print('Undulator is moving...') if verbose else 0
        while vpu.is_moving:
            time.sleep(0.1)
            print('Current gap {:.3f} mm.'.format(vpu.gap_mon), end='\r') if verbose else 0
        print('Gap {:.3f} mm reached.'.format(vpu.gap)) if verbose else 0
        return True
    else:
        print('Error while cmd_move_start.')
        return False

def move_vpu_gap_robust(vpu:VPU, gap, timeout, maxiter=3, verbose=False):
    sucess = move_vpu_gap(vpu, gap=gap, timeout=timeout, verbose=verbose)
    i=0
    while not sucess and i<maxiter:
        i += 1
        print('Trying {:0f}/{:.0f}'.format(i, maxiter)) if verbose else 0
        vpu.cmd_reset(timeout=timeout)
        time.sleep(0.5)
        sucess = move_vpu_gap(vpu, gap=gap, timeout=timeout, verbose=verbose)
    if sucess:
        print('Movimentation done!\n')
        return True
    else:
        print('Error while moving.\n')
        return False

def get_CAX_and_tune_data(bbb_h, bbb_v, tune, imgproc, nr_acq=10, acq_sleep=0.5):
        """Get data.

        Returns:
            data: Numpy 1d array
        """
        sigmax = np.zeros(nr_acq)
        sigmay = np.zeros(nr_acq)
        sigma1 = np.zeros(nr_acq)
        sigma2 = np.zeros(nr_acq)
        angle = np.zeros(nr_acq)
        # images = np.zeros((nr_acq, imgproc.image_sizey, imgproc.image_sizex))
        for i in range(nr_acq):
            sigmax[i] = imgproc.roix_fit_sigma
            sigmay[i] = imgproc.roiy_fit_sigma
            sigma1[i] = imgproc.fit_sigma1
            sigma2[i] = imgproc.fit_sigma2
            angle[i] = imgproc.fit_angle
            # images[i] = imgproc.image
            time.sleep(acq_sleep)

        img_params = np.zeros((5, nr_acq))
        img_params[0, :] = sigmax
        img_params[1, :] = sigmay
        img_params[2, :] = sigma1
        img_params[3, :] = sigma2
        img_params[4, :] = angle
        
        tunex, tuney, tunex_bbb, tuney_bbb = get_tunes(bbb_h, bbb_v, tune)
        tunes = [tunex, tuney, tunex_bbb, tuney_bbb]
        
        return img_params, tunes

def get_tune_data(tune, nr_acq=10, acq_sleep=0.5):
    tunex = np.zeros(nr_acq)
    tuney = np.zeros(nr_acq)
    for i in range(nr_acq):
        tunex[i] = tune.tunex
        tuney[i] = tune.tuney
        time.sleep(acq_sleep)
    return tunex, tuney

def acquire_tunes(tune, nr_acq, delay=0.2):
    tunex = np.zeros(nr_acq)
    tuney = np.zeros(nr_acq)
    for i in np.arange(nr_acq):
        tunex[i] = tune.tunex
        tuney[i] = tune.tuney
        time.sleep(delay)
    return tunex, tuney

def acquire_tunes_bbb(bbb_h, bbb_v, nr_acq, delay=0.2):
    tunex = np.zeros(nr_acq)
    tuney = np.zeros(nr_acq)
    for i in np.arange(nr_acq):
        tunex[i] = bbb_h.spec_marker1_tune
        tuney[i] = bbb_v.spec_marker1_tune
        time.sleep(delay)
    return tunex, tuney

def acquire_tunes_mixed(bbb_h, bbb_v, tune, nr_acq, delay=0.2):
    tunex_bbb = np.zeros(nr_acq)
    tuney_bbb = np.zeros(nr_acq)
    tunex = np.zeros(nr_acq)
    tuney = np.zeros(nr_acq)
    for i in np.arange(nr_acq):
        tunex_bbb[i] = bbb_h.spec_marker1_tune
        tuney_bbb[i] = bbb_v.spec_marker1_tune
        tunex[i] = tune.tunex
        tuney[i] = tune.tuney
        time.sleep(delay)
    return tunex, tuney, tunex_bbb, tuney_bbb

def get_tunes(bbb_h, bbb_v, tune):
    tunex, tuney, tunex_bbb, tuney_bbb = acquire_tunes_mixed(bbb_h, bbb_v, tune, 20, 0.2)
    tunex_bbb = np.median(tunex_bbb)
    tuney_bbb = np.median(tuney_bbb)
    tuney = np.median(tuney)
    tunex = np.median(tunex)
    return tunex, tuney, tunex_bbb, tuney_bbb


## Search devnames

In [5]:
devnamevpu = IDSearch.conv_beamline_2_idname(beamline='CARNAUBA')

## Initialize devices

In [6]:
vpu = initialize_vpu(beamline='CARNAUBA')
vpu.set_gap_speed(1)
time.sleep(0.5)
print(vpu.gap_speed)

beamline control:  False
1.0


In [7]:
vpu.gap_mon

20.000183000000007

In [10]:
vpu.set_gap(80)

True

In [11]:
vpu.cmd_move_start()

True

In [8]:
tune = Tune(Tune.DEVICES.SI)
bbb_v = SingleBunch(BunchbyBunch.DEVICES.V)
bbb_h = SingleBunch(BunchbyBunch.DEVICES.H)
imgproc = DVFImgProc(DVFImgProc.DEVICES.CAX_DVF2)

imgproc.wait_for_connection(timeout=timeout)
print('IMG Proc connected: ', imgproc.connected)

tune.wait_for_connection(timeout=timeout)
print('Tune connected: ', tune.connected)


IMG Proc connected:  True
Tune connected:  True


In [9]:
sofb = SOFB(SOFB.DEVICES.SI)
fofb = HLFOFB(HLFOFB.DEVICES.SI)

### Make sure SOFB and FOFB loops are opened and clear correctors

In [10]:
# Disable injector!!!

fofb.cmd_turn_off_loop_state()
fofb.cmd_corr_accclear()

sofb.cmd_turn_off_autocorr()
sofb.cmd_change_opmode_to_sloworb(timeout=timeout)

True

# Meas orbit distortions

In [13]:
# gaps0 = np.array([80, 75, 70, 65, 60, 55, 50, 45, 40, 35])
# gaps1 = np.linspace(30, 10, 21)
# gaps2 = np.array([9.7])
gaps0 = np.array([80,  70,  60,  50,  40, 30])
gaps1 = np.linspace(20, 10, 11)
gaps2 = np.array([9.7])
gaps = np.concatenate((gaps0, gaps1, gaps2))
gaps
print(gaps)

[80.  70.  60.  50.  40.  30.  20.  19.  18.  17.  16.  15.  14.  13.
 12.  11.  10.   9.7]


In [14]:
sofb.correct_orbit_manually(nr_iters=5, residue=0.5)

(4, 1.2559150810924156, 0.31175836900187626)

In [15]:
data = dict()
data['gap'] = list()
data['timestamps'] = list()
data['dorbx'] = list()
data['dorby'] = list()
data['gap_speed'] = list()
data['tunes'] = list()
data['img_params'] = list()
t0 = time.time()

sofb.correct_orbit_manually(nr_iters=5, residue=0.5)
sofb.cmd_reset()
sofb.wait_buffer()
orbx0 = sofb.orbx
orby0 = sofb.orby
for i, gap in enumerate(gaps):
    print('gap {:.2f} mm'.format(gap), end='\r')

    # Move gap
    sucess = move_vpu_gap_robust(vpu, gap=gap, timeout=timeout, verbose=True)
    # Verify if undulator moved
    if not sucess:
        break

    sofb.cmd_reset()
    sofb.wait_buffer()
    # Wait orbit distortion to become stationary
    
    # Record orbit and other info
    data['gap'].append(gap)
    data['timestamps'].append(time.time())
    data['dorbx'].append(sofb.orbx - orbx0)
    data['dorby'].append(sofb.orby - orby0)
    data['gap_speed'].append(vpu.gap_speed)
    data['beamline'] = IDSearch.conv_idname_2_beamline(vpu.devname)

    # Record CAX img params and tune
    # img_params, tunes = get_CAX_and_tune_data(bbb_h, bbb_v, tune, imgproc, nr_acq=5, acq_sleep=0.5)
    # data['img_params'].append(img_params)
    # data['tunes'].append(tunes)

print(f'Done! ETA: {time.time()-t0:.3f}s')
save(data, 'orbit_distortions_VPU_{}_iter1.pickle'.format(IDSearch.conv_idname_2_beamline(vpu.devname)))

Gap-RB 80.000 mm
Undulator is moving...
Gap 80.000 mm reached.
Movimentation done!

Gap-RB 70.000 mm
Undulator is moving...
Gap 70.000 mm reached.
Movimentation done!

Gap-RB 60.000 mm
Undulator is moving...
Gap 60.000 mm reached.
Movimentation done!

Gap-RB 50.000 mm
Undulator is moving...
Gap 50.000 mm reached.
Movimentation done!

Gap-RB 40.000 mm
Undulator is moving...
Gap 40.000 mm reached.
Movimentation done!

Gap-RB 30.000 mm
Undulator is moving...
Gap 30.000 mm reached.
Movimentation done!

Gap-RB 20.000 mm
Undulator is moving...
Gap 20.000 mm reached.
Movimentation done!

Gap-RB 19.000 mm
Undulator is moving...
Gap 19.000 mm reached.
Movimentation done!

Gap-RB 18.000 mm
Undulator is moving...
Gap 18.000 mm reached.
Movimentation done!

Gap-RB 17.000 mm
Undulator is moving...
Gap 17.000 mm reached.
Movimentation done!

Gap-RB 16.000 mm
Undulator is moving...
Gap 16.000 mm reached.
Movimentation done!

Gap-RB 15.000 mm
Undulator is moving...
Gap 15.000 mm reached.
Movimentation

# Meas local correctors matrix response

In [16]:
# Open ivu gap
move_vpu_gap_robust(vpu, gap=80, timeout=timeout, verbose=True)

Gap-RB 80.000 mm
Undulator is moving...
Gap 80.000 mm reached.
Movimentation done!



True

In [None]:
vpu = initialize_vpu(beamline='CARNAUBA')
vpu.set_gap_speed(0.3)
time.sleep(0.5)
print(vpu.gap_speed)

In [19]:
idff = IDFF('SI-06SB:BS-IDFF-CC_HARD')

# Configure SOFB and FOFB
# Disable injector!!!

fofb.cmd_turn_off_loop_state()
fofb.cmd_corr_accclear()

sofb.cmd_turn_off_autocorr()
sofb.cmd_change_opmode_to_sloworb(timeout=timeout)

True

In [17]:
# gaps0 = np.array([80, 75, 70, 65, 60, 55, 50, 45, 40, 35])
gaps0 = np.array([80,  70,  60,  50,  40, 30])
gaps1 = np.linspace(20, 10, 11)
gaps2 = np.array([9.7])
gaps = np.concatenate((gaps0, gaps1, gaps2))
# idx = np.argmin(np.abs(gaps - 12))
# gaps = gaps[idx:]

gaps

array([80. , 70. , 60. , 50. , 40. , 30. , 20. , 19. , 18. , 17. , 16. ,
       15. , 14. , 13. , 12. , 11. , 10. ,  9.7])

set CC currrents to zero

In [18]:
idff = IDFF('SI-06SB:BS-IDFF-CC_HARD')

dcurr = 5 # delta current in [A]
corrs = list()
corrs.extend(idff.ccdevs)

data = dict()
data['delta_current'] = 2*dcurr
for i, corr in enumerate(corrs):
    corr.cmd_turn_on(timeout=timeout)
    corr.set_current(0, wait_mon=True)
    print('Current {:.2f} A, corrector: '.format(corr.current_mon), corr.devname)

Current -0.01 A, corrector:  SI-06SB:PS-CC1-1
Current -0.01 A, corrector:  SI-06SB:PS-CC2-1
Current -0.00 A, corrector:  SI-06SB:PS-CC2-2
Current -0.00 A, corrector:  SI-06SB:PS-CC1-2


Test all CC currents to max values

In [23]:
data['delta_current'] = 2*dcurr
for i, corr in enumerate(corrs):
    corr.cmd_turn_on(timeout=timeout)
    corr.set_current(dcurr, wait_mon=True)
    print('Current {:.2f} A, corrector: '.format(corr.current_mon), corr.devname)

Current 5.00 A, corrector:  SI-06SB:PS-CC1-1
Current 5.00 A, corrector:  SI-06SB:PS-CC2-1
Current 5.00 A, corrector:  SI-06SB:PS-CC2-2
Current 5.00 A, corrector:  SI-06SB:PS-CC1-2


In [19]:
# print power supply device
idff = IDFF('SI-06SB:BS-IDFF-CC_HARD')
idff.ccdevs

[<siriuspy.devices.pwrsupply.PowerSupplyFBP at 0x7fc199a34940>,
 <siriuspy.devices.pwrsupply.PowerSupplyFBP at 0x7fc199a32400>,
 <siriuspy.devices.pwrsupply.PowerSupplyFBP at 0x7fc1999f1ba8>,
 <siriuspy.devices.pwrsupply.PowerSupplyFBP at 0x7fc199a19da0>]

In [20]:
# Disable injector

sofb.correct_orbit_manually(nr_iters=5, residue=0.5)
sofb.cmd_reset()
sofb.wait_buffer()

orbx0 = sofb.orbx
orby0 = sofb.orby

In [21]:
t0 = time.time()
intlk_flag = False
for i, gap in enumerate(gaps):
    if intlk_flag:
        break
    print('gap {:.2f} mm'.format(gap), end='\r')
    
    # Create dict for this gap
    data['gap {:.2f}'.format(gap)] = dict()
    
    # Move gap
    sucess = move_vpu_gap_robust(vpu, gap=gap, timeout=timeout, verbose=True)
    if not sucess:
        break
    sofb.correct_orbit_manually(nr_iters=5, residue=1)

    for i, corr in enumerate(corrs):
        intlk_pv = corr.devname + ':IntlkHard-Mon'
        reset_pv = corr.devname + ':Reset-Cmd'

        nr_temps = 5
        j = 0
        while epics.caget(intlk_pv) and j < nr_temps:
            print('{} is interlocked!'.format(corr.devname), end='\r')
            epics.caput(reset_pv, 1)
            time.sleep(1)
            j += 1
        if epics.caget(intlk_pv):
            print('It was not possible to reset {}'.format(corr.devname))
            intlk_flag = True
            break


        data['gap {:.2f}'.format(gap)][corr.devname] = dict()
        sofb.correct_orbit_manually(nr_iters=5, residue=1)
        sofb.cmd_reset()
        sofb.wait_buffer()

        orby0 = sofb.orby
        orbx0 = sofb.orbx

        print('Measuring corrector: ', corr.devname)

        corr.set_current(dcurr, wait_mon=True)
        sofb.cmd_reset()
        sofb.wait_buffer()
        
        # wait current
        # reset sofb and wait
        data['gap {:.2f}'.format(gap)][corr.devname]['pos'] = dict()
        data['gap {:.2f}'.format(gap)][corr.devname]['pos']['timestamps'] = time.time()
        data['gap {:.2f}'.format(gap)][corr.devname]['pos']['dorbx'] = sofb.orbx - orbx0
        data['gap {:.2f}'.format(gap)][corr.devname]['pos']['dorby'] = sofb.orby - orby0

 

        corr.set_current(0, wait_mon=True)
        sofb.cmd_reset()
        sofb.wait_buffer()
        data['gap {:.2f}'.format(gap)][corr.devname]['hist'] = dict()
        data['gap {:.2f}'.format(gap)][corr.devname]['hist']['dorbx'] = sofb.orbx - orbx0
        data['gap {:.2f}'.format(gap)][corr.devname]['hist']['dorby'] = sofb.orby - orby0
        data['gap {:.2f}'.format(gap)][corr.devname]['hist']['orbx0'] = orbx0
        data['gap {:.2f}'.format(gap)][corr.devname]['hist']['orby0'] = orby0
        
      

        corr.set_current(-dcurr, wait_mon=True)
        sofb.cmd_reset()
        sofb.wait_buffer()
        data['gap {:.2f}'.format(gap)][corr.devname]['neg'] = dict()
        data['gap {:.2f}'.format(gap)][corr.devname]['neg']['timestamps'] = time.time()
        data['gap {:.2f}'.format(gap)][corr.devname]['neg']['dorbx'] = sofb.orbx - orbx0
        data['gap {:.2f}'.format(gap)][corr.devname]['neg']['dorby'] = sofb.orby - orby0


        corr.set_current(0)
        sofb.cmd_reset()
        sofb.wait_buffer()
        
    save(data, 'orbit_distortions_corrs_VPU_{}_iter1_5A.pickle'.format(IDSearch.conv_idname_2_beamline(vpu.devname)), overwrite=True)

print(f'Done! ETA: {time.time()-t0:.3f}s')

Gap-RB 80.000 mm
Undulator is moving...
Gap 80.000 mm reached.
Movimentation done!

Measuring corrector:  SI-06SB:PS-CC1-1
Measuring corrector:  SI-06SB:PS-CC2-1
Measuring corrector:  SI-06SB:PS-CC2-2
Measuring corrector:  SI-06SB:PS-CC1-2
Gap-RB 70.000 mm
Undulator is moving...
Gap 70.000 mm reached.
Movimentation done!

Measuring corrector:  SI-06SB:PS-CC1-1
Measuring corrector:  SI-06SB:PS-CC2-1
Measuring corrector:  SI-06SB:PS-CC2-2
Measuring corrector:  SI-06SB:PS-CC1-2
Gap-RB 60.000 mm
Undulator is moving...
Gap 60.000 mm reached.
Movimentation done!

Measuring corrector:  SI-06SB:PS-CC1-1
Measuring corrector:  SI-06SB:PS-CC2-1
Measuring corrector:  SI-06SB:PS-CC2-2
Measuring corrector:  SI-06SB:PS-CC1-2
Gap-RB 50.000 mm
Undulator is moving...
Gap 50.000 mm reached.
Movimentation done!

Measuring corrector:  SI-06SB:PS-CC1-1
Measuring corrector:  SI-06SB:PS-CC2-1
Measuring corrector:  SI-06SB:PS-CC2-2
Measuring corrector:  SI-06SB:PS-CC1-2
Gap-RB 40.000 mm
Undulator is moving...


# Analysis