# TVL1 optical flow

Paper: https://www-pequan.lip6.fr/~bereziat/cours/master/vision/papers/zach07.pdf

In [1]:
from sklearn.model_selection import train_test_split
from break_loop import BreakLoopException
import numpy as np
import constants as const
import pandas as pd
from pathlib import Path
from sklearn.model_selection import ParameterSampler
import progressbar as pb
from timeit import default_timer
import cv2
import csv
import logging
import utils
import utils_flow as ut_fl

In [2]:
IMG_HEIGHT = IMG_WIDTH = 64
MAX_WIDTH = const.MAX_WIDTH
MAX_HEIGHT = const.MAX_HEIGHT
VIDEO_FORMAT_CODECS = const.VIDEO_FORMAT_CODECS

In [3]:
log_file = Path(const.logs_path, 'DualTVL1.log')
logging.basicConfig(
    format='%(asctime)s %(message)s',
    filemode='a',
    filename=log_file,
    encoding='utf-8',
    level=logging.INFO,
    force=True
)

In [4]:
logging.info('Generating Directories Structure')

In [5]:
df = pd.read_csv(Path(const.csv_path, 'dataset.csv'))

In [6]:
# Sample the database

sample_fraction = 1

if (sample_fraction < 1):
    df_sampled, df_remaining = train_test_split(df,
                                                train_size=sample_fraction,
                                                random_state=42,
                                                stratify=df.emotion)
else:
    df_sampled = df

print(f'''Length of sample array is {len(df_sampled)} element,
{sample_fraction:.1%} of {len(df)}''')

Length of sample array is 1440 element,
100.0% of 1440


## Find optimal parameters

In [7]:
def pol2cart(rho, phi):
    # Convert polar coordinates to cartesian coordinates for computation of optical strain
    x = rho * np.cos(phi)
    y = rho * np.sin(phi)
    return (x, y)


def computeStrain(u, v):
    # Compute os , setting t=1 to maximize the sensitivity of ME
    u_x = u - pd.DataFrame(u).shift(1, axis=1)
    v_y = v - pd.DataFrame(v).shift(1, axis=0)
    u_y = u - pd.DataFrame(u).shift(1, axis=0)
    v_x = v - pd.DataFrame(v).shift(1, axis=1)
    os = np.array(
        np.sqrt(
            (u_x**2).fillna(0) +
            (v_y**2).fillna(0) +
            1/2 * (u_y.fillna(0) + v_x.fillna(0)) ** 2
        )
    )
    return os

In [8]:
TVL1_parameters = {
    'warps': 2,
    'useInitialFlow': False,
    'theta': 0.3,
    'tau': 0.25,
    'scaleStep': 0.8,
    'outerIterations': 3,
    'nscales': 3,
    'medianFiltering': 5,
    'lambda_': 0.15,
    'innerIterations': 3,
    'gamma': 0.0,
    'epsilon': 0.01
}

In [54]:
# Function for finding optimal param
def save_DualTVL1_frames(row, save_path, parameters):
    # TODO: fix video generation
    (warps, useInitialFlow, theta, tau, scaleStep,
     outerIterations, nscales, medianFiltering,
     lambda_, innerIterations, gamma, epsilon) = parameters.values()

    optical_flow = cv2.optflow.DualTVL1OpticalFlow_create(
            tau, lambda_, theta, nscales, warps,
            epsilon, innerIterations,
            outerIterations, scaleStep,
            gamma, medianFiltering, useInitialFlow
        )

    timer_start = default_timer()

    low_idx = np.rint(30 * (2 / 3))  # where 30=FPS


    # Extract video name
    path = row.path
    filename = row.filename


    full_path = Path(path, filename)
    video_name = utils.remove_extension(filename)

    # save_path = const.test_flows_path if (test) else const.flows_path

    folder_path = Path(save_path)
    # creates the target folder if needed (parents = True)
    folder_path.mkdir(parents=True, exist_ok=True)


    # Find CSV file based on video name

    csv_regex = f'{video_name}.csv'
    path_gen = Path(const.updated_landmarks_path).glob(csv_regex)

    ret_list = list(path_gen)

    try:
        video_csv = ret_list.pop(0)
    except IndexError:
        print(f'ERROR: video {full_path}: no csv file found ({csv_regex})')
        return

    data = pd.read_csv(video_csv)

    fourcc_mp4 = cv2.VideoWriter_fourcc(*VIDEO_FORMAT_CODECS['avi'])
    out_path = str(Path(save_path, 'video.mp4'))
    out_mp4 = cv2.VideoWriter(
        out_path, fourcc_mp4, 30, (MAX_WIDTH, MAX_HEIGHT), False
    )

    vidcap = cv2.VideoCapture(str(full_path))  # open video

    frames = utils.get_video_frames(vidcap)

    p2.set_elements(frames)

    fps = utils.get_video_fps(vidcap)

    high_idx = frames - low_idx

    is_good_flow = ''
    skip = False
    show = True
    ret, frame1 = vidcap.read()

    ra = utils.calc_bounding_box(data, 1)

    x, y, width, height = ra['box']

    face = frame1[y:y + height, x:x + width, :]

    face = cv2.resize(face, dsize=(MAX_WIDTH, MAX_HEIGHT))

    prvs = cv2.cvtColor(face, cv2.COLOR_BGR2GRAY)

    vidcap.set(cv2.CAP_PROP_POS_FRAMES, 1)

    for frame_number in range(1, frames):

        success, image = vidcap.read()

        speaking_frame = frame_number >= low_idx and frame_number <= high_idx

        flow_string = f'{video_name}-{frame_number:03d}'
        speaking_string = '01' if speaking_frame else '00'

        save_string = f'{flow_string}-{speaking_string}.jpg'
        ra = utils.calc_bounding_box(data, frame_number)

        x, y, width, height = ra['box']

        face = image[y:y + height, x:x + width, :]

        face = cv2.resize(face, dsize=(MAX_WIDTH, MAX_HEIGHT))

        next = cv2.cvtColor(face, cv2.COLOR_BGR2GRAY)


        flow = optical_flow.calc(prvs, next, None)
        mag, ang = cv2.cartToPolar(flow[..., 0], flow[..., 1])
        u, v = pol2cart(mag, ang)
        os = computeStrain(u, v)

        final = np.zeros((next.shape[0], next.shape[1], 3)).astype(np.float32)
        final[..., 0] = u
        final[..., 1] = v
        final[..., 2] = os

        for channel in range(3):
            final[..., channel] = cv2.normalize(
                final[..., channel], None, alpha=0, beta=255,
                norm_type=cv2.NORM_MINMAX
            )

        gr1 = cv2.cvtColor(final, cv2.COLOR_BGR2GRAY) 

        if show is True:
            cv2.imshow('Flow', final / 255)
            cv2.imshow('Gray', gr1 / 255)
            # cv2.imshow('u', u)
            # cv2.imshow('v', v)
            # cv2.imshow('os', os)
            key = cv2.waitKey(int(1000/fps))
            match key:
                case 27:  # ESC
                    is_good_flow = str(0)
                    skip = True
                    break
                case 98:  # letter 'b'
                    is_good_flow = str(0)
                case 104:  # letter 'h' - hide screen
                    show = False
                    cv2.destroyAllWindows()
                case 111:  # letter 'o'
                    is_good_flow = str(1)
                case 113:  # q
                    vidcap.release()
                    # out_mp4.release()
                    # out_mp4 = None
                    cv2.destroyAllWindows()
                    raise BreakLoopException(
                        f'BLE at video {video_name}: frame {frame_number}'
                    )

        out_mp4.write(final / 255)
        cv2.imwrite(str(Path(folder_path, save_string)), final)


        p2.update_progress(frame_number)

    vidcap.release()
    out_mp4.release()
    out_mp4 = None


    if is_good_flow == '':
        key = cv2.waitKey(7000)  # Wait 7 s for an input
        match key:
            case 98:  # letter 'b'
                is_good_flow = str(0)
            case 111:  # letter 'o'
                is_good_flow = str(1)
            case _:
                is_good_flow = '-'

    cv2.destroyAllWindows()

    timer_end = default_timer() - timer_start

    csv_path = Path(save_path.parent.parent, 'runs.csv')
    with open(csv_path, 'a+', newline='') as res_file:
        filewriter = csv.writer(res_file, delimiter=',',
                                quoting=csv.QUOTE_MINIMAL)

        values_list = [value for value in params.values()]
        values_list.insert(0, save_path.parent.name)
        values_list.insert(1, video_name)
        values_list.append(timer_end)
        values_list.append('skip' if skip else '')
        values_list.append(is_good_flow)
        filewriter.writerow(
            values_list
        )

    p2.update_progress(frames)

In [55]:
# # default
param_grid = {
    'warps': [5],
    'useInitialFlow': [False],
    'theta': [0.3],
    'tau': [0.25],
    'scaleStep': [0.8],
    'outerIterations': [10],
    'nscales': [5],
    'medianFiltering': [5],
    'lambda_': [0.15],
    'innnerIterations': [30],
    'gamma': [0.0],
    'epsilon': [0.01]
}

param_grid = {
    'warps': [2],
    'useInitialFlow': [False],
    'theta': [0.3],
    'tau': [0.25],
    'scaleStep': [0.8],
    'outerIterations': [3],
    'nscales': [3],
    'medianFiltering': [5],
    'lambda_': [0.15],
    'innerIterations': [3],
    'gamma': [0.0],
    'epsilon': [0.01]
}

par_space = np.prod([len(x) for x in param_grid.values()])
# Create runs file in folder if it does not exists
runs_file = Path('FlowGridSearch/runs.csv')

if not runs_file.is_file():
    with open(runs_file, 'w', newline='') as res_file:
        filewriter = csv.writer(res_file, delimiter=',',
                                quoting=csv.QUOTE_MINIMAL)
        keys_list = [value for value in param_grid.keys()]
        keys_list.insert(0, 'Folder')
        keys_list.insert(1, 'Name')
        keys_list.append('Time')
        keys_list.append('Note')
        keys_list.append('Good')
        filewriter.writerow(keys_list)

iterations = 30
iterations = min(iterations, par_space)


sampler = ParameterSampler(param_grid, iterations, random_state=42)

elem_to_take = 1

# Create a random copy of the database and take the first x elements
df_random = df.copy().sample(frac=1, random_state=42).reset_index(drop=True)

run = 99

p0 = pb.Progressbar(id='p0', elements=elem_to_take)
p1 = pb.Progressbar(id='p1', elements=iterations)
p2 = pb.Progressbar(id='p2', elements=100)

try:
    for params in sampler:
        run += 1
        print(params)
        p1.add_to_progress(1)
        for index, row in df_random.iloc[:elem_to_take].iterrows():
            p0.add_to_progress(1)
            filename = Path(row.filename).stem
            out_path = Path('FlowGridSearch', f'Run_{run}', filename)
            # flow_calc(False, row, out_path, params
            save_DualTVL1_frames(row, out_path, params)
            # save_OF_frames(row, out_path, params)
            # save_OF_frames_pyflow(row, out_path, params)
except BreakLoopException as BLE:
    details = BLE.args[0]
    print(details)

'Progress: [####################] 100%             1/1'

'Progress: [####################] 100%             1/1'

'Progress: [######--------------] 30%             38/123'

{'warps': 2, 'useInitialFlow': False, 'theta': 0.3, 'tau': 0.25, 'scaleStep': 0.8, 'outerIterations': 3, 'nscales': 3, 'medianFiltering': 5, 'lambda_': 0.15, 'innerIterations': 3, 'gamma': 0.0, 'epsilon': 0.01}
BLE at video 01-01-07-02-01-01-03: frame 39


## Save Optical flow

In [13]:
len(df_sampled[1300:])

140

In [27]:
def save_optical_flow(save=True, save_folder=const.flows_path,
                      parameters=TVL1_parameters, view=False):

    total_elements = len(df_sampled)
    # cycle_idx = 1

    p1 = pb.Progressbar('p1', elements=total_elements)
    p2 = pb.Progressbar('p2', 100)

    low_idx = np.rint(30 * (2 / 3))  # where 30=FPS
    logging.info(parameters)

    for index, row in df_sampled.iterrows():

        p1.add_to_progress(1)

        # Extract video name
        path = row.path
        filename = row.filename

        target_class = utils.get_class_string(filename)

        full_path = Path(path, filename)

        video_name = utils.remove_extension(filename)
        logging.info(f'{video_name}, [{index}]')


        folder_flow_path = Path(save_folder, 'TVL1', target_class)

        if (save):
            folder_flow_path.mkdir(parents=True, exist_ok=True)

        # existing_files = [
        #     filename for filename in folder_flow_path.glob(f'{video_name}*.jpg')
        # ]

        # if existing_files is not None:
        #     # print('files exist', video_name)
        #     continue
        # print(full_path)
        # Find CSV file for that video

        csv_regex = f'{video_name}.csv'
        path_gen = Path(const.updated_landmarks_path).glob(csv_regex)

        ret_list = list(path_gen)

        try:
            video_csv = ret_list.pop(0)
        except IndexError:
            print(f'ERROR: Skipping video {full_path}: no ({csv_regex}) file')
            continue

        data = pd.read_csv(video_csv)
        vidcap = cv2.VideoCapture(str(full_path))

        frames = utils.get_video_frames(vidcap) - 1
        p2.set_elements(frames)

        high_idx = frames - low_idx

        frame_number = 1

        speaking_frame = frame_number >= low_idx and frame_number <= high_idx

        flow_string = f'{video_name}-{frame_number:03d}'
        speaking_string = '01' if speaking_frame else '00'

        save_string = f'{flow_string}-{speaking_string}.jpg'

        # frame_string = f'frame_{1:03d}.jpg'

        ret, frame1 = vidcap.read()

        ra = utils.calc_bounding_box(data, 1)

        x, y, width, height = ra['box']

        face = frame1[y:y + height, x:x + width, :]

        face = cv2.resize(face, dsize=(MAX_WIDTH, MAX_HEIGHT))

        prvs = cv2.cvtColor(face, cv2.COLOR_BGR2GRAY)
        vidcap.set(cv2.CAP_PROP_POS_FRAMES, 1)
        for frame_number in range(1, frames):

            p2.add_to_progress(1)
            ret, frame2 = vidcap.read()

            if not ret:
                if (frame_number < (frames - 1)):
                    print(f'''No frames grabbed!
                    video: {full_path}
                    frame: {frame_number}''')
                break
            speaking_frame = frame_number >= low_idx and frame_number <= high_idx

            flow_string = f'{video_name}-{frame_number:03d}'
            speaking_string = '01' if speaking_frame else '00'

            save_string = f'{flow_string}-{speaking_string}.jpg'
            if Path(folder_flow_path, save_string).is_file():
                continue

            ra = utils.calc_bounding_box(data, frame_number)

            x, y, width, height = ra['box']

            face = frame2[y:y + height, x:x + width, :]

            face = cv2.resize(face, dsize=(MAX_WIDTH, MAX_HEIGHT))

            next = cv2.cvtColor(face, cv2.COLOR_BGR2GRAY)

            gr1 = ut_fl.calc_flow_TVL1(prvs, next, parameters)

            prvs = next

            if (save):
                gr1_res = cv2.resize(gr1, dsize=(IMG_HEIGHT, IMG_WIDTH))
                cv2.imwrite(str(Path(folder_flow_path, save_string)), gr1_res)

            if (view):
                # cv2.imshow('Vector Flow', ut_fl.draw_flow(next, flow))
                cv2.imshow('Video', face)
                # cv2.imshow('BGR Flow', bgr)
                cv2.imshow('GR1 BGRTOGRAY', gr1 / 255)
                # cv2.imshow('GR2 SPLIT', gr2)

                k = cv2.waitKey(30) & 0xff
                if k == 27:
                    break

        vidcap.release()
        cv2.destroyAllWindows()

        p2.update_progress(frames)


    p1.update_progress(total_elements)

In [28]:
# Flows generated on F556UV on 25/04/2024 in 7h 24m
save_optical_flow(save=True, view=False)

'Progress: [--------------------]'

'Progress: [####################] 100%             108/108'

In [11]:
cv2.destroyAllWindows()

In [29]:
tvl1_path = Path(const.flows_path, 'TVL1')
actors_labels = [f'{i:02d}' for i in range(1, 25)]
emotions_folder = tvl1_path.glob('*')

for folder_label in emotions_folder:
    for actor in actors_labels:
        new_path = Path(folder_label, actor)
        # print(type(folder_label), type(actor), type(new_path))
        new_path.mkdir()

In [33]:
label_names = const.EMOTIONS_LABELS_SORTED

In [34]:
# Create empty folder for every emotion folder
# Execution on ASUS: 2024-04-24
p1 = pb.Progressbar(id='p1', elements=156822)
logging.info('Generating Directories Structure')

for label in label_names:
    folder_path = Path(tvl1_path, label)

    gen = folder_path.glob('*.jpg')
    for img_path in gen:
        # logging.info(f'Moving: {str(img_path)}')
        actor = utils.get_actor(str(img_path))
        new_path = Path(folder_path, actor, img_path.name)
        Path(img_path).rename(new_path)
        p1.add_to_progress(1)

'Progress: [--------------------]'