In [None]:
from google.colab import drive
drive.mount('/content/drive')

general_path = "/content/drive/My Drive/Colab Notebooks/"
%cd /content/drive/My\ Drive/Colab Notebooks/

## Libaries

In [None]:
import os
import pickle
import numpy as np

## Functions

In [None]:
from PIL import Image

# Function that reshapes an array of size (m, n, z)
# to a vector of size (m * n * z, 1)
def image_to_vector(im, rgb=False):
  if rgb:
    length, height, depth = im.shape
    vec = im.reshape(length * height * depth, 1)
  else:
    length, height = im.shape
    vec = im.reshape(length * height, 1)

  return vec 

# Function that reshapes a vector of size (m * n * z, 1)
# to an array of size (m, n, z)
def vector_to_image(vec, length, height, depth=1, rgb=False):
  if rgb:
    im = vec.reshape(length, height, depth)
  else:
    im = vec.reshape(length, height)

  return im

# Function that creates a matrix out of column vectors
def create_input_matrix(data_path, rgb=True):
  shape = None
  im_paths = []

  for subdir, dirs, files in os.walk(data_path):
    for file in files:  
      file_path = subdir + '/' + file
      im = Image.open(file_path) # open aligned image

      # do once for the first image
      if shape is None:
        shape = im.size

        im_arr = np.asarray(im) # load image to array
        cp_arr = im_arr.copy()
        
        matrix = image_to_vector(cp_arr, rgb)
        im_paths.append(file_path)
        continue

      assert im.size == shape

      # repeat for the rest images in the set
      im_arr = np.asarray(im) # load image to array
      cp_arr = im_arr.copy()
      
      vec = image_to_vector(cp_arr, rgb)
      matrix = np.append(matrix, vec, axis=1)
      im_paths.append(file_path)

  return matrix, im_paths

# Function that creates a directory if it doesn't already exist
def create_dir(dir):
  if not os.path.exists(dir):
    os.makedirs(dir)

## PCPSFM

In [None]:
def soft_shrink(Χ, mu):
    """
    Υ = sgn(Χ)max(|Χ| - mu, 0)
    
    Parameters
    ----------
    Χ: numpy array
    mu: thresholding parameter
    
    Returns:
    ----------
    Υ: numpy array
    
    """
    Υ = np.maximum(Χ - mu, 0)
    Υ = Υ + np.minimum(Χ + mu, 0)
    
    return Υ

def pcpsfm(X, W, S, U, V, alpha=np.nan, lamda=np.nan, beta=np.nan, mu=np.nan, tol=10**(-7), maxit=1000):
  """ 
  Paper: Side Information for Face Completion: a Robust PCA Approach 
  Authors: Niannan Xue, Jiankang Deng, Shiyang Cheng, Yannis Panagakis, Stefanos Zafeiriou
  Published in: IEEE Transactions on Pattern Analysis and Machine Intelligence (Volume: 41, Issue: 10, Oct. 1 2019)

  Description
  ------------
  A Robust Principal Component Analysis implementation using side information with missing values and features incorporation (PCPSFM).

  Parameters
  ----------
  X : observation (n1 x n2) matrix, which will be decomposed into a sparse matrix E and a low-rank matrix L.

  W : mask / missing values (n1 x n2) matrix. 

  S : side information (n1 x n2) matrix.

  U : feature (n1 x d1) matrix.
  
  V : feature (n2 x d2) matrix.

  alpha: positive tuning parameter used in the calculation of B. 

  lamda : positive tuning parameter used in the calculation of E.

  beta: scaling ratio factor (> 1), that helps mu reach an accelerated superlinear performance.

  mu: postive tuning parameter used in augmented Lagrangian.
  
  tol : tolerance value for convergency.

  maxit : maximum iteration.
  
  Returns
  ----------
  L : array-like, low-rank matrix.

  E : array-like, sparse matrix.

  niter : number of iteration.
  """ 

  # dimensions
  n1, n2 = X.shape
  
  d1 = U.shape[1]
  d2 = V.shape[1]

  # B = H - D
  # H,D bilinear mappings 
  H = np.zeros((d1,d2))
  B = np.zeros((d1,d2)) 

  # Lagrange multipliers
  Z = np.zeros((n1,n2)) 
  N = np.zeros((d1,d2)) 

  # parameter setting
  if np.isnan(alpha):
    alpha = 10**(-1)
  if np.isnan(lamda):
    lamda = 1.0 / np.sqrt(max(n1,n2))
  if np.isnan(beta):
    beta = 1.0 / np.linalg.norm(X, 2) 
  if np.isnan(mu):
    mu = 10**(-5)
  
  print("** Initial parameters **")
  print('alpha {} lamda {} beta {} mu {}'.format(alpha, lamda, beta, mu))

  # transpose feature matricies
  Ut = np.transpose(U)
  Vt = np.transpose(V)

  for niter in range(maxit):
    print("Iteration ", niter)
   
    if (niter + 1) % 50 == 0: # every 50 iterations beta reduces by 0.05
      beta -= 0.05
      if beta <= 1.0: # if reaches 0 or less, assign beta its previous value
        beta = 1.05  # if beta <= 0.05, give it a fixed value (beta = 1.05)


    # E = Softshrinkage(λ/μ)(X - UHV* + (1/μ) Z) o W + (X - UHV* + (1/μ) Z) o (1 - W)
    R = X - np.dot(np.dot(U, H), Vt) + (1/mu) * Z     # R = X - UHV* + (1/μ) Z
    SS = soft_shrink(R, lamda/mu)                     # SS = Softshrinkage(λ/μ)(R)
    E = np.multiply(SS, W) + np.multiply(R, (1 - W))  # E = SS o W + R o (1 - W)


    # H = U* SVT(1/2μ)((1/2)(X - E + (1/μ)Z + U (B + U*SV - (1/μ)N) V*)) V
    P = 0.5 * (X - E + (1/mu) * Z + np.dot(np.dot(U, (B + np.dot(np.dot(Ut, S), V) - (1/mu) * N)), Vt)) # P = (1/2)(X - E + (1/μ)Z + U (B + U*SV - (1/μ)N) V*)

    # SVT(1/2μ)(P)
    UH, sigmasH, VH = np.linalg.svd(P, full_matrices=False) 
    sigmasH = soft_shrink(sigmasH, 1/(2*mu)) 
    SigmaH = np.diag(sigmasH)

    H = np.dot(np.dot(Ut, np.dot(np.dot(UH, SigmaH), VH)), V) # H = U* SVT(1/2μ)(P) V


    # B = SVT(a/μ) (H - U*SV + (1/μ)N)
    Q = H - np.dot(np.dot(Ut, S), V) + (1/mu) * N  # Q = (H - U*SV + (1/μ)N
    
    # SVT(a/μ)(Q)
    UB, sigmasB, VB = np.linalg.svd(Q, full_matrices=False)
    sigmasB = soft_shrink(sigmasB, alpha/mu) 
    SigmaB = np.diag(sigmasB)

    B = np.dot(np.dot(UB, SigmaB), VB)  # B = SVT(a/μ)(Q)


    # Z = Z + μ(X - E - UHV*)
    CoefZ = X - E - np.dot(np.dot(U, H), Vt) # CoefZ = X - E - UHV*
    Z = Z + mu * CoefZ; # Z = Z + μ * Coefz

    # N = N + μ(H - B - U*SV)
    CoefN = H - B - np.dot(np.dot(Ut, S), V)  # CoefN = H - B - U*SV
    N = N + mu * CoefN; # N = N + μ * CoefN

    # μ = μ x β
    mu = mu * beta          

    print('beta {},  mu {}'.format(beta, mu))

    cond1 = np.linalg.norm(CoefZ, 'fro') / np.linalg.norm(X, 'fro') # || X - E - UHV* ||F / ||X||F
    cond2 = np.linalg.norm(CoefN, 'fro') / np.linalg.norm(X, 'fro') # || H - B - U*SV ||F / ||X||F
    
    print('cond1 {} cond2 {} tol {}'.format(cond1, cond2, tol))
    print()
    
    # # save the currently calculated results to disk every 50 iterations
    # if (niter + 1) % 50 == 0:
    #   check_path = general_path + "pcpsfm_checkpoint_" + str(niter+1) + ".pkl"

    #   # Saving the objects:
    #   with open(check_path, 'wb') as f:  
    #     pickle.dump([np.dot(np.dot(U, H), Vt), E, niter], f)

    if max(cond1, cond2) < tol: # if the maximum of the two conditions gets a value smaller than the tolerance, terminate the algorithm
      break

  L = np.dot(np.dot(U, H), Vt)

  return L, E, niter

# Classic RPCA

### M Matrix

In [None]:
# Create observation matrix
data_path = os.path.join(general_path, "datasets/small/myceleba/")
M, im_paths = create_input_matrix(data_path)

# Store observation matrix 
mat_path = os.path.join(general_path, "myceleba_small_classic_matrix.pkl")
with open(mat_path, 'wb') as f:  
  pickle.dump([M, im_paths], f)

### Load Observation Matrix 

In [None]:
mat_path = os.path.join(general_path, "myceleba_small_classic_matrix.pkl")
with open(mat_path, 'rb') as f:
  [M, im_paths] = pickle.load(f)

M = M / 255 # normalize matrix
X = M

### Execution - lamda tuning

In [None]:
from PIL import Image, ImageOps
from pathlib import Path

# Classic RPCA configuration
d1 = X.shape[0]
d2 = X.shape[1]

U = np.identity(d1, np.float32)
V = np.identity(d2, np.float32)
S = np.zeros((d1, d2), np.float32)
W = np.full((d1, d2), 1, np.float32)

res_path = os.path.join(general_path, "results/") # directory containing all the results
create_dir(res_path)

# lamda values
lamda_list = [1.0 / np.sqrt(max(d1,d2)), 0.01, 0.1]

lamda_path = os.path.join(general_path, "results/myceleba_classic/") # directory to store the results
create_dir(lamda_path)

# Execute CRPCA for each lamda value
for lamda in lamda_list:
  L, E, niter = pcpsfm(X, W, S, U, V, alpha=0.0, lamda=lamda, beta=1.5, mu=10**(-5), tol=10**(-7))
  print("Converges at ", niter, "iteratons")

  # Store CRPCA return values
  pkl_name =  "myceleba_lam_" + str(round(lamda, 4)) + "_results.pkl"
  results_path = os.path.join(lamda_path, pkl_name)
  with open(results_path, 'wb') as f:  
    pickle.dump([L, E, niter], f)

  # Create directory to store the results
  res_name = "lam_" + str(round(lamda, 4)) + "/"
  res_dir = os.path.join(lamda_path, res_name)
  create_dir(res_dir)
  
  # Create directory to store only the inpainting results
  L_name = "lam_" + str(round(lamda, 4)) + "_L/"
  L_dir = os.path.join(lamda_path, L_name)
  create_dir(L_dir)

  # List the occluded images
  occ_dir = os.path.join(general_path, "myceleba_occlusions/")
  occ_list = os.listdir(occ_dir)

  # Locate occluded images 
  for case in occ_list:
    case_paths = [path for path in im_paths if case in path]
    
    file_path = Path(case_paths[0])
    parent_path = file_path.parent.absolute()
    id_dir = os.path.basename(parent_path)
    id_path = os.path.join(res_dir, id_dir)
    create_dir(id_path)
    idL_path = os.path.join(L_dir, id_dir)
    create_dir(idL_path)
    
    # Get the index of occluded image in list of paths
    ind = im_paths.index(case_paths[0]) 
    col = ind

    # Create arrays from column vectors
    X_arr = vector_to_image(X[:, col], 102, 100, 3, True)
    L_arr = vector_to_image(L[:, col], 102, 100, 3, True)
    E_arr = vector_to_image(E[:, col], 102, 100, 3, True)

    # Covert arrays to images
    imX = Image.fromarray((X_arr * 255).astype(np.uint8))
    imL = Image.fromarray((L_arr * 255).astype(np.uint8))
    imE = Image.fromarray((E_arr * 255).astype(np.uint8))

    # Store initial, low-rank and sparse images
    file_name, file_extension = os.path.splitext(case)

    X_name = file_name + '_X' + file_extension
    L_name = file_name + '_L' + file_extension
    E_name = file_name + '_E' + file_extension

    X_path = os.path.join(id_path, X_name)
    L_path = os.path.join(id_path, L_name)
    LL_path = os.path.join(idL_path, L_name)
    E_path = os.path.join(id_path, E_name)

    imX.save(X_path)
    imL.save(L_path)
    imL.save(LL_path)
    imE.save(E_path)

# PCPF

### Train Matrix

In [None]:
# Create observation matrix
train_path = os.path.join(general_path, "datasets/small/myceleba_split/train/")
M, train_im_paths = create_input_matrix(train_path)

# Store observation matrix 
train_mat_path = os.path.join(general_path, "myceleba_train_matrix.pkl")
with open(train_mat_path, 'wb') as f:  
  pickle.dump([M, train_im_paths], f)

### Test Matrix

In [None]:
# Create observation matrix
test_path = os.path.join(general_path, "datasets/small/myceleba_split/test/")
X, test_im_paths = create_input_matrix(test_path)

# Store observation matrix 
test_mat_path = os.path.join(general_path, "myceleba_small_test_matrix.pkl")
with open(test_mat_path, 'wb') as f:  
  pickle.dump([X, test_im_paths], f)

### Load Train, Test Matrices 

In [None]:
train_mat_path = os.path.join(general_path, "myceleba_train_matrix.pkl")
with open(train_mat_path, 'rb') as f:
  [M, train_im_paths] = pickle.load(f)

M = M / 255 # normalize matrix

In [None]:
test_mat_path = os.path.join(general_path, "myceleba_small_test_matrix.pkl")

# Getting back the objects:
with open(test_mat_path, 'rb') as f:
  [X, test_im_paths] = pickle.load(f)

X = X / 255 # normalize matrix

### Create U Matrix

In [None]:
U, _, _ = np.linalg.svd(M, full_matrices=False)

### Execution - lamda tuning

In [None]:
from PIL import Image, ImageOps
from pathlib import Path

# PCPF configuration
d1 = X.shape[0]
d2 = X.shape[1]

V = np.identity(d2, np.float32)
S = np.zeros((d1, d2), np.float32)
W = np.full((d1, d2), 1, np.float32)

res_path = os.path.join(general_path, "results/") # directory containing all the results
create_dir(res_path)

# lamda values
lamda_list = [1.0 / np.sqrt(max(d1,d2)), 0.01, 0.1]

lamda_path = os.path.join(general_path, "results/myceleba_pcpf/") # directory to store the results
create_dir(lamda_path)

# Execute CRPCA for each lamda value
for lamda in lamda_list:
  L, E, niter = pcpsfm(X, W, S, U, V, alpha=0.0, lamda=lamda, beta=1.2, mu=10**(-5), tol=10**(-7))
  print("Converges at ", niter, "iteratons")

  # Store CRPCA return values
  pkl_name =  "myceleba_lam_" + str(round(lamda, 4)) + "_results.pkl"
  results_path = os.path.join(lamda_path, pkl_name)
  with open(results_path, 'wb') as f:  
    pickle.dump([L, E, niter], f)

  # Create directory to store the results
  res_name = "lam_" + str(round(lamda, 4)) + "/"
  res_dir = os.path.join(lamda_path, res_name)
  create_dir(res_dir)
  
  # Create directory to store only the inpainting results
  L_name = "lam_" + str(round(lamda, 4)) + "_L/"
  L_dir = os.path.join(lamda_path, L_name)
  create_dir(L_dir)

  # List the occluded images
  occ_dir = os.path.join(general_path, "datasets/small/myceleba_occlusions/")
  occ_list = os.listdir(occ_dir)

  # Locate occluded images 
  for case in occ_list:
    case_paths = [path for path in test_im_paths if case in path]
    
    file_path = Path(case_paths[0])
    parent_path = file_path.parent.absolute()
    id_dir = os.path.basename(parent_path)
    id_path = os.path.join(res_dir, id_dir)
    create_dir(id_path)
    idL_path = os.path.join(L_dir, id_dir)
    create_dir(idL_path)

    # Get the index of occluded image in list of paths
    ind = test_im_paths.index(case_paths[0]) 
    col = ind

    # Create arrays from column vectors
    X_arr = vector_to_image(X[:, col], 102, 100, 3, True)
    L_arr = vector_to_image(L[:, col], 102, 100, 3, True)
    E_arr = vector_to_image(E[:, col], 102, 100, 3, True)

    # Covert arrays to images
    imX = Image.fromarray((X_arr * 255).astype(np.uint8))
    imL = Image.fromarray((L_arr * 255).astype(np.uint8))
    imE = Image.fromarray((E_arr * 255).astype(np.uint8))

    # Store initial, low-rank and sparse images
    file_name, file_extension = os.path.splitext(case)

    X_name = file_name + '_X' + file_extension
    L_name = file_name + '_L' + file_extension
    E_name = file_name + '_E' + file_extension

    X_path = os.path.join(id_path, X_name)
    L_path = os.path.join(id_path, L_name)
    LL_path = os.path.join(idL_path, L_name)
    E_path = os.path.join(id_path, E_name)

    imX.save(X_path)
    imL.save(L_path)
    imL.save(LL_path)
    imE.save(E_path)

# PCPFM

### Train Matrix

In [None]:
# Create observation matrix
train_path = os.path.join(general_path, "datasets/small/myceleba_split/train/")
M, train_im_paths = create_input_matrix(train_path)

# Store observation matrix 
train_mat_path = os.path.join(general_path, "myceleba_train_matrix.pkl")
with open(train_mat_path, 'wb') as f:  
  pickle.dump([M, train_im_paths], f)

### Test Matrix

In [None]:
# Create observation matrix
test_path = os.path.join(general_path, "datasets/small/myceleba_split/test/")
X, test_im_paths = create_input_matrix(test_path)

# Store observation matrix 
test_mat_path = os.path.join(general_path, "myceleba_small_test_matrix.pkl")
with open(test_mat_path, 'wb') as f:  
  pickle.dump([X, test_im_paths], f)

### Load Train, Test Matrices 

In [None]:
train_mat_path = os.path.join(general_path, "myceleba_train_matrix.pkl")
with open(train_mat_path, 'rb') as f:
  [M, train_im_paths] = pickle.load(f)

M = M / 255 # normalize matrix

In [None]:
test_mat_path = os.path.join(general_path, "myceleba_test_matrix.pkl")

# Getting back the objects:
with open(test_mat_path, 'rb') as f:
  [X, test_im_paths] = pickle.load(f)

X = X / 255 # normalize matrix

### Create U Matrix

In [None]:
U, _, _ = np.linalg.svd(M, full_matrices=False)

### Create W Matrix

In [None]:
from PIL import Image, ImageOps

# List the mask images
mask_dir = os.path.join(general_path, "myceleba_masks/")
mask_list = os.listdir(mask_dir)

W_list = []

# Traverse all the test set masks
for i, case in enumerate(test_im_paths):
  file_name = os.path.basename(case)
  if file_name in mask_list:  # Find mask that corresponds to an occluded image
    mask_path = os.path.join(mask_dir, file_name)
    mask = Image.open(mask_path)
    mask = ImageOps.invert(mask) # Invert mask colors
    mask_arr = np.asarray(mask) # Convert mask image to array

    # Create column vector with mask pixels
    Wi = mask_arr / 255
    Wi_rgb = np.dstack((Wi, Wi, Wi))
    W_vec = image_to_vector(Wi_rgb, True)
    W_list.append(W_vec.tolist())

  else: # Fill column vectors with 1 for non-occluded images
    W_vec = np.full((X.shape[0], 1), 1.0, np.float64)
    W_list.append(W_vec.tolist())

W = np.array(W_list)
W = np.squeeze(W, 2)
W = np.transpose(W, (1, 0))

### Execution - lamda tuning

In [None]:
from PIL import Image, ImageOps
from pathlib import Path

# Classic RPCA configuration
d1 = X.shape[0]
d2 = X.shape[1]

V = np.identity(d2, np.float32)
S = np.zeros((d1, d2), np.float32)

res_path = os.path.join(general_path, "results/") # directory containing all the results
create_dir(res_path)

# lamda values
lamda_list = [1.0 / np.sqrt(max(d1,d2)), 0.01, 0.1]

lamda_path = os.path.join(general_path, "results/myceleba_pcpfm/") # directory to store the results
create_dir(lamda_path)

# Execute CRPCA for each lamda value
for lamda in lamda_list:
  L, E, niter = pcpsfm(X, W, S, U, V, alpha=0.0, lamda=lamda, beta=1.2, mu=10**(-5), tol=10**(-7))
  print("Converges at ", niter, "iteratons")

  # Store CRPCA return values
  pkl_name =  "myceleba_lam_" + str(round(lamda, 4)) + "_results.pkl"
  results_path = os.path.join(lamda_path, pkl_name)
  with open(results_path, 'wb') as f:  
    pickle.dump([L, E, niter], f)

  # Create directory to store the results
  res_name = "lam_" + str(round(lamda, 4)) + "/"
  res_dir = os.path.join(lamda_path, res_name)
  create_dir(res_dir)
  
  # Create directory to store only the inpainting results
  L_name = "lam_" + str(round(lamda, 4)) + "_L/"
  L_dir = os.path.join(lamda_path, L_name)
  create_dir(L_dir)

  # List the occluded images
  occ_dir = os.path.join(general_path, "myceleba_occlusions/")
  occ_list = os.listdir(occ_dir)

  # Locate occluded images 
  for case in occ_list:
    case_paths = [path for path in test_im_paths if case in path]
    
    file_path = Path(case_paths[0])
    parent_path = file_path.parent.absolute()
    id_dir = os.path.basename(parent_path)
    id_path = os.path.join(res_dir, id_dir)
    create_dir(id_path)
    idL_path = os.path.join(L_dir, id_dir)
    create_dir(idL_path)

    # Get the index of occluded image in list of paths
    ind = test_im_paths.index(case_paths[0]) 
    col = ind

    # Create arrays from column vectors
    X_arr = vector_to_image(X[:, col], 102, 100, 3, True)
    L_arr = vector_to_image(L[:, col], 102, 100, 3, True)
    E_arr = vector_to_image(E[:, col], 102, 100, 3, True)

    # Covert arrays to images
    imX = Image.fromarray((X_arr * 255).astype(np.uint8))
    imL = Image.fromarray((L_arr * 255).astype(np.uint8))
    imE = Image.fromarray((E_arr * 255).astype(np.uint8))

    # Store initial, low-rank and sparse images
    file_name, file_extension = os.path.splitext(case)

    X_name = file_name + '_X' + file_extension
    L_name = file_name + '_L' + file_extension
    E_name = file_name + '_E' + file_extension

    X_path = os.path.join(id_path, X_name)
    L_path = os.path.join(id_path, L_name)
    LL_path = os.path.join(idL_path, L_name)
    E_path = os.path.join(id_path, E_name)

    imX.save(X_path)
    imL.save(L_path)
    imL.save(LL_path)
    imE.save(E_path)