<a href="https://colab.research.google.com/github/yl3469/CloudModeling/blob/master/OptimalDesigns.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
# Imports and defaults
from joblib import Parallel, delayed
import matplotlib as mpl
import matplotlib.pyplot as plt
import numpy as np
from scipy.linalg import sqrtm
from scipy.optimize import linprog
import time

mpl.style.use("classic")
mpl.rcParams["figure.figsize"] = [5, 3]

mpl.rcParams["axes.linewidth"] = 0.75
mpl.rcParams["figure.facecolor"] = "w"
mpl.rcParams["grid.linewidth"] = 0.75
mpl.rcParams["lines.linewidth"] = 1.00
mpl.rcParams["patch.linewidth"] = 0.75
mpl.rcParams["xtick.major.size"] = 3
mpl.rcParams["ytick.major.size"] = 3

mpl.rcParams["pdf.fonttype"] = 42
mpl.rcParams["ps.fonttype"] = 42
mpl.rcParams["font.size"] = 8
mpl.rcParams["axes.titlesize"] = "medium"
mpl.rcParams["legend.fontsize"] = "medium"

import platform
print("python %s" % platform.python_version())
print("matplotlib %s" % mpl.__version__)

def linestyle2dashes(style):
  if style == "--":
    return (0, (3, 3))
  elif style == ":":
    return (0, (1, 3))
  else:
    return (None, None)

python 3.9.13
matplotlib 3.5.1


In [None]:
# Optimal designs
def a_grad(X, p, gamma=1e-6, return_grad=True):
  """Value of A-optimal objective and its gradient.
  
  X: n x d matrix of arm features
  p: distribution over n arms (design)
  """
  n, d = X.shape

  # inverse of the sample covariance matrix
  Xp = X * np.sqrt(p[:, np.newaxis])
  G = Xp.T.dot(Xp) + gamma * np.eye(d)
  invG = np.linalg.inv(G)

  # objective value (trace)
  obj = np.trace(invG)
  if return_grad:
    # gradient of the objective
    V = invG.dot(X.T)
    M = np.einsum("ik,jk->ijk", V, V)
    dp = - np.trace(M)
  else:
    dp = 0

  return obj, dp


def d_grad(X, p, gamma=1e-6, return_grad=True):
  """Value of D-optimal objective and its gradient.
  
  X: n x d matrix of arm features
  p: distribution over n arms (design)
  """
  n, d = X.shape

  # inverse of the sample covariance matrix
  Xp = X * np.sqrt(p[:, np.newaxis])
  G = Xp.T.dot(Xp) + gamma * np.eye(d)
  invG = np.linalg.inv(G)

  # objective value (log det)
  _, obj = np.linalg.slogdet(invG)
  if return_grad:
    # gradient of the objective
    V = np.einsum("ki,kj->ijk", X, X)
    M = np.einsum("ilk,lj->ijk", V, invG)
    dp = - np.trace(M)
  else:
    dp = 0

  return obj, dp


def e_grad(X, p, gamma=1e-6):
  """Value of E-optimal objective and its gradient.
  
  X: n x d matrix of arm features
  p: distribution over n arms (design)
  """
  n, d = X.shape

  # eigendecomposition of the sample covariance matrix
  Xp = X * np.sqrt(p[:, np.newaxis])
  G = Xp.T.dot(Xp) + gamma * np.eye(d)
  eigvals, eigv = np.linalg.eigh(G)

  v = eigv[:, 0]  # minimum eigenvector
  obj = - eigvals[0]  # objective value (- minimum eigenvalue)
  dp = - np.square(np.dot(X, v))  # gradient of the objective

  return obj, dp


def g_grad(X, p, gamma=1e-6):
  """Value of G-optimal objective and its gradient.
  
  X: n x d matrix of arm features
  p: distribution over n arms (design)
  """
  n, d = X.shape

  # inverse of the sample covariance matrix
  Xp = X * np.sqrt(p[:, np.newaxis])
  G = Xp.T.dot(Xp) + gamma * np.eye(d)
  invG = np.linalg.inv(G)

  # objective value
  obj = np.amax((X.dot(invG) * X).sum(axis=-1))

  # gradient of the objective
  xa = X[np.argmax((X.dot(invG) * X).sum(axis=-1)), :]
  dp = - np.square(xa.dot(invG).dot(X.T))

  return obj, dp


def fw_design(X, design="a", pi_0=None, num_iters=100, tol=1e-6, A_ub=None, b_ub=None, printout=True):
  """Frank-Wolfe algorithm for design optimization.
  
  X: n x d matrix of arm features
  design: design type
  pi_0: initial distribution over n arms (design)
  num_iters: maximum number of Frank-Wolfe iterations
  tol: stop when two consecutive objective values differ by less than tol
  A_ub: matrix A in design constraints A pi >= b
  b_ub: vector b in design constraints A pi >= b
  """
  n, d = X.shape

  if pi_0 is None:
    # initial allocation weights are 1 / n and they add up to 1
    pi = np.ones(n) / n
  else:
    pi = np.copy(pi_0)

  # initialize E- and G-optimal designs with the D-optimal design
  if (design == "e") or (design == "g"):
    pi = fw_design(X, "d", pi_0, num_iters, tol, A_ub, b_ub, printout)

  # initialize constraints
  if A_ub is None:
    A_ub_fw = np.ones((1, n))
    b_ub_fw = 1
  else:
    # last constraint guarantees that pi is a distribution
    A_ub_fw = np.zeros((A_ub.shape[0] + 1, A_ub.shape[1]))
    A_ub_fw[: -1, :] = A_ub
    A_ub_fw[-1, :] = np.ones((1, n))
    b_ub_fw = np.zeros(b_ub.size + 1)
    b_ub_fw[: -1] = b_ub
    b_ub_fw[-1] = 1

  # Frank-Wolfe iterations
  for iter in range(num_iters):
    # compute gradient at the last solution
    pi_last = np.copy(pi)
    if design == "a":
      last_obj, grad = a_grad(X, pi_last)
    elif design == "d":
      last_obj, grad = d_grad(X, pi_last)
    elif design == "g":
      last_obj, grad = g_grad(X, pi_last)
    else:
      last_obj, grad = e_grad(X, pi_last)

    if printout:
      print("%.4f" % last_obj, end=" ")

    # find a feasible LP solution in the direction of the gradient
    result = linprog(grad, A_ub_fw, b_ub_fw, bounds=[0, 1], method="revised simplex")
    pi_lp = result.x
    pi_lp = np.maximum(pi_lp, 0)
    pi_lp /= pi_lp.sum()

    # line search in the direction of the gradient
    num_ls_iters = 101
    best_step = 0.0
    best_obj = last_obj
    for ls_iter in range(num_ls_iters):
      step = ls_iter / (num_ls_iters - 1)
      pi_ = step * pi_lp + (1 - step) * pi_last
      if design == "a":
        obj, _ = a_grad(X, pi_, return_grad=False)
      elif design == "d":
        obj, _ = d_grad(X, pi_, return_grad=False)
      elif design == "g":
        obj, _ = g_grad(X, pi_)
      else:
        obj, _ = e_grad(X, pi_)
      if obj < best_obj:
        # record an improved solution
        best_step = step
        best_obj = obj

    # update solution
    pi = best_step * pi_lp + (1 - best_step) * pi_last

    if last_obj - best_obj < tol:
      break;
    iter += 1
  
  if printout:
    print()

  pi = np.maximum(pi, 0)
  pi /= pi.sum()
  return pi


def fw_safe_design(X, pi_0, alpha, theta_bar, Sigma_bar, num_iters=20):
  """Algorithm SafeOD (Section 4.1).
  
  X: n x d matrix of arm features
  pi_0: initial distribution over n arms (design)
  alpha: safety parameter
  theta_bar: center of confidence ellipsoid \Theta
  Sigma_bar: covariance of confidence ellipsoid \Theta
  num_iters: maximum number of added constraints
  """
  n, d = X.shape

  # start with a G-optimal design without constraints
  pi = fw_design(X, design="g", pi_0=pi_0, printout=False)
  A_ub = np.zeros((num_iters, n))
  b_ub = np.zeros(num_iters)

  iter = 1
  while iter < num_iters:
    # find the most violated constraint (last equation in Section 4.1)
    c = (alpha * pi_0 - pi).dot(X)
    theta = theta_bar + Sigma_bar.dot(c) / np.sqrt(c.dot(Sigma_bar.dot(c)))

    print("%d: %.4f, " % (iter, c.dot(theta)), end="")
    # stop when all constraints are satisfied
    if c.dot(theta) < 1e-6:
      break;

    # add the most violated constraint to the constraint set
    A_ub[iter - 1, :] = - X.dot(theta)
    b_ub[iter - 1] = - alpha * pi_0.dot(X).dot(theta)
    # resolve the constrained G-optimal design
    pi = fw_design(X, design="g", pi_0=pi_0, A_ub=A_ub[: iter, :], b_ub=b_ub[: iter], printout=False)

    iter += 1
  print()

  return pi

In [None]:
# synthetic experiment in Section 5.1
X = np.asarray([[1.0, 0.0], [0.0, 1.0]])
X /= np.sqrt(np.square(X).sum(axis=-1))[:, np.newaxis]
n, d = X.shape

sigma = 1.0  # reward noise
pi_0 = np.asarray([0.2, 0.8])
alpha = 0.9
Sigma_bar = np.diag([0.1, 0.1])

for theta_bar in (np.asarray([1.0, 2.0]), np.asarray([2.0, 1.0])):
  print("theta_bar: %s" % " ".join("%.3f" % _ for _ in theta_bar))
  print()

  for method in range(3):
    # compute optimal design
    if method == 0:
      print("*** G-optimal design ***")
      pi = fw_design(X, design="d")
    elif method == 1:
      print("*** Mixture policy ***")
      pi = alpha * pi_0 + (1 - alpha) / pi_0.size
    else:
      print("*** SafeOD ***")
      pi = fw_safe_design(X, pi_0, alpha, theta_bar, Sigma_bar)
    print("Policy          : %s" % " ".join("%.3f" % _ for _ in pi))

    Xp = X * np.sqrt(pi[:, np.newaxis])
    invG = np.linalg.inv(Xp.T.dot(Xp))
    print("Design width    : %.3f" % np.sqrt(np.amax((X.dot(invG) * X).sum(axis=-1))))

    # maximum safety violation for any model in confidence ellipsoid \Theta
    c = (alpha * pi_0 - pi).dot(X)
    theta = theta_bar + Sigma_bar.dot(c) / np.sqrt(c.dot(Sigma_bar.dot(c)))
    constraint_violation = c.dot(theta)
    print("Safety violation: %.3f" % constraint_violation)

  print()

theta_bar: 1.000 2.000

*** G-optimal design ***
1.3863 
Policy          : 0.500 0.500
Design width    : 1.414
Safety violation: 0.243
*** Mixture policy ***
Policy          : 0.230 0.770
Design width    : 2.085
Safety violation: -0.128
*** SafeOD ***
1: 0.2443, 2: 0.0019, 3: 0.0000, 
Policy          : 0.330 0.670
Design width    : 1.741
Safety violation: 0.000

theta_bar: 2.000 1.000

*** G-optimal design ***
1.3863 
Policy          : 0.500 0.500
Design width    : 1.414
Safety violation: -0.297
*** Mixture policy ***
Policy          : 0.230 0.770
Design width    : 2.085
Safety violation: -0.128
*** SafeOD ***
1: -0.2978, 
Policy          : 0.501 0.499
Design width    : 1.416
Safety violation: -0.298

