# Emittance Sample study


# Imports

In [1]:
# Ignore all warnings
import warnings
# warnings.filterwarnings("ignore")

import sys
# sys.path.append('C:\\Users\\Dylan\\SLAC') #parent directory containing emitopt module

import matplotlib.pyplot as plt
import os
os.environ['KMP_DUPLICATE_LIB_OK'] = 'True'

import pandas as pd
import torch

from xopt import Xopt
from xopt.vocs import VOCS
from xopt.generators.bayesian.bax_generator import BaxGenerator

from xopt.evaluator import Evaluator

from emitopt.analysis import compute_emit_bmag
from emitopt.sampling import draw_product_kernel_post_paths
from emitopt.algorithms import GridMinimizeEmitBmag

import time
import numpy as np
import random

SyntaxError: '(' was never closed (algorithms.py, line 890)

# Use CUDA if available

In [None]:
if torch.cuda.is_available():
# if False:
    torch.set_default_tensor_type('torch.cuda.DoubleTensor')
    tkwargs = {"dtype": torch.double, "device": "cuda"}
    use_cuda = True
    print('Using cuda.')
else:
    torch.set_default_tensor_type('torch.DoubleTensor')
    use_cuda = False

In [None]:
torch.cuda.is_available()

# Notebook settings

In [None]:
ndim = 2 #number of input dimensions
noise = False #whether to add noise to the ground-truth beam size function outputs
thick_quad = True
meas_dim = 1 #input dimension for measurement parameter
n_obs_init = 50 #number of random initial observations for GP model
n_samples = 10 #number of posterior samples for BAX
n_iter = 1 #number of optimization steps for Xopt to take (after acquiring random initial data)
rand_seed = 3

#random seeds for reproducibility 
torch.manual_seed(rand_seed)
np.random.seed(rand_seed) #only affects initial random observations through Xopt
random.seed(rand_seed)

# Build test function from single-quadrupole optical beam size model 
Here we define a simple ground-truth beam size function for our optimization problem, where we attempt to find the location in tuning parameter space with minimal emittance. Note that the function "test_func" used to evaluate the ground-truth beam size function takes a dictionary as input and returns a dictionary as the output.

In [None]:
from pyemittance.emittance_calc import EmitCalc
from pyemittance.load_json_configs import load_configs
from pyemittance.simulation import BeamSim

CONFIG = load_configs('LCLS2_OTR0H04')
CONFIG['beamline_info']

q_len = CONFIG['beamline_info']['Lquad']
rmat_x = torch.tensor(CONFIG['beamline_info']['rMatx']).reshape(2,2)
rmat_y = torch.tensor(CONFIG['beamline_info']['rMaty']).reshape(2,2)
print(rmat_x)
print(rmat_y)

In [None]:
BUNCH_PARAMS0 = {
    'total_charge': 50e-12,
    'norm_emit_x': 1e-6,
    'norm_emit_y': 2e-6,
    'beta_x': 10,
    'alpha_x': -1,
    'beta_y': 11,
    'alpha_y': -2,
    'energy': 80e6,
    'species':'electron'
}
sim = BeamSim(bunch_params=BUNCH_PARAMS0, beamline_info=CONFIG['beamline_info'])


# define variables functions
var_names = ['x' + str(i) for i in range(ndim)]
meas_param = var_names[meas_dim]

from emitopt.utils import get_quad_scale_factor
scale_factor = get_quad_scale_factor(E=.08, q_len=q_len)
scale = 1.e3

def measure_beamsize(input_dict):
    x_tuning = torch.tensor([])
    for key in input_dict.keys():
        if key != str(meas_param):
            x_tuning = torch.cat((x_tuning, torch.tensor([input_dict[key]])))
    rms_beamsizes0 = np.array(sim.beam_size_meas(input_dict[meas_param]))
    detuning_scale = 1. + 1*x_tuning.abs().sum().cpu()
    xrms, yrms = detuning_scale * rms_beamsizes0
    return {'xrms_sq': (float(xrms)*scale)**2.,
            'yrms_sq': (float(yrms)*scale)**2.} # mean-square beam sizes in mm squared


# scale_factor = 1.
q = torch.linspace(-3,3,11)
bs = torch.tensor([np.array(sim.beam_size_meas(v))*scale for v in q.numpy()]).T
print(bs)
k = scale_factor*q
ks = torch.stack((k,-k))
rmats = torch.stack((rmat_x, rmat_y))
emit, bmag, sig, is_valid = compute_emit_bmag(ks, bs**2, q_len, rmats, thick=True)
print(emit[0], emit[1])
gt_emit_min = (emit[0]*emit[1]).sqrt()


print('Ground truth minimum emit:', gt_emit_min)

# Construct vocs

In [None]:
variables = {var_name: [-2,1] for var_name in var_names}
variables[meas_param] = [-3,3] #overwrite bounds for measurement parameter to capture minimum of single-quadrupole optical model

#construct vocs
vocs = VOCS(
    variables = variables,
    observables = ['xrms_sq', 'yrms_sq'],
)

print('variable_names =', vocs.variable_names)
print('meas_param =', "'" + meas_param + "'")
print('domain =\n', vocs.bounds.T)

In [None]:
def eval_true_emittance(x, use_bmag=False):
    emits = []
    bmags = []
    for setting in x:
        bssx = []
        bssy = []
        input_dict = {name: float(setting[i]) for i, name in enumerate(vocs.variable_names)}
        for v in q.numpy():
            input_dict[meas_param] = v
            output_dict = measure_beamsize(input_dict)
            bssx += [output_dict['xrms_sq']]
            bssy += [output_dict['yrms_sq']]
        bss = torch.tensor([bssx, bssy])
        beta0 = torch.tensor([[10], [11]])
        alpha0 = torch.tensor([[-1], [-2]])
        emit, bmag, sig, is_valid = compute_emit_bmag(ks, bss, q_len, rmats, beta0=beta0, alpha0=alpha0, thick=True)
        emits += [torch.sqrt(emit[0]*emit[1])]
        bmags += [torch.sqrt(bmag[0].min()*bmag[1].min())]
    res = torch.tensor(emits)
    if use_bmag:
        res *= torch.tensor(bmags)
    return res
eval_true_emittance(torch.zeros(1,ndim), use_bmag=True)

# Prepare generator options.
In this example, we use a specialty covariance module (Matern x Quadratic kernel) for our beam size model.

In [None]:
from gpytorch.kernels import MaternKernel, PolynomialKernel, ScaleKernel
from gpytorch.priors.torch_priors import GammaPrior

from xopt.generators.bayesian.models.standard import StandardModelConstructor
from xopt.generators.bayesian.bax_generator import BaxGenerator
from emitopt.algorithms import ScipyMinimizeEmittanceXY

# prepare custom covariance module
tuning_dims = list(range(vocs.n_variables))
tuning_dims.remove(meas_dim)
covar_module_x = (MaternKernel(ard_num_dims=len(tuning_dims), 
                              active_dims=tuning_dims, 
                              lengthscale_prior=None) * 
                              PolynomialKernel(power=2, active_dims=[meas_dim])
                 )

scaled_covar_module_x = ScaleKernel(covar_module_x)#, outputscale_prior=GammaPrior(2.0, 0.15))
covar_module_y = (MaternKernel(ard_num_dims=len(tuning_dims), 
                              active_dims=tuning_dims, 
                              lengthscale_prior=None) * 
                              PolynomialKernel(power=2, active_dims=[meas_dim])
                 )
scaled_covar_module_y =  ScaleKernel(covar_module_y)#, outputscale_prior=GammaPrior(2.0, 0.15))

# prepare options for Xopt generator
covar_module_dict = {'xrms_sq': scaled_covar_module_x,
                     'yrms_sq': scaled_covar_module_y}

model_constructor = StandardModelConstructor(covar_modules=covar_module_dict, use_low_noise_prior=True)

In [None]:
from xopt.numerical_optimizer import LBFGSOptimizer, GridOptimizer
numerical_optimizer = LBFGSOptimizer(
                                    n_restarts=20,
                                    max_time=2)
# numerical_optimizer = GridOptimizer()

# Construct generator, evaluator, Xopt objects

In [None]:
# class DifferentialEvolutionEmitBmag(GridMinimizeEmitBmag):
#     name = "DifferentialEvolutionEmitBmag"

#     def get_execution_paths(self, model: ModelList, bounds: Tensor, tkwargs=None, verbose=False):
#         if not (self.x_key or self.y_key):
#             raise ValueError("must provide a key for x, y, or both.")
#         if (self.x_key and self.rmat_x is None) or (self.y_key and self.rmat_y is None):
#             raise ValueError("must provide rmat for each transverse dimension (x/y) being modeled.")
    
#         tkwargs = tkwargs if tkwargs else {"dtype": torch.double, "device": "cpu"}

#         temp_id = self.meas_dim + 1
#         tuning_domain = torch.cat((bounds.T[: self.meas_dim], bounds.T[temp_id:]))

#         tuning_bounds = tuning_domain.T
        
#         cpu_models = [copy.deepcopy(m).cpu() for m in model.models]

#         sample_funcs_list = []
#         for cpu_model in cpu_models:
#             if type(cpu_model.covar_module.base_kernel) == ProductKernel:
#                 sample_funcs = draw_product_kernel_post_paths(cpu_model, n_samples=self.n_samples)
#             if type(cpu_model.covar_module.base_kernel) == MaternKernel:
#                 sample_funcs = draw_matheron_paths(cpu_model, sample_shape=torch.Size([self.n_samples]))
#             sample_funcs_list += [sample_funcs]
        
        

In [None]:
from emitopt.algorithms import DifferentialEvolutionEmitBmag

#Prepare Algorithm
algo_kwargs = {
        'x_key': 'xrms_sq',
        'y_key': 'yrms_sq',
        'scale_factor': scale_factor,
        'q_len': q_len,
        'rmat_x': rmat_x,
        'rmat_y': rmat_y,
        'twiss0_x': torch.tensor([10., -1.]),
        'twiss0_y': torch.tensor([11., -2.]),
        'n_samples': n_samples,
        'meas_dim': meas_dim,
        'n_steps_measurement_param': 3,
        'thick_quad': thick_quad,
        'n_grid_points': 3,
        'use_bmag': True,
}
algo = DifferentialEvolutionEmitBmag(**algo_kwargs)
# algo = GridMinimizeEmitBmag(**algo_kwargs)
# algo = ScipyMinimizeEmittanceXY(**algo_kwargs)

#construct BAX generator
generator = BaxGenerator(vocs=vocs, 
                         gp_constructor=model_constructor, 
                         numerical_optimizer=numerical_optimizer,
                         algorithm=algo, 
                         use_cuda=use_cuda)

#construct evaluator
evaluator = Evaluator(function=measure_beamsize)

#construct Xopt optimizer
optimizer = Xopt(evaluator=evaluator, generator=generator, vocs=vocs)

# Optimize

In [None]:
# call X.random_evaluate() to generate random initial points and evaluate on test_func
optimizer.random_evaluate(n_obs_init)

In [None]:
from xopt.generators.bayesian.visualize import visualize_generator_model
optimizer.generator.train_model()
visualize_generator_model(optimizer.generator, 
                          variable_names=['x0','x1'], 
                            reference_point={'x0':torch.tensor([0.]),
                                             'x1':torch.tensor([0.]),
                                            # 'x2':torch.tensor([0.]),
                                            #  'x3':torch.tensor([0.]),
                                             # 'x4':torch.tensor([0.]),
                                             # 'x5':torch.tensor([0.]),
                                             # 'x6':torch.tensor([0.]),
                                             # 'x7':torch.tensor([0.]),
                                             # 'x8':torch.tensor([0.]),
                                            },
                          show_acquisition=False)
visualize_generator_model(optimizer.generator, 
                          variable_names=['x1'], 
                            reference_point={'x0':torch.tensor([0.]),
                                             'x1':torch.tensor([0.]),
                                            # 'x2':torch.tensor([0.]),
                                            #  'x3':torch.tensor([0.]),
                                             # 'x4':torch.tensor([0.]),
                                             # 'x5':torch.tensor([0.]),
                                             # 'x6':torch.tensor([0.]),
                                             # 'x7':torch.tensor([0.]),
                                             # 'x8':torch.tensor([0.]),
                                            },
                          show_acquisition=False)

In [None]:
x = torch.zeros(100, ndim)
x[:,0] = torch.linspace(*vocs.bounds.T[0], 100)
plt.plot(x[:,0], eval_true_emittance(x, use_bmag=True), c='k')

In [None]:
for i in range(10):
    print(i)
    optimizer.step()

In [None]:
from xopt.generators.bayesian.bax.visualize import visualize_virtual_objective
model = optimizer.generator.train_model()

# for i in range(10):
start = time.time()
fig, ax = visualize_virtual_objective(optimizer.generator, 
                            variable_names=['x0'],
                            reference_point={'x0':torch.tensor([0.]),
                                             'x1':torch.tensor([0.]),
                                            # 'x2':torch.tensor([0.]),
                                            #  'x3':torch.tensor([0.]),
                                            #  'x4':torch.tensor([0.]),
                                            #  'x5':torch.tensor([0.]),
                                            #  'x6':torch.tensor([0.]),
                                            #  'x7':torch.tensor([0.]),
                                            #  'x8':torch.tensor([0.]),
                                            },
                            n_grid=100,
                            n_samples=1,
                            show_samples=True,
                            # kwargs={'use_bmag':False},
                                     )
print(time.time()-start)
x = torch.zeros(100, ndim)
x[:,0] = torch.linspace(*vocs.bounds.T[0], 100)
plt.plot(x[:,0], eval_true_emittance(x, use_bmag=True), c='k')
# plt.ylim(top=0.01)

In [None]:
from emitopt.sampling import draw_product_kernel_post_paths
import copy

bounds = torch.from_numpy(vocs.bounds)


# sample_funcs_list =[]
# cpu_models = [copy.deepcopy(m).cpu() for m in model.models]
# for cpu_model in cpu_models:
#     sample_funcs = draw_product_kernel_post_paths(cpu_model, n_samples=1)
#     sample_funcs_list += [sample_funcs]

sample_funcs_list = optimizer.generator.algorithm.results['sample_funcs_list']

def wrapped_virtual_objective(x):
    x = torch.from_numpy(x)
    start = time.time()
    res = algo.evaluate_virtual_objective(sample_funcs_list, x.T.unsqueeze(0), bounds)
    print(time.time() - start)
    return res[0,:].numpy()
    # return res[0,0]

# from emitopt.algorithms import unif_random_sample_domain
# x0 = unif_random_sample_domain(1, vocs.bounds.T).flatten()
# print(wrapped_virtual_objective(x0))
# torch.autograd.functional.jacobian(wrapped_virtual_objective, x0)

In [None]:
from scipy.optimize import differential_evolution, shgo, dual_annealing, minimize

opt_bounds = vocs.bounds.T
opt_bounds[algo.meas_dim] = [0,0]

start = time.time()
res = differential_evolution(wrapped_virtual_objective, bounds=opt_bounds, vectorized=True, polish=False, popsize=50, maxiter=10, seed=1)
# res = shgo(wrapped_virtual_objective, bounds=opt_bounds)
# res = dual_annealing(wrapped_virtual_objective, bounds=opt_bounds, seed=1)

# from emitopt.algorithms import unif_random_sample_domain
# x0 = unif_random_sample_domain(1, vocs.bounds.T).flatten().numpy()
# res = minimize(wrapped_virtual_objective, x0, bounds=opt_bounds)

print(time.time() - start)
print(res.fun)
res.x

In [None]:
res

In [None]:
for i in range(ndim):
    if i == meas_dim:
        continue
    x = torch.from_numpy(res.x).repeat(100,1)
    x[:,i] = torch.linspace(-2, 1, 100)
    x = x.repeat(1, 1, 1)
    vobj = algo.evaluate_virtual_objective(sample_funcs_list, x, bounds)
    
    plt.plot(x[:,:,i].T, vobj.T, c='C0')
    plt.scatter([res.x[i]], [res.fun], c='r', marker='x')
    plt.show()

In [None]:
bmag = algo.results['bmag']
bmag_mean = (bmag[...,0] * bmag[...,1]).sqrt()
bmag_min, bmag_min_id = torch.min(bmag_mean, dim=-1)
plt.plot(x[:,:,0].T,bmag_min.flatten())

In [None]:
bmag.shape

In [None]:
sample_funcs_list = []
for cpu_model in cpu_models:
    sample_funcs = draw_product_kernel_post_paths(cpu_model, n_samples=10)
    sample_funcs_list += [sample_funcs]

from emitopt.algorithms import unif_random_sample_domain
x0 = unif_random_sample_domain(1000, vocs.bounds.T)
start = time.time()
algo.evaluate_virtual_objective(sample_funcs_list, x0.reshape(10, 100, -1), bounds)
print(time.time()-start)

In [None]:
# from emitopt.utils import x_tuning_to_dict, get_bax_optimum
# print(optimizer.generator.algorithm_results['x_tuning_best'])
# x_tuning_best = optimizer.generator.algorithm_results['x_tuning_best'].mean(dim=0)
# print(x_tuning_best)
# reference_point = get_bax_optimum(optimizer.generator)
# print(reference_point)
# target_point = x_tuning_to_dict(optimizer.generator, x_tuning = torch.tensor([[0]]))
# print(target_point)

In [None]:
from emitopt.visualize import plot_virtual_measurement_scan
fig, ax, best_q = plot_virtual_measurement_scan(optimizer, reference_point, n_samples=10)
ax[0].set_ylim(top=1)
ax[1].set_ylim(top=3, bottom=0.9)
ax[0].axhline(0, c='k')