# Softplus Penalty Function Experiments
**Stefan Meili<br>2021-09-07**

In [None]:
import numpy as np
import pandas as pd

import warnings
warnings.filterwarnings("ignore")
from multiprocessing import Pool

import os
thread_count = len(os.sched_getaffinity(0))

import time

import secrets

from scipy.optimize import minimize

# Define Optimization Problems

In [None]:
#Penalty function lookup dictionary
#Return an approprate penalty function given type and constraint operator keys
#penalty functions accept constraint error (x), and hardness parameter (alpha)
penalty_functions = {
    'algebraic':{
        '<': lambda x, a: (np.sqrt(4 * a**2 + np.square(x)) + x)/2,
        '=': lambda x, a: np.sqrt(4 * a**2 + np.square(x)),
        '>': lambda x, a: (np.sqrt(4 * a**2 + np.square(x)) - x)/2
    },
    'algx':{
        '<': lambda x, a: (np.sqrt(4 * a**2 + np.square(x)) + x)/2 + np.square(np.maximum(0,x)),
        '=': lambda x, a: np.sqrt(4 * a**2 + np.square(x))+np.square(x),
        '>': lambda x, a: (np.sqrt(4 * a**2 + np.square(x)) - x)/2 + np.square(np.maximum(0,-x))
    },
    'logistic':{
        '<': lambda x, a: np.where(np.abs(x/a) < 1023, a*np.log2(1 + np.exp2(x/a)), np.maximum(0,x)),
        '=': lambda x, a: np.where(np.abs(x/a) < 1023, 2 * a * np.log2(1 + np.exp2(x/a)) - x, np.maximum(0,x)),
        '>': lambda x, a: np.where(np.abs(x/a) < 1023, a*np.log2(1 + np.exp2(-x/a)), np.maximum(0,x)),
    },
    'linear':{
        '<': lambda x, a: np.maximum(0, x),
        '=': lambda x, a: np.abs(x),
        '>': lambda x, a: np.maximum(0, -x),
    }
}

class Constraint:
    def __init__(self, operator, target = 0, ptype = 'algebraic', sigma = 1, alpha = 1, beta = 1):
        '''
        Simple constraint container class that stores penalty function, target, and relevant parameters
            operator = string operator. Must be in ['<', '=', '>']
            target = float
            ptype = string penalty function type. Must be in ['algebraic', 'algx', logistic', 'linear'] 
        '''
        self.operator = operator
        self.target = target
        self.ptype = ptype
        self.alpha = alpha
        self.sigma = sigma
        self.beta = beta
        
        if self.beta !=1: 
            self.penalty = lambda x: self.sigma * penalty_functions[self.ptype][self.operator](x, self.alpha)**self.beta
        else:
            self.penalty = lambda x: self.sigma * penalty_functions[self.ptype][self.operator](x, self.alpha)
            
    def __call__(self, value):
        x = value - self.target
        return self.penalty(x)

In [None]:
class Problem:
    '''
    Simple problem container class that stores objective coefficients, projection, and combination method
        o_coeffs = numpy float array with N coefficients for N dimensional problem
        projection = a callable class that projects input vector x onto the constraint space where each row returned
                     is a constraint
        combine = string in [norm, sum, max, sum_square] that defines how constraints are combined
    '''
    def __init__(self, o_coeffs, projection, combine = 'norm', **kwargs):
        self.o_coeffs = o_coeffs.reshape(1,-1)
        self.n_dims = o_coeffs.shape[0]
        
        self.projection = projection
        
        if combine == 'norm': self.combine_fn = lambda y: np.linalg.norm(y, axis = 0)
        elif combine == 'sum': self.combine_fn = lambda y: np.sum(y, axis = 0)
        elif combine == 'max': self.combine_fn = lambda y: np.max(y, axis = 0)
        elif combine == 'sum_square': self.combine_fn = lambda y: np.sum(np.square(y), axis = 0)
        elif combine == 'norm2': self.combine_fn = lambda y: np.sqrt(
            np.sum(np.square(y), axis = 0) + np.square(np.sum(y, axis = 0)))
        else: raise Exception('Invalid combine option, must be [norm, sum, max, sum_square]')
        
        self.constraint = Constraint('<', **kwargs)
    
    def _objective(self, x):
        #Return optimization objective
        return np.sum(np.multiply(self.o_coeffs, x), axis = 1)
    
    def _penalty(self, x):
        #Return penalty for constraint violations
        x_ = x.reshape(-1, self.n_dims)
        return self.combine_fn(self.constraint(self.projection(x_)))
    
    def __call__(self, x):
        #Return constrained optimization objective function
        return self._objective(x) + self._penalty(x)

In [None]:
def gen_objective(n, max_grad = 5):
    #Generate random linear objective coefficents with dimensions n and a specified maximum gradient
    grad = np.random.uniform(low = 1e-2, high = max_grad)
    u = np.random.uniform(low = -1, high = 1, size = n)
    return u / np.linalg.norm(u) * grad, grad

In [None]:
class Planes:
    '''
    hyperplane projection of x onto new constraint space defined by points and normals
    '''
    def __init__(self, points, normals):
        self.points = points[:,np.newaxis,:]
        self.normals = normals[:,np.newaxis,:]
        
    def __call__(self, x):
        return np.sum((x - self.points) * self.normals, axis = -1)

def rand_shear_mat(n, max_shear = 2, n_shears = 1):
    #Generate a random shearing matrix specifying dimensions n, maximum shearing factor and number of shears
    out = np.identity(n)
    
    for ns in range(n_shears):
        mat = np.identity(n)
        i = np.random.randint(n)
        j = np.random.randint(n-1)
        if j>=i: j+=1
        k = np.random.uniform(-max_shear, max_shear)
        mat[i, j] = k
        out = np.matmul(out, mat)
        
    return out


def gen_planes(n, x_min = 5, x_max = 20, shear = False, scale = False):
    '''
    generate a hypercube of constraints at random positive and negative coordinates defined by x_min and x_max
    shear enables constraint shearing, scale enables constraint scaling
    '''
    coords = np.row_stack([
        np.diag(np.random.uniform(x_min, x_max, size = n)),
        -np.diag(np.random.uniform(x_min, x_max, size = n))])
    
    norms = coords
    
    if shear:
        norms = np.matmul(norms, rand_shear_mat(n, n_shears = n//2))
        
    norms = norms / np.linalg.norm(norms, axis = -1)[:,np.newaxis]
    
    if scale:
        norms = norms * np.power(10, np.random.uniform(0, 1, (2*n, 1)))
        
    return coords, norms


def solve_planes(o, c, guess):
    '''
    Solve the hyperplanes problem analytically give objective coefficients o and contraint plane parameters c and 
    a guess vector of the solution location. Providing a guess avoids iterating over all possible corner combinatios
    and massively speeds solution time.
    ''' 
    n = len(o)
    errors = Planes(*c)(guess).reshape(-1)
    active = errors[:n] - errors[n:] > 0
    mask = np.concatenate([active, ~active])
    
    b = np.sum(c[0][mask] * c[1][mask], axis = 1)
    soln = np.linalg.solve(c[1][mask], b)
    
    return soln, np.dot(o, soln) 

In [None]:
class Sphere:
    '''
    projection of input x onto hyperspherical coordinates via np.linalg.norm()
    r_max specifies the radius of the spherical constraint.
    '''
    def __init__(self, r_max):
        self.r_max = r_max
        
    def __call__(self, x):
        return (np.linalg.norm(x, axis = 1) - self.r_max).reshape(1,-1)

    
def gen_sphere(r_min = 5, r_max = 20):
    return np.random.uniform(r_min, r_max)


def solve_sphere(o, r):
    #Finds the true minimum of the the objective function subject to radial constraint r.
    u = r / np.linalg.norm(o)
    soln = -o*u
    value = np.dot(o, soln)
    
    return soln, value

## Run Experiments

In [None]:
class Jacobian:
    '''
    Wrapper class that returns a second order estimate of the jacabian given step size for use with
    scipy.optimize.minimum(jac = True)
    '''
    def __init__(self, problem, step = 1e-6):
        self.problem = problem
        self.step = step
        
        self.n = None
        self.delta = None
        
    def __call__(self, x):
        if self.delta is None:
            self.n = len(x)
            self.delta = np.identity(self.n) * self.step / 2
            self.delta = np.row_stack([np.zeros(self.n), self.delta, -self.delta])
            self.n+=1

        vals = self.problem(x + self.delta)
        
        return vals[0], (vals[1:self.n] - vals[self.n:]) / self.step

In [None]:
def experiment(n, experiment_params, sphere = False, shear = False, const_scale = False, path = None):
    #Run a constrained optimization experiment with n dimensions and specified parameters
    
    #A unique identifier for each experiment group
    exp_id = secrets.token_hex(32)
    
    #select an random initial point
    x0 = np.random.uniform(low = -250, high = 250, size = n)
    
    #select a random objecive and gradient
    o_coeffs, grad = gen_objective(n, max_grad = 5)
    
    #generate and apply constraints
    if sphere:
        radius = gen_sphere()
        projection = Sphere(radius)
    else:
        c_coeffs = gen_planes(n, x_min = 10, x_max = 25, shear = shear, scale = const_scale)
        projection = Planes(*c_coeffs)
    
    #iterate over experimental groups, scaling paramters and hardness parameters, solving each optimization problem
    exps = []
    groups, combs, ptypes, sigmas, alphas, betas = [], [], [], [], [], []
    for exp_group, params in experiment_params.items():
        ptype = params['ptype']
        beta = params['beta']
        combine = params['combine']
        for sigma in params['sigma']:
            for alpha in params['alpha']:
                prob = Problem(
                    o_coeffs, 
                    projection,
                    combine,
                    ptype = ptype, 
                    sigma = sigma, 
                    alpha = alpha, 
                    beta = beta)
                
                prob_min = minimize(
                    Jacobian(prob),
                    x0,
                    method = 'BFGS',
                    jac = True
                )
                                                
                groups.append(exp_group)
                combs.append(combine)
                ptypes.append(ptype)
                sigmas.append(sigma)
                alphas.append(alpha)
                betas.append(beta)
                
                exps.append(prob_min)
    
    #find the true solution
    if sphere:
        solution, minval = solve_sphere(o_coeffs, radius)
    else:
        guess = np.median([e['x'] for e in exps], axis = 0)
        solution, minval = solve_planes(o_coeffs, c_coeffs, guess)
    
    #store results
    l = len(exps)
    out = pd.DataFrame.from_dict({
        'exp_id': [exp_id]*l,
        'n_dims': [n]*l,
        'grad': [grad]*l,
        'sphere': [sphere]*l,
        'shear': [shear]*l,
        'const_scale': [const_scale]*l,
        'group': groups,
        'combine': combs,
        'ptype': ptypes,
        'sigma': sigmas,
        'alpha': alphas,
        'beta': betas,
        'nfev':[e['nfev'] for e in exps],
        'nit':[e['nit'] for e in exps],
        'njev':[e['njev'] for e in exps],
        'success':[e['success'] for e in exps],
        'error':[np.linalg.norm(e['x'] - solution) for e in exps],
    })
    
    if path is not None:
        out.to_csv(f'{path}{exp_id}.csv')
    
    else: return out

In [None]:
class PickleableExp:
    #Make experiments picklable so they can be processed with Pool.map()
    def __init__(self, params, **kwargs):
        self.params = params
        self.kwargs = kwargs
        
    def __call__(self, n):
        return experiment(n, self.params, **self.kwargs)

def map_exps(tasks, params, **kwargs):
    #map tasks to pickled experiemnts
    exp = PickleableExp(params, **kwargs)
    
    with Pool(thread_count) as pool:
        pool.map(exp, tasks)

exp_kwargs = {
    #'square':{'path':'./data/square_planes/'},
    'shear':{'path':'./data/shear_planes/', 'shear':True},
    #'scale':{'path':'./data/scale_planes/', 'const_scale':True},
    #'shear_scale': {'path':'./data/shear_scale_planes/', 'shear':True, 'const_scale':True},
    'sphere':{'path':'./data/spheres/', 'sphere':True},
}


for kwargs in exp_kwargs.values():
    os.makedirs(kwargs['path'], exist_ok=True)

In [None]:
n_tasks = 500

### Experiments 1 and 2
Investigate performance of softplus and algebraic penalty functions on hyperplane and hypersphere problems with dimensions 2 - 50.

In [None]:
tasks = [2, 3, 5, 8, 12, 20, 32, 50] * n_tasks
    
experiment_params = {
    'quadratic sum':{
        'ptype': 'linear',
        'sigma': [1e4],
        'alpha': [0.0],
        'beta': 2.0,
        'combine': 'sum'
    },
    'algebraic norm':{
        'ptype': 'algebraic',
        'sigma': [15],
        'alpha': [3e-5],
        'beta': 1.0,
        'combine': 'norm'
    },
    'algebraic sum':{
        'ptype': 'algebraic',
        'sigma': [15],
        'alpha': [3e-5],
        'beta': 1.0,
        'combine': 'sum'
    },
    'logistic norm':{
        'ptype': 'logistic',
        'sigma': [15],
        'alpha': [3e-5],
        'beta': 1.0,
        'combine': 'norm'
    },
    'algx norm':{
        'ptype': 'algx',
        'sigma': [15],
        'alpha': [3e-5],
        'beta': 1.0,
        'combine': 'norm'
    },
    'algx sum':{
        'ptype': 'algx',
        'sigma': [15],
        'alpha': [3e-5],
        'beta': 1.0,
        'combine': 'sum'
    },
}
for key, kwargs in exp_kwargs.items():
    print(key)
    map_exps(tasks, experiment_params, **kwargs)

### Experiment 3
Investigate sensitivity of penalty function error to scaling parameter sigma

In [None]:
tasks = [12] * n_tasks

alpha_range = [1e-7, 1e-6, 1e-5, 1e-4, 1e-3, 1e-2, 1e-1]

experiment_params = {
    'algebraic norm':{
        'ptype': 'algebraic',
        'sigma': [15],
        'alpha': alpha_range,
        'beta': 1.0,
        'combine': 'norm'
    },
    'algebraic sum':{
        'ptype': 'algebraic',
        'sigma': [15],
        'alpha': alpha_range,
        'beta': 1.0,
        'combine': 'sum'
    },
    'logistic norm':{
        'ptype': 'logistic',
        'sigma': [15],
        'alpha': alpha_range,
        'beta': 1.0,
        'combine': 'norm'
    },
    'algx norm':{
        'ptype': 'algx',
        'sigma': [15],
        'alpha': alpha_range,
        'beta': 1.0,
        'combine': 'norm'
    },
    'algx sum':{
        'ptype': 'algx',
        'sigma': [15],
        'alpha': alpha_range,
        'beta': 1.0,
        'combine': 'sum'
    },
}

for key, kwargs in exp_kwargs.items():
    print(key)
    map_exps(tasks, experiment_params, **kwargs)

### Experiment 4
Investigate sensitivity of penalty function error to hardness parameter alpha

In [None]:
sigma_range = [1e1, 1e2, 1e4, 1e3, 1e5, 1e6, 1e7]

experiment_params = {
    'quadratic sum':{
        'ptype': 'linear',
        'sigma': sigma_range,
        'alpha': [0.0],
        'beta': 2.0,
        'combine': 'sum'
    },
    'algebraic norm':{
        'ptype': 'algebraic',
        'sigma': sigma_range,
        'alpha': [3e-5],
        'beta': 1.0,
        'combine': 'norm'
    },
    'algebraic sum':{
        'ptype': 'algebraic',
        'sigma': sigma_range,
        'alpha': [3e-5],
        'beta': 1.0,
        'combine': 'sum'
    },
    'logistic norm':{
        'ptype': 'logistic',
        'sigma': sigma_range,
        'alpha': [3e-5],
        'beta': 1.0,
        'combine': 'norm'
    },
    'algx norm':{
        'ptype': 'algx',
        'sigma': sigma_range,
        'alpha': [3e-5],
        'beta': 1.0,
        'combine': 'norm'
    },
    'algx sum':{
        'ptype': 'algx',
        'sigma': sigma_range,
        'alpha': [3e-5],
        'beta': 1.0,
        'combine': 'sum'
    },
}

for key, kwargs in exp_kwargs.items():
    print(key)
    map_exps(tasks, experiment_params, **kwargs)