In [None]:
import numpy as np
from scipy.linalg import svd, qr
from scipy.sparse.linalg import svds
from numpy.linalg import norm

import os
import cv2

import matplotlib.pyplot as plt

from skimage.metrics import peak_signal_noise_ratio as psnr
from skimage.metrics import structural_similarity as ssim
from sklearn.metrics import precision_score, recall_score, f1_score

In [None]:
def soft_threshold(x, threshold):
    return np.sign(x) * np.maximum(np.abs(x) - threshold, 0)
def hard_threshold(x, threshold):
    return x * (np.abs(x) >= threshold)

In [None]:
def trim(U, Sig, V, mu_U, mu_V):
    '''
    Trims the input matrices U and V to ensure that their row norms are below
    certain thresholds, and then applies QR and SVD factorizations to produce
    trimmed versions of U and V.

    Parameters
    ----------
    U : numpy.ndarray
        The left singular vectors.
    Sig : numpy.ndarray
        The singular values.
    V : numpy.ndarray
        The right singular vectors.
    mu_U : float
        The threshold for the row norm of U.
    mu_V : float
        The threshold for the row norm of V.

    Returns
    -------
    U_out : numpy.ndarray
        The trimmed left singular vectors.
    V_out : numpy.ndarray
        The trimmed right singular vectors.
    '''
    m, r = U.shape
    row_norm_square_U = np.sum(U**2, axis=1)
    big_rows_U = row_norm_square_U > (mu_U * r / m)
    U[big_rows_U, :] = (U[big_rows_U, :].T * ((mu_U * r / m) / np.sqrt(row_norm_square_U[big_rows_U]))).T

    n, r = V.shape
    row_norm_square_V = np.sum(V**2, axis=1)
    big_rows_V = row_norm_square_V > (mu_V * r / n)
    V[big_rows_V, :] = (V[big_rows_V, :].T * ((mu_V * r / n) / np.sqrt(row_norm_square_V[big_rows_V]))).T

    Q1, R1 = qr(U, mode='economic')
    Q2, R2 = qr(V, mode='economic')
    U_temp, Sig_out, V_temp = svd(R1 @ Sig @ R2.T, full_matrices=False)
    
    U_out = Q1 @ U_temp
    V_out = Q2 @ V_temp

    return U_out, V_out
    

In [None]:
def AccAltProj(D, r, params):
    '''
    Performs accelerated alternating projection to compute a low-rank approximation
    of the input matrix D, with the specified target rank r.
    Parameters
    ----------
    D : numpy.ndarray
        The input matrix, with shape (m, n).
    r : int
        The target rank of the low-rank approximation.
    params : dict
        A dictionary of algorithm parameters, including:
        - niter : int, optional (default = 100)
            The maximum number of iterations.
        - eps : float, optional (default = 1e-5)
            The convergence tolerance for the Frobenius norm of the residual.
        - β : float, optional (default = 1/(2*np.cbrt(np.prod(D.shape)/4)))
            The step size for updating the threshold ζ.
        - β_init : float, optional (default = 4*β)
            The initial value of the threshold scaling parameter.
        - γ : float, optional (default = 0.7)
            The decay rate for the additional threshold term.
        - µ : list of floats, optional (default = [5])
            The trimming thresholds for the left and right singular vectors,
            if trimming is enabled.
        - trimming : bool, optional (default = False)
            Whether to perform trimming of the left and right singular vectors
            after each iteration.
    Returns
    -------
    L : numpy.ndarray
        The low-rank approximation of the input matrix D, with shape (m, n).
    S : numpy.ndarray
        The sparse matrix representing the residuals of the approximation.
    '''
    niter = params.get('niter', 100)
    eps = params.get('eps', 1e-5)
    β = params.get('β', 1/(2*np.cbrt(np.prod(D.shape)/4)))
    β_init = params.get('β_init', 4*β)
    γ = params.get('γ', 0.7)
    µ = params.get('µ', [5])
    trimming = params.get('trimming', False)

    ζ = β_init * svds(D, 1)[1][0]

    S = hard_threshold(D, ζ)
    U, Sigma, Vt = svds(D - S, r)
    V = Vt.T
    L = U @ np.diag(Sigma) @ Vt

    ζ = β * Sigma[0]
    S = hard_threshold(D - L, ζ)

    out = np.empty(niter)
    out[0] = norm(D - L - S,'fro')/norm(D,'fro')

    for i in range(niter):
        if trimming:
            U, V = trim(U, np.diag(Sigma[:r]), V, µ[0], µ[-1])
        
        Z = D - S
        Q1, R1 = qr((Z.T @ U - V @ (Z @ V).T @ U), mode='economic')
        Q2, R2 = qr((Z @ V - U @ U.T @ Z @ V), mode='economic')

        M = np.block([[U.T @ Z @ V, R1.T],
                      [R2, np.zeros_like(R2)]])
        
        U_of_M, Sigma, V_of_M = svd(M, full_matrices=False)
        
        U = np.hstack([U, Q2]) @ U_of_M[:, :r]
        V = np.hstack([V, Q1]) @ V_of_M[:, :r]
        L = U @ np.diag(Sigma[:r]) @ V.T

        ζ = β * (Sigma[r] + (γ**i) * Sigma[0])
        S = hard_threshold(D - L, ζ)

        out[i+1] = norm(D - L - S,'fro')/norm(D,'fro')

        print(f'{i} {out[i+1]}')
        
        if out[i+1] < eps:
            break
              
    return L, S

In [None]:
def compute_metrics(binary_mask, ground_truth):
    assert binary_mask.shape == ground_truth.shape

    binary_mask_flat = binary_mask.flatten()
    ground_truth_flat = ground_truth.flatten()

    ground_truth_flat[ground_truth_flat != 0] = 255

    prec_pos = precision_score(ground_truth_flat, binary_mask_flat, pos_label=255,zero_division=0)
    prec_neg = precision_score(ground_truth_flat, binary_mask_flat, pos_label=0,zero_division=0)
    rec_pos = recall_score(ground_truth_flat, binary_mask_flat, pos_label=255,zero_division=0)
    rec_neg = recall_score(ground_truth_flat, binary_mask_flat, pos_label=0,zero_division=0)

    precision = (prec_pos + prec_neg) / 2
    recall = (rec_pos + rec_neg) / 2

    f1 = 2*(precision*recall)/(precision+recall)

    return {'precision': precision, 'recall': recall, 'f1_score': f1}

In [None]:
def load_data_path(dataset, category, place, type):
    path = os.path.join(os.getcwd(),"Dataset",dataset,category,place,type)
    image_paths = [os.path.join(path, img_name) for img_name in sorted(os.listdir(path)) if img_name != ".DS_Store"]
    return image_paths

In [None]:
def load_data(dataset, category, place, type):
    image_paths = load_data_path(dataset, category, place, type)
    images = []
    for path in image_paths:
        im = cv2.imread(path)
        gray_im  = cv2.cvtColor(im, cv2.COLOR_BGR2GRAY)
        gray_im = gray_im/255.0
        images.append(gray_im)
    return images

In [None]:
def downsampling(image_paths,output_path,factor):
    for i, path in enumerate(image_paths):
       im = cv2.imread(path)
       gray_im  = cv2.cvtColor(im, cv2.COLOR_BGR2GRAY)
       downsampled_image = cv2.resize(gray_im, None, fx=factor, fy=factor, interpolation=cv2.INTER_AREA)
       output_filepath = os.path.join(output_path,f"{i+1:05}.png")
       cv2.imwrite(output_filepath, downsampled_image)

In [None]:
def stack_images(images):
    stacked_images = np.stack(images, axis=-1)
    n, m, k = stacked_images.shape
    images_matrix = stacked_images.reshape(n*m, k)
    return images_matrix, n, m, k

In [None]:
def save_output(L, S, n, m, k, path):
    L_frames = L.reshape((n*m, k))
    S_frames = S.reshape((n*m, k))

    if not os.path.exists(os.path.join(path,"S")):
        os.makedirs(os.path.join(path,"S"))

    if not os.path.exists(os.path.join(path,"L")):
        os.makedirs(os.path.join(path,"L"))
    
    if not os.path.exists(os.path.join(path,"BinaryMask")):
        os.makedirs(os.path.join(path,"BinaryMask"))

    for i in range(k):
        L_frame = L_frames[:, i].reshape(n, m)
        S_frame = S_frames[:, i].reshape(n, m)

        L_frame = (L_frame * 255).astype(np.uint8)
        S_frame = (S_frame * 255).astype(np.uint8)

        output_filepath = os.path.join(path,"S",f"S{i+1:05}.png")
        cv2.imwrite(output_filepath, S_frame)

        output_filepath = os.path.join(path,"L",f"L{i+1:05}.png")
        cv2.imwrite(output_filepath, L_frame)

        S_frame[S_frame>220] = 0
        S_frame[S_frame>=50] = 255
        S_frame[S_frame<50] = 0
        
        output_filepath = os.path.join(path,"BinaryMask",f"BM{i+1:05}.png")
        cv2.imwrite(output_filepath, S_frame)

In [None]:
def load_roi(path):
    roi_path = os.path.join(path,"temporalROI.txt")
    with open(roi_path, 'r') as f:
        numbers = f.readline().strip().split(' ')
        numbers = [int(num) for num in numbers]
    return numbers

In [None]:
def evaluate(downsampled_groundtruths, binarymasks, roi):

    sum_metrics = {
    'precision': 0,
    'recall': 0,
    'f1_score': 0,
    }

    if roi == None:
        roi = [1, len(downsampled_groundtruths)+1]

    for i in range(roi[0]-1, roi[1]-1):
        ground_truth = cv2.imread(downsampled_groundtruths[i])
        ground_truth = cv2.cvtColor(ground_truth, cv2.COLOR_BGR2GRAY)

        binary_mask = cv2.imread(binarymasks[i])
        binary_mask = cv2.cvtColor(binary_mask, cv2.COLOR_BGR2GRAY)

        metrics = compute_metrics(binary_mask, ground_truth)
        
        for key in sum_metrics:
            sum_metrics[key] += metrics[key]

    average_metrics = {key: value / (roi[1]-roi[0]) for key, value in sum_metrics.items()}

    return average_metrics

In [None]:
def save_video(frames,path,frame_size):
    codec = cv2.VideoWriter_fourcc(*'MJPG') 
    fps = 30
    video_writer = cv2.VideoWriter(path, codec, fps, frame_size, isColor=False)
    for frame in frames:
        video_writer.write((frame * 255).astype(np.uint8))
    video_writer.release()
    

# BMC

In [None]:
dataset = "BMC"
category = "synth2"
video_name = "522"
place = '522'

In [None]:
dataset = "BMC"
category = "real"
video_name = "Video_001"
place = 'Video_001'

In [None]:
def video_to_frames(video_file, output_dir):
    if not os.path.exists(output_dir):
        os.makedirs(output_dir)
    cap = cv2.VideoCapture(video_file)

    if not cap.isOpened():
        print("Error: Could not open the video file.")
        return

    i = 0

    while True:
        ret, frame = cap.read()
        if not ret:
            break
        output_path = os.path.join(output_dir, f"{i+1:05}.png")
        cv2.imwrite(output_path, frame)
        i += 1
    
    cap.release()

video_file = os.path.join(os.getcwd(),"Dataset",dataset,category,video_name+".mp4")
output_path = os.path.join(os.getcwd(),"Dataset",dataset,category,place,"input")

video_to_frames(video_file, output_path)

In [None]:
video_file = os.path.join(os.getcwd(),"Dataset",dataset,category,video_name+"_gt.mp4")
output_path = os.path.join(os.getcwd(),"Dataset",dataset,category,video_name,"groundtruth")

video_to_frames(video_file, output_path)

# CDnet2012

In [None]:
dataset = "CDnet2012"

In [None]:
category = "baseline"
place = "PETS2006"

In [None]:
category = "dynamicBackground"
place = "boats"

In [None]:
category = "intermittentObjectMotion"
place = "sofa"

In [None]:
category = "shadow"
place = "copyMachine"

In [None]:
category = "thermal"
place = "corridor"

# Background Subtraction

In [None]:
factor = 0.5

output_path = os.path.join(os.getcwd(),"Dataset",dataset,category,place,"downsampled_input")
if not os.path.exists(output_path):
    os.makedirs(output_path)
input_paths = load_data_path(dataset,category,place,"input")
downsampling(input_paths,output_path,factor)

output_path = os.path.join(os.getcwd(),"Dataset",dataset,category,place,"downsampled_groundtruth")
if not os.path.exists(output_path):
    os.makedirs(output_path)
ground_truth_paths = load_data_path(dataset,category,place,"groundtruth")
downsampling(ground_truth_paths,output_path,factor)

In [None]:
downsampled_images = load_data(dataset,category,place,"downsampled_input")

In [None]:
frame_matrix, n, m, k = stack_images(downsampled_images)

In [None]:
r = 1
µ = [1,100]
result = "result1_100_1-0.3"
params = {
    'β_init': r*np.sqrt(µ[0]*µ[1])/(1*np.sqrt(n*m*k)),
    'β': r*np.sqrt(µ[0]*µ[1])/(4*np.sqrt(n*m*k)),
    'µ': µ,
    'trimming': True,
    'eps': 1e-4,
    'γ': 0.3,
    'niter': 3000,
}

In [None]:
L, S = AccAltProj(frame_matrix, r, params=params)

In [None]:
output_path = os.path.join(os.getcwd(),"Dataset",dataset,category,place,result)
if not os.path.exists(output_path):
    os.makedirs(output_path)
save_output(L, S, n, m, k, output_path)

In [None]:
path = os.path.join(os.getcwd(),"Dataset",dataset,category,place)
if dataset == "BMC":
    roi = None
else:
    roi = load_roi(path)
downsampled_groundtruths = load_data_path(dataset,category,place,"downsampled_groundtruth")
binarymasks = load_data_path(dataset,category,place,result+"/BinaryMask")
average_metrics = evaluate(downsampled_groundtruths, binarymasks, roi)


In [None]:
average_metrics

In [None]:
output_path = os.path.join(os.getcwd(),"Dataset",dataset,category,place,result,"Background.avi")
L_frames = load_data(dataset,category,place,result+"/L")
save_video(L_frames,output_path,(m,n))

output_path = os.path.join(os.getcwd(),"Dataset",dataset,category,place,result,"Foreground.avi")
Mask_frames = load_data(dataset,category,place,result+"/BinaryMask")
save_video(Mask_frames,output_path,(m,n))


In [None]:
L_frames = load_data(dataset,category,place,result+"/L")
downsampled_images = load_data(dataset,category,place,"downsampled_input")[0]
downsampled_images = (downsampled_images * 255.0)
sim = 0
psnr_score = 0
for i in range(len(L_frames)):
    background = (L_frames[i] * 255.0)
    sim += ssim(downsampled_images, background, data_range=background.max() - background.min())
    psnr_score += psnr(downsampled_images, background, data_range=background.max() - background.min())
sim = sim/len(L_frames)
psnr_score = psnr_score/len(L_frames)
print(f"ssim: {sim}\npsnr: {psnr_score}")