In [8]:
# Author: Maximilian Beck
# Date: 18.05.22

%matplotlib inline
from typing import Tuple, Dict, Any
from scipy.interpolate import RegularGridInterpolator
import copy
from IPython.core.display import HTML, display
import plotly
import plotly.graph_objects as go
import noise
import numpy as np
import matplotlib as mpl
import matplotlib.pyplot as plt
from mpl_toolkits.mplot3d import axes3d
%matplotlib qt 
# for this run pip install pyqt5

## Optimization Algorithms

This snippet implements Gradient Descent, Stochastic Gradient Descent and Subspace Gradient Descent (a variant of gradient descent, which projects update steps into a subspace of the parameter space).

In [9]:
from typing import Callable, NamedTuple, List


class DescentStep(NamedTuple):
    theta: np.ndarray
    value: float


class GradientDescent(object):
    def __init__(self, loss_fn: Callable[[np.ndarray], np.ndarray],
                 theta: np.ndarray, lr: float, gradient_noise_std: float = 0.1, fd_h: float = 1e-3, seed: int = 1):
        self.loss_fn = loss_fn
        self.theta = theta
        self.lr = lr  # stepsize
        self.finite_difference_h = fd_h
        self.gradient_noise_std = gradient_noise_std
        self.rng = np.random.default_rng(seed)

        self.at_boundary = False
        self.last_step = DescentStep(theta_0, loss_fn(theta_0))

    def _compute_gradient(self):
        # forward finite difference gradient
        h = self.finite_difference_h
        f = self.loss_fn
        x = self.theta
        gradient = np.zeros_like(x)
        try:
            for i in range(len(gradient)):
                h_vec = np.zeros_like(x)
                h_vec[i] = h
                gradient[i] = (f(x+h_vec) - f(x))/(h)
        except ValueError:
            print(
                f'Try to compute gradient at function support boundary at {str(x)}. Setting gradient to zero!')
            gradient = np.zeros_like(x)
        return gradient

    def _safe_decent_step_creation(self, theta):
        try:
            loss = self.loss_fn(theta)
            return DescentStep(theta.copy(), loss)
        except ValueError:
            print('Reached function support boundary!')
            self.at_boundary = True
            return None

    def _get_descent_step(self, theta):
        descent_step = self._safe_decent_step_creation(self.theta)
        if descent_step is None or self.at_boundary:
            print('At function support boundary, using last step.')
            return copy.deepcopy(self.last_step)
        else:
            self.last_step = descent_step
        return descent_step

    def step(self):
        if not self.at_boundary:
            grad = self._compute_gradient()
            self.theta = self.theta - self.lr * grad
        return self._get_descent_step(self.theta)

    def noisy_step(self):
        if not self.at_boundary:
            grad_noise = self.rng.normal(loc=0, scale=self.gradient_noise_std)
            grad = self._compute_gradient() + grad_noise
            self.theta = self.theta - self.lr * grad
        return self._get_descent_step(self.theta)

    def subspace_step(self, subspace):
        if not self.at_boundary:
            grad_noise = self.rng.normal(loc=0, scale=self.gradient_noise_std)
            grad = self._compute_gradient() + grad_noise
            update_vector = - self.lr * grad
            projection = np.dot(subspace, update_vector)
            self.theta = self.theta + projection * subspace
        return self._get_descent_step(self.theta)
            

def do_gradient_descent(interp_loss_fn: RegularGridInterpolator, theta_0: np.ndarray, lr: float, num_steps: int,
                        gradient_noise_std: float = 0.0, subspace: np.ndarray = None, finite_difference_h: float = 1e-3,
                        seed: int = 1) -> List[DescentStep]:
    """Convenvience function to do a optimization."""
    optim = GradientDescent(loss_fn=interp_loss_fn, theta=theta_0, lr=lr,
                            gradient_noise_std=gradient_noise_std, fd_h=finite_difference_h, seed=seed)
    optim_trajectory = []
    print('==NEW OPTIMIZATION==')
    print(f'Start from theta_0: {theta_0}, Loss: {interp_loss_fn(theta_0)}')
    optim_trajectory.append(DescentStep(theta_0.copy(), interp_loss_fn(theta_0))) 
    for i in range(num_steps):
        if subspace is not None:
            step = optim.subspace_step(subspace)
        else:
            step = optim.noisy_step()
        optim_trajectory.append(step)
        print(f'Step {i}: theta: {step.theta} Loss: {step.value}')
    return optim_trajectory

## Loss surface / function generation and interpolation

In [10]:
def generate_perlin_noise_loss_surface(shape, scale, octaves, persistence, lacunarity):
    """Generate a 2D loss surface from perlin noise. Output is a 2D array."""
    surface = np.zeros(shape)
    for i in range(shape[0]):
        for j in range(shape[1]):
            surface[i][j] = noise.pnoise2(i/scale, 
                                        j/scale, 
                                        octaves=octaves, 
                                        persistence=persistence, 
                                        lacunarity=lacunarity, 
                                        repeatx=1024, 
                                        repeaty=1024, 
                                        base=42)
    return surface

def interpolate_loss_surface(surface, shape):
    """Interpolate the discrete loss surface, such that finite difference gradients are applicable."""
    lin_x = np.linspace(0, 1, shape[0], endpoint=False)
    lin_y = np.linspace(0, 1, shape[1], endpoint=False)
    interp_loss_surface = RegularGridInterpolator((lin_x, lin_y), surface)
    return interp_loss_surface

## Plotting with Matplotlib

In [11]:
def plot_interp_loss_surface(interp_surface: RegularGridInterpolator, shape: Tuple[int, int],
                             optim_trajectories: List[DescentStep] = None, styles: List[Dict[str, Any]] = None, xlim: List[float] = None, ylim: List[float] = None):

    lin_x = np.linspace(0, 1, shape[0], endpoint=False)
    lin_y = np.linspace(0, 1, shape[1], endpoint=False)
    x, y = np.meshgrid(lin_x, lin_y)
    xy = np.stack([x, y], axis=2)
    z = interp_surface(xy)
    fig = plt.figure()
    # contour plot
    ax0 = plt.subplot(1, 2, 1)
    ax0.contourf(x, y, z, cmap='terrain')
    # 3d plot
    # limit 3d surface
    if xlim:
        assert all([x < 1.0 for x in xlim]) and len(xlim) == 2
        x_min_ind = int(x.shape[0] * xlim[0])
        x_max_ind = int(x.shape[0] * xlim[1])
        x = x[x_min_ind:x_max_ind, :]
        y = y[x_min_ind:x_max_ind, :]
        z = z[x_min_ind:x_max_ind, :]
    if ylim:
        assert all([y < 1.0 for y in ylim]) and len(ylim) == 2
        y_min_ind = int(y.shape[1] * ylim[0])
        y_max_ind = int(y.shape[1] * ylim[1])
        x = x[:, y_min_ind:y_max_ind]
        y = y[:, y_min_ind:y_max_ind]
        z = z[:, y_min_ind:y_max_ind]

    ax1 = plt.subplot(1, 2, 2, projection='3d')
    ax1.plot_surface(x, y, z, alpha=0.6, cmap='terrain')
    if optim_trajectories:
        for i, optim_trajectory in enumerate(optim_trajectories):
            if styles:
                style = styles[i]
            else: 
                style = {'lw': 3}

            thetas = np.stack([x.theta for x in optim_trajectory])
            ax0.plot(thetas[:, 0], thetas[:, 1], 'o-', **style)
            loss_values = np.array([x.value for x in optim_trajectory])
            ax1.plot3D(thetas[:, 0],
                       thetas[:, 1],
                       loss_values[:, 0],
                       'o-',
                       **style)

    if xlim:
        ax0.set_xlim(xlim)
        ax1.set_xlim(xlim)
    if ylim:
        ax0.set_ylim(ylim)
        ax1.set_ylim(ylim)
    # plt.figlegend()
    ax0.legend()
    ax1.legend()

    return fig

## Plotting with Plotly

In [12]:
# plotly plot
from typing import Tuple, Dict, Any
from scipy.interpolate import RegularGridInterpolator

plotly.offline.init_notebook_mode(connected=True)

# Convert Matplotlib colourmap to plotly colorscale
def matplotlib_cmap_to_plotly(cmap_name: str = 'terrain', pl_entries: int = 255) -> List[List[Any]]:
    cmap = mpl.cm.get_cmap(cmap_name)
    h = 1.0/(pl_entries-1)
    pl_colorscale = []

    for k in range(pl_entries):
        C = list(map(np.uint8, np.array(cmap(k*h)[:3])*255))
        pl_colorscale.append([k*h, 'rgb'+str((C[0], C[1], C[2]))])

    return pl_colorscale


def plotly_interp_loss_surface(interp_surface: RegularGridInterpolator, shape: Tuple[int, int],
                               optim_trajectories: List[DescentStep] = None,
                               optimum: DescentStep = None, styles: List[Dict[str, Any]] = None,
                               xlim: List[float] = None, ylim: List[float] = None):
    # 3D surface data
    lin_x = np.linspace(0, 1, shape[0], endpoint=False)
    lin_y = np.linspace(0, 1, shape[1], endpoint=False)
    x, y = np.meshgrid(lin_x, lin_y)
    xy = np.stack([x, y], axis=2)
    z = interp_surface(xy)
    # limit 3D surface bounds
    if xlim:
        assert all([x < 1.0 for x in xlim]) and len(xlim) == 2
        x_min_ind = int(x.shape[0] * xlim[0])
        x_max_ind = int(x.shape[0] * xlim[1])
        x = x[x_min_ind:x_max_ind, :]
        y = y[x_min_ind:x_max_ind, :]
        z = z[x_min_ind:x_max_ind, :]
    if ylim:
        assert all([y < 1.0 for y in ylim]) and len(ylim) == 2
        y_min_ind = int(y.shape[1] * ylim[0])
        y_max_ind = int(y.shape[1] * ylim[1])
        x = x[:, y_min_ind:y_max_ind]
        y = y[:, y_min_ind:y_max_ind]
        z = z[:, y_min_ind:y_max_ind]

    # create plotly plot
    go_plots = []
    pl_cmap = matplotlib_cmap_to_plotly('terrain', 255)
    # plot 3D surface
    pl_surface = go.Surface(x=x, y=y, z=z, colorscale=pl_cmap, opacity=0.7, name='Loss Surface')
    go_plots.append(pl_surface)

    # plot optimum marker
    if optimum:
        optimum_color = '#de425b'

        # show a cone pointing in upward direction
        # pl_optimum = go.Cone(x=[optimum.theta[0]], y=[optimum.theta[1]], z=[optimum.value[0]],
        #                      u=[0], v=[0], w=[0.1], sizemode='absolute', sizeref=1, anchor='tail')
        # show a maker at optimum position
        pl_optimum = go.Scatter3d(x=[optimum.theta[0]], y=[optimum.theta[1]], z=[optimum.value[0]],
                                mode='markers', name='Optimum', marker=dict(color=optimum_color, size=8, symbol='diamond'))
        go_plots.append(pl_optimum)

    # plot trajectories
    if optim_trajectories:
        for i, optim_trajectory in enumerate(optim_trajectories):
            if styles:
                style = styles[i]
            else:
                style = {'lw': 3}

            thetas = np.stack([x.theta for x in optim_trajectory])
            loss_values = np.array([x.value for x in optim_trajectory])

            pl_optim_traj = go.Scatter3d(x=thetas[:, 0], y=thetas[:, 1], z=loss_values[:, 0], mode='lines',
                                         line=dict(color=style['c'], width=style['lw']),
                                         name=style['label'])
            go_plots.append(pl_optim_traj)

    # if xlim:
    #     ax0.set_xlim(xlim)
    #     ax1.set_xlim(xlim)
    # if ylim:
    #     ax0.set_ylim(ylim)
    #     ax1.set_ylim(ylim)

    fig = go.Figure(data=go_plots)
    fig.update_layout(title='SubGD optimization trajectory on random loss surface')
    # legend
    fig.update_layout(
        legend=dict(
            x=0,
            y=1,
            traceorder="reversed",
            title_font_family="Times New Roman",
            font=dict(
                # family="Courier",
                size=14,
                color="black"
            ),
            bgcolor="LightSteelBlue",
            bordercolor="Black",
            borderwidth=2
        ))
    # scene camera
    fig.update_layout(
        scene_camera=dict(
            up=dict(x=0, y=0, z=1),
            center=dict(x=0, y=0, z=0),
            eye=dict(x=1.25, y=-1.25, z=1.25)
        ))

    return fig


## Optimization of a quadratic loss

In [13]:
## Generate quadratic loss
# optimize quadratic loss for testing
shape = (50,50)
def quadratic_loss(x):
    assert len(x) == 2
    z = x[0]**2 + x[1]**2
    return z
lin_x = np.linspace(0,1,shape[0],endpoint=False)
lin_y = np.linspace(0,1,shape[1],endpoint=False)
x,y = np.meshgrid(lin_x, lin_y, sparse=False)
z = x**2+y**2
interp_quadratic_loss = interpolate_loss_surface(z, shape)

## Do optimization
theta_0 = np.array([0.8,0.6])
optim_trajectory = do_gradient_descent(interp_quadratic_loss, theta_0, lr=0.1, num_steps=10)
fig = plot_interp_loss_surface(interp_quadratic_loss, shape, [optim_trajectory])

No artists with labels found to put in legend.  Note that artists whose label start with an underscore are ignored when legend() is called with no argument.
No artists with labels found to put in legend.  Note that artists whose label start with an underscore are ignored when legend() is called with no argument.


==NEW OPTIMIZATION==
Start from theta_0: [0.8 0.6], Loss: [1.]
Step 0: theta: [0.638 0.478] Loss: [0.6356]
Step 1: theta: [0.512 0.384] Loss: [0.40976]
Step 2: theta: [0.41  0.306] Loss: [0.26192]
Step 3: theta: [0.328 0.244] Loss: [0.16728]
Step 4: theta: [0.262 0.194] Loss: [0.1064]
Step 5: theta: [0.208 0.156] Loss: [0.06776]
Step 6: theta: [0.166 0.126] Loss: [0.0436]
Step 7: theta: [0.132 0.1  ] Loss: [0.02752]
Step 8: theta: [0.106 0.078] Loss: [0.01744]
Step 9: theta: [0.084 0.064] Loss: [0.01128]


## Optimization of a Perlin noise loss

In [14]:
## Setup optimization problem
shape = (50, 50)
scale = 100.0
octaves = 6
persistence = 0.5
lacunarity = 2.0
surface = generate_perlin_noise_loss_surface(shape, scale, octaves, persistence, lacunarity)
interp_noise_loss = interpolate_loss_surface(surface, shape)

## Do optimization
# use noise loss + noise gradient steps
theta_0 = np.array([0.29,0.36])
gd_trajectory = do_gradient_descent(interp_noise_loss, theta_0=theta_0, lr=0.01, num_steps=200, gradient_noise_std=0.0)
sgd_trajectory = do_gradient_descent(interp_noise_loss, theta_0=theta_0, lr=0.01, num_steps=200, gradient_noise_std=0.7, seed=2)
# calculate subspace
optim_trajectory = gd_trajectory
theta_0 = optim_trajectory[0].theta
theta_star = optim_trajectory[-1].theta
subspace = theta_star - theta_0
subspace = subspace / np.linalg.norm(subspace, ord=2)
subgd_trajectory = do_gradient_descent(interp_noise_loss, theta_0=theta_0, lr=0.01, num_steps=200, gradient_noise_std=0.7, subspace=subspace)


# Plot results with Matplotlib
gd_style = {'lw': 3, 'c': '#bc5090', 'label': 'GD'}
sgd_style = {'lw': 3, 'c': '#ffa600', 'label': 'SGD'}
subgd_style = {'lw': 3, 'c': '#003f5c', 'label': 'SubGD'}
styles = [gd_style, sgd_style, subgd_style]
trajectories = [gd_trajectory, sgd_trajectory, subgd_trajectory]

fig = plot_interp_loss_surface(interp_noise_loss, shape, trajectories, styles, xlim=[0,0.6],ylim=[0,0.6])

# Plot results with Plotly
gd_style = {'lw': 10, 'c': '#bc5090', 'label': 'Gradient Descent'}
sgd_style = {'lw': 10, 'c': '#ffa600', 'label': 'Stochastic Gradient Descent'}
subgd_style = {'lw': 10, 'c': '#003f5c', 'label': 'SubGD'}
styles_pl = [gd_style, sgd_style, subgd_style]
optimum = gd_trajectory[-1]
fig = plotly_interp_loss_surface(interp_noise_loss, shape, trajectories, optimum, styles_pl, xlim=[0,0.6],ylim=[0,0.6])
# Note that include_plotlyjs is used as cdn so that the static site generator can read it and present it on the browser. This is not typically required.
html = plotly.offline.plot(fig, filename='3D-SubGD-SGD-GD-plotly.html',include_plotlyjs='cdn')
display(HTML(html))

==NEW OPTIMIZATION==
Start from theta_0: [0.29 0.36], Loss: [0.06060266]
Step 0: theta: [0.29153945 0.35873151] Loss: [0.06012539]
Step 1: theta: [0.29309408 0.35683724] Loss: [0.05952135]
Step 2: theta: [0.29467139 0.35492436] Loss: [0.05890304]
Step 3: theta: [0.2962716 0.3529926] Loss: [0.0582701]
Step 4: theta: [0.29789493 0.35104169] Loss: [0.05762219]
Step 5: theta: [0.29954161 0.34907135] Loss: [0.05695892]
Step 6: theta: [0.30266215 0.34708129] Loss: [0.05529945]
Step 7: theta: [0.30711638 0.34494365] Loss: [0.05280766]
Step 8: theta: [0.3116847  0.34256823] Loss: [0.05009851]
Step 9: theta: [0.31637983 0.33994896] Loss: [0.04713438]
Step 10: theta: [0.32121462 0.33699895] Loss: [0.04341777]
Step 11: theta: [0.32617934 0.33236202] Loss: [0.03877331]
Step 12: theta: [0.3312035  0.32766147] Loss: [0.0340093]
Step 13: theta: [0.3362879  0.32289652] Loss: [0.02912266]
Step 14: theta: [0.34143337 0.3180664 ] Loss: [0.02456943]
Step 15: theta: [0.34533662 0.31465506] Loss: [0.0219362