From 11fcc161c2b525c8c09ff9140998486b3b3a8bbe Mon Sep 17 00:00:00 2001 From: sronilsson Date: Thu, 11 Apr 2024 20:28:25 +0000 Subject: [PATCH] cleaned --- simba/mixins/config_reader.py | 9 +- .../feature_extraction_supplement_mixin.py | 23 +- simba/mixins/geometry_mixin.py | 11 +- simba/mixins/plotting_mixin.py | 162 ++-- simba/mixins/statistics_mixin.py | 88 ++- simba/plotting/distance_plotter.py | 193 +++-- simba/plotting/distance_plotter_mp.py | 297 ++++--- simba/plotting/frame_mergerer_ffmpeg.py | 143 +++- simba/ui/pop_ups/distance_plot_pop_up.py | 34 +- simba/ui/pop_ups/video_processing_pop_up.py | 40 +- simba/utils/checks.py | 13 +- simba/utils/read_write.py | 21 +- .../batch_process_create_ffmpeg_commands.py | 1 + simba/video_processors/video_processing.py | 741 +++++++++++++----- 14 files changed, 1241 insertions(+), 535 deletions(-) diff --git a/simba/mixins/config_reader.py b/simba/mixins/config_reader.py index 002e63583..cb9ce33ab 100644 --- a/simba/mixins/config_reader.py +++ b/simba/mixins/config_reader.py @@ -331,7 +331,9 @@ def get_all_clf_names(self) -> List[str]: ) return model_names - def insert_column_headers_for_outlier_correction(self, data_df: pd.DataFrame, new_headers: List[str], filepath: str) -> pd.DataFrame: + def insert_column_headers_for_outlier_correction( + self, data_df: pd.DataFrame, new_headers: List[str], filepath: str + ) -> pd.DataFrame: """ Helper to insert new column headers onto a dataframe. @@ -350,7 +352,10 @@ def insert_column_headers_for_outlier_correction(self, data_df: pd.DataFrame, ne difference = int(len(data_df.columns) - len(new_headers)) bp_missing = int(abs(difference) / 3) if difference < 0: - raise DataHeaderError(msg=f"SIMBA ERROR: SimBA expects {len(new_headers)} columns of data inside the files within project_folder/csv/input_csv directory. However, within file {filepath} file, SimBA found {len(data_df.columns)} columns. Thus, there is {abs(difference)} missing data columns in the imported data, which may represent {int(bp_missing)} bodyparts if each body-part has an x, y and p value. Either revise the SimBA project pose-configuration with {bp_missing} less body-part, or include {bp_missing} more body-part in the imported data", source=self.__class__.__name__,) + raise DataHeaderError( + msg=f"SIMBA ERROR: SimBA expects {len(new_headers)} columns of data inside the files within project_folder/csv/input_csv directory. However, within file {filepath} file, SimBA found {len(data_df.columns)} columns. Thus, there is {abs(difference)} missing data columns in the imported data, which may represent {int(bp_missing)} bodyparts if each body-part has an x, y and p value. Either revise the SimBA project pose-configuration with {bp_missing} less body-part, or include {bp_missing} more body-part in the imported data", + source=self.__class__.__name__, + ) else: raise DataHeaderError( msg=f"SIMBA ERROR: SimBA expects {len(new_headers)} columns of data inside the files within project_folder/csv/input_csv directory. However, within file {filepath} file, SimBA found {len(data_df.columns)} columns. Thus, there is {abs(difference)} more data columns in the imported data than anticipated, which may represent {int(bp_missing)} bodyparts if each body-part has an x, y and p value. Either revise the SimBA project pose-configuration with {bp_missing} more body-part, or include {bp_missing} less body-part in the imported data", diff --git a/simba/mixins/feature_extraction_supplement_mixin.py b/simba/mixins/feature_extraction_supplement_mixin.py index 16f769658..a5a3c1343 100644 --- a/simba/mixins/feature_extraction_supplement_mixin.py +++ b/simba/mixins/feature_extraction_supplement_mixin.py @@ -23,7 +23,7 @@ check_if_filepath_list_is_empty, check_instance, check_str, check_that_column_exist, check_valid_array, - check_valid_lst, check_valid_dataframe) + check_valid_dataframe, check_valid_lst) from simba.utils.data import detect_bouts from simba.utils.errors import CountError, InvalidInputError from simba.utils.printing import SimbaTimer, stdout_success @@ -626,7 +626,9 @@ def find_path_loops(data: np.ndarray) -> Dict[Tuple[int], List[int]]: return {k: v for k, v in seen_dedup.items() if len(v) > 1} @staticmethod - def sequential_lag_analysis(data: pd.DataFrame, criterion: str, target: str, time_window: float, fps: float): + def sequential_lag_analysis( + data: pd.DataFrame, criterion: str, target: str, time_window: float, fps: float + ): """ Perform sequential lag analysis to determine the temporal relationship between two events. @@ -668,13 +670,19 @@ def sequential_lag_analysis(data: pd.DataFrame, criterion: str, target: str, tim value=time_window, min_value=10e-6, ) - check_valid_dataframe(df=data, source=f'{FeatureExtractionSupplemental.sequential_lag_analysis.__name__} data', - valid_dtypes=(np.float32, np.float64, np.int64, np.int32, float, int), - required_fields=[criterion, target]) + check_valid_dataframe( + df=data, + source=f"{FeatureExtractionSupplemental.sequential_lag_analysis.__name__} data", + valid_dtypes=(np.float32, np.float64, np.int64, np.int32, float, int), + required_fields=[criterion, target], + ) bouts = detect_bouts(data_df=data, target_lst=[criterion, target], fps=fps) if len(bouts) == 0: - raise CountError(msg=f"No events of behaviors {criterion} and {target} detected in data.", source=FeatureExtractionSupplemental.sequential_lag_analysis) + raise CountError( + msg=f"No events of behaviors {criterion} and {target} detected in data.", + source=FeatureExtractionSupplemental.sequential_lag_analysis, + ) criterion_starts = bouts["Start_frame"][bouts["Event"] == criterion].values target_starts = bouts["Start_frame"][bouts["Event"] == target].values preceding_cnt, proceeding_cnt = 0, 0 @@ -760,8 +768,7 @@ def distance_and_velocity( return movement, np.mean(v) - -#df = read_df(file_path='/Users/simon/Desktop/envs/simba/troubleshooting/two_black_animals_14bp/project_folder/csv/targets_inserted/Together_1.csv', file_type='csv') +# df = read_df(file_path='/Users/simon/Desktop/envs/simba/troubleshooting/two_black_animals_14bp/project_folder/csv/targets_inserted/Together_1.csv', file_type='csv') # # df = pd.DataFrame(np.random.randint(0, 2, (100, 2)), columns=['Attack', 'Sniffing']) # FeatureExtractionSupplemental.sequential_lag_analysis(data=df, criterion='Attack', target='Sniffing', fps=5, time_window=2.0) diff --git a/simba/mixins/geometry_mixin.py b/simba/mixins/geometry_mixin.py index d44dd452c..2d0b507a5 100644 --- a/simba/mixins/geometry_mixin.py +++ b/simba/mixins/geometry_mixin.py @@ -2905,7 +2905,12 @@ def adjust_geometries( :example: >>> shapes = GeometryMixin().adjust_geometries(geometries=shapes, shift=(0, 333)) """ - check_valid_lst(data=geometries, source=f"{GeometryMixin().adjust_geometries.__name__} geometries", valid_dtypes=(Polygon,), min_len=1) + check_valid_lst( + data=geometries, + source=f"{GeometryMixin().adjust_geometries.__name__} geometries", + valid_dtypes=(Polygon,), + min_len=1, + ) results = [] for shape_cnt, shape in enumerate(geometries): results.append( @@ -3410,7 +3415,9 @@ def cumsum_animal_geometries_grid( return np.cumsum(img_arr, axis=0) / fps @staticmethod - def hausdorff_distance(geometries: List[List[Union[Polygon, LineString]]]) -> np.ndarray: + def hausdorff_distance( + geometries: List[List[Union[Polygon, LineString]]] + ) -> np.ndarray: """ The Hausdorff distance measure of the similarity between time-series sequential geometries. It is defined as the maximum of the distances from each point in one set to the nearest point in the other set. diff --git a/simba/mixins/plotting_mixin.py b/simba/mixins/plotting_mixin.py index 01551104e..65b9c2e7b 100644 --- a/simba/mixins/plotting_mixin.py +++ b/simba/mixins/plotting_mixin.py @@ -5,9 +5,6 @@ import os import random import shutil -import plotly.graph_objs as go -import plotly.io as pio -from PIL import Image from typing import Any, Dict, List, Optional, Tuple, Union import cv2 @@ -17,10 +14,13 @@ import numpy as np import pandas as pd import PIL +import plotly.graph_objs as go +import plotly.io as pio import seaborn as sns from matplotlib import cm from matplotlib.backends.backend_agg import FigureCanvasAgg as FigureCanvas from numba import njit +from PIL import Image try: from typing import Literal @@ -1827,13 +1827,18 @@ def continuous_scatter( fig, ax = plt.subplots() if bg_clr is not None: if bg_clr not in get_named_colors(): - raise InvalidInputError(msg=f'bg_clr {bg_clr} is not a valid named color. Options: {get_named_colors()}', source=PlottingMixin.continuous_scatter.__name__) + raise InvalidInputError( + msg=f"bg_clr {bg_clr} is not a valid named color. Options: {get_named_colors()}", + source=PlottingMixin.continuous_scatter.__name__, + ) fig.set_facecolor(bg_clr) if not show_box: plt.axis("off") plt.xlabel(columns[0]) plt.ylabel(columns[1]) - plot = ax.scatter(data[columns[0]], data[columns[1]], c=data[columns[2]], s=size, cmap=palette) + plot = ax.scatter( + data[columns[0]], data[columns[1]], c=data[columns[2]], s=size, cmap=palette + ) cbar = fig.colorbar(plot) cbar.set_label(columns[2], loc="center") if title is not None: @@ -2053,46 +2058,69 @@ def line_plot( return plot @staticmethod - def make_line_plot(data: List[np.ndarray], - colors: List[str], - show_box: Optional[bool] = True, - width: Optional[int] = 640, - height: Optional[int] = 480, - line_width: Optional[int] = 6, - font_size: Optional[int] = 8, - bg_clr: Optional[str] = None, - x_lbl_divisor: Optional[float] = None, - title: Optional[str] = None, - y_lbl: Optional[str] = None, - x_lbl: Optional[str] = None, - y_max: Optional[int] = -1, - line_opacity: Optional[int] = 0.0, - save_path: Optional[Union[str, os.PathLike]] = None): - - check_valid_lst(data=data, source=PlottingMixin.make_line_plot.__name__, valid_dtypes=(np.ndarray, list,)) - check_valid_lst(data=colors, source=PlottingMixin.make_line_plot.__name__, valid_dtypes=(str,), exact_len=len(data)) + def make_line_plot( + data: List[np.ndarray], + colors: List[str], + show_box: Optional[bool] = True, + width: Optional[int] = 640, + height: Optional[int] = 480, + line_width: Optional[int] = 6, + font_size: Optional[int] = 8, + bg_clr: Optional[str] = None, + x_lbl_divisor: Optional[float] = None, + title: Optional[str] = None, + y_lbl: Optional[str] = None, + x_lbl: Optional[str] = None, + y_max: Optional[int] = -1, + line_opacity: Optional[int] = 0.0, + save_path: Optional[Union[str, os.PathLike]] = None, + ): + + check_valid_lst( + data=data, + source=PlottingMixin.make_line_plot.__name__, + valid_dtypes=( + np.ndarray, + list, + ), + ) + check_valid_lst( + data=colors, + source=PlottingMixin.make_line_plot.__name__, + valid_dtypes=(str,), + exact_len=len(data), + ) clr_dict = get_color_dict() matplotlib.font_manager._get_font.cache_clear() plt.close("all") fig, ax = plt.subplots() - if bg_clr is not None: fig.set_facecolor(bg_clr) - if not show_box: plt.axis("off") + if bg_clr is not None: + fig.set_facecolor(bg_clr) + if not show_box: + plt.axis("off") for i in range(len(data)): line_clr = clr_dict[colors[i]][::-1] line_clr = tuple(x / 255 for x in line_clr) flat_data = data[i].flatten() - plt.plot(flat_data, color=line_clr, linewidth=line_width, alpha=line_opacity) + plt.plot( + flat_data, color=line_clr, linewidth=line_width, alpha=line_opacity + ) max_x = max([len(x) for x in data]) - if y_max == -1: y_max = max([np.max(x) for x in data]) + if y_max == -1: + y_max = max([np.max(x) for x in data]) y_ticks_locs = y_lbls = np.round(np.linspace(0, y_max, 10), 2) x_ticks_locs = x_lbls = np.linspace(0, max_x, 5) - if x_lbl_divisor is not None: x_lbls = np.round((x_lbls / x_lbl_divisor), 1) - if y_lbl is not None: plt.ylabel(y_lbl) - if x_lbl is not None: plt.xlabel(x_lbl) + if x_lbl_divisor is not None: + x_lbls = np.round((x_lbls / x_lbl_divisor), 1) + if y_lbl is not None: + plt.ylabel(y_lbl) + if x_lbl is not None: + plt.xlabel(x_lbl) plt.xticks(x_ticks_locs, x_lbls, rotation="horizontal", fontsize=font_size) plt.yticks(y_ticks_locs, y_lbls, fontsize=font_size) plt.ylim(0, y_max) - if title is not None: plt.suptitle(title, x=0.5, y=0.92, fontsize=font_size + 4) + if title is not None: + plt.suptitle(title, x=0.5, y=0.92, fontsize=font_size + 4) buffer_ = io.BytesIO() plt.savefig(buffer_, format="png") buffer_.seek(0) @@ -2103,28 +2131,29 @@ def make_line_plot(data: List[np.ndarray], img = cv2.resize(img, (width, height)) if save_path is not None: cv2.imwrite(save_path, img) - stdout_success(msg=f'Line plot saved at {save_path}') + stdout_success(msg=f"Line plot saved at {save_path}") else: return img @staticmethod - def make_line_plot_plotly(data: List[np.ndarray], - colors: List[str], - show_box: Optional[bool] = True, - show_grid: Optional[bool] = False, - width: Optional[int] = 640, - height: Optional[int] = 480, - line_width: Optional[int] = 6, - font_size: Optional[int] = 8, - bg_clr: Optional[str] = 'white', - x_lbl_divisor: Optional[float] = None, - title: Optional[str] = None, - y_lbl: Optional[str] = None, - x_lbl: Optional[str] = None, - y_max: Optional[int] = -1, - line_opacity: Optional[int] = 0.5, - save_path: Optional[Union[str, os.PathLike]] = None): - + def make_line_plot_plotly( + data: List[np.ndarray], + colors: List[str], + show_box: Optional[bool] = True, + show_grid: Optional[bool] = False, + width: Optional[int] = 640, + height: Optional[int] = 480, + line_width: Optional[int] = 6, + font_size: Optional[int] = 8, + bg_clr: Optional[str] = "white", + x_lbl_divisor: Optional[float] = None, + title: Optional[str] = None, + y_lbl: Optional[str] = None, + x_lbl: Optional[str] = None, + y_max: Optional[int] = -1, + line_opacity: Optional[int] = 0.5, + save_path: Optional[Union[str, os.PathLike]] = None, + ): """ Create a line plot using Plotly. @@ -2167,20 +2196,35 @@ def tick_formatter(x): fig = go.Figure() clr_dict = get_color_dict() - if y_max == -1: y_max = max([np.max(i) for i in data]) + if y_max == -1: + y_max = max([np.max(i) for i in data]) for i in range(len(data)): line_clr = clr_dict[colors[i]] - line_clr = f'rgba({line_clr[0]}, {line_clr[1]}, {line_clr[2]}, {line_opacity})' - fig.add_trace(go.Scatter(y=data[i].flatten(), mode='lines', line=dict(color=line_clr, width=line_width))) + line_clr = ( + f"rgba({line_clr[0]}, {line_clr[1]}, {line_clr[2]}, {line_opacity})" + ) + fig.add_trace( + go.Scatter( + y=data[i].flatten(), + mode="lines", + line=dict(color=line_clr, width=line_width), + ) + ) if not show_box: - fig.update_layout(width=width, height=height, title=title, xaxis_visible=False, yaxis_visible=False, - showlegend=False) + fig.update_layout( + width=width, + height=height, + title=title, + xaxis_visible=False, + yaxis_visible=False, + showlegend=False, + ) else: - if fig['layout']['xaxis']['tickvals'] is None: + if fig["layout"]["xaxis"]["tickvals"] is None: tickvals = [i for i in range(data[0].shape[0])] else: - tickvals = fig['layout']['xaxis']['tickvals'] + tickvals = fig["layout"]["xaxis"]["tickvals"] if x_lbl_divisor is not None: ticktext = [tick_formatter(x) for x in tickvals] else: @@ -2193,7 +2237,7 @@ def tick_formatter(x): title=x_lbl, tickvals=tickvals, ticktext=ticktext, - tickmode='auto', + tickmode="auto", tick0=0, dtick=10, tickfont=dict(size=font_size), @@ -2205,14 +2249,14 @@ def tick_formatter(x): range=[0, y_max], showgrid=show_grid, ), - showlegend=False + showlegend=False, ) if bg_clr is not None: fig.update_layout(plot_bgcolor=bg_clr) if save_path is not None: pio.write_image(fig, save_path) - stdout_success(msg=f'Line plot saved at {save_path}') + stdout_success(msg=f"Line plot saved at {save_path}") else: img_bytes = fig.to_image(format="png") img = PIL.Image.open(io.BytesIO(img_bytes)) diff --git a/simba/mixins/statistics_mixin.py b/simba/mixins/statistics_mixin.py index 16a590a52..870a88ccb 100644 --- a/simba/mixins/statistics_mixin.py +++ b/simba/mixins/statistics_mixin.py @@ -1797,11 +1797,13 @@ def find_collinear_features( return feature_names @staticmethod - def local_outlier_factor(data: np.ndarray, - k: Union[int, float] = 5, - contamination: Optional[float] = 1e-10, - normalize: Optional[bool] = False, - groupby_idx: Optional[int] = None) -> np.ndarray: + def local_outlier_factor( + data: np.ndarray, + k: Union[int, float] = 5, + contamination: Optional[float] = 1e-10, + normalize: Optional[bool] = False, + groupby_idx: Optional[int] = None, + ) -> np.ndarray: """ Compute the local outlier factor of each observation. @@ -1850,14 +1852,30 @@ def get_lof(data, k, contamination): return y if groupby_idx is not None: - check_int(name=f'{Statistics.local_outlier_factor.__name__} groupby_idx', value=groupby_idx, min_value=0, - max_value=data.shape[1] - 1) - check_valid_array(source=f"{Statistics.local_outlier_factor.__name__} local_outlier_factor", data=data, - accepted_sizes=[2], min_axis_1=3) + check_int( + name=f"{Statistics.local_outlier_factor.__name__} groupby_idx", + value=groupby_idx, + min_value=0, + max_value=data.shape[1] - 1, + ) + check_valid_array( + source=f"{Statistics.local_outlier_factor.__name__} local_outlier_factor", + data=data, + accepted_sizes=[2], + min_axis_1=3, + ) else: - check_valid_array(source=f"{Statistics.local_outlier_factor.__name__} data", data=data, accepted_sizes=[2], - min_axis_1=2) - check_float(name=f"{Statistics.local_outlier_factor.__name__} contamination", value=contamination, min_value=0.0) + check_valid_array( + source=f"{Statistics.local_outlier_factor.__name__} data", + data=data, + accepted_sizes=[2], + min_axis_1=2, + ) + check_float( + name=f"{Statistics.local_outlier_factor.__name__} contamination", + value=contamination, + min_value=0.0, + ) if groupby_idx is None: return get_lof(data, k, contamination) @@ -1873,7 +1891,9 @@ def get_lof(data, k, contamination): else: unclustered = None for i in unique_c: - s_data = data_w_idx[np.argwhere(data_w_idx[:, groupby_idx + 1] == i)].reshape(-1, data_w_idx.shape[1]) + s_data = data_w_idx[ + np.argwhere(data_w_idx[:, groupby_idx + 1] == i) + ].reshape(-1, data_w_idx.shape[1]) idx = s_data[:, 0].reshape(s_data.shape[0], 1) s_data = np.delete(s_data, [0, groupby_idx + 1], 1) lof = get_lof(s_data, k, contamination).reshape(s_data.shape[0], 1) @@ -1885,10 +1905,12 @@ def get_lof(data, k, contamination): x = np.vstack((x, unclustered)) return x[np.argsort(x[:, 0])][:, -1] - def elliptic_envelope(data: np.ndarray, - contamination: Optional[float] = 1e-1, - normalize: Optional[bool] = False, - groupby_idx: Optional[int] = None) -> np.ndarray: + def elliptic_envelope( + data: np.ndarray, + contamination: Optional[float] = 1e-1, + normalize: Optional[bool] = False, + groupby_idx: Optional[int] = None, + ) -> np.ndarray: """ Compute the Mahalanobis distances of each observation in the input array using Elliptic Envelope method. @@ -1922,12 +1944,32 @@ def get_envelope(data, contamination) -> np.ndarray: return y if groupby_idx is not None: - check_int(name=f'{Statistics.elliptic_envelope.__name__} groupby_idx', value=groupby_idx, min_value=0, max_value=data.shape[1] - 1) - check_valid_array(source=f"{Statistics.elliptic_envelope.__name__} local_outlier_factor", data=data, accepted_sizes=[2], min_axis_1=3) + check_int( + name=f"{Statistics.elliptic_envelope.__name__} groupby_idx", + value=groupby_idx, + min_value=0, + max_value=data.shape[1] - 1, + ) + check_valid_array( + source=f"{Statistics.elliptic_envelope.__name__} local_outlier_factor", + data=data, + accepted_sizes=[2], + min_axis_1=3, + ) else: - check_valid_array(source=f"{Statistics.elliptic_envelope.__name__} data", data=data, accepted_sizes=[2], min_axis_1=2) + check_valid_array( + source=f"{Statistics.elliptic_envelope.__name__} data", + data=data, + accepted_sizes=[2], + min_axis_1=2, + ) - check_float(name=f"{Statistics.elliptic_envelope.__name__} contamination", value=contamination, min_value=0.0, max_value=1.0) + check_float( + name=f"{Statistics.elliptic_envelope.__name__} contamination", + value=contamination, + min_value=0.0, + max_value=1.0, + ) if groupby_idx is None: return get_envelope(data, contamination) else: @@ -1942,7 +1984,9 @@ def get_envelope(data, contamination) -> np.ndarray: else: unclustered = None for i in unique_c: - s_data = data_w_idx[np.argwhere(data_w_idx[:, groupby_idx + 1] == i)].reshape(-1, data_w_idx.shape[1]) + s_data = data_w_idx[ + np.argwhere(data_w_idx[:, groupby_idx + 1] == i) + ].reshape(-1, data_w_idx.shape[1]) idx = s_data[:, 0].reshape(s_data.shape[0], 1) s_data = np.delete(s_data, [0, groupby_idx + 1], 1) lof = get_envelope(s_data, contamination).reshape(s_data.shape[0], 1) diff --git a/simba/plotting/distance_plotter.py b/simba/plotting/distance_plotter.py index 4d73129a6..d158351d5 100644 --- a/simba/plotting/distance_plotter.py +++ b/simba/plotting/distance_plotter.py @@ -1,20 +1,22 @@ __author__ = "Simon Nilsson" import matplotlib -matplotlib.use('TkAgg') + +matplotlib.use("TkAgg") import os -from typing import Dict, List, Union, Optional +from typing import Dict, List, Optional, Union + import cv2 import numpy as np from simba.mixins.config_reader import ConfigReader -from simba.mixins.plotting_mixin import PlottingMixin from simba.mixins.feature_extraction_mixin import FeatureExtractionMixin -from simba.utils.checks import (check_instance, - check_file_exist_and_readable, - check_valid_lst, - check_all_file_names_are_represented_in_video_log) -from simba.utils.errors import NoSpecifiedOutputError, CountError, InvalidInputError +from simba.mixins.plotting_mixin import PlottingMixin +from simba.utils.checks import ( + check_all_file_names_are_represented_in_video_log, + check_file_exist_and_readable, check_instance, check_valid_lst) +from simba.utils.errors import (CountError, InvalidInputError, + NoSpecifiedOutputError) from simba.utils.lookups import get_color_dict from simba.utils.printing import SimbaTimer, stdout_success from simba.utils.read_write import get_fn_ext, read_df @@ -45,106 +47,171 @@ class DistancePlotterSingleCore(ConfigReader): >>> distance_plotter.run() """ - def __init__(self, - config_path: Union[str, os.PathLike], - data_paths: List[Union[str, os.PathLike]], - style_attr: Dict[str, int], - line_attr: List[List[str]], - frame_setting: Optional[bool] = False, - video_setting: Optional[bool] = False, - final_img: Optional[bool] = False): + def __init__( + self, + config_path: Union[str, os.PathLike], + data_paths: List[Union[str, os.PathLike]], + style_attr: Dict[str, int], + line_attr: List[List[str]], + frame_setting: Optional[bool] = False, + video_setting: Optional[bool] = False, + final_img: Optional[bool] = False, + ): if (not frame_setting) and (not video_setting) and (not final_img): - raise NoSpecifiedOutputError(msg="Please choice to create frames and/or video distance plots", source=self.__class__.__name__) - check_instance(source=f'{self.__class__.__name__} line_attr', instance=line_attr, accepted_types=(list,)) + raise NoSpecifiedOutputError( + msg="Please choice to create frames and/or video distance plots", + source=self.__class__.__name__, + ) + check_instance( + source=f"{self.__class__.__name__} line_attr", + instance=line_attr, + accepted_types=(list,), + ) for cnt, i in enumerate(line_attr): - check_valid_lst(source=f'{self.__class__.__name__} line_attr {cnt}', data=i, valid_dtypes=(str,), exact_len=3) + check_valid_lst( + source=f"{self.__class__.__name__} line_attr {cnt}", + data=i, + valid_dtypes=(str,), + exact_len=3, + ) check_valid_lst(data=data_paths, valid_dtypes=(str,), min_len=1) _ = [check_file_exist_and_readable(i) for i in data_paths] ConfigReader.__init__(self, config_path=config_path) - self.video_setting, self.frame_setting, self.data_paths, self.style_attr, self.line_attr, self.final_img = video_setting, frame_setting, data_paths, style_attr, line_attr, final_img + ( + self.video_setting, + self.frame_setting, + self.data_paths, + self.style_attr, + self.line_attr, + self.final_img, + ) = (video_setting, frame_setting, data_paths, style_attr, line_attr, final_img) self.color_names = get_color_dict() - - def run(self): print(f"Processing {len(self.data_paths)} videos...") - check_all_file_names_are_represented_in_video_log(video_info_df=self.video_info_df, data_paths=self.data_paths) + check_all_file_names_are_represented_in_video_log( + video_info_df=self.video_info_df, data_paths=self.data_paths + ) for file_cnt, file_path in enumerate(self.data_paths): video_timer = SimbaTimer(start=True) data_df = read_df(file_path, self.file_type) _, video_name, _ = get_fn_ext(file_path) - self.video_info, px_per_mm, fps = self.read_video_info(video_name=video_name) + self.video_info, px_per_mm, fps = self.read_video_info( + video_name=video_name + ) self.save_video_folder = os.path.join(self.line_plot_dir, video_name) self.save_frame_folder_dir = os.path.join(self.line_plot_dir, video_name) try: data_df.columns = self.bp_headers except ValueError: - raise CountError(msg=f'SimBA expects {self.bp_headers} columns but found {len(data_df)} columns in {file_path}', source=self.__class__.__name__) + raise CountError( + msg=f"SimBA expects {self.bp_headers} columns but found {len(data_df)} columns in {file_path}", + source=self.__class__.__name__, + ) distances = [] colors = [] for cnt, i in enumerate(self.line_attr): if i[2] not in list(self.color_names.keys()): - raise InvalidInputError(msg=f'{i[2]} is not a valid color. Options: {list(self.color_names.keys())}.', source=self.__class__.__name__) + raise InvalidInputError( + msg=f"{i[2]} is not a valid color. Options: {list(self.color_names.keys())}.", + source=self.__class__.__name__, + ) colors.append(i[2]) - bp_1, bp_2 = [f'{i[0]}_x', f'{i[0]}_y'], [f'{i[1]}_x', f'{i[1]}_y'] - if len(list(set(bp_1) - set(data_df.columns))) > 0: raise InvalidInputError(msg=f'Could not find fields {bp_1} in {file_path}', source=self.__class__.__name__) - if len(list(set(bp_2) - set(data_df.columns))) > 0: raise InvalidInputError(msg=f'Could not find fields {bp_2} in {file_path}', source=self.__class__.__name__) - distances.append(FeatureExtractionMixin.framewise_euclidean_distance(location_1=data_df[bp_1].values, location_2=data_df[bp_2].values, px_per_mm=px_per_mm, centimeter=True)) + bp_1, bp_2 = [f"{i[0]}_x", f"{i[0]}_y"], [f"{i[1]}_x", f"{i[1]}_y"] + if len(list(set(bp_1) - set(data_df.columns))) > 0: + raise InvalidInputError( + msg=f"Could not find fields {bp_1} in {file_path}", + source=self.__class__.__name__, + ) + if len(list(set(bp_2) - set(data_df.columns))) > 0: + raise InvalidInputError( + msg=f"Could not find fields {bp_2} in {file_path}", + source=self.__class__.__name__, + ) + distances.append( + FeatureExtractionMixin.framewise_euclidean_distance( + location_1=data_df[bp_1].values, + location_2=data_df[bp_2].values, + px_per_mm=px_per_mm, + centimeter=True, + ) + ) if self.frame_setting: if os.path.exists(self.save_frame_folder_dir): self.remove_a_folder(self.save_frame_folder_dir) os.makedirs(self.save_frame_folder_dir) if self.video_setting: save_video_path = os.path.join(self.line_plot_dir, f"{video_name}.avi") - fourcc = cv2.VideoWriter_fourcc(*'DIVX') - video_writer = cv2.VideoWriter(save_video_path, fourcc, fps, (self.style_attr["width"], self.style_attr["height"])) + fourcc = cv2.VideoWriter_fourcc(*"DIVX") + video_writer = cv2.VideoWriter( + save_video_path, + fourcc, + fps, + (self.style_attr["width"], self.style_attr["height"]), + ) if self.final_img: - _ = PlottingMixin.make_line_plot(data=distances, - colors=colors, - width=self.style_attr['width'], - height=self.style_attr['height'], - line_width=self.style_attr['line width'], - font_size=self.style_attr['font size'], - title='Animal distances', - y_lbl='distance (cm)', - x_lbl='time (s)', - x_lbl_divisor=fps, - y_max=self.style_attr['y_max'], - line_opacity=self.style_attr['opacity'], - save_path=os.path.join(self.line_plot_dir, f"{video_name}_final_distances.png")) + _ = PlottingMixin.make_line_plot( + data=distances, + colors=colors, + width=self.style_attr["width"], + height=self.style_attr["height"], + line_width=self.style_attr["line width"], + font_size=self.style_attr["font size"], + title="Animal distances", + y_lbl="distance (cm)", + x_lbl="time (s)", + x_lbl_divisor=fps, + y_max=self.style_attr["y_max"], + line_opacity=self.style_attr["opacity"], + save_path=os.path.join( + self.line_plot_dir, f"{video_name}_final_distances.png" + ), + ) if self.video_setting or self.frame_setting: - if self.style_attr["y_max"] == -1: self.style_attr["y_max"] = max([np.max(x) for x in distances]) + if self.style_attr["y_max"] == -1: + self.style_attr["y_max"] = max([np.max(x) for x in distances]) for frm_cnt in range(distances[0].shape[0]): line_data = [x[:frm_cnt] for x in distances] - img = PlottingMixin.make_line_plot_plotly(data=line_data, - colors=colors, - width=self.style_attr['width'], - height=self.style_attr['height'], - line_width=self.style_attr['line width'], - font_size=self.style_attr['font size'], - title='Animal distances', - y_lbl='distance (cm)', - x_lbl='frame count', - x_lbl_divisor=fps, - y_max=self.style_attr['y_max'], - line_opacity=self.style_attr['opacity'], - save_path=None).astype(np.uint8) + img = PlottingMixin.make_line_plot_plotly( + data=line_data, + colors=colors, + width=self.style_attr["width"], + height=self.style_attr["height"], + line_width=self.style_attr["line width"], + font_size=self.style_attr["font size"], + title="Animal distances", + y_lbl="distance (cm)", + x_lbl="frame count", + x_lbl_divisor=fps, + y_max=self.style_attr["y_max"], + line_opacity=self.style_attr["opacity"], + save_path=None, + ).astype(np.uint8) if self.video_setting: video_writer.write(img[:, :, :3]) if self.frame_setting: - frm_name = os.path.join(self.save_frame_folder_dir, f"{frm_cnt}.png") + frm_name = os.path.join( + self.save_frame_folder_dir, f"{frm_cnt}.png" + ) cv2.imwrite(frm_name, np.uint8(img)) print(f"Distance frame created: {frm_cnt}, Video: {video_name} ...") if self.video_setting: video_writer.release() video_timer.stop_timer() - stdout_success(msg=f"Distance visualizations created for {video_name} saved at {self.line_plot_dir}", elapsed_time=video_timer.elapsed_time_str) + stdout_success( + msg=f"Distance visualizations created for {video_name} saved at {self.line_plot_dir}", + elapsed_time=video_timer.elapsed_time_str, + ) self.timer.stop_timer() - stdout_success(msg=f"Distance visualizations complete for {len(self.data_paths)} video(s)", elapsed_time=self.timer.elapsed_time_str) + stdout_success( + msg=f"Distance visualizations complete for {len(self.data_paths)} video(s)", + elapsed_time=self.timer.elapsed_time_str, + ) + # style_attr = {'width': 640, 'height': 480, 'line width': 6, 'font size': 12, 'y_max': -1, 'opacity': 0.5} # line_attr = [['Center_1', 'Center_2', 'Green'], ['Ear_left_2', 'Ear_right_2', 'Red']] @@ -158,8 +225,6 @@ def run(self): # test.run() - - # # style_attr = {'width': 640, # 'height': 480, diff --git a/simba/plotting/distance_plotter_mp.py b/simba/plotting/distance_plotter_mp.py index de357d1ac..d9a3ce27c 100644 --- a/simba/plotting/distance_plotter_mp.py +++ b/simba/plotting/distance_plotter_mp.py @@ -1,70 +1,88 @@ __author__ = "Simon Nilsson" import matplotlib -matplotlib.use('TkAgg') + +matplotlib.use("TkAgg") import functools import multiprocessing import os -from typing import Dict, List, Union, Optional import platform +from typing import Dict, List, Optional, Union +import cv2 import numpy as np from numba import jit -import cv2 -from simba.utils.lookups import get_color_dict from simba.mixins.config_reader import ConfigReader +from simba.mixins.feature_extraction_mixin import FeatureExtractionMixin from simba.mixins.plotting_mixin import PlottingMixin -from simba.utils.checks import (check_int, check_valid_lst, check_file_exist_and_readable, check_instance, check_all_file_names_are_represented_in_video_log) -from simba.utils.errors import NoSpecifiedOutputError, CountError, InvalidInputError +from simba.utils.checks import ( + check_all_file_names_are_represented_in_video_log, + check_file_exist_and_readable, check_instance, check_int, check_valid_lst) +from simba.utils.errors import (CountError, InvalidInputError, + NoSpecifiedOutputError) +from simba.utils.lookups import get_color_dict from simba.utils.printing import SimbaTimer, stdout_success -from simba.utils.read_write import (concatenate_videos_in_folder, get_fn_ext, read_df, find_core_cnt) -from simba.mixins.feature_extraction_mixin import FeatureExtractionMixin +from simba.utils.read_write import (concatenate_videos_in_folder, + find_core_cnt, get_fn_ext, read_df) + -def distance_plotter_mp(frm_cnts: np.array, - distances: np.ndarray, - colors: List[str], - video_setting: bool, - frame_setting: bool, - video_name: str, - video_save_dir: str, - frame_folder_dir: str, - style_attr: dict, - fps: int): +def distance_plotter_mp( + frm_cnts: np.array, + distances: np.ndarray, + colors: List[str], + video_setting: bool, + frame_setting: bool, + video_name: str, + video_save_dir: str, + frame_folder_dir: str, + style_attr: dict, + fps: int, +): group = int(distances[frm_cnts[0], 0]) video_writer = None if video_setting: - fourcc = cv2.VideoWriter_fourcc(*'DIVX') + fourcc = cv2.VideoWriter_fourcc(*"DIVX") temp_video_save_path = os.path.join(video_save_dir, f"{group}.avi") - video_writer = cv2.VideoWriter(temp_video_save_path, fourcc, fps, (style_attr["width"], style_attr["height"])) + video_writer = cv2.VideoWriter( + temp_video_save_path, + fourcc, + fps, + (style_attr["width"], style_attr["height"]), + ) for frm_cnt in frm_cnts: line_data = distances[:frm_cnt, 1:] line_data = np.hsplit(line_data, line_data.shape[1]) - img = PlottingMixin.make_line_plot_plotly(data=line_data, - colors=colors, - width=style_attr['width'], - height=style_attr['height'], - line_width=style_attr['line width'], - font_size=style_attr['font size'], - title='Animal distances', - y_lbl='distance (cm)', - x_lbl='frame count', - x_lbl_divisor=fps, - y_max=style_attr['y_max'], - line_opacity=style_attr['opacity'], - save_path=None).astype(np.uint8) + img = PlottingMixin.make_line_plot_plotly( + data=line_data, + colors=colors, + width=style_attr["width"], + height=style_attr["height"], + line_width=style_attr["line width"], + font_size=style_attr["font size"], + title="Animal distances", + y_lbl="distance (cm)", + x_lbl="frame count", + x_lbl_divisor=fps, + y_max=style_attr["y_max"], + line_opacity=style_attr["opacity"], + save_path=None, + ).astype(np.uint8) if video_setting: video_writer.write(img[:, :, :3]) if frame_setting: frm_name = os.path.join(frame_folder_dir, f"{frm_cnt}.png") cv2.imwrite(frm_name, np.uint8(img)) - print(f"Distance frame created: {frm_cnt} (of {distances.shape[0]}), Video: {video_name}, Processing core: {group}") + print( + f"Distance frame created: {frm_cnt} (of {distances.shape[0]}), Video: {video_name}, Processing core: {group}" + ) if video_setting: - video_writer.release() + video_writer.release() return group + class DistancePlotterMultiCore(ConfigReader, PlottingMixin): """ Visualize the distances between pose-estimated body-parts (e.g., two animals) through line @@ -95,29 +113,66 @@ class DistancePlotterMultiCore(ConfigReader, PlottingMixin): >>> distance_plotter.run() """ - def __init__(self, - config_path: Union[str, os.PathLike], - data_paths: List[Union[str, os.PathLike]], - frame_setting: bool, - video_setting: bool, - final_img: bool, - style_attr: Dict[str, int], - line_attr: Dict[int, list], - core_cnt: Optional[int] = -1): + def __init__( + self, + config_path: Union[str, os.PathLike], + data_paths: List[Union[str, os.PathLike]], + frame_setting: bool, + video_setting: bool, + final_img: bool, + style_attr: Dict[str, int], + line_attr: Dict[int, list], + core_cnt: Optional[int] = -1, + ): if (not frame_setting) and (not video_setting) and (not final_img): - raise NoSpecifiedOutputError(msg="Please choice to create frames and/or video distance plots", source=self.__class__.__name__) - check_int(name=f'{self.__class__.__name__} core_cnt', value=core_cnt, min_value=-1, max_value=find_core_cnt()[0]) - if core_cnt == -1: core_cnt = find_core_cnt()[0] + raise NoSpecifiedOutputError( + msg="Please choice to create frames and/or video distance plots", + source=self.__class__.__name__, + ) + check_int( + name=f"{self.__class__.__name__} core_cnt", + value=core_cnt, + min_value=-1, + max_value=find_core_cnt()[0], + ) + if core_cnt == -1: + core_cnt = find_core_cnt()[0] ConfigReader.__init__(self, config_path=config_path) PlottingMixin.__init__(self) - check_instance(source=f'{self.__class__.__name__} line_attr', instance=line_attr, accepted_types=(list,)) + check_instance( + source=f"{self.__class__.__name__} line_attr", + instance=line_attr, + accepted_types=(list,), + ) for cnt, i in enumerate(line_attr): - check_valid_lst(source=f'{self.__class__.__name__} line_attr {cnt}', data=i, valid_dtypes=(str,), exact_len=3) + check_valid_lst( + source=f"{self.__class__.__name__} line_attr {cnt}", + data=i, + valid_dtypes=(str,), + exact_len=3, + ) check_valid_lst(data=data_paths, valid_dtypes=(str,), min_len=1) _ = [check_file_exist_and_readable(i) for i in data_paths] - self.video_setting, self.frame_setting, self.data_paths, self.style_attr, self.line_attr, self.final_img, self.core_cnt = video_setting, frame_setting, data_paths, style_attr, line_attr, final_img, core_cnt - if not os.path.exists(self.line_plot_dir): os.makedirs(self.line_plot_dir) + ( + self.video_setting, + self.frame_setting, + self.data_paths, + self.style_attr, + self.line_attr, + self.final_img, + self.core_cnt, + ) = ( + video_setting, + frame_setting, + data_paths, + style_attr, + line_attr, + final_img, + core_cnt, + ) + if not os.path.exists(self.line_plot_dir): + os.makedirs(self.line_plot_dir) self.color_names = get_color_dict() if platform.system() == "Darwin": multiprocessing.set_start_method("spawn", force=True) @@ -130,7 +185,9 @@ def __insert_group_idx_column(data: np.array, group: int): def run(self): print(f"Processing {len(self.data_paths)} video(s)...") - check_all_file_names_are_represented_in_video_log(video_info_df=self.video_info_df, data_paths=self.data_paths) + check_all_file_names_are_represented_in_video_log( + video_info_df=self.video_info_df, data_paths=self.data_paths + ) for file_cnt, file_path in enumerate(self.data_paths): video_timer = SimbaTimer(start=True) _, video_name, _ = get_fn_ext(file_path) @@ -138,8 +195,13 @@ def run(self): try: data_df.columns = self.bp_headers except ValueError: - raise CountError(msg=f'SimBA expects {self.bp_headers} columns but found {len(data_df)} columns in {file_path}', source=self.__class__.__name__) - self.video_info, px_per_mm, fps = self.read_video_info(video_name=video_name) + raise CountError( + msg=f"SimBA expects {self.bp_headers} columns but found {len(data_df)} columns in {file_path}", + source=self.__class__.__name__, + ) + self.video_info, px_per_mm, fps = self.read_video_info( + video_name=video_name + ) self.save_video_folder = os.path.join(self.line_plot_dir, video_name) self.temp_folder = os.path.join(self.line_plot_dir, video_name, "temp") self.save_frame_folder_dir = os.path.join(self.line_plot_dir, video_name) @@ -147,12 +209,30 @@ def run(self): colors = [] for cnt, i in enumerate(self.line_attr): if i[2] not in list(self.color_names.keys()): - raise InvalidInputError(msg=f'{i[2]} is not a valid color. Options: {list(self.color_names.keys())}.', source=self.__class__.__name__) + raise InvalidInputError( + msg=f"{i[2]} is not a valid color. Options: {list(self.color_names.keys())}.", + source=self.__class__.__name__, + ) colors.append(i[2]) - bp_1, bp_2 = [f'{i[0]}_x', f'{i[0]}_y'], [f'{i[1]}_x', f'{i[1]}_y'] - if len(list(set(bp_1) - set(data_df.columns))) > 0: raise InvalidInputError(msg=f'Could not find fields {bp_1} in {file_path}', source=self.__class__.__name__) - if len(list(set(bp_2) - set(data_df.columns))) > 0: raise InvalidInputError(msg=f'Could not find fields {bp_2} in {file_path}', source=self.__class__.__name__) - distances.append(FeatureExtractionMixin.framewise_euclidean_distance(location_1=data_df[bp_1].values, location_2=data_df[bp_2].values, px_per_mm=px_per_mm, centimeter=True)) + bp_1, bp_2 = [f"{i[0]}_x", f"{i[0]}_y"], [f"{i[1]}_x", f"{i[1]}_y"] + if len(list(set(bp_1) - set(data_df.columns))) > 0: + raise InvalidInputError( + msg=f"Could not find fields {bp_1} in {file_path}", + source=self.__class__.__name__, + ) + if len(list(set(bp_2) - set(data_df.columns))) > 0: + raise InvalidInputError( + msg=f"Could not find fields {bp_2} in {file_path}", + source=self.__class__.__name__, + ) + distances.append( + FeatureExtractionMixin.framewise_euclidean_distance( + location_1=data_df[bp_1].values, + location_2=data_df[bp_2].values, + px_per_mm=px_per_mm, + centimeter=True, + ) + ) if self.frame_setting: if os.path.exists(self.save_frame_folder_dir): self.remove_a_folder(self.save_frame_folder_dir) @@ -162,55 +242,85 @@ def run(self): if os.path.exists(self.temp_folder): self.remove_a_folder(self.temp_folder) os.makedirs(self.temp_folder) - self.save_video_path = os.path.join(self.line_plot_dir, f"{video_name}.mp4") + self.save_video_path = os.path.join( + self.line_plot_dir, f"{video_name}.mp4" + ) if self.final_img: - _ = PlottingMixin.make_line_plot(data=distances, - colors=colors, - width=self.style_attr['width'], - height=self.style_attr['height'], - line_width=self.style_attr['line width'], - font_size=self.style_attr['font size'], - title='Animal distances', - y_lbl='distance (cm)', - x_lbl='time (s)', - x_lbl_divisor=fps, - y_max=self.style_attr['y_max'], - line_opacity=self.style_attr['opacity'], - save_path=os.path.join(self.line_plot_dir, f"{video_name}_final_distances.png")) + _ = PlottingMixin.make_line_plot( + data=distances, + colors=colors, + width=self.style_attr["width"], + height=self.style_attr["height"], + line_width=self.style_attr["line width"], + font_size=self.style_attr["font size"], + title="Animal distances", + y_lbl="distance (cm)", + x_lbl="time (s)", + x_lbl_divisor=fps, + y_max=self.style_attr["y_max"], + line_opacity=self.style_attr["opacity"], + save_path=os.path.join( + self.line_plot_dir, f"{video_name}_final_distances.png" + ), + ) if self.video_setting or self.frame_setting: - if self.style_attr["y_max"] == -1: self.style_attr["y_max"] = max([np.max(x) for x in distances]) + if self.style_attr["y_max"] == -1: + self.style_attr["y_max"] = max([np.max(x) for x in distances]) distances = np.stack(distances, axis=1) frm_range = np.arange(0, distances.shape[0]) frm_range = np.array_split(frm_range, self.core_cnt) distances = np.array_split(distances, self.core_cnt) - distances = [self.__insert_group_idx_column(data=i, group=cnt) for cnt, i in enumerate(distances)] + distances = [ + self.__insert_group_idx_column(data=i, group=cnt) + for cnt, i in enumerate(distances) + ] distances = np.concatenate(distances, axis=0) - print(f"Creating distance plots, multiprocessing, follow progress in terminal (chunksize: {self.multiprocess_chunksize}, cores: {self.core_cnt})") - with multiprocessing.Pool(self.core_cnt, maxtasksperchild=self.maxtasksperchild) as pool: - constants = functools.partial(distance_plotter_mp, - distances=distances, - video_setting=self.video_setting, - frame_setting=self.frame_setting, - video_name=video_name, - video_save_dir=self.temp_folder, - frame_folder_dir=self.save_frame_folder_dir, - style_attr=self.style_attr, - colors=colors, - fps=fps) - for cnt, result in enumerate(pool.map(constants, frm_range, chunksize=self.multiprocess_chunksize)): - print(f'Frame batch core {result} complete...') + print( + f"Creating distance plots, multiprocessing, follow progress in terminal (chunksize: {self.multiprocess_chunksize}, cores: {self.core_cnt})" + ) + with multiprocessing.Pool( + self.core_cnt, maxtasksperchild=self.maxtasksperchild + ) as pool: + constants = functools.partial( + distance_plotter_mp, + distances=distances, + video_setting=self.video_setting, + frame_setting=self.frame_setting, + video_name=video_name, + video_save_dir=self.temp_folder, + frame_folder_dir=self.save_frame_folder_dir, + style_attr=self.style_attr, + colors=colors, + fps=fps, + ) + for cnt, result in enumerate( + pool.map( + constants, frm_range, chunksize=self.multiprocess_chunksize + ) + ): + print(f"Frame batch core {result} complete...") pass pool.join() pool.terminate() if self.video_setting: - concatenate_videos_in_folder(in_folder=self.temp_folder, save_path=self.save_video_path, video_format='avi') + concatenate_videos_in_folder( + in_folder=self.temp_folder, + save_path=self.save_video_path, + video_format="avi", + ) video_timer.stop_timer() - stdout_success(msg=f"Distance visualizations created for {video_name} saved at {self.line_plot_dir}", elapsed_time=video_timer.elapsed_time_str) + stdout_success( + msg=f"Distance visualizations created for {video_name} saved at {self.line_plot_dir}", + elapsed_time=video_timer.elapsed_time_str, + ) self.timer.stop_timer() - stdout_success(msg=f"Distance visualizations complete for {len(self.data_paths)} video(s)", elapsed_time=self.timer.elapsed_time_str) + stdout_success( + msg=f"Distance visualizations complete for {len(self.data_paths)} video(s)", + elapsed_time=self.timer.elapsed_time_str, + ) # style_attr = {'width': 640, 'height': 480, 'line width': 6, 'font size': 12, 'y_max': -1, 'opacity': 0.5} @@ -226,9 +336,6 @@ def run(self): # test.run() - - - # style_attr = {'width': 640, 'height': 480, 'line width': 6, 'font size': 8, 'y_max': 'auto', 'opacity': 0.9} # line_attr = {0: ['Center_1', 'Center_2', 'Green'], 1: ['Ear_left_2', 'Ear_left_1', 'Red']} # diff --git a/simba/plotting/frame_mergerer_ffmpeg.py b/simba/plotting/frame_mergerer_ffmpeg.py index e44d77a33..104d15f4d 100644 --- a/simba/plotting/frame_mergerer_ffmpeg.py +++ b/simba/plotting/frame_mergerer_ffmpeg.py @@ -2,8 +2,8 @@ import os import shutil -from typing import Optional, List, Union from datetime import datetime +from typing import List, Optional, Union try: from typing import Literal @@ -11,27 +11,25 @@ from typing_extensions import Literal from simba.mixins.config_reader import ConfigReader -from simba.utils.checks import (check_nvidea_gpu_available, - check_ffmpeg_available, - check_str, - check_file_exist_and_readable, - check_int, +from simba.utils.checks import (check_ffmpeg_available, + check_file_exist_and_readable, check_int, + check_nvidea_gpu_available, check_str, check_valid_lst) from simba.utils.enums import Paths, TagNames from simba.utils.errors import FFMPEGCodecGPUError from simba.utils.printing import SimbaTimer, log_event, stdout_success -from simba.video_processors.video_processing import (mixed_mosaic_concatenator, - horizontal_video_concatenator, - vertical_video_concatenator, - mosaic_concatenator) -from simba.utils.read_write import get_fn_ext, copy_files_to_directory - -HORIZONTAL = 'horizontal' -VERTICAL = 'vertical' -MOSAIC = 'mosaic' -MIXED_MOSAIC = 'mixed_mosaic' +from simba.utils.read_write import copy_files_to_directory, get_fn_ext +from simba.video_processors.video_processing import ( + horizontal_video_concatenator, mixed_mosaic_concatenator, + mosaic_concatenator, vertical_video_concatenator) + +HORIZONTAL = "horizontal" +VERTICAL = "vertical" +MOSAIC = "mosaic" +MIXED_MOSAIC = "mixed_mosaic" ACCEPTED_TYPES = [HORIZONTAL, VERTICAL, MOSAIC, MIXED_MOSAIC] + class FrameMergererFFmpeg(ConfigReader): """ Merge separate visualizations of classifications, descriptive statistics etc., into single video mosaic. @@ -56,49 +54,113 @@ class FrameMergererFFmpeg(ConfigReader): >>> merger.run() """ - def __init__(self, - concat_type: Literal["horizontal", "vertical", "mosaic", "mixed_mosaic"], - video_paths: List[Union[str, os.PathLike]], - video_height: Optional[int] = None, - video_width: Optional[int] = None, - config_path: Optional[str] = None, - gpu: Optional[bool] = False): + def __init__( + self, + concat_type: Literal["horizontal", "vertical", "mosaic", "mixed_mosaic"], + video_paths: List[Union[str, os.PathLike]], + video_height: Optional[int] = None, + video_width: Optional[int] = None, + config_path: Optional[str] = None, + gpu: Optional[bool] = False, + ): if gpu and not check_nvidea_gpu_available(): - raise FFMPEGCodecGPUError(msg="NVIDEA GPU not available (as evaluated by nvidea-smi returning None", source=self.__class__.__name__) + raise FFMPEGCodecGPUError( + msg="NVIDEA GPU not available (as evaluated by nvidea-smi returning None", + source=self.__class__.__name__, + ) check_ffmpeg_available() - check_str(name=f'{FrameMergererFFmpeg.__name__} concat_type', value=concat_type, options=ACCEPTED_TYPES) - check_valid_lst(data=video_paths, source=f'{self.__class__.__name__} video_paths', valid_dtypes=(str,), min_len=2) - for i in video_paths: check_file_exist_and_readable(file_path=i) + check_str( + name=f"{FrameMergererFFmpeg.__name__} concat_type", + value=concat_type, + options=ACCEPTED_TYPES, + ) + check_valid_lst( + data=video_paths, + source=f"{self.__class__.__name__} video_paths", + valid_dtypes=(str,), + min_len=2, + ) + for i in video_paths: + check_file_exist_and_readable(file_path=i) if concat_type != MIXED_MOSAIC: - check_int(name=f'{FrameMergererFFmpeg.__name__} video_height', value=video_height, min_value=0) - check_int(name=f'{FrameMergererFFmpeg.__name__} video_width', value=video_height, min_value=0) + check_int( + name=f"{FrameMergererFFmpeg.__name__} video_height", + value=video_height, + min_value=0, + ) + check_int( + name=f"{FrameMergererFFmpeg.__name__} video_width", + value=video_height, + min_value=0, + ) if config_path is not None: ConfigReader.__init__(self, config_path=config_path) - log_event(logger_name=str(__class__.__name__), log_type=TagNames.CLASS_INIT.value, msg=self.create_log_msg_from_init_args(locals=locals())) - self.output_dir = os.path.join( self.project_path, Paths.CONCAT_VIDEOS_DIR.value) - self.output_path = os.path.join(self.project_path, Paths.CONCAT_VIDEOS_DIR.value, f"merged_video_{self.datetime}.mp4",) + log_event( + logger_name=str(__class__.__name__), + log_type=TagNames.CLASS_INIT.value, + msg=self.create_log_msg_from_init_args(locals=locals()), + ) + self.output_dir = os.path.join( + self.project_path, Paths.CONCAT_VIDEOS_DIR.value + ) + self.output_path = os.path.join( + self.project_path, + Paths.CONCAT_VIDEOS_DIR.value, + f"merged_video_{self.datetime}.mp4", + ) else: self.timer = SimbaTimer(start=True) self.datetime = datetime.now().strftime("%Y%m%d%H%M%S") self.output_dir, _, _ = get_fn_ext(filepath=video_paths[0]) - self.output_path = os.path.join(self.output_dir, f"merged_video_{self.datetime}.mp4") + self.output_path = os.path.join( + self.output_dir, f"merged_video_{self.datetime}.mp4" + ) self.video_height, self.video_width, self.gpu = video_height, video_width, gpu self.video_paths, self.concat_type = video_paths, concat_type - if not os.path.exists(self.output_dir): os.makedirs(self.output_dir) + if not os.path.exists(self.output_dir): + os.makedirs(self.output_dir) + def run(self): if self.concat_type == HORIZONTAL: - _ = horizontal_video_concatenator(video_paths=self.video_paths, save_path=self.output_path, height_px=self.video_height, gpu=self.gpu, verbose=True) + _ = horizontal_video_concatenator( + video_paths=self.video_paths, + save_path=self.output_path, + height_px=self.video_height, + gpu=self.gpu, + verbose=True, + ) elif self.concat_type == VERTICAL: - _ = vertical_video_concatenator(video_paths=self.video_paths, save_path=self.output_path, width_px=self.video_width, gpu=self.gpu, verbose=True) + _ = vertical_video_concatenator( + video_paths=self.video_paths, + save_path=self.output_path, + width_px=self.video_width, + gpu=self.gpu, + verbose=True, + ) elif self.concat_type == MOSAIC: - _ = mosaic_concatenator(video_paths=self.video_paths, save_path=self.output_path, width_px=self.video_width, height_px=self.video_height, gpu=self.gpu, verbose=True) + _ = mosaic_concatenator( + video_paths=self.video_paths, + save_path=self.output_path, + width_px=self.video_width, + height_px=self.video_height, + gpu=self.gpu, + verbose=True, + ) else: - _ = mixed_mosaic_concatenator(video_paths=self.video_paths, save_path=self.output_path, gpu=self.gpu, verbose=True) + _ = mixed_mosaic_concatenator( + video_paths=self.video_paths, + save_path=self.output_path, + gpu=self.gpu, + verbose=True, + ) self.timer.stop_timer() - stdout_success(msg=f'Merged video saved at {self.output_path}', source=self.__class__.__name__, elapsed_time=self.timer.elapsed_time_str) - + stdout_success( + msg=f"Merged video saved at {self.output_path}", + source=self.__class__.__name__, + elapsed_time=self.timer.elapsed_time_str, + ) # videos = ['/Users/simon/Desktop/envs/simba/troubleshooting/mouse_open_field/project_folder/videos/SI_DAY3_308_CD1_PRESENT_downsampled.mp4', '/Users/simon/Desktop/envs/simba/troubleshooting/mouse_open_field/project_folder/videos/SI_DAY3_308_CD1_PRESENT_downsampled.mp4'] @@ -111,7 +173,6 @@ def run(self): # merger.run() - # # FrameMergererFFmpeg(config_path=None, # frame_types={'Video 1': '/Users/simon/Desktop/envs/troubleshooting/two_black_animals_14bp/project_folder/videos/Together_1.avi', diff --git a/simba/ui/pop_ups/distance_plot_pop_up.py b/simba/ui/pop_ups/distance_plot_pop_up.py index 605e69250..88d8b340d 100644 --- a/simba/ui/pop_ups/distance_plot_pop_up.py +++ b/simba/ui/pop_ups/distance_plot_pop_up.py @@ -1,17 +1,16 @@ __author__ = "Simon Nilsson" import os -from tkinter import * -from collections import defaultdict import threading +from collections import defaultdict +from tkinter import * import numpy as np -from simba.plotting.distance_plotter import DistancePlotterSingleCore -from simba.plotting.distance_plotter_mp import DistancePlotterMultiCore from simba.mixins.config_reader import ConfigReader from simba.mixins.pop_up_mixin import PopUpMixin - +from simba.plotting.distance_plotter import DistancePlotterSingleCore +from simba.plotting.distance_plotter_mp import DistancePlotterMultiCore from simba.ui.tkinter_functions import (CreateLabelFrameWithIcon, DropDownMenu, Entry_Box) from simba.utils.checks import check_if_filepath_list_is_empty, check_int @@ -234,7 +233,9 @@ def __create_distance_plots(self, multiple_videos: bool): if multiple_videos: data_paths = list(self.files_found_dict.values()) else: - data_paths = [self.files_found_dict[self.single_video_dropdown.getChoices()]] + data_paths = [ + self.files_found_dict[self.single_video_dropdown.getChoices()] + ] line_attr = defaultdict(list) for attr in (self.bp_1, self.bp_2, self.distance_clrs): @@ -244,15 +245,24 @@ def __create_distance_plots(self, multiple_videos: bool): for cnt, v in enumerate(line_attr): if v[0] == v[1]: - raise DuplicationError(msg=f"DISTANCE LINE {cnt+1} ERROR: The two body-parts cannot be the same body-part.", source=self.__class__.__name__) + raise DuplicationError( + msg=f"DISTANCE LINE {cnt+1} ERROR: The two body-parts cannot be the same body-part.", + source=self.__class__.__name__, + ) width = int(self.resolution_dropdown.getChoices().split("×")[0]) height = int(self.resolution_dropdown.getChoices().split("×")[1]) - check_int(name="DISTANCE FONT SIZE", value=self.font_size_entry.entry_get, min_value=1) - check_int(name="DISTANCE LINE WIDTH", value=self.line_width.entry_get, min_value=1) + check_int( + name="DISTANCE FONT SIZE", value=self.font_size_entry.entry_get, min_value=1 + ) + check_int( + name="DISTANCE LINE WIDTH", value=self.line_width.entry_get, min_value=1 + ) max_y = self.max_y_dropdown.getChoices() - if max_y == 'auto': max_y = -1 - else: max_y = int(max_y) + if max_y == "auto": + max_y = -1 + else: + max_y = int(max_y) style_attr = { "width": width, @@ -288,7 +298,7 @@ def __create_distance_plots(self, multiple_videos: bool): threading.Thread(distance_plotter.run()).start() -#_ = DistancePlotterPopUp(config_path='/Users/simon/Desktop/envs/simba/troubleshooting/two_black_animals_14bp/project_folder/project_config.ini') +# _ = DistancePlotterPopUp(config_path='/Users/simon/Desktop/envs/simba/troubleshooting/two_black_animals_14bp/project_folder/project_config.ini') # _ = DistancePlotterPopUp(config_path='/Users/simon/Desktop/envs/troubleshooting/two_black_animals_14bp/project_folder/project_config.ini') diff --git a/simba/ui/pop_ups/video_processing_pop_up.py b/simba/ui/pop_ups/video_processing_pop_up.py index c36eec7f0..cc397541d 100644 --- a/simba/ui/pop_ups/video_processing_pop_up.py +++ b/simba/ui/pop_ups/video_processing_pop_up.py @@ -1107,13 +1107,32 @@ def populate_table(self): if hasattr(self, "video_table_frm"): self.video_table_frm.destroy() self.join_type_frm.destroy() - self.video_table_frm = LabelFrame(self.main_frm, text="VIDEO PATHS", pady=5, padx=5, font=Formats.LABELFRAME_HEADER_FORMAT.value, fg="black") + self.video_table_frm = LabelFrame( + self.main_frm, + text="VIDEO PATHS", + pady=5, + padx=5, + font=Formats.LABELFRAME_HEADER_FORMAT.value, + fg="black", + ) self.video_table_frm.grid(row=1, sticky=NW) - self.join_type_frm = LabelFrame(self.main_frm, text="JOIN TYPE", pady=5, padx=5, font=Formats.LABELFRAME_HEADER_FORMAT.value, fg="black") + self.join_type_frm = LabelFrame( + self.main_frm, + text="JOIN TYPE", + pady=5, + padx=5, + font=Formats.LABELFRAME_HEADER_FORMAT.value, + fg="black", + ) self.join_type_frm.grid(row=2, sticky=NW) self.videos_dict = {} for cnt in range(int(self.select_video_cnt_dropdown.getChoices())): - self.videos_dict[cnt] = FileSelect(self.video_table_frm, "Video {}: ".format(str(cnt + 1)), title="Select a video file", file_types=[("VIDEO", Options.ALL_VIDEO_FORMAT_STR_OPTIONS.value)]) + self.videos_dict[cnt] = FileSelect( + self.video_table_frm, + "Video {}: ".format(str(cnt + 1)), + title="Select a video file", + file_types=[("VIDEO", Options.ALL_VIDEO_FORMAT_STR_OPTIONS.value)], + ) self.videos_dict[cnt].grid(row=cnt, column=0, sticky=NW) self.join_type_var = StringVar() @@ -1164,7 +1183,9 @@ def populate_table(self): fg="black", ) self.use_gpu_var = BooleanVar(value=False) - use_gpu_cb = Checkbutton(self.gpu_frm, text="Use GPU (reduced runtime)", variable=self.use_gpu_var) + use_gpu_cb = Checkbutton( + self.gpu_frm, text="Use GPU (reduced runtime)", variable=self.use_gpu_var + ) use_gpu_cb.grid(row=0, column=0, sticky="NW") self.resolution_frm.grid(row=3, column=0, sticky=NW) self.gpu_frm.grid(row=4, column=0, sticky="NW") @@ -1181,7 +1202,10 @@ def run(self): file_paths.append(video_data.file_path) if (len(file_paths) < 3) & (self.join_type_var.get() == "mixed_mosaic"): - raise MixedMosaicError(msg="If using the mixed mosaic join type, please tick check-boxes for at least three video types.", source=self.__class__.__name__,) + raise MixedMosaicError( + msg="If using the mixed mosaic join type, please tick check-boxes for at least three video types.", + source=self.__class__.__name__, + ) if (len(file_paths) < 3) & (self.join_type_var.get() == "mosaic"): self.join_type_var.set(value="vertical") @@ -1194,11 +1218,11 @@ def run(self): gpu=self.use_gpu_var.get(), ) - threading.Thread( - target=video_merger.run()) + threading.Thread(target=video_merger.run()) + +# ConcatenatorPopUp(config_path='/Users/simon/Desktop/envs/simba/troubleshooting/two_black_animals_14bp/project_folder/project_config.ini') -#ConcatenatorPopUp(config_path='/Users/simon/Desktop/envs/simba/troubleshooting/two_black_animals_14bp/project_folder/project_config.ini') class VideoRotatorPopUp(PopUpMixin): def __init__(self): diff --git a/simba/utils/checks.py b/simba/utils/checks.py index 0fec869bc..1ae7efc2e 100644 --- a/simba/utils/checks.py +++ b/simba/utils/checks.py @@ -1238,12 +1238,17 @@ def check_valid_dataframe( source=source, ) if required_fields is not None: - check_valid_lst(data=required_fields, source=check_valid_dataframe.__name__, valid_dtypes=(str,)) + check_valid_lst( + data=required_fields, + source=check_valid_dataframe.__name__, + valid_dtypes=(str,), + ) missing = list(set(required_fields) - set(df.columns)) if len(missing) > 0: - raise InvalidInputError(msg=f"The dataframe {source} are missing required columns {missing}.", source=source) - - + raise InvalidInputError( + msg=f"The dataframe {source} are missing required columns {missing}.", + source=source, + ) def check_valid_tuple( diff --git a/simba/utils/read_write.py b/simba/utils/read_write.py index 22935fe08..e5de87340 100644 --- a/simba/utils/read_write.py +++ b/simba/utils/read_write.py @@ -8,6 +8,7 @@ import platform import re import shutil +import subprocess import threading import webbrowser from configparser import ConfigParser @@ -15,7 +16,6 @@ from pathlib import Path from typing import Any, Dict, Iterable, List, Optional, Tuple, Union from urllib.parse import urlparse -import subprocess import cv2 import numpy as np @@ -1896,10 +1896,12 @@ def get_unique_values_in_iterable( return cnt -def copy_files_to_directory(file_paths: List[Union[str, os.PathLike]], - dir: Union[str, os.PathLike], - verbose: Optional[bool] = True, - integer_save_names: Optional[bool] = False) -> List[Union[str, os.PathLike]]: +def copy_files_to_directory( + file_paths: List[Union[str, os.PathLike]], + dir: Union[str, os.PathLike], + verbose: Optional[bool] = True, + integer_save_names: Optional[bool] = False, +) -> List[Union[str, os.PathLike]]: """ Copy a list of files to a specified directory. @@ -1916,12 +1918,14 @@ def copy_files_to_directory(file_paths: List[Union[str, os.PathLike]], destinations = [] for cnt, file_path in enumerate(file_paths): if verbose: - print(f'Copying file {os.path.basename(file_path)} ({cnt+1}/{len(file_paths)})...') + print( + f"Copying file {os.path.basename(file_path)} ({cnt+1}/{len(file_paths)})..." + ) if not integer_save_names: destination = os.path.join(dir, os.path.basename(file_path)) else: _, file_name, ext = get_fn_ext(filepath=file_path) - destination = os.path.join(dir, f'{cnt}{ext}') + destination = os.path.join(dir, f"{cnt}{ext}") try: if os.path.isfile(destination): os.remove(destination) @@ -1929,7 +1933,8 @@ def copy_files_to_directory(file_paths: List[Union[str, os.PathLike]], print(e.args) raise PermissionError( msg=f"Not allowed to overwrite file {destination}. Try running SimBA in terminal opened in admin mode or delete existing file before copying.", - source=copy_files_to_directory.__name__, ) + source=copy_files_to_directory.__name__, + ) destinations.append(destination) shutil.copy(file_path, destination) return destinations diff --git a/simba/video_processors/batch_process_create_ffmpeg_commands.py b/simba/video_processors/batch_process_create_ffmpeg_commands.py index 8a484da10..c2e5abf28 100644 --- a/simba/video_processors/batch_process_create_ffmpeg_commands.py +++ b/simba/video_processors/batch_process_create_ffmpeg_commands.py @@ -5,6 +5,7 @@ import pathlib import shutil import subprocess + import cv2 import simba diff --git a/simba/video_processors/video_processing.py b/simba/video_processors/video_processing.py index ae3a88065..0d92904b6 100644 --- a/simba/video_processors/video_processing.py +++ b/simba/video_processors/video_processing.py @@ -11,7 +11,7 @@ from datetime import datetime from pathlib import Path from tkinter import * -from typing import List, Optional, Union, Tuple +from typing import List, Optional, Tuple, Union import cv2 import numpy as np @@ -26,21 +26,21 @@ import simba from simba.mixins.config_reader import ConfigReader from simba.mixins.image_mixin import ImageMixin -from simba.utils.checks import (check_file_exist_and_readable, check_float, +from simba.utils.checks import (check_ffmpeg_available, + check_file_exist_and_readable, check_float, check_if_dir_exists, check_if_filepath_list_is_empty, check_if_string_value_is_valid_video_timestamp, check_int, check_nvidea_gpu_available, check_that_hhmmss_start_is_before_end, - check_valid_tuple, - check_ffmpeg_available, - check_valid_lst) + check_valid_lst, check_valid_tuple) from simba.utils.enums import OS, ConfigKey, Formats, Options, Paths from simba.utils.errors import (CountError, DirectoryExistError, FFMPEGCodecGPUError, FFMPEGNotFoundError, - FileExistError, InvalidFileTypeError, - InvalidInputError, NoDataError, - NoFilesFoundError, NotDirectoryError, InvalidVideoFileError, FrameRangeError) + FileExistError, FrameRangeError, + InvalidFileTypeError, InvalidInputError, + InvalidVideoFileError, NoDataError, + NoFilesFoundError, NotDirectoryError) from simba.utils.printing import SimbaTimer, stdout_success from simba.utils.read_write import ( check_if_hhmmss_timestamp_is_valid_part_of_video, @@ -56,10 +56,12 @@ MAX_FRM_SIZE = 1080, 650 -def change_img_format(directory: Union[str, os.PathLike], - file_type_in: str, - file_type_out: str, - verbose: Optional[bool] = False) -> None: +def change_img_format( + directory: Union[str, os.PathLike], + file_type_in: str, + file_type_out: str, + verbose: Optional[bool] = False, +) -> None: """ Convert the file type of all image files within a directory. @@ -75,20 +77,29 @@ def change_img_format(directory: Union[str, os.PathLike], check_if_dir_exists(in_dir=directory, source=change_img_format.__name__) files_found = glob.glob(directory + f"/*.{file_type_in}") if len(files_found) == 0: - raise NoFilesFoundError(f'SIMBA ERROR: No {file_type_in} files (with .{file_type_in} file extension) found in the {directory} directory', source=change_img_format.__name__) + raise NoFilesFoundError( + f"SIMBA ERROR: No {file_type_in} files (with .{file_type_in} file extension) found in the {directory} directory", + source=change_img_format.__name__, + ) print(f"{len(files_found)} image files found in {directory}...") for file_cnt, file_path in enumerate(files_found): if verbose: - print(f'Converting file {file_cnt+1}/{len(files_found)} ...') + print(f"Converting file {file_cnt+1}/{len(files_found)} ...") im = Image.open(file_path) save_name = file_path.replace("." + str(file_type_in), "." + str(file_type_out)) im.save(save_name) os.remove(file_path) - stdout_success(msg=f"SIMBA COMPLETE: Files in {directory} directory converted to {file_type_out}", source=change_img_format.__name__,) + stdout_success( + msg=f"SIMBA COMPLETE: Files in {directory} directory converted to {file_type_out}", + source=change_img_format.__name__, + ) -def clahe_enhance_video(file_path: Union[str, os.PathLike], - clip_limit: Optional[int] = 2, - tile_grid_size: Optional[Tuple[int]] = (16, 16)) -> None: + +def clahe_enhance_video( + file_path: Union[str, os.PathLike], + clip_limit: Optional[int] = 2, + tile_grid_size: Optional[Tuple[int]] = (16, 16), +) -> None: """ Convert a single video file to clahe-enhanced greyscale .avi file. The result is saved with prefix ``CLAHE_`` in the same directory as in the input file. @@ -102,18 +113,38 @@ def clahe_enhance_video(file_path: Union[str, os.PathLike], """ check_file_exist_and_readable(file_path=file_path) - check_int(name=f'{clahe_enhance_video.__name__} clip_limit', value=clip_limit, min_value=0) + check_int( + name=f"{clahe_enhance_video.__name__} clip_limit", value=clip_limit, min_value=0 + ) video_meta_data = get_video_meta_data(file_path) - check_valid_tuple(x=tile_grid_size, source=clahe_enhance_video.__name__, accepted_lengths=(2,), valid_dtypes=(int,)) - if (tile_grid_size[0] > video_meta_data["height"]) or ((tile_grid_size[1] > video_meta_data["width"])): - raise InvalidInputError(msg=f'The tile grid size ({tile_grid_size}) is larger than the video size ({video_meta_data["resolution_str"]})', source=clahe_enhance_video.__name__) + check_valid_tuple( + x=tile_grid_size, + source=clahe_enhance_video.__name__, + accepted_lengths=(2,), + valid_dtypes=(int,), + ) + if (tile_grid_size[0] > video_meta_data["height"]) or ( + (tile_grid_size[1] > video_meta_data["width"]) + ): + raise InvalidInputError( + msg=f'The tile grid size ({tile_grid_size}) is larger than the video size ({video_meta_data["resolution_str"]})', + source=clahe_enhance_video.__name__, + ) dir, file_name, file_ext = get_fn_ext(filepath=file_path) save_path = os.path.join(dir, f"CLAHE_{file_name}.avi") fourcc = cv2.VideoWriter_fourcc(*Formats.AVI_CODEC.value) print(f"Applying CLAHE on video {file_name}, this might take awhile...") cap = cv2.VideoCapture(file_path) - writer = cv2.VideoWriter(save_path, fourcc, video_meta_data["fps"], (video_meta_data["width"], video_meta_data["height"]), 0) - clahe_filter = cv2.createCLAHE(clipLimit=int(clip_limit), tileGridSize=tile_grid_size) + writer = cv2.VideoWriter( + save_path, + fourcc, + video_meta_data["fps"], + (video_meta_data["width"], video_meta_data["height"]), + 0, + ) + clahe_filter = cv2.createCLAHE( + clipLimit=int(clip_limit), tileGridSize=tile_grid_size + ) frm_cnt = 0 try: while True: @@ -123,7 +154,9 @@ def clahe_enhance_video(file_path: Union[str, os.PathLike], img = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY) clahe_frm = clahe_filter.apply(img) writer.write(clahe_frm) - print(f"CLAHE converted frame {frm_cnt}/{video_meta_data['frame_count']}") + print( + f"CLAHE converted frame {frm_cnt}/{video_meta_data['frame_count']}" + ) else: break cap.release() @@ -134,9 +167,15 @@ def clahe_enhance_video(file_path: Union[str, os.PathLike], print(f"CLAHE conversion failed for video {file_name}.") cap.release() writer.release() - raise InvalidVideoFileError(msg=f'Could not convert file {file_path} to CLAHE enhanced video', source=clahe_enhance_video.__name__) + raise InvalidVideoFileError( + msg=f"Could not convert file {file_path} to CLAHE enhanced video", + source=clahe_enhance_video.__name__, + ) -def extract_frame_range(file_path: Union[str, os.PathLike], start_frame: int, end_frame: int) -> None: + +def extract_frame_range( + file_path: Union[str, os.PathLike], start_frame: int, end_frame: int +) -> None: """ Extract a user-defined range of frames from a video file to `png` format. Images are saved in a folder with the suffix `_frames` within the same directory as the video file. @@ -153,7 +192,9 @@ def extract_frame_range(file_path: Union[str, os.PathLike], start_frame: int, en video_meta_data = get_video_meta_data(file_path) check_int(name="start frame", value=start_frame, min_value=0) file_dir, file_name, file_ext = get_fn_ext(filepath=file_path) - check_int(name="end frame", value=end_frame, max_value=video_meta_data["frame_count"]) + check_int( + name="end frame", value=end_frame, max_value=video_meta_data["frame_count"] + ) frame_range = list(range(int(start_frame), int(end_frame) + 1)) save_dir = os.path.join(file_dir, file_name + "_frames") cap = cv2.VideoCapture(file_path) @@ -165,9 +206,15 @@ def extract_frame_range(file_path: Union[str, os.PathLike], start_frame: int, en frm_save_path = os.path.join(save_dir, f"{frm_number}.png") cv2.imwrite(frm_save_path, frame) print(f"Frame {frm_number} saved (Frame {frm_cnt}/{len(frame_range)-1})") - stdout_success(msg=f"{len(frame_range)-1} frames extracted for video {file_name} saved in {save_dir}", source=extract_frame_range.__name__) + stdout_success( + msg=f"{len(frame_range)-1} frames extracted for video {file_name} saved in {save_dir}", + source=extract_frame_range.__name__, + ) -def change_single_video_fps(file_path: Union[str, os.PathLike], fps: int, gpu: Optional[bool] = False) -> None: + +def change_single_video_fps( + file_path: Union[str, os.PathLike], fps: int, gpu: Optional[bool] = False +) -> None: """ Change the fps of a single video file. Results are stored in the same directory as in the input file with the suffix ``_fps_new_fps``. @@ -186,7 +233,10 @@ def change_single_video_fps(file_path: Union[str, os.PathLike], fps: int, gpu: O timer = SimbaTimer(start=True) check_ffmpeg_available(raise_error=True) if gpu and not check_nvidea_gpu_available(): - raise FFMPEGCodecGPUError(msg="No GPU found (as evaluated by nvidea-smi returning None)",source=change_single_video_fps.__name__) + raise FFMPEGCodecGPUError( + msg="No GPU found (as evaluated by nvidea-smi returning None)", + source=change_single_video_fps.__name__, + ) check_file_exist_and_readable(file_path=file_path) check_int(name="New fps", value=fps) video_meta_data = get_video_meta_data(video_path=file_path) @@ -217,7 +267,10 @@ def change_single_video_fps(file_path: Union[str, os.PathLike], fps: int, gpu: O source=change_single_video_fps.__name__, ) -def change_fps_of_multiple_videos(directory: Union[str, os.PathLike], fps: int, gpu: Optional[bool] = False) -> None: + +def change_fps_of_multiple_videos( + directory: Union[str, os.PathLike], fps: int, gpu: Optional[bool] = False +) -> None: """ Change the fps of all video files in a folder. Results are stored in the same directory as in the input files with the suffix ``_fps_new_fps``. @@ -238,7 +291,10 @@ def change_fps_of_multiple_videos(directory: Union[str, os.PathLike], fps: int, source=change_fps_of_multiple_videos.__name__, ) if not os.path.isdir(directory): - raise NotDirectoryError(msg=f"SIMBA ERROR: {directory} is not a valid directory", source=change_fps_of_multiple_videos.__name__) + raise NotDirectoryError( + msg=f"SIMBA ERROR: {directory} is not a valid directory", + source=change_fps_of_multiple_videos.__name__, + ) check_int(name="New fps", value=fps) video_paths = [] file_paths_in_folder = [f for f in glob.glob(directory + "/*") if os.path.isfile(f)] @@ -257,14 +313,18 @@ def change_fps_of_multiple_videos(directory: Union[str, os.PathLike], fps: int, video_timer = SimbaTimer(start=True) dir_name, file_name, ext = get_fn_ext(filepath=file_path) print(f"Converting FPS for {file_name}...") - save_path = os.path.join(dir_name, file_name + "_fps_{}{}".format(str(fps), str(ext))) + save_path = os.path.join( + dir_name, file_name + "_fps_{}{}".format(str(fps), str(ext)) + ) if gpu: command = f'ffmpeg -hwaccel auto -c:v h264_cuvid -i "{file_path}" -vf "fps={fps}" -c:v h264_nvenc -c:a copy "{save_path}" -y' else: command = f'ffmpeg -i {file_path} -filter:v fps=fps={fps} -c:v libx264 "{save_path}" -y' subprocess.call(command, shell=True) video_timer.stop_timer() - print(f"Video {file_name} complete... (elapsed time: {video_timer.elapsed_time_str}s)") + print( + f"Video {file_name} complete... (elapsed time: {video_timer.elapsed_time_str}s)" + ) timer.stop_timer() stdout_success( msg=f"SIMBA COMPLETE: FPS of {len(video_paths)} video(s) changed to {fps}", @@ -272,7 +332,10 @@ def change_fps_of_multiple_videos(directory: Union[str, os.PathLike], fps: int, source=change_fps_of_multiple_videos.__name__, ) -def convert_video_powerpoint_compatible_format(file_path: Union[str, os.PathLike], gpu: Optional[bool] = False) -> None: + +def convert_video_powerpoint_compatible_format( + file_path: Union[str, os.PathLike], gpu: Optional[bool] = False +) -> None: """ Create MS PowerPoint compatible copy of a video file. The result is stored in the same directory as the input file with the ``_powerpointready`` suffix. @@ -315,10 +378,12 @@ def convert_video_powerpoint_compatible_format(file_path: Union[str, os.PathLike ) -#convert_video_powerpoint_compatible_format(file_path=r"/Users/simon/Desktop/envs/simba/troubleshooting/mouse_open_field/project_folder/videos/test/SI_DAY3_308_CD1_PRESENT_fps_10_fps_15.mp4", gpu=False) +# convert_video_powerpoint_compatible_format(file_path=r"/Users/simon/Desktop/envs/simba/troubleshooting/mouse_open_field/project_folder/videos/test/SI_DAY3_308_CD1_PRESENT_fps_10_fps_15.mp4", gpu=False) -def convert_to_mp4(file_path: Union[str, os.PathLike], gpu: Optional[bool] = False) -> None: +def convert_to_mp4( + file_path: Union[str, os.PathLike], gpu: Optional[bool] = False +) -> None: """ Convert a video file to mp4 format. The result is stored in the same directory as the input file with the ``_converted.mp4`` suffix. @@ -362,7 +427,10 @@ def convert_to_mp4(file_path: Union[str, os.PathLike], gpu: Optional[bool] = Fal source=convert_to_mp4.__name__, ) -def video_to_greyscale(file_path: Union[str, os.PathLike], gpu: Optional[bool] = False) -> None: + +def video_to_greyscale( + file_path: Union[str, os.PathLike], gpu: Optional[bool] = False +) -> None: """ Convert a video file to greyscale mp4 format. The result is stored in the same directory as the input file with the ``_grayscale.mp4`` suffix. @@ -404,7 +472,9 @@ def video_to_greyscale(file_path: Union[str, os.PathLike], gpu: Optional[bool] = ) -def batch_video_to_greyscale(directory: Union[str, os.PathLike], gpu: Optional[bool] = False) -> None: +def batch_video_to_greyscale( + directory: Union[str, os.PathLike], gpu: Optional[bool] = False +) -> None: """ Convert a directory of video file to greyscale mp4 format. The results are stored in the same directory as the input files with the ``_grayscale.mp4`` suffix. @@ -418,10 +488,15 @@ def batch_video_to_greyscale(directory: Union[str, os.PathLike], gpu: Optional[b """ check_ffmpeg_available(raise_error=True) if gpu and not check_nvidea_gpu_available(): - raise FFMPEGCodecGPUError(msg="No GPU found (as evaluated by nvidea-smi returning None)", source=batch_video_to_greyscale.__name__) + raise FFMPEGCodecGPUError( + msg="No GPU found (as evaluated by nvidea-smi returning None)", + source=batch_video_to_greyscale.__name__, + ) timer = SimbaTimer(start=True) check_if_dir_exists(in_dir=directory, source=batch_video_to_greyscale.__name__) - video_paths = find_all_videos_in_directory(directory=directory, as_dict=True, raise_error=True) + video_paths = find_all_videos_in_directory( + directory=directory, as_dict=True, raise_error=True + ) for file_cnt, (file_name, file_path) in enumerate(video_paths.items()): video_timer = SimbaTimer(start=True) in_dir, _, _ = get_fn_ext(filepath=file_path) @@ -430,17 +505,25 @@ def batch_video_to_greyscale(directory: Union[str, os.PathLike], gpu: Optional[b command = f'ffmpeg -hwaccel auto -c:v h264_cuvid -i "{file_path}" -vf "hwupload_cuda,hwdownload,format=nv12,format=gray" -c:v h264_nvenc -c:a copy "{save_name}" -y' else: command = f'ffmpeg -i "{file_path}" -vf format=gray -c:v libx264 "{save_name}" -hide_banner -loglevel error -y' - print(f"Converting {file_name} to greyscale (Video {file_cnt+1}/{len(list(video_paths.keys()))})... ") + print( + f"Converting {file_name} to greyscale (Video {file_cnt+1}/{len(list(video_paths.keys()))})... " + ) subprocess.call(command, shell=True, stdout=subprocess.PIPE) video_timer.stop_timer() - print(f'Video {save_name} complete, (elapsed time: {video_timer.elapsed_time_str}s)') + print( + f"Video {save_name} complete, (elapsed time: {video_timer.elapsed_time_str}s)" + ) timer.stop_timer() - stdout_success(msg=f"{len(list(video_paths.keys()))} video(s) converted to gresyscale! Saved in {directory} with '_greyscale' suffix", elapsed_time=timer.elapsed_time_str, source=batch_video_to_greyscale.__name__,) - - + stdout_success( + msg=f"{len(list(video_paths.keys()))} video(s) converted to gresyscale! Saved in {directory} with '_greyscale' suffix", + elapsed_time=timer.elapsed_time_str, + source=batch_video_to_greyscale.__name__, + ) -def superimpose_frame_count(file_path: Union[str, os.PathLike], gpu: Optional[bool] = False) -> None: +def superimpose_frame_count( + file_path: Union[str, os.PathLike], gpu: Optional[bool] = False +) -> None: """ Superimpose frame count on a video file. The result is stored in the same directory as the input file with the ``_frame_no.mp4`` suffix. @@ -493,10 +576,13 @@ def superimpose_frame_count(file_path: Union[str, os.PathLike], gpu: Optional[bo elapsed_time=timer.elapsed_time_str, ) -#_ = superimpose_frame_count(file_path=r'/Users/simon/Desktop/envs/simba/troubleshooting/mouse_open_field/project_folder/videos/SI_DAY3_308_CD1_PRESENT.mp4', gpu=False) + +# _ = superimpose_frame_count(file_path=r'/Users/simon/Desktop/envs/simba/troubleshooting/mouse_open_field/project_folder/videos/SI_DAY3_308_CD1_PRESENT.mp4', gpu=False) -def remove_beginning_of_video(file_path: Union[str, os.PathLike], time: int, gpu: Optional[bool] = False) -> None: +def remove_beginning_of_video( + file_path: Union[str, os.PathLike], time: int, gpu: Optional[bool] = False +) -> None: """ Remove N seconds from the beginning of a video file. The result is stored in the same directory as the input file with the ``_shorten.mp4`` suffix. @@ -541,7 +627,7 @@ def remove_beginning_of_video(file_path: Union[str, os.PathLike], time: int, gpu ) -#remove_beginning_of_video(file_path=r'/Users/simon/Desktop/envs/simba/troubleshooting/mouse_open_field/project_folder/videos/SI_DAY3_308_CD1_PRESENT_frame_no.mp4', time=10, gpu=False) +# remove_beginning_of_video(file_path=r'/Users/simon/Desktop/envs/simba/troubleshooting/mouse_open_field/project_folder/videos/SI_DAY3_308_CD1_PRESENT_frame_no.mp4', time=10, gpu=False) def clip_video_in_range( @@ -698,8 +784,11 @@ def gif_creator( check_int(name="Duration", value=duration, min_value=1) check_int(name="Width", value=width) video_meta_data = get_video_meta_data(file_path) - if (start_time + duration) > video_meta_data['video_length_s']: - raise FrameRangeError(msg=f'The end of the gif (start time: {start_time} + duration: {duration}) is longer than the {file_path} video: {video_meta_data["video_length_s"]}s', source=gif_creator.__name__) + if (start_time + duration) > video_meta_data["video_length_s"]: + raise FrameRangeError( + msg=f'The end of the gif (start time: {start_time} + duration: {duration}) is longer than the {file_path} video: {video_meta_data["video_length_s"]}s', + source=gif_creator.__name__, + ) dir, file_name, ext = get_fn_ext(filepath=file_path) save_name = os.path.join(dir, file_name + ".gif") @@ -721,7 +810,8 @@ def gif_creator( source=gif_creator.__name__, ) -#gif_creator(file_path=r'/Users/simon/Desktop/envs/simba/troubleshooting/mouse_open_field/project_folder/videos/SI_DAY3_308_CD1_PRESENT.mp4', start_time=5, duration=15, width=600, gpu=False) + +# gif_creator(file_path=r'/Users/simon/Desktop/envs/simba/troubleshooting/mouse_open_field/project_folder/videos/SI_DAY3_308_CD1_PRESENT.mp4', start_time=5, duration=15, width=600, gpu=False) def batch_convert_video_format( @@ -776,7 +866,9 @@ def batch_convert_video_format( video_timer = SimbaTimer(start=True) dir_name, file_name, ext = get_fn_ext(filepath=file_path) print(f"Processing video {file_name}...") - save_path = os.path.join(dir_name, file_name + ".{}".format(output_format.lower())) + save_path = os.path.join( + dir_name, file_name + ".{}".format(output_format.lower()) + ) if os.path.isfile(save_path): raise FileExistError( msg="SIMBA ERROR: The outfile file already exist: {}.".format( @@ -792,13 +884,16 @@ def batch_convert_video_format( command = f'ffmpeg -y -i "{file_path}" -c:v libx264 -crf 21 -preset medium -c:a libmp3lame -b:a 320k "{save_path}"' subprocess.call(command, shell=True, stdout=subprocess.PIPE) video_timer.stop_timer() - print(f"Video {file_name} complete, (elapsed time: {video_timer.elapsed_time_str}s) (Video {file_cnt + 1}/{len(video_paths)})...") + print( + f"Video {file_name} complete, (elapsed time: {video_timer.elapsed_time_str}s) (Video {file_cnt + 1}/{len(video_paths)})..." + ) stdout_success( msg=f"SIMBA COMPLETE: {str(len(video_paths))} videos converted in {directory} directory!", source=batch_convert_video_format.__name__, ) -#_ = batch_convert_video_format(directory='/Users/simon/Desktop/envs/simba/troubleshooting/mouse_open_field/project_folder/videos/test_2',input_format='mp4', output_format='avi') + +# _ = batch_convert_video_format(directory='/Users/simon/Desktop/envs/simba/troubleshooting/mouse_open_field/project_folder/videos/test_2',input_format='mp4', output_format='avi') def batch_create_frames(directory: Union[str, os.PathLike]) -> None: @@ -875,8 +970,8 @@ def extract_frames_single_video(file_path: Union[str, os.PathLike]) -> None: source=extract_frames_single_video.__name__, ) -#_ = extract_frames_single_video(file_path='/Users/simon/Desktop/envs/simba/troubleshooting/mouse_open_field/project_folder/videos/SI_DAY3_308_CD1_PRESENT_downsampled.mp4') +# _ = extract_frames_single_video(file_path='/Users/simon/Desktop/envs/simba/troubleshooting/mouse_open_field/project_folder/videos/SI_DAY3_308_CD1_PRESENT_downsampled.mp4') def multi_split_video( @@ -966,7 +1061,9 @@ def multi_split_video( # multi_split_video(file_path=r'/Users/simon/Desktop/time_s_converted.mp4', start_times=['00:00:01', '00:00:02'], end_times=['00:00:04', '00:00:05'], gpu=False) -def crop_single_video(file_path: Union[str, os.PathLike], gpu: Optional[bool] = False) -> None: +def crop_single_video( + file_path: Union[str, os.PathLike], gpu: Optional[bool] = False +) -> None: """ Crop a single video using ``simba.video_processors.roi_selector.ROISelector`` interface. Results is saved in the same directory as input video with the ``_cropped.mp4`` suffix`. @@ -1181,12 +1278,14 @@ def frames_to_movie( command = f'ffmpeg -y -r {fps} -f image2 -s {img_h}x{img_w} -i "{ffmpeg_fn}" -c:v h264_nvenc -b:v {bitrate}k "{save_path}" -y' else: command = f'ffmpeg -y -r {fps} -f image2 -s {img_h}x{img_w} -i "{ffmpeg_fn}" -vcodec libx264 -b {bitrate}k "{save_path}" -y' - print(f"Creating {os.path.basename(save_path)} from {len(img_paths_in_folder)} images...") + print( + f"Creating {os.path.basename(save_path)} from {len(img_paths_in_folder)} images..." + ) subprocess.call(command, shell=True) stdout_success(msg=f"Video created at {save_path}", source=frames_to_movie.__name__) -#_ = frames_to_movie(directory='/Users/simon/Desktop/envs/simba/troubleshooting/mouse_open_field/project_folder/videos/SI_DAY3_308_CD1_PRESENT_downsampled', fps=15, bitrate=32000, img_format='png') +# _ = frames_to_movie(directory='/Users/simon/Desktop/envs/simba/troubleshooting/mouse_open_field/project_folder/videos/SI_DAY3_308_CD1_PRESENT_downsampled', fps=15, bitrate=32000, img_format='png') def video_concatenator( @@ -1465,7 +1564,9 @@ def run(self): self.main_frm.mainloop() -def extract_frames_from_all_videos_in_directory(config_path: Union[str, os.PathLike], directory: Union[str, os.PathLike]) -> None: +def extract_frames_from_all_videos_in_directory( + config_path: Union[str, os.PathLike], directory: Union[str, os.PathLike] +) -> None: """ Extract all frames from all videos in a directory. The results are saved in the project_folder/frames/input directory of the SimBA project @@ -1484,7 +1585,10 @@ def extract_frames_from_all_videos_in_directory(config_path: Union[str, os.PathL if ext.lower() in video_types: video_paths.append(file_path) if len(video_paths) == 0: - raise NoFilesFoundError(msg=f"SIMBA ERROR: 0 video files in mp4 or avi format found in {directory}", source=extract_frames_from_all_videos_in_directory.__name__) + raise NoFilesFoundError( + msg=f"SIMBA ERROR: 0 video files in mp4 or avi format found in {directory}", + source=extract_frames_from_all_videos_in_directory.__name__, + ) config = read_config_file(config_path) project_path = read_config_entry( config, "General settings", "project_path", data_type="folder_path" @@ -1832,13 +1936,14 @@ def crop_multiple_videos_polygons( ) - -def resize_videos_by_height(video_paths: List[Union[str, os.PathLike]], - height: Union[int, str], - overwrite: Optional[bool] = False, - save_dir: Optional[Union[str, os.PathLike]] = None, - gpu: Optional[bool] = False, - verbose: Optional[bool] = True) -> Union[None, List[Union[None, str, os.PathLike]]]: +def resize_videos_by_height( + video_paths: List[Union[str, os.PathLike]], + height: Union[int, str], + overwrite: Optional[bool] = False, + save_dir: Optional[Union[str, os.PathLike]] = None, + gpu: Optional[bool] = False, + verbose: Optional[bool] = True, +) -> Union[None, List[Union[None, str, os.PathLike]]]: """ Re-size a list of videos to a specified height while retaining their aspect ratios. @@ -1856,29 +1961,45 @@ def resize_videos_by_height(video_paths: List[Union[str, os.PathLike]], """ timer = SimbaTimer(start=True) if (not overwrite) and (save_dir is None): - raise InvalidInputError(msg='Pass a save_dir OR set overwrite to True', source=resize_videos_by_height.__name__) + raise InvalidInputError( + msg="Pass a save_dir OR set overwrite to True", + source=resize_videos_by_height.__name__, + ) elif (overwrite) and (save_dir is not None): - raise InvalidInputError(msg='Pass EITHER overwrite as True OR pass a save_dir', source=resize_videos_by_height.__name__) + raise InvalidInputError( + msg="Pass EITHER overwrite as True OR pass a save_dir", + source=resize_videos_by_height.__name__, + ) if save_dir is not None: if not os.path.isdir(save_dir): os.makedirs(save_dir) - check_valid_lst(data=video_paths, source=resize_videos_by_height.__name__, min_len=1) + check_valid_lst( + data=video_paths, source=resize_videos_by_height.__name__, min_len=1 + ) _ = [check_file_exist_and_readable(x) for x in video_paths] new_video_paths = [] if isinstance(height, str): - check_int(name=f'{resize_videos_by_height.__name__} height', value=height, min_value=0, max_value=len(video_paths)) + check_int( + name=f"{resize_videos_by_height.__name__} height", + value=height, + min_value=0, + max_value=len(video_paths), + ) video_heights = [] - for i in video_paths: video_heights.append(get_video_meta_data(video_path=i)['height']) + for i in video_paths: + video_heights.append(get_video_meta_data(video_path=i)["height"]) height = video_heights[int(height)] for cnt, video_path in enumerate(video_paths): dir_name, video_name, ext = get_fn_ext(video_path) if verbose: - print(f'Resizing height video {video_name} (Video {cnt+1}/{len(video_paths)})...') + print( + f"Resizing height video {video_name} (Video {cnt+1}/{len(video_paths)})..." + ) if overwrite: dt = datetime.now().strftime("%Y%m%d%H%M%S") - save_path = os.path.join(dir_name, f'{video_name}_{dt}.mp4') + save_path = os.path.join(dir_name, f"{video_name}_{dt}.mp4") else: - save_path = os.path.join(save_dir, f'{video_name}.mp4') + save_path = os.path.join(save_dir, f"{video_name}.mp4") new_video_paths.append(save_path) if gpu: cmd = f'ffmpeg -y -hwaccel auto -c:v h264_cuvid -i "{video_path}" -vf scale_npp=-2:{height} -c:v h264_nvenc "{save_path}" -hide_banner -loglevel error -y' @@ -1890,17 +2011,20 @@ def resize_videos_by_height(video_paths: List[Union[str, os.PathLike]], os.remove(save_path) timer.stop_timer() if verbose: - print(f'Resized height {len(video_paths)} video(s). Elapsed time: {timer.elapsed_time_str}s.') + print( + f"Resized height {len(video_paths)} video(s). Elapsed time: {timer.elapsed_time_str}s." + ) return new_video_paths -def resize_videos_by_width(video_paths: List[Union[str, os.PathLike]], - width: Union[int, str], - overwrite: Optional[bool] = False, - save_dir: Optional[Union[str, os.PathLike]] = None, - gpu: Optional[bool] = False, - verbose: Optional[bool] = True) -> Union[None, List[Union[None, str, os.PathLike]]]: - +def resize_videos_by_width( + video_paths: List[Union[str, os.PathLike]], + width: Union[int, str], + overwrite: Optional[bool] = False, + save_dir: Optional[Union[str, os.PathLike]] = None, + gpu: Optional[bool] = False, + verbose: Optional[bool] = True, +) -> Union[None, List[Union[None, str, os.PathLike]]]: """ Re-size a list of videos to a specified width while retaining their aspect ratios. @@ -1919,9 +2043,15 @@ def resize_videos_by_width(video_paths: List[Union[str, os.PathLike]], timer = SimbaTimer(start=True) if (not overwrite) and (save_dir is None): - raise InvalidInputError(msg='Provide a save_dir or set overwrite to True', source=resize_videos_by_width.__name__) + raise InvalidInputError( + msg="Provide a save_dir or set overwrite to True", + source=resize_videos_by_width.__name__, + ) elif (overwrite) and (save_dir is not None): - raise InvalidInputError(msg='Set EITHER overwrite to True OR Provide a save_dir', source=resize_videos_by_width.__name__) + raise InvalidInputError( + msg="Set EITHER overwrite to True OR Provide a save_dir", + source=resize_videos_by_width.__name__, + ) if save_dir is not None: if not os.path.isdir(save_dir): os.makedirs(save_dir) @@ -1929,19 +2059,27 @@ def resize_videos_by_width(video_paths: List[Union[str, os.PathLike]], _ = [check_file_exist_and_readable(x) for x in video_paths] new_video_paths = [] if isinstance(width, str): - check_int(name=f'{resize_videos_by_width.__name__} height', value=width, min_value=0, max_value=len(video_paths)) + check_int( + name=f"{resize_videos_by_width.__name__} height", + value=width, + min_value=0, + max_value=len(video_paths), + ) video_widths = [] - for i in video_paths: video_widths.append(get_video_meta_data(video_path=i)['width']) + for i in video_paths: + video_widths.append(get_video_meta_data(video_path=i)["width"]) width = video_widths[int(width)] for cnt, video_path in enumerate(video_paths): dir_name, video_name, ext = get_fn_ext(video_path) if verbose: - print(f'Resizing width video {video_name} (Video {cnt+1}/{len(video_paths)})...') + print( + f"Resizing width video {video_name} (Video {cnt+1}/{len(video_paths)})..." + ) if overwrite: dt = datetime.now().strftime("%Y%m%d%H%M%S") - save_path = os.path.join(dir_name, f'{video_name}_{dt}.mp4') + save_path = os.path.join(dir_name, f"{video_name}_{dt}.mp4") else: - save_path = os.path.join(save_dir, f'{video_name}.mp4') + save_path = os.path.join(save_dir, f"{video_name}.mp4") new_video_paths.append(save_path) if gpu: cmd = f'ffmpeg -y -hwaccel auto -i "{video_path}" -vf scale_npp={width}:-2 -c:v h264_nvenc "{save_path}" -hide_banner -loglevel error -y' @@ -1953,17 +2091,21 @@ def resize_videos_by_width(video_paths: List[Union[str, os.PathLike]], os.remove(save_path) timer.stop_timer() if verbose: - print(f'Resized width {len(video_paths)} video(s). Elapsed time: {timer.elapsed_time_str}s.') + print( + f"Resized width {len(video_paths)} video(s). Elapsed time: {timer.elapsed_time_str}s." + ) return new_video_paths -def create_blank_video(path: Union[str, os.PathLike], - length: int, - width: int, - height: int, - color: Optional[str] = 'black', - gpu: Optional[bool] = False, - verbose: Optional[bool] = False) -> None: +def create_blank_video( + path: Union[str, os.PathLike], + length: int, + width: int, + height: int, + color: Optional[str] = "black", + gpu: Optional[bool] = False, + verbose: Optional[bool] = False, +) -> None: """ Create a "blank" uni-colored video of specified size and length. @@ -1980,13 +2122,13 @@ def create_blank_video(path: Union[str, os.PathLike], >>> _ = create_blank_video(path='/Users/simon/Desktop/envs/simba/troubleshooting/RAT_NOR/project_folder/videos/test/new/blank_test.mp4', length=5, width=300, height=400, gpu=False, verbose=True, color='orange') """ - check_int(name=f'{create_blank_video.__name__} length', value=length, min_value=1) - check_int(name=f'{create_blank_video.__name__} width', value=width, min_value=1) - check_int(name=f'{create_blank_video.__name__} height', value=height, min_value=1) + check_int(name=f"{create_blank_video.__name__} length", value=length, min_value=1) + check_int(name=f"{create_blank_video.__name__} width", value=width, min_value=1) + check_int(name=f"{create_blank_video.__name__} height", value=height, min_value=1) check_if_dir_exists(in_dir=os.path.dirname(path)) timer = SimbaTimer(start=True) if verbose: - print('Creating blank video...') + print("Creating blank video...") if gpu: cmd = f'ffmpeg -y -t {length} -f lavfi -i color=c={color}:s={width}x{height} -c:v h264_nvenc -preset slow -tune stillimage -pix_fmt yuv420p "{path}" -hide_banner -loglevel error -y' else: @@ -1994,15 +2136,17 @@ def create_blank_video(path: Union[str, os.PathLike], subprocess.call(cmd, shell=True, stdout=subprocess.PIPE) timer.stop_timer() if verbose: - print(f'Blank video complete. Elapsed time: {timer.elapsed_time_str}s.') + print(f"Blank video complete. Elapsed time: {timer.elapsed_time_str}s.") -def horizontal_video_concatenator(video_paths: List[Union[str, os.PathLike]], - save_path: Union[str, os.PathLike], - height_px: Optional[Union[int, str]] = None, - height_idx: Optional[Union[int, str]] = None, - gpu: Optional[bool] = False, - verbose: Optional[bool] = True) -> None: +def horizontal_video_concatenator( + video_paths: List[Union[str, os.PathLike]], + save_path: Union[str, os.PathLike], + height_px: Optional[Union[int, str]] = None, + height_idx: Optional[Union[int, str]] = None, + gpu: Optional[bool] = False, + verbose: Optional[bool] = True, +) -> None: """ Concatenates multiple videos horizontally. @@ -2023,37 +2167,69 @@ def horizontal_video_concatenator(video_paths: List[Union[str, os.PathLike]], """ check_ffmpeg_available() if gpu and not check_nvidea_gpu_available(): - raise FFMPEGCodecGPUError(msg="NVIDEA GPU not available (as evaluated by nvidea-smi returning None)", source=horizontal_video_concatenator.__name__) + raise FFMPEGCodecGPUError( + msg="NVIDEA GPU not available (as evaluated by nvidea-smi returning None)", + source=horizontal_video_concatenator.__name__, + ) timer = SimbaTimer(start=True) - check_valid_lst(data=video_paths, source=horizontal_video_concatenator.__name__, min_len=2) - check_if_dir_exists(in_dir=os.path.dirname(save_path), source=horizontal_video_concatenator.__name__) - video_meta_data = [get_video_meta_data(video_path=video_path) for video_path in video_paths] - if ((height_px is None) and (height_idx is None)) or ((height_px is not None) and (height_idx is not None)): - raise InvalidInputError(msg='Provide a height_px OR height_idx', source=horizontal_video_concatenator.__name__) + check_valid_lst( + data=video_paths, source=horizontal_video_concatenator.__name__, min_len=2 + ) + check_if_dir_exists( + in_dir=os.path.dirname(save_path), source=horizontal_video_concatenator.__name__ + ) + video_meta_data = [ + get_video_meta_data(video_path=video_path) for video_path in video_paths + ] + if ((height_px is None) and (height_idx is None)) or ( + (height_px is not None) and (height_idx is not None) + ): + raise InvalidInputError( + msg="Provide a height_px OR height_idx", + source=horizontal_video_concatenator.__name__, + ) if height_idx is not None: - check_int(name=f'{horizontal_video_concatenator.__name__} height', value=height_idx, min_value=0, max_value=len(video_paths)-1) - height = int(video_meta_data[height_idx]['height']) + check_int( + name=f"{horizontal_video_concatenator.__name__} height", + value=height_idx, + min_value=0, + max_value=len(video_paths) - 1, + ) + height = int(video_meta_data[height_idx]["height"]) else: - check_int(name=f'{horizontal_video_concatenator.__name__} height', value=height_px, min_value=1) + check_int( + name=f"{horizontal_video_concatenator.__name__} height", + value=height_px, + min_value=1, + ) height = int(height_px) video_path_str = " ".join([f'-i "{path}"' for path in video_paths]) - codec = 'h264_nvenc' if gpu else 'libvpx-vp9' - filter_complex = ";".join([f"[{idx}:v]scale=-1:{height}[v{idx}]" for idx in range(len(video_paths))]) + codec = "h264_nvenc" if gpu else "libvpx-vp9" + filter_complex = ";".join( + [f"[{idx}:v]scale=-1:{height}[v{idx}]" for idx in range(len(video_paths))] + ) filter_complex += f";{''.join([f'[v{idx}]' for idx in range(len(video_paths))])}hstack=inputs={len(video_paths)}[v]" if verbose: - print(f'Concatenating {len(video_paths)} videos horizontally with a {height} pixel height... ') + print( + f"Concatenating {len(video_paths)} videos horizontally with a {height} pixel height... " + ) cmd = f'ffmpeg {video_path_str} -filter_complex "{filter_complex}" -map "[v]" -c:v {codec} -loglevel error -stats "{save_path}" -y' subprocess.call(cmd, shell=True, stdout=subprocess.PIPE) timer.stop_timer() if verbose: - print(f'Horizontal concatenation complete, saved at {save_path} (elapsed time: {timer.elapsed_time_str}s.)') + print( + f"Horizontal concatenation complete, saved at {save_path} (elapsed time: {timer.elapsed_time_str}s.)" + ) + -def vertical_video_concatenator(video_paths: List[Union[str, os.PathLike]], - save_path: Union[str, os.PathLike], - width_px: Optional[int] = None, - width_idx: Optional[int] = None, - gpu: Optional[bool] = False, - verbose: Optional[bool] = True) -> None: +def vertical_video_concatenator( + video_paths: List[Union[str, os.PathLike]], + save_path: Union[str, os.PathLike], + width_px: Optional[int] = None, + width_idx: Optional[int] = None, + gpu: Optional[bool] = False, + verbose: Optional[bool] = True, +) -> None: """ Concatenates multiple videos vertically. @@ -2078,43 +2254,73 @@ def vertical_video_concatenator(video_paths: List[Union[str, os.PathLike]], """ check_ffmpeg_available() - if gpu and not check_nvidea_gpu_available(): raise FFMPEGCodecGPUError(msg="NVIDIA GPU not available", source=vertical_video_concatenator.__name__) - video_meta_data = [get_video_meta_data(video_path=video_path) for video_path in video_paths] + if gpu and not check_nvidea_gpu_available(): + raise FFMPEGCodecGPUError( + msg="NVIDIA GPU not available", source=vertical_video_concatenator.__name__ + ) + video_meta_data = [ + get_video_meta_data(video_path=video_path) for video_path in video_paths + ] timer = SimbaTimer(start=True) - check_valid_lst(data=video_paths, source=vertical_video_concatenator.__name__, min_len=2) - check_if_dir_exists(in_dir=os.path.dirname(save_path), source=vertical_video_concatenator.__name__) - if ((width_px is None) and (width_idx is None)) or ((width_px is not None) and (width_idx is not None)): - raise InvalidInputError(msg='Provide a width_px OR width_idx', source=vertical_video_concatenator.__name__) + check_valid_lst( + data=video_paths, source=vertical_video_concatenator.__name__, min_len=2 + ) + check_if_dir_exists( + in_dir=os.path.dirname(save_path), source=vertical_video_concatenator.__name__ + ) + if ((width_px is None) and (width_idx is None)) or ( + (width_px is not None) and (width_idx is not None) + ): + raise InvalidInputError( + msg="Provide a width_px OR width_idx", + source=vertical_video_concatenator.__name__, + ) if width_idx is not None: - check_int(name=f'{vertical_video_concatenator.__name__} width index', value=width_idx, min_value=0, max_value=len(video_paths) - 1) - width = int(video_meta_data[width_idx]['width']) + check_int( + name=f"{vertical_video_concatenator.__name__} width index", + value=width_idx, + min_value=0, + max_value=len(video_paths) - 1, + ) + width = int(video_meta_data[width_idx]["width"]) else: - check_int(name=f'{vertical_video_concatenator.__name__} width', value=width_px, min_value=1) + check_int( + name=f"{vertical_video_concatenator.__name__} width", + value=width_px, + min_value=1, + ) width = int(width_px) video_path_str = " ".join([f'-i "{path}"' for path in video_paths]) - codec = 'h264_nvenc' if gpu else 'libvpx-vp9' - filter_complex = ";".join([f"[{idx}:v]scale={width}:-1[v{idx}]" for idx in range(len(video_paths))]) + codec = "h264_nvenc" if gpu else "libvpx-vp9" + filter_complex = ";".join( + [f"[{idx}:v]scale={width}:-1[v{idx}]" for idx in range(len(video_paths))] + ) filter_complex += f";{''.join([f'[v{idx}]' for idx in range(len(video_paths))])}" filter_complex += f"vstack=inputs={len(video_paths)}[v]" if verbose: - print(f'Concatenating {len(video_paths)} videos vertically with a {width} pixel width...') + print( + f"Concatenating {len(video_paths)} videos vertically with a {width} pixel width..." + ) cmd = f'ffmpeg {video_path_str} -filter_complex "{filter_complex}" -map "[v]" -c:v {codec} -loglevel error -stats "{save_path}" -y' subprocess.call(cmd, shell=True, stdout=subprocess.PIPE) timer.stop_timer() if verbose: - print(f'Vertical concatenation complete. Saved at {save_path} (Elapsed time: {timer.elapsed_time_str}s.)') - + print( + f"Vertical concatenation complete. Saved at {save_path} (Elapsed time: {timer.elapsed_time_str}s.)" + ) -def mosaic_concatenator(video_paths: List[Union[str, os.PathLike]], - save_path: Union[str, os.PathLike], - width_idx: Optional[Union[int, str]] = None, - width_px: Optional[Union[int, str]] = None, - height_idx: Optional[Union[int, str]] = None, - height_px: Optional[Union[int, str]] = None, - gpu: Optional[bool] = False, - verbose: Optional[bool] = True, - uneven_fill_color: Optional[str] = 'black') -> None: +def mosaic_concatenator( + video_paths: List[Union[str, os.PathLike]], + save_path: Union[str, os.PathLike], + width_idx: Optional[Union[int, str]] = None, + width_px: Optional[Union[int, str]] = None, + height_idx: Optional[Union[int, str]] = None, + height_px: Optional[Union[int, str]] = None, + gpu: Optional[bool] = False, + verbose: Optional[bool] = True, + uneven_fill_color: Optional[str] = "black", +) -> None: """ Concatenates multiple videos into a mosaic layout. @@ -2141,61 +2347,127 @@ def mosaic_concatenator(video_paths: List[Union[str, os.PathLike]], """ check_ffmpeg_available() - if gpu and not check_nvidea_gpu_available(): raise FFMPEGCodecGPUError(msg="NVIDIA GPU not available", source=mosaic_concatenator.__name__) + if gpu and not check_nvidea_gpu_available(): + raise FFMPEGCodecGPUError( + msg="NVIDIA GPU not available", source=mosaic_concatenator.__name__ + ) timer = SimbaTimer(start=True) dt = datetime.now().strftime("%Y%m%d%H%M%S") - check_valid_lst(data=video_paths, source=f'{mosaic_concatenator.__name__} video_paths', min_len=3) - video_meta_data = [get_video_meta_data(video_path=video_path) for video_path in video_paths] - max_video_length = max([x['video_length_s'] for x in video_meta_data]) - if ((width_px is None) and (width_idx is None)) or ((width_px is not None) and (width_idx is not None)): - raise InvalidInputError(msg='Provide a width_px OR width_idx', source=mosaic_concatenator.__name__) - if ((height_px is None) and (height_idx is None)) or ((height_px is not None) and (height_idx is not None)): - raise InvalidInputError(msg='Provide a height_px OR height_idx', source=mosaic_concatenator.__name__) + check_valid_lst( + data=video_paths, + source=f"{mosaic_concatenator.__name__} video_paths", + min_len=3, + ) + video_meta_data = [ + get_video_meta_data(video_path=video_path) for video_path in video_paths + ] + max_video_length = max([x["video_length_s"] for x in video_meta_data]) + if ((width_px is None) and (width_idx is None)) or ( + (width_px is not None) and (width_idx is not None) + ): + raise InvalidInputError( + msg="Provide a width_px OR width_idx", source=mosaic_concatenator.__name__ + ) + if ((height_px is None) and (height_idx is None)) or ( + (height_px is not None) and (height_idx is not None) + ): + raise InvalidInputError( + msg="Provide a height_px OR height_idx", source=mosaic_concatenator.__name__ + ) if width_idx is not None: - check_int(name=f'{vertical_video_concatenator.__name__} width index', value=width_idx, min_value=1, max_value=len(video_paths) - 1) - width = int(video_meta_data[width_idx]['width']) + check_int( + name=f"{vertical_video_concatenator.__name__} width index", + value=width_idx, + min_value=1, + max_value=len(video_paths) - 1, + ) + width = int(video_meta_data[width_idx]["width"]) else: width = width_px if height_idx is not None: - check_int(name=f'{vertical_video_concatenator.__name__} height index', value=width_idx, min_value=1, max_value=len(video_paths) - 1) - height = int(video_meta_data[width_idx]['height']) + check_int( + name=f"{vertical_video_concatenator.__name__} height index", + value=width_idx, + min_value=1, + max_value=len(video_paths) - 1, + ) + height = int(video_meta_data[width_idx]["height"]) else: height = height_px if verbose: - print(f'Creating mosaic video ...') - temp_dir = os.path.join(os.path.dirname(video_paths[0]), f'temp_{dt}') + print(f"Creating mosaic video ...") + temp_dir = os.path.join(os.path.dirname(video_paths[0]), f"temp_{dt}") os.makedirs(temp_dir) if not (len(video_paths) % 2) == 0: - blank_path = os.path.join(temp_dir, f'{dt}.mp4') - create_blank_video(path=blank_path, length=max_video_length, width=width, height=height, gpu=gpu, verbose=verbose, color=uneven_fill_color) + blank_path = os.path.join(temp_dir, f"{dt}.mp4") + create_blank_video( + path=blank_path, + length=max_video_length, + width=width, + height=height, + gpu=gpu, + verbose=verbose, + color=uneven_fill_color, + ) video_paths.append(blank_path) - upper_videos, lower_videos = video_paths[:len(video_paths)//2], video_paths[len(video_paths)//2:] - if verbose: print('Creating upper mosaic... (Step 1/3)') + upper_videos, lower_videos = ( + video_paths[: len(video_paths) // 2], + video_paths[len(video_paths) // 2 :], + ) + if verbose: + print("Creating upper mosaic... (Step 1/3)") if len(upper_videos) > 1: - upper_path = os.path.join(temp_dir, 'upper.mp4') - horizontal_video_concatenator(video_paths=upper_videos, save_path=upper_path, gpu=gpu, height_px=height, verbose=verbose) + upper_path = os.path.join(temp_dir, "upper.mp4") + horizontal_video_concatenator( + video_paths=upper_videos, + save_path=upper_path, + gpu=gpu, + height_px=height, + verbose=verbose, + ) else: upper_path = upper_videos[0] - if verbose: print('Creating lower mosaic... (Step 2/3)') + if verbose: + print("Creating lower mosaic... (Step 2/3)") if len(lower_videos) > 1: - lower_path = os.path.join(temp_dir, 'lower.mp4') - horizontal_video_concatenator(video_paths=lower_videos, save_path=lower_path, gpu=gpu, height_px=height, verbose=verbose) + lower_path = os.path.join(temp_dir, "lower.mp4") + horizontal_video_concatenator( + video_paths=lower_videos, + save_path=lower_path, + gpu=gpu, + height_px=height, + verbose=verbose, + ) else: lower_path = lower_videos[0] - panels_meta = [get_video_meta_data(video_path=video_path) for video_path in [lower_path, upper_path]] - if verbose: print('Joining upper and lower mosaic... (Step 2/3)') - vertical_video_concatenator(video_paths=[upper_path, lower_path], save_path=save_path, verbose=verbose, gpu=gpu, width_px=max([x['width'] for x in panels_meta])) + panels_meta = [ + get_video_meta_data(video_path=video_path) + for video_path in [lower_path, upper_path] + ] + if verbose: + print("Joining upper and lower mosaic... (Step 2/3)") + vertical_video_concatenator( + video_paths=[upper_path, lower_path], + save_path=save_path, + verbose=verbose, + gpu=gpu, + width_px=max([x["width"] for x in panels_meta]), + ) timer.stop_timer() shutil.rmtree(temp_dir) if verbose: - print(f'Mosaic concatenation complete. Saved at {save_path} (Elapsed time: {timer.elapsed_time_str}s.)') + print( + f"Mosaic concatenation complete. Saved at {save_path} (Elapsed time: {timer.elapsed_time_str}s.)" + ) -def mixed_mosaic_concatenator(video_paths: List[Union[str, os.PathLike]], - save_path: Union[str, os.PathLike], - gpu: Optional[bool] = False, - verbose: Optional[bool] = True, - uneven_fill_color: Optional[str] = 'black') -> None: +def mixed_mosaic_concatenator( + video_paths: List[Union[str, os.PathLike]], + save_path: Union[str, os.PathLike], + gpu: Optional[bool] = False, + verbose: Optional[bool] = True, + uneven_fill_color: Optional[str] = "black", +) -> None: """ Create a mixed mosaic video by concatenating multiple input videos in a mosaic layout of various sizes. @@ -2220,42 +2492,91 @@ def mixed_mosaic_concatenator(video_paths: List[Union[str, os.PathLike]], """ check_ffmpeg_available() - if gpu and not check_nvidea_gpu_available(): raise FFMPEGCodecGPUError(msg="NVIDIA GPU not available", source=mixed_mosaic_concatenator.__name__) + if gpu and not check_nvidea_gpu_available(): + raise FFMPEGCodecGPUError( + msg="NVIDIA GPU not available", source=mixed_mosaic_concatenator.__name__ + ) timer = SimbaTimer(start=True) - check_valid_lst(data=video_paths, source=mixed_mosaic_concatenator.__name__, min_len=2) + check_valid_lst( + data=video_paths, source=mixed_mosaic_concatenator.__name__, min_len=2 + ) dt = datetime.now().strftime("%Y%m%d%H%M%S") - video_meta_data = [get_video_meta_data(video_path=video_path) for video_path in video_paths] - max_video_length = max([x['video_length_s'] for x in video_meta_data]) - check_if_dir_exists(in_dir=os.path.dirname(save_path), source=mixed_mosaic_concatenator.__name__) + video_meta_data = [ + get_video_meta_data(video_path=video_path) for video_path in video_paths + ] + max_video_length = max([x["video_length_s"] for x in video_meta_data]) + check_if_dir_exists( + in_dir=os.path.dirname(save_path), source=mixed_mosaic_concatenator.__name__ + ) large_mosaic_path, video_paths = video_paths[0], video_paths[1:] - mosaic_height = int(video_meta_data[0]['height'] / 2) - if verbose: print('Creating mixed mosaic video... ') - temp_dir = os.path.join(os.path.dirname(video_paths[0]), f'temp_{dt}') + mosaic_height = int(video_meta_data[0]["height"] / 2) + if verbose: + print("Creating mixed mosaic video... ") + temp_dir = os.path.join(os.path.dirname(video_paths[0]), f"temp_{dt}") os.makedirs(temp_dir) if not (len(video_paths) % 2) == 0: - blank_path = os.path.join(temp_dir, f'{dt}.mp4') - create_blank_video(path=blank_path, length=max_video_length, width=video_meta_data[-1]['width'], height=mosaic_height, gpu=gpu, verbose=True, color=uneven_fill_color) + blank_path = os.path.join(temp_dir, f"{dt}.mp4") + create_blank_video( + path=blank_path, + length=max_video_length, + width=video_meta_data[-1]["width"], + height=mosaic_height, + gpu=gpu, + verbose=True, + color=uneven_fill_color, + ) video_paths.append(blank_path) - upper_videos, lower_videos = video_paths[:len(video_paths) // 2], video_paths[len(video_paths) // 2:] - if verbose: print('Creating upper right mosaic ... (Step 1/4)') + upper_videos, lower_videos = ( + video_paths[: len(video_paths) // 2], + video_paths[len(video_paths) // 2 :], + ) + if verbose: + print("Creating upper right mosaic ... (Step 1/4)") if len(upper_videos) > 1: - upper_path = os.path.join(temp_dir, 'upper.mp4') - horizontal_video_concatenator(video_paths=upper_videos, save_path=upper_path, gpu=gpu, height_px=mosaic_height, verbose=verbose) + upper_path = os.path.join(temp_dir, "upper.mp4") + horizontal_video_concatenator( + video_paths=upper_videos, + save_path=upper_path, + gpu=gpu, + height_px=mosaic_height, + verbose=verbose, + ) else: upper_path = upper_videos[0] - if verbose: print('Creating lower right mosaic ... (Step 2/4)') + if verbose: + print("Creating lower right mosaic ... (Step 2/4)") if len(lower_videos) > 1: - lower_path = os.path.join(temp_dir, 'lower.mp4') - horizontal_video_concatenator(video_paths=lower_videos, save_path=lower_path, gpu=gpu, verbose=verbose) + lower_path = os.path.join(temp_dir, "lower.mp4") + horizontal_video_concatenator( + video_paths=lower_videos, save_path=lower_path, gpu=gpu, verbose=verbose + ) else: lower_path = lower_videos[0] - panels_meta = [get_video_meta_data(video_path=video_path) for video_path in [lower_path, upper_path]] - mosaic_path = os.path.join(temp_dir, 'mosaic.mp4') - if verbose: print('Joining upper and lower right mosaic... (Step 3/4)') - vertical_video_concatenator(video_paths=[upper_path, lower_path], width_px=min([x['width'] for x in panels_meta]), save_path=mosaic_path, gpu=gpu, verbose=verbose) - if verbose: print('Joining left and right mosaic... (Step 4/4)') - horizontal_video_concatenator(video_paths=[large_mosaic_path, mosaic_path], height_idx=0, save_path=save_path, gpu=gpu) + panels_meta = [ + get_video_meta_data(video_path=video_path) + for video_path in [lower_path, upper_path] + ] + mosaic_path = os.path.join(temp_dir, "mosaic.mp4") + if verbose: + print("Joining upper and lower right mosaic... (Step 3/4)") + vertical_video_concatenator( + video_paths=[upper_path, lower_path], + width_px=min([x["width"] for x in panels_meta]), + save_path=mosaic_path, + gpu=gpu, + verbose=verbose, + ) + if verbose: + print("Joining left and right mosaic... (Step 4/4)") + horizontal_video_concatenator( + video_paths=[large_mosaic_path, mosaic_path], + height_idx=0, + save_path=save_path, + gpu=gpu, + ) timer.stop_timer() shutil.rmtree(temp_dir) if verbose: - print(f'Mixed mosaic concatenation complete. Saved at {save_path} (Elapsed time: {timer.elapsed_time_str}s.)') \ No newline at end of file + print( + f"Mixed mosaic concatenation complete. Saved at {save_path} (Elapsed time: {timer.elapsed_time_str}s.)" + )