In [10]:
import logging
import warnings
from dataclasses import dataclass
from typing import Tuple

import numpy as np
from scipy.linalg import eig
from scipy.optimize import fmin_bfgs
from scipy.spatial.distance import pdist


_required_opts = ['analytic', 'ntries']
_logger = logging.getLogger(__name__)


@dataclass
class PilotOutput:
    A: np.ndarray
    B: np.ndarray
    C: np.ndarray
    Z: np.ndarray
    error: float
    R2: np.ndarray


def errorfcn(alpha, Xbar, n, m):
    f1 = np.reshape(alpha[(2 * n):], (m, 2))
    f2 = np.reshape(alpha[0:2 * n], (2, n))
    f3 = Xbar[:, 0:n].T
    r = (Xbar - np.dot(f1, np.dot(f2, f3)).T) ** 2
    return np.nanmean(np.nanmean(r, axis=0))


def pilot(X, Y, featlabels=None, **kwargs):
#     for opt in _required_opts:
#         if opt not in kwargs:
#             raise KeyError(f"Pilot required parameter {repr(opt)} missing.")

    Xbar = np.hstack((X, Y))
    n = X.shape[1]
    m = Xbar.shape[1]

    if kwargs['analytic']:
        _logger.info("PILOT is solving analyticaly the projection problem.")
        X = X.T
        Xbar = Xbar.T

        D, V = eig(np.dot(Xbar, Xbar.T), right=True, left=False)

        idx = np.argsort(np.abs(D))[::-1]
        V = V[:, idx[0:2]]
        B = V[0:n, :]
        C = V[n:m + 1, :].T
        Xr = np.dot(X.T, np.linalg.pinv(np.dot(X, X.T)))
        A = np.dot(V.T, np.dot(Xbar, Xr))
        Z = np.dot(A, X)
        Xhat = np.vstack((np.dot(B, Z), np.dot(C.T, Z)))
        error = float(np.sum((Xbar - Xhat) ** 2))
        R2 = np.diagonal(np.corrcoef(Xbar, Xhat, rowvar=False)[:m, m:]) ** 2
        Z = Z.T
    else:
        _logger.info("PILOT is solving numerically the projection problem.")
        ntries = kwargs['ntries']
        seed = kwargs['seed'] if 'seed' in kwargs else 1
        np.random.seed(seed)
        X0 = 2 * np.random.random((ntries, 2 * m + 2 * n)).T - 1
        alpha = np.zeros((2 * m + 2 * n, ntries))
        eoptim = np.zeros(ntries)
        perf = np.zeros(ntries)
        Hd = pdist(X)[np.newaxis].T

        for i in range(ntries):
            alpha[:, i], eoptim[i] = fmin_bfgs(lambda a: errorfcn(a, Xbar, n, m), x0=X0[:, i],
                                               full_output=True, disp=False)[:2]
            aux = alpha[:, [i]]
            A = np.reshape(aux[0:2 * n], (2, n))
            Z = np.dot(X, A.T)
            perf[i] = np.corrcoef(Hd, pdist(Z)[np.newaxis].T, rowvar=False)[0][1]
            _logger.info(f"PILOT has completed trial {i+1}")

        idx = np.argmax(perf)
        A = np.reshape(alpha[0:2 * n, idx], (2, n))
        Z = np.dot(X, A.T)
        B = np.reshape(alpha[(2 * n):, idx], (m, 2))
        Xhat = np.dot(Z, B.T)
        C = B[n:m + 1, :].T
        B = B[0:n + 1, :]
        error = np.sum((Xbar - Xhat) ** 2)
        with warnings.catch_warnings(record=True) as w:
            R2 = np.diagonal(np.corrcoef(Xbar, Xhat, rowvar=False)[:m, m:]) ** 2

    out = PilotOutput(A, B, C, Z, error, R2)
    _logger.info("PILOT has completed.")
    return out


def adjust_rotation(Z: np.ndarray, Ybad: np.ndarray, theta: float = 135.0) -> Tuple[np.ndarray, np.ndarray]:
    cenroid_bad = np.mean(Z[Ybad], axis=0)[::-1]
    theta = np.radians(theta) - np.arctan2(*cenroid_bad)
    rot = np.array(((np.cos(theta), -np.sin(theta)),
                    (np.sin(theta), np.cos(theta))))
    Z_rot = np.dot(rot, Z.T)
    return Z_rot.T, rot


In [11]:
import pandas as pd

In [12]:
df = pd.read_csv('datasets/sklearn-datasets/5_features_0.01_error.csv')
df

Unnamed: 0,x0,x1,x2,x3,x4,target
0,0.075179,0.318426,0.009202,0.122739,-0.099731,1
1,0.080730,0.112668,0.278760,0.198804,-0.033195,1
2,0.156315,-0.049505,0.172710,0.130853,-0.041350,1
3,0.300269,-0.054744,-0.008746,0.258687,0.060852,0
4,-0.068403,-0.055665,0.243979,-0.186244,0.216012,1
...,...,...,...,...,...,...
995,-0.000976,-0.085782,0.092538,-0.024185,-0.066686,1
996,0.050468,0.102596,0.201487,0.125999,-0.096460,1
997,-0.240456,-0.077274,0.116554,-0.143257,0.386489,1
998,0.196277,0.156681,-0.071957,0.034946,-0.178096,1


In [28]:
# X = df.drop(columns = 'target').values
X = df['x0'].values.reshape(-1,1)
Y = df['target'].values.reshape(-1,1)

In [32]:
pilot(X,Y, analytic = True)

PilotOutput(A=array([[-0.03943625],
       [-0.99991312]]), B=array([[-0.00226838, -0.99999743]]), C=array([[-0.99999743],
       [ 0.00226838]]), Z=array([[-0.00296478, -0.0751725 ],
       [-0.0031837 , -0.0807234 ],
       [-0.00616449, -0.15630184],
       ...,
       [ 0.00948269,  0.24043515],
       [-0.00774042, -0.19625964],
       [ 0.0010994 ,  0.02787537]]), error=499.9602690153329, R2=array([1., 1.]))

In [None]:
np.hstack((X, Y))