In [None]:
import numpy as np
import matplotlib.pyplot as plt
from sklearn.linear_model import Lasso
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import Normalizer
from sklearn.decomposition import PCA
from glob import glob as glob
import os
import pywt

In [None]:
L = 5
num_samp = 5
im_size = np.array((192, 168))

In [None]:
im_dirs = np.array(sorted(glob("CroppedYale/*")))
total_num_images = len(im_dirs)
im_idxs = np.random.choice(total_num_images, size=L, replace=False)

In [None]:
def filter_files(d, num_train, num_test, cond):
    fnames = [os.path.basename(i)[:-4] for i in glob(d+"/*_P00A*.pgm")]
    acc_files = []
    for f in fnames:
        az = int(f[12:16])
        elev = int(f[17:20])
        if cond(az, elev):
            acc_files.append(f)
    samps = np.random.choice(acc_files, num_train+num_test, replace=False)
    return samps[:num_train], samps[num_train:]

In [None]:
def fread(f):
    return plt.imread(f).flatten().T

In [None]:
A = np.zeros((np.prod(im_size), L*num_samp))
train_fnames = []
# print(A.shape)
for i, d in enumerate(im_dirs[im_idxs]):
    train, test = filter_files(d, num_samp, .8, lambda az, elev: abs(az) <= 45 and abs(elev) <= 45)
    print(f"Train: {train}\n Test: {test} \n")
#     for j, f in enumerate(filter_files(d, num_samp)):
#         train_fnames.append(f)
#         A[:,i*num_samp+j] = fread(d+"/"+f+".pgm")

In [None]:
def_cond = lambda az, elev: abs(az) <= 45 and abs(elev) <= 45
def train_test_split(num_classes=5, num_train=5, num_test=3, cond = def_cond):
    
    class_dirnames = np.random.choice(glob("CroppedYale/*"), size=num_classes, replace=False)
    
    train_fnames_all, test_fnames_all = np.array([]), np.array([])
    
    A = np.zeros((np.prod(im_size), num_classes*num_train))
    y = np.zeros((np.prod(im_size), num_classes*num_test))
    
    for i, d in enumerate(class_dirnames):
        train_fnames, test_fnames = filter_files(d, num_train, num_test, cond)
        train_fnames_all = np.append(train_fnames_all, train_fnames)
        test_fnames_all = np.append(test_fnames_all, test_fnames)
        for j, f in enumerate(train_fnames):
            A[:,(i*num_train + j)] = fread(d+"/"+f+".pgm")
        for j, f in enumerate(test_fnames):
            y[:,(i*num_test+j)] = fread(d+"/"+f+".pgm")
             
    return A,y, train_fnames_all, test_fnames_all

In [None]:
A, y, train_fnames, test_fnames = train_test_split(num_classes = 3, num_train = 10, num_test = 3)
print(A.shape, y.shape)
print(train_fnames)
print(test_fnames)

In [None]:
def down_samp(A, ds_factor=16):
    im_size_down = np.ceil(im_size/ds_factor).astype(int)
    A_down = np.zeros((np.prod(im_size_down), A.shape[-1]))
#     print(A_down.shape)
    for i in range(A.shape[-1]):
        A_down[:,i] = A[:,i].reshape(im_size)[::ds_factor, ::ds_factor].flatten()
    return A_down, im_size_down

In [None]:
def down_samp_pca(A, dim=25):
    # sklearn PCA
    pca = PCA(n_components=dim, svd_solver="auto")
    A_pca = pca.fit_transform(A.T).T
    
    # Manual PCA
#     U, S, Vh = np.linalg.svd(A, full_matrices=True)
#     print(U.shape, S.shape, Vh.shape)
#     A_pca = U[:,:dim].T@A
    return A_pca, A_pca.shape

In [None]:
def down_samp_wave(A):
    im_vec = A.reshape((*im_size,-1))
    wave_vec = pywt.wavedec2(im_vec, 'haar', axes=(0,1), level=4)
#     low_dim_data = wave_vec[0]
    low_dim_data = wave_vec[0] + sum(wave_vec[1])
    return low_dim_data.reshape(-1, A.shape[-1]), low_dim_data.shape[:2]

In [None]:
A_ds, ds_shape = down_samp(A, ds_factor=16)

In [None]:
A_wave, wave_shape = down_samp_wave(A)

In [None]:
plt.imshow(A_ds[...,3].reshape(ds_shape))

In [None]:
plt.imshow(A_wave[...,3].reshape(wave_shape))

In [None]:
A_pca, pca_shape = down_samp_pca(A)

In [None]:
pca_shape

In [None]:
A_pca[...,0]

In [None]:
plt.imshow(A_pca)

In [None]:
def delta(x, i):
    assert i < L
    out = np.zeros(len(x))
    idxs = slice(i*L, i*L+num_samp)
    out[idxs] = x[idxs]
    return out

In [None]:
delta(np.ones(25), 4)

In [None]:
y = A_down[:,1]

In [None]:
y.shape

In [None]:
def identity(A, y):
    A = A/np.linalg.norm(A, axis=0)
    prob = Lasso(fit_intercept=False)
    prob.fit(A, y)
    x_hat = prob.coef_
    r = np.zeros(L)
    for i in range(L):
        r[i] = np.linalg.norm(y-A@delta(x_hat, i))
#     print(r)
    return np.argmin(r)

In [None]:
identity(A_down,y)

In [None]:
test_fnames = []
for fname in im_dirs[im_idxs]:
    a = filter_files(fname, 1)
    while a in train_fnames:
        a = filter_files(fname, 1)
    test_fnames.append(fname+"/"+a[0]+".pgm")

In [None]:
test_fnames

In [None]:
identity(A_down, down_samp(fread(test_fnames[4])[:,None],16)[0])

In [None]:
fread(test_fnames[4]).shape

In [None]:
A_down.shape

In [None]:
identity(A_down, pca.transform(fread(test_fnames[4])[:,None].T).T)

In [None]:
# fread(test_fnames[0]).shape

In [None]:
def robust_identity(A, y):
    A = A/np.linalg.norm(A, axis=0)
    m,n = A.shape
    B = np.hstack((A, np.eye(m)))
    print(B.shape)
    prob = Lasso(fit_intercept=False)
    prob.fit(B, y)
    w_hat = prob.coef_
    x_hat = w_hat[:n]
    e_hat = w_hat[n:]
    r = np.zeros(L)
    for i in range(L):
        r[i] = np.linalg.norm(y-e_hat-A@delta(x_hat, i))
    print(r-np.min(r))
    return np.argmin(r)

In [None]:
y_test_noise = fread(test_fnames[2])[:,None]+np.random.normal(scale=500, size=(32256,1))

In [None]:
plt.imshow(y_test_noise.reshape(im_size))

In [None]:
robust_identity(A_down, down_samp(y_test_noise,16)[0])