In [1]:
from functools import partial
from pathlib import Path

from itertools import product

import numpy as np
import pandas as pd
import pytest
from numpy.testing import assert_array_almost_equal as aaae

from estimagic.optimization.process_constraints import process_constraints
from estimagic.optimization.reparametrize import reparametrize_from_internal
from estimagic.optimization.reparametrize import reparametrize_to_internal


In [2]:
from estimagic.optimization.reparametrize import reparametrize_from_internal
from estimagic.optimization.reparametrize import convert_external_derivative_to_internal
from estimagic.optimization.reparametrize import _reparametrize_from_internal_jacobian
from estimagic.optimization.reparametrize import _pre_replace
from estimagic.optimization.reparametrize import _post_replace
from estimagic.optimization.reparametrize import _transform_constraint_part

In [3]:
from estimagic.differentiation.derivatives import first_derivative

In [4]:
from estimagic.tests.optimization.test_reparametrize import to_test

In [5]:
def example_params():
    p = Path("estimagic/tests/optimization/fixtures/reparametrize_fixtures.csv")
    params = pd.read_csv(p)
    params.set_index(["category", "subcategory", "name"], inplace=True)
    for col in ["lower", "internal_lower"]:
        params[col].fillna(-np.inf, inplace=True)
    for col in ["upper", "internal_upper"]:
        params[col].fillna(np.inf, inplace=True)
    return params

example_params = example_params()

In [6]:
def all_constraints():
    constraints_dict = {
        "basic_probability": [{"loc": ("c", "c2"), "type": "probability"}],
        "uncorrelated_covariance": [
            {"loc": ("e", "off"), "type": "fixed", "value": 0},
            {"loc": "e", "type": "covariance"},
        ],
        "basic_covariance": [{"loc": "f", "type": "covariance"}],
        "basic_fixed": [
            {
                "loc": [("a", "a", "0"), ("a", "a", "2"), ("a", "a", "4")],
                "type": "fixed",
                "value": [0.1, 0.3, 0.5],
            }
        ],
        "basic_increasing": [{"loc": "d", "type": "increasing"}],
        "basic_equality": [{"loc": "h", "type": "equality"}],
        "query_equality": [
            {"query": 'subcategory == "j1" | subcategory == "i1"', "type": "equality"}
        ],
        "basic_sdcorr": [{"loc": "k", "type": "sdcorr"}],
        "normalized_covariance": [
            {"loc": "m", "type": "covariance"},
            {"loc": ("m", "diag", "a"), "type": "fixed", "value": 4.0},
        ],
    }
    return constraints_dict

all_constraints = all_constraints()

In [7]:
def reduce_params(params, constraints):
    all_locs = []
    for constr in constraints:
        if "query" in constr:
            all_locs = ["i", "j"]
        elif isinstance(constr["loc"], tuple):
            all_locs.append(constr["loc"][0])
        elif isinstance(constr["loc"], list):
            all_locs.append(constr["loc"][0][0])
        else:
            all_locs.append(constr["loc"])
    all_locs = sorted(set(all_locs))
    return params.loc[all_locs].copy()


In [8]:
case = "basic_probability"
number = 0

In [9]:
constraints = all_constraints[case]
params = reduce_params(example_params, constraints)
params["value"] = params[f"value{number}"]

keep = params[f"internal_value{number}"].notnull()

pc, pp = process_constraints(constraints, params)

internal_p = params[f"internal_value{number}"][keep].to_numpy()
fixed_val = pp["_internal_fixed_value"].to_numpy()
pre_repl = pp["_pre_replacements"].to_numpy()
post_repl = pp["_post_replacements"].to_numpy()

In [10]:
from estimagic.optimization.reparametrize import _reparametrize_from_internal_jacobian_matrix
from estimagic.optimization.reparametrize import _pre_replace_matrix
from estimagic.optimization.reparametrize import _transformer_matrix
from estimagic.optimization.reparametrize import _post_replace_matrix

In [11]:
gac = _reparametrize_from_internal_jacobian_matrix(
    internal_p, fixed_val, pre_repl, pc, post_repl
)

In [12]:
gac

array([[ 1. ,  1. ,  1. ,  1. ,  1. ],
       [ 1. ,  1. ,  1. ,  1. ,  1. ],
       [ 0.4,  0.4,  0.4,  0.4,  0.4],
       [-0.2, -0.2, -0.2, -0.2, -0.2],
       [-0.2, -0.2, -0.2, -0.2, -0.2],
       [ 0. ,  0. ,  0. ,  0. ,  0. ]])

In [15]:
tester("basic_probability", 0, True)

jac
[[ 1.    0.    0.    0.    0.  ]
 [ 0.    1.    0.    0.    0.  ]
 [ 0.    0.    0.45 -0.05 -0.05]
 [ 0.    0.   -0.1   0.4  -0.1 ]
 [ 0.    0.   -0.1  -0.1   0.4 ]
 [ 0.    0.   -0.25 -0.25 -0.25]]
gac
[[ 1.   1.   1.   1.   1. ]
 [ 1.   1.   1.   1.   1. ]
 [ 0.4  0.4  0.4  0.4  0.4]
 [-0.2 -0.2 -0.2 -0.2 -0.2]
 [-0.2 -0.2 -0.2 -0.2 -0.2]
 [ 0.   0.   0.   0.   0. ]]


In [14]:
def tester(case, number, return_stuff=False):

    constraints = all_constraints[case]
    params = reduce_params(example_params, constraints)
    params["value"] = params[f"value{number}"]

    keep = params[f"internal_value{number}"].notnull()

    pc, pp = process_constraints(constraints, params)

    internal_p = params[f"internal_value{number}"][keep].to_numpy()
    fixed_val = pp["_internal_fixed_value"].to_numpy()
    pre_repl = pp["_pre_replacements"].to_numpy()
    post_repl = pp["_post_replacements"].to_numpy()

    func = partial(
        reparametrize_from_internal,
        **{
            "fixed_values": fixed_val,
            "pre_replacements": pre_repl,
            "processed_constraints": pc,
            "post_replacements": post_repl
        }
    )

    gunc = partial(
        #convert_external_derivative_to_internal,
        _reparametrize_from_internal_jacobian_matrix,
        **{
            #"external_derivative": np.eye(len(internal_p)),
            "fixed_values": fixed_val,
            "pre_replacements": pre_repl,
            "processed_constraints": pc,
            "post_replacements": post_repl
        }
    )

    jac = first_derivative(func, internal_p)
    gac = gunc(internal_p)

    if return_stuff:
        print('jac')
        print(jac)
        print('gac')
        print(gac)
    else:
        aaae(jac, gac)

In [14]:
to_test = list(
    product(
        [
            "basic_probability",
            "uncorrelated_covariance",
            "basic_covariance",
            "basic_fixed",
            "basic_increasing",
            "basic_equality",
            "query_equality",
            "basic_sdcorr",
            "normalized_covariance",
        ],
        [0, 1, 2],
    )
)


In [15]:
bad = []
for args in to_test:
    try:
        tester(*args)
    except AssertionError:
        bad.append(args)

In [17]:
len(bad)

27