In [1]:
import yaml
import sys
sys.path.append("../")
from plugins.interfaces.diagnostics import AWAFrameGrabberDiagnostic, ROI


In [2]:
import time
from xopt import Evaluator
from epics import caput


In [3]:
from xopt import VOCS
from xopt.generators import get_generator
from xopt.generators.bayesian.models.standard import StandardModelConstructor
from xopt import Xopt
import time
from epics import caget_many
from xopt.utils import get_local_region
import pandas as pd


def align_beam(screen_name, trim_name, start_at_zero=True,tolerance=5.0):
    # start time to measure execution time
    start = time.time()

    # get information about trim magnets and screen settings from config files
    variable_names = [f"{trim_name}{ele}" for ele in ["H","V"]]
    trim_info = yaml.safe_load(open("awa_config/awa_drive_magnet_config.yml"))
    screen_info = yaml.safe_load(open("awa_config/awa_drive_camera_config.yml"))[screen_name]

    ############# create image diagnostic class #################
    screen_roi = ROI(
        xcenter=int(screen_info["center"][0]),
        ycenter=int(screen_info["center"][1]),
        xwidth=int(screen_info["radius"]),
        ywidth=int(screen_info["radius"]),
    )
        
    image_diagnostic = AWAFrameGrabberDiagnostic(
        roi=screen_roi, apply_bounding_box_constraint=False, visualize=False
    )

    ############# create measurement function #################
    def evaluate(inputs: dict):
        # caput values
        for name, val in inputs.items():
            caput(name, val)
    
        # wait for changes to occur - use small wait time for interpolated measurements
        time.sleep(1.5)
    
        results = image_diagnostic.measure_beamsize(1, **inputs)
    
        # measure distance to image center
        results["center_dist_x"] = results["Cx"] - image_diagnostic.roi.xwidth/2
        results["center_dist_y"] = results["Cy"] - image_diagnostic.roi.ywidth/2
    
        results["center_dist"] = (results["center_dist_x"]**2 + results["center_dist_y"]**2)**0.5
        results["time"] = time.time()
        
        return results
    

    ################# create Xopt objects #########################
    # create variables, objectives, etc. based on config files
    vocs = VOCS(
        variables = {
            trim_info[name]["PV"]:trim_info[name]["RANGE"] for name in variable_names
        },
        objectives = {"center_dist":"MINIMIZE"}
    )

    # create evaluator object
    evaluator = Evaluator(function=evaluate)

    # create generator object
    model_constructor = StandardModelConstructor(use_low_noise_prior=False)
    generator = get_generator("expected_improvement")(
        vocs=vocs, gp_constructor=model_constructor, 
        n_interpolate_points=2, 
        turbo_controller="optimize",
    )
    generator.numerical_optimizer.max_time = 10.0

    # create Xopt object
    X = Xopt(vocs=vocs, generator=generator, evaluator=evaluator, strict=True)

    ########## initial random sampling ######################

    # start samples at zero trim strength
    if start_at_zero:
        X.evaluate_data({name:0.0 for name in X.vocs.variable_names})

    # get current PV values for trims
    current_value = dict(zip(X.vocs.variable_names, caget_many(X.vocs.variable_names)))
    print(current_value)
    # get small region around current point to sample
    random_sample_region = get_local_region(current_value,X.vocs, fraction=0.25)

    # do random sampling
    X.random_evaluate(4, custom_bounds=random_sample_region)

    # do optimization - stop when tolerance is reached    
    for i in range(20):
        print(f"step {i}")
        X.step()
    
        idx, val = X.vocs.select_best(X.data)
        if abs(val) < tolerance:
            break

    # set trims to optimal config
    idx, val = X.vocs.select_best(X.data)
    res = X.evaluate_data(X.data.iloc[idx][X.vocs.variable_names])

    # print the time it took to run
    print(time.time() - start)

    return X

In [5]:
screens = ["DYG5"]#["DYG1","DYG2","DYG3","DYG4","DYG5"]
trims = ["DT7"]#["DT1","DT3","DT5","DT6","DT7"]

for screen, trim in zip(screens, trims):
    input(f"put in screen {screen}")
    align_beam(screen,trim)

put in screen DYG5 


fitting image
{'AWA:Drive:DT7H_B_S:Ctrl': 0.0, 'AWA:Drive:DT7V_B_S:Ctrl': 0.0}
fitting image
fitting image
fitting image
fitting image
step 0
fitting image
fitting image
step 1
fitting image
fitting image
step 2
fitting image
fitting image
step 3
fitting image
fitting image
step 4
fitting image
fitting image
step 5
fitting image
fitting image
step 6
fitting image
fitting image
step 7
fitting image
fitting image
step 8
fitting image
fitting image
step 9
fitting image
fitting image
fitting image
82.38361430168152
