# (一) Autocorrelation

## 1.

In [22]:
import numpy as np
import os
import torch
import torch.nn.functional as F
import time
from tqdm.autonotebook import tqdm

In [23]:
def autocorrelation(input_seq, m_x, m_y, m_t, L_x=352, L_y=288, L_t=100, device=torch.device('cpu')):
    '''
        input_seq : numpy.ndarray(3d) or torch.Tensor(5d)
        m_x, m_y, m_z : Scalar
        L_x, L_y, L_z : Scalar
    ''' 
    
    if abs(m_x) >= L_x or abs(m_y) >= L_y or abs(m_t) >= L_t:
        return 0
    
    if type(input_seq) == np.ndarray and len(input_seq.shape) == 3:
        input_seq = torch.Tensor([[input_seq]]).type(torch.float32).to(device)
    elif type(input_seq) == torch.Tensor and len(input_seq.shape) == 5:
        pass
    else:
        raise ValueError
    
    
    inputs = input_seq[:, :, :L_t-abs(m_t),:L_y-abs(m_y),:L_x-abs(m_x)]
    filters = input_seq[:, :, abs(m_t):,abs(m_y):,abs(m_x):]
    #print(inputs.shape)
    
    #print(filters.shape)
    #start_t = time.time()
    
    #C_vvv = F.conv3d(inputs, filters).view(-1)[0].cpu().numpy()    
    C_vvv = (inputs*filters).sum().cpu().numpy()
    
    R_xxx = C_vvv / ((L_x-abs(m_x))*(L_y-abs(m_y))*(L_t-abs(m_t)))
    
    #end_t = time.time()
    #print('Autocorrelation value = ',R_xxx,' ; time = ',end_t-start_t)
    return R_xxx

## 2.

In [24]:
def read_yuv_video(yuv_filename):
    with open(yuv_filename ,'rb') as f:
        width, height, n_frames = 352, 288, 100

        Y = []
        U = []
        V = []

        for i in range(n_frames):
            yuv_frame = np.frombuffer(f.read(width*height*3//2), dtype=np.uint8)
            Y.append(np.array(yuv_frame[:width*height]).reshape(height, width))
            U.append(np.array(yuv_frame[width*height:-width*height//4]))
            V.append(np.array(yuv_frame[-width*height//4:]))

        Y = np.array(Y)
        U = np.array(U)
        V = np.array(V)
        f.close()
    return Y,U,V

In [37]:
def problem_1_2(yuv_filename, output_video_name):
    # Read video
    Y, U, V = read_yuv_video(yuv_filename)
    
    # Normalize each frame with its own mean before calculating autocorrelation
    _Y = []
    for frame in Y:
        _Y.append(frame-frame.mean())

    Y = np.array(_Y)
    
    # Calculate autocorrelation ; result is stored in R_xxx
    device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')     

    R_xxx = np.zeros([21,144,176])

    input_seq = torch.Tensor([[Y]]).type(torch.float32).to(device)


    print('Start calculating autocorrelation...')
    for m_t in tqdm(range(10+1), desc='m_t'):
        m_t_fill = [10+m_t, 10-m_t]

        #for m_y in tqdm(range(72+1), desc='m_y', leave=False):
        for m_y in range(72+1):
            m_y_fill = [m_y]
            if m_y < 72 and m_y !=0:
                m_y_fill.append(144-m_y)
    
            #for m_x in tqdm(range(88+1), desc='m_x',leave=False):
            for m_x in range(88+1):
                m_x_fill = [m_x]
                if m_x < 88 and m_x !=0:
                    m_x_fill.append(176-m_x)
            
                #####################################
                result = autocorrelation(input_seq, m_x-88, m_y-72, m_t-10, device=device)
                #####################################

            for x in m_x_fill:
                for y in m_y_fill:
                    for t in m_t_fill:
                        R_xxx[t][y][x] = result
    
    
    print('Finish calculating autocorrelation.')
    # Normalize with R_xxx[0,0,0] ; since we have shifted R_xxx, R_xxx[10,72,88] is actually R_xxx[0,0,0]
    _Y = R_xxx

    norm = R_xxx[10,72,88]
    _Y = (_Y/norm)*127.5+127.5
    _Y = np.cast['uint8'](_Y)
    
    # Store R_xxx as an video
    with open(output_video_name ,'wb') as f:

        width, height, n_frames = 352, 288, 100

        _U = (np.ones((width*height//4))*128).reshape(-1).astype(np.uint8)
        _V = (np.ones((width*height//4))*128).reshape(-1).astype(np.uint8)

        #_Y = R_xxx
        for frame in _Y:
            f.write(frame.reshape(-1).astype(np.uint8).tobytes())
            f.write(_U.tobytes())
            f.write(_V.tobytes())
        f.close()
    print('Finish writing ',output_video_name)

In [38]:
yuv_filename = './AKIYO_352x288_10.yuv'
output_video_name = './AKIYO_AC.yuv'

problem_1_2(yuv_filename, output_video_name)

Start calculating autocorrelation...


HBox(children=(IntProgress(value=0, description='m_t', max=11, style=ProgressStyle(description_width='initial'…


Finish calculating autocorrelation.
Finish writing  ./AKIYO_AC.yuv


## 3.

In [15]:
def autocorrelation_motion(predicted_seq, target_seq, m_x, m_y, m_t, L_x=352, L_y=288, L_t=100):
    '''
        ! Incompatible to autocorrelation(), since this is not "true" autocorrelation calculation !
        
        predicted_seq, target_seq : numpy.ndarray(3d) or torch.Tensor(5d)
            Detail :
                1. predicted_seq is a sequence generated by motion compensation, so it's no need to silce it.
                2. Here we need target_seq also be silced in last dimension (time dimension) already
                
        m_x, m_y, m_z : Scalar
        L_x, L_y, L_z : Scalar
    ''' 
    
    if abs(m_x) >= L_x or abs(m_y) >= L_y or abs(m_t) >= L_t:
        return 0
    
    predicted_seq = predicted_seq[:, :,    :L_t-abs(m_t),   :L_y-abs(m_y),:]
    target_seq    = target_seq[   :, :, abs(m_t):       , abs(m_y):      ,:]
    
    
    C_vvv = (predicted_seq*target_seq).sum().cpu().numpy()
    
    R_xxx = C_vvv / ((L_x-abs(m_x))*(L_y-abs(m_y))*(L_t-abs(m_t)))
    
    #end_t = time.time()
    #print('Autocorrelation value = ',R_xxx,' ; time = ',end_t-start_t)
    return R_xxx

In [17]:
def motion_estimate(source_block, target_frame, residual_frame = None, unit_size=(4,4)):
    '''
    source_block, target_frame, residual_frame : torch.Tensor with dimention:
        source_block : unit_size
        target_frame : at least unit_size
        residual_frame : same as target_frame, is (source frame - target frame)**2 
    '''
    
    assert len(target_frame.shape)==2
    
    max_mse = np.infty
    result_idx = (0, 0)
    
    height, width = target_frame.shape
    #print(target_frame.shape)
    for h in range(height-unit_size[0]):
        for w in range(width-unit_size[1]):
            if residual_frame == None:
                current_block = target_frame[h:h+unit_size[0], w:w+unit_size[1]]
                mse = torch.sum((current_block-source_block)**2)
            else:
                mse = torch.sum(residual_frame[h:h+unit_size[0], w:w+unit_size[1]])
                
            if mse < max_mse:
                max_mse = mse
                result_idx = (h, w)
    return result_idx

In [None]:
def problem_1_3(yuv_filename, output_video_name):
    Y, U, V = read_yuv_video(yuv_filename)

    device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')     
    input_seq = torch.Tensor(Y).type(torch.float64).to(device)

    L_x, L_y, L_t = 352, 288, 100
    unit_size=(4,4) # block size ; (height, width)

    # First, do motion compensation
    predicted_frames_by_motion = {}

    for i in range(11):
        predicted_frames_by_motion[i] = {}

        print(i,":")
        for source_frame_idx in range(L_t -i):
            source_frame = input_seq[source_frame_idx]
            target_frame = input_seq[source_frame_idx + i]

            predicted_frame = torch.zeros_like(source_frame).to(device)
            motion_vector_list = []

            #print(source_frame.size())

            residual_frame = (source_frame-target_frame)**2

            for h_base in trange(L_y//unit_size[0]):
                for w_base in range(L_x//unit_size[1]):
                    # Get current block from source_frame
                    source_block = source_frame[h_base*unit_size[0] : (h_base+1)*unit_size[0],
                                                w_base*unit_size[1] : (w_base+1)*unit_size[1]]
                    # Determine margins
                    l_margin, r_margin = max(w_base*unit_size[1]-16, 0) , min(w_base*unit_size[1]+15+1, L_x)
                    u_margin, d_margin = max(h_base*unit_size[0]-16, 0) , min(h_base*unit_size[0]+15+1, L_y)

                    target_subframe = target_frame[u_margin:d_margin,l_margin:r_margin]
                    # Calculate motion compensation result
                    motion_vector = motion_estimate(source_block = source_block,
                                                    target_frame = target_subframe,
                                                    residual_frame=residual_frame,
                                                    unit_size = unit_size)
                    predicted_block = target_subframe[motion_vector[0]:motion_vector[0]+unit_size[0], motion_vector[1]:motion_vector[1]+unit_size[1]]
                    # Paste prediction onto its own location
                    predicted_frame[h_base*unit_size[0] : (h_base+1)*unit_size[0],
                                    w_base*unit_size[1] : (w_base+1)*unit_size[1]] = predicted_block

                    # Record motion vector
                    motion_vector_list.append(motion_vector)

            predicted_frames_by_motion[source_frame_idx] = (predicted_frame, motion_vector_list)
            
    # Second, calculate autocorrelation with motion compensated frames
    

In [None]:
yuv_filename = './AKIYO_352x288_10.yuv'
output_video_name = './AKIYO_AC_motion.yuv'
problem_1_3(yuv_filename, output_video_name)