# Python Imports

In [0]:
!pip install -q python-ternary
!pip install -q git+https://github.com/marcharper/pyed.git

In [0]:
# For Colab, uncomment the command below:
# Add LaTeX fonts for saving Matplotlib figures with LaTex annotations.
# !apt install -q texlive-fonts-recommended texlive-fonts-extra cm-super dvipng

In [0]:
import inspect
from functools import partial
from math import sqrt

import numpy as np
from matplotlib import pyplot as plt
import matplotlib.gridspec as gridspec
from scipy import stats

import ternary

In [0]:
from matplotlib import rc
rc('figure', **{'figsize': (16, 8)})
rc('font', **{'family': 'sans-serif', 'sans-serif': ['Helvetica']})
rc('text', usetex=True)
rc('axes', labelsize=18, titlesize=18)
rc('legend',fontsize=16)
rc('xtick',labelsize=14)
rc('ytick',labelsize=14)

In [0]:
import pyed
from pyed import dynamics, geometries, incentives, information, normalize
from pyed.incentives import (
    linear_fitness, fermi_incentive, logit_incentive, replicator_incentive_power, 
    rock_paper_scissors, uniform_mutation_matrix)
from pyed.dynamics import compute_trajectory, replicator_trajectory

In [0]:
# Make images directory
from pathlib import Path

path = Path("images")
path.mkdir(exist_ok=True)

# Plotting and Figures

In [0]:
def plot_trajectories(data, params, divergence=None):
    if not divergence:
        divergence = information.kl_divergence

    gs = gridspec.GridSpec(1, 2)
    
    # Trajectory plots
    ax = plt.subplot(gs[0, 0])
    figure, tax = ternary.figure(ax=ax)
    for i, (momentum, color) in enumerate(params):
        if isinstance(momentum, float) or isinstance(momentum, int):
          label = '$\\beta$ = {:.2f}'.format(momentum)
        else:
          label = momentum
        tax.plot(data[i], linewidth=2, color=color, label=label)
    tax.boundary()
    tax.ax.set_title("Trajectories")
    tax.legend()

    # Divergences
    ax = plt.subplot(gs[0, 1])
    for i, (momentum, color) in enumerate(params):
        e = normalize(np.array([1, 1, 1]))
        v = [divergence(e, x) for x in data[i]]
        if isinstance(momentum, float) or isinstance(momentum, int):
          label = '$\\beta$ = {:.2f}'.format(momentum)
        else:
          label = momentum
        ax.plot(range(len(data[i])), v, color=color, label=label)
    ax.set_title("Divergences")
    ax.legend()

    
def plot_convergence(data, params, divergence=None):
    if not divergence:
        divergence = information.kl_divergence

    gs = gridspec.GridSpec(1, 2)
    
    # Divergences
    ax = plt.subplot(gs[0, 0])
    e = normalize(np.array([1, 1, 1]))
    for i, (momentum, color) in enumerate(params):
        v = [divergence(e, x) for x in data[i]]
        ratios = [np.log(x / v[0]) for k, x in enumerate(v)]
        if isinstance(momentum, float) or isinstance(momentum, int):
          label = '$\\beta$ = {:.2f}'.format(momentum)
        else:
          label = momentum
        ax.plot(range(len(data[i])), ratios, color=color, label=label)
    ax.set_title("Log Divergence ratio")
    ax.legend()

    # Divergences
    ax = plt.subplot(gs[0, 1])
    e = normalize(np.array([1, 1, 1]))
    for i, (momentum, color) in enumerate(params):
        v = [divergence(e, x) for x in data[i]]
        ratios = [v[k] / v[k-1] for k, x in list(enumerate(v))[1:]]
        if isinstance(momentum, float) or isinstance(momentum, int):
          label = '$\\beta$ = {:.2f}'.format(momentum)
        else:
          label = momentum
        ax.plot(list(range(len(data[i])))[1:], ratios, color=color, label=label)
    
    ax.set_title("log Divergence ratio subsequent")
    ax.legend()    
    
    
def time_to(data, params, divergence=None, domain=None):
    if not divergence:
        divergence = information.kl_divergence

    if not len(domain) > 0:
        domain = list(np.arange(0.4, 0.05, -0.01))
    
    def process_data(ds):
        ys = []
        for x in domain:
            found = False
            for i, d in enumerate(ds):
                if d < x:
                    ys.append(i)
                    found = True
                    break
            if not found:
                ys.append(len(ds))
        return np.array(ys)
    
    gs = gridspec.GridSpec(1, 2)
    
    # Divergences
    ax = plt.subplot(gs[0, 0])
    e = normalize(np.array([1, 1, 1]))
    v0 = [divergence(e, x) for x in data[0]]
    ys0 = process_data(v0)
    
    for i, (momentum, color) in list(enumerate(params)):
        v = [divergence(e, x) for x in data[i]]
        ys = process_data(v)
        if isinstance(momentum, float) or isinstance(momentum, int):
          label = '$\\beta$ = {:.2f}'.format(momentum)
        else:
          label = momentum
        ax.plot(domain, ys, color=color, label=label)
    ax.set_title("Time to")
    ax.legend()

    ax = plt.subplot(gs[0, 1])
    e = normalize(np.array([1, 1, 1]))
    v0 = [divergence(e, x) for x in data[0]]
    ys0 = process_data(v0)
    
    for i, (momentum, color) in list(enumerate(params)):
        v = [divergence(e, x) for x in data[i]]
        ys = process_data(v)
        if isinstance(momentum, float) or isinstance(momentum, int):
          label = '$\\beta$ = {:.2f}'.format(momentum)
        else:
          label = momentum
        ax.plot(domain, ys / ys0, color=color, label=label)
    ax.set_title("Time to relative")
    ax.legend()

def approximate_q(data, params, divergence=None, domain=None):
    if not divergence:
        divergence = information.kl_divergence

    def process_data(ds):
        qs = []
        for i in range(3, len(ds)):
            q_n = np.log(np.abs(ds[i  ] - ds[i-1]) / np.abs(ds[i-1] - ds[i-2]))
            q_d = np.log(np.abs(ds[i-1] - ds[i-2]) / np.abs(ds[i-2] - ds[i-3]))
            q = q_n / q_d
            qs.append(q)
        return qs
    
    gs = gridspec.GridSpec(1, 1)
    
    # Divergences
    ax = plt.subplot(gs[0, 0])
    e = normalize(np.array([1, 1, 1]))
    
    for i, (momentum, color) in list(enumerate(params)):
        v = [divergence(e, x) for x in data[i]]
        ys = process_data(v)
        if isinstance(momentum, float) or isinstance(momentum, int):
          label = '$\\beta$ = {:.2f}'.format(momentum)
        else:
          label = momentum
        ax.plot(range(len(ys)), ys, color=color, label=label)
    ax.set_title("Approximate Order")
    ax.legend()
        

# Figures Actually in the Paper

## Diagram for momentum

In [0]:
betas = np.linspace(-1, 3, num=51, endpoint=True, retstep=False, dtype=None, axis=0)
ys = [(1. / (1 - beta)) for beta in betas]


fig, ax = plt.subplots(figsize=(20, 10))

plt.plot(betas, ys, linewidth=3)
plt.axvline(x=0, ymin=-25, ymax=25, dashes=[3,3], color='grey')
plt.axvline(x=1, ymin=-25, ymax=25, dashes=[2,2], color='black')
plt.axvline(x=2, ymin=-25, ymax=25, dashes=[2,2], color='black')

plt.axhline(y=0, xmin=-1, xmax=3, color='black')
plt.axhline(y=1, xmin=-1, xmax=3, dashes=[2,2], color='black')
plt.axhline(y=-1, xmin=-1, xmax=3, dashes=[2,2], color='black')

plt.plot([0], [1], marker='o', markersize=10, color="red")
plt.ylim(-5, 5)
plt.xlim(-1, 3)

ax.annotate('Velocity increasing',
            xy=(-0.5, 3), xycoords='data',
            xytext=(-100, 60), textcoords='offset points',
            size=30)

ax.annotate('Velocity decreasing and\nreversed',
            xy=(1.8, -3), xycoords='data',
            xytext=(-30, -30), textcoords='offset points',
            size=30)


ax.annotate('Momentum free',
            xy=(-0.05, 1.5), xycoords='data',
            xytext=(-100, 60), textcoords='offset points',
            size=20, color='r',
            arrowprops=dict(arrowstyle="->"))

plt.xlabel(r"Momentum $\beta$", size=40)
plt.ylabel(r"Coefficient $\frac{1}{1-\beta}$", size=40)
plt.xticks(range(-1, 4, 1), range(-1, 4, 1), size=20)
plt.yticks(range(-4, 5, 2), range(-4, 5, 2), size=20)

path = Path("images") / "momentum_beta.png"
plt.savefig(str(path), dpi=400)

# Phase Portraits and KL-divergence

In [0]:
initial_state = normalize(np.array([1, 1, 3]))

# Dynamics parameters
m = rock_paper_scissors(a=2, b=-1)
print(m)
fitness = linear_fitness(m)
incentive = replicator_incentive_power(fitness, 1)
mu = uniform_mutation_matrix(3, ep=0.)

# Various momenta
params = [(-0.5, 'b'), (0, 'black'), (0.5, 'g'), (0.9, 'r')]
data = []

for momentum, _ in params:
    t = compute_trajectory(
        initial_state,
        incentive,
        iterations=1000,
        mu=mu,
        momentum=momentum,
        h=0.01)
    data.append(t)

plot_trajectories(data, params)
path = Path("images") / "divergence_example.png"
plt.savefig(str(path), dpi=400)

In [0]:
initial_state = normalize(np.array([1, 1, 4]))

# Dynamics parameters
m = rock_paper_scissors(a=-1, b=1)
fitness = linear_fitness(m)
incentive = replicator_incentive_power(fitness, 1)
# incentive = fermi_incentive(fitness, beta=1)
mu = uniform_mutation_matrix(3, ep=0.)

momentum = .7
params = [('Polyak', 'red'), ('Nesterov', 'blue')]
data = []

for nesterov in [False, True]:
  t = compute_trajectory(
      initial_state,
      incentive,
      iterations=3000,
      mu=mu,
      momentum=momentum,
      nesterov=nesterov
    )
  data.append(t)

t = compute_trajectory(
  initial_state,
  incentive,
  iterations=3000,
  mu=mu,
  momentum=0,
  nesterov=nesterov
)
data.append(t)
params.append(("Momentum free", "black"))
    
plot_trajectories(data, params)
path = Path("images") / "convergence_divergence.png"
plt.savefig(str(path), dpi=400)

In [0]:
initial_state = normalize(np.array([1, 1, 4]))

# Dynamics parameters
m = rock_paper_scissors(a=2, b=1)
fitness = linear_fitness(m)
incentive = replicator_incentive_power(fitness, 1)
mu = uniform_mutation_matrix(3, ep=0.)

# Various momenta
params = [(0, 'black'), (.5, 'b'), (0.9, 'r'), (0.95, 'y'), (0.96, 'g')]
data = []

for momentum, _ in params:
    t = compute_trajectory(
        initial_state,
        incentive,
        iterations=1000,
        mu=mu,
        momentum=momentum)
    data.append(t)

plot_trajectories(data, params)
path = Path("images") / "polyak_examples.png"
plt.savefig(str(path), dpi=400)

In [0]:
initial_state = normalize(np.array([1, 1, 4]))

# Dynamics parameters
m = rock_paper_scissors(a=2, b=1)
fitness = linear_fitness(m)
incentive = replicator_incentive_power(fitness, 1)
mu = uniform_mutation_matrix(3, ep=0.)

# Various momenta
params = [(0, 'black'), (.5, 'b'), (0.9, 'r'), (0.95, 'y'), (0.96, 'g')]
data = []

for momentum, _ in params:
    t = compute_trajectory(
        initial_state,
        incentive,
        iterations=1000,
        mu=mu,
        momentum=momentum,
        nesterov=True
        )
    data.append(t)

plot_trajectories(data, params)
path = Path("images") / "nesterov_examples.png"
plt.savefig(str(path), dpi=400)

# Convergence

In [0]:
trajectory_fun = partial(replicator_trajectory, initial_state, fitness,
                         iterations=20000, exit_on_uniform=False,
                         exit_on_divergence_tol=True, divergence_tol=.01)

def num_iterations(betas, alpha, nesterov=False):
  return [len(trajectory_fun(momentum=beta, h=alpha, nesterov=nesterov)) for beta in betas]

def get_no_momentum_iter(alpha):
  return len(trajectory_fun(momentum=0.0, h=alpha))

initial_state = normalize(np.array([1, 1, 4]))

# Dynamics parameters
m = rock_paper_scissors(a=1, b=1)
fitness = linear_fitness(m)
betas = np.linspace(-0.9, 0.9, num=100)
learning_rate = 1/200 # (aka step size aka \alpha)
no_momentum_num_iter = get_no_momentum_iter(learning_rate)

polyak_iterations = num_iterations(betas, learning_rate, nesterov=False)
nesterov_iterations = num_iterations(betas, learning_rate, nesterov=True)

fig, ax = plt.subplots(figsize=(10, 10))
plt.plot(betas, np.array(polyak_iterations) / no_momentum_num_iter,
         label='Actual number of steps, Polyak momentum, learning rate: {}'.format(learning_rate))
plt.plot(betas, np.array(nesterov_iterations) / no_momentum_num_iter,
         label='Actual number of steps, Nesterov momentum, learning rate: {}'.format(learning_rate))
plt.plot(betas, (1 - betas), label='Predicted number of steps with Polyak momentum')
# I.e, no_momentum_num_iter / (1 / (1 - betas))
plt.axhline(1, linestyle='--', label='$\\beta$ = 0')
plt.xlabel("$\\beta$")
plt.ylabel("Number of iterations till $D(\hat{x}, x) \leq .01$ relative to $\\beta = 0$")
plt.title("Momentum vs. number of steps to approximate convergence")
plt.legend()
path = Path("images") / "time_to_converge_nesterov.png"
plt.savefig(str(path), dpi=400)

In [0]:
def plot_actual_vs_predicted_steps(betas, alphas, normed=False):
  fig, ax = plt.subplots(figsize=(10, 10))
  for i, alpha in enumerate(alphas):
    iterations = np.array(num_iterations(betas, alpha))
    no_momentum_num_iter = get_no_momentum_iter(alpha)
    predicted = np.array(no_momentum_num_iter * (1 - betas))
    if normed:
      # I.e, no_momentum_num_iter / (1 / (1 - betas))
      if i == 0:
        ax.plot(betas, 1 - betas,
                label='Predicted number of steps', color='black')
        ax.axhline(no_momentum_num_iter / no_momentum_num_iter,
                   linestyle='--', label='$\\beta$ = 0')
      ax.plot(betas, iterations / no_momentum_num_iter,
              label='Actual number of steps, learning rate: {:.2f}'.format(alpha))
    else:
      ax.plot(betas, iterations,
              label='Actual number of steps, learning rate: {:.2f}'.format(alpha))
      ax.plot(betas, predicted,
              label='Predicted number of steps, learning rate: {:.2f}'.format(alpha))
      # I.e, no_momentum_num_iter / (1 / (1 - betas))
      ax.axhline(no_momentum_num_iter, linestyle='--',
                 label='$\\beta$ = 0 , lr = {:.2f}: {} steps'.format(alpha, no_momentum_num_iter))
  plt.xlabel("$\\beta$")
  if normed:
    plt.ylabel("Number of iterations till $D(\hat{x}, x) \leq .01$ relative to $\\beta = 0$")
  else:
    plt.ylabel("Number of iterations till $D(\hat{x}, x) \leq .01$")
  plt.title("Momentum vs. number of steps to approximate convergence")
  plt.legend()

In [0]:
betas = np.linspace(-0.9, 0.9, num=200)
plot_actual_vs_predicted_steps(betas, [.05, .1, .15], normed=True)
path = Path("images") / "time_to_converge.png"
plt.savefig(str(path), dpi=400)