In [1]:
import cv2, PIL, os, sys
import copy
import math
import datetime
import numpy as np
import matplotlib.pyplot as plt
import matplotlib.animation as animation
import pandas as pd
from PIL import Image

from cropimage import crop_single_image
from processing import Processor

%matplotlib nbagg
%load_ext autoreload
%autoreload 2

In [2]:
class WarpVideo:
    def __init__(self, frames_path: str, chessboard_vid_path: str, destination_frames_folder:str | None = None , chessboard_shape: tuple[int, int] = (6, 9)) -> None:
        self.frames_path: str = frames_path
        self.chessboard_vid_path: str = chessboard_vid_path
        self.chessboard_shape: tuple[int, int] = chessboard_shape
        self.destination_frames_folder: str | None = destination_frames_folder
        self.chessboard_images: list[np.ndarray] | None = None
        self.camera_matrix: np.ndarray | None = None
        self.dist_coeff: np.ndarray | None = None
        self.undistorted_frames: np.ndarray |list[np.ndarray] | None = None
        self.adjusted_matrix: np.ndarray | None = None
        self.adjusted_width: int | None = None
        self.adjusted_height: int | None = None
        self.corresp_imgnames: list[str] | None = None
        self.saved_img_names:list[str] | None = None
    
    @staticmethod
    def display_video(frames):
        for frame in frames:
            cv2.imshow("Frame", frame)
            cv2.waitKey(30)
            if cv2.waitKey(1) & 0xFF == ord('q'):
                break

        cv2.destroyAllWindows()
    
    def __load_chessboard_images(self, every_x_frames:int = 20, overwrite: bool = False) -> list[np.ndarray]:
        char_frames = []
        for i, file in enumerate(os.listdir(self.chessboard_vid_path)):
            current_frame = -1
            charuco_vid_file_1 = f"{self.chessboard_vid_path}/{file}"
            video = cv2.VideoCapture(charuco_vid_file_1)
            grabbed, frame = video.read()
            while grabbed:
                current_frame += 1
                if not current_frame % every_x_frames:
                    char_frames.append(frame)
                grabbed, frame = video.read()
        print("number of pics loaded: ", len(char_frames))
        if overwrite:
            self.chessboard_images = char_frames
        return char_frames

    def find_intrinsic(self, overwrite:bool = False) -> tuple[np.ndarray, np.ndarray]:
        self.chessboard_images = self.__load_chessboard_images(overwrite=overwrite) if self.chessboard_images is None else self.chessboard_images
        
        copiedframes = copy.deepcopy(self.chessboard_images)
        criteria = (cv2.TERM_CRITERIA_EPS + cv2.TERM_CRITERIA_MAX_ITER, 30, 0.001)

        # Prepare object points, like (0,0,0), (1,0,0), (2,0,0) ....,(4,7,0)
        objp = np.zeros((self.chessboard_shape[0] * self.chessboard_shape[1], 3), np.float32)
        objp[:, :2] = np.mgrid[0: self.chessboard_shape[0], 0: self.chessboard_shape[1]].T.reshape(-1, 2)
        
        size_of_chessboard_squares_mm = 30
        objp = objp * size_of_chessboard_squares_mm
        
        # Arrays to store object points and image points from all the images.
        objpoints = []  # 3d point in real world space
        imgpoints = []  # 2d points in image plane
        
        # Use the first frame to determine the frame size
        if len(copiedframes) > 0:
            frameHeight, frameWidth = copiedframes[0].shape[:2]
            frameSize = (frameWidth, frameHeight)
        else:
            raise ValueError("No frames in charucoFrames list")
    
        found = 0
        for idx, frame in enumerate(copiedframes):
            if frame is None:
                continue
            
            # Downscale the image by a factor of 5
            gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
        
            # Find the chess board corners
            ret, corners = cv2.findChessboardCorners(gray, self.chessboard_shape, None)
        
            if ret:
                found += 1
                objpoints.append(objp)
                corners2 = cv2.cornerSubPix(gray, corners, (11, 11), (-1, -1), criteria)
                imgpoints.append(corners2)
        
        if len(objpoints) == 0 or len(imgpoints) == 0:
            raise ValueError("No chessboard corners found in any of the frames.")
        print(f"Found chessboard corners in {found} frames.")
        
        ret, cameraMatrix, dist, _, _ = cv2.calibrateCamera(objpoints, imgpoints, frameSize, None, None)

        if not ret:
            raise ValueError("Camera calibration failed.")
        print(f"Camera calibration succeeded. RME: {ret} pixels")
        self.camera_matrix, self.dist_coeff = cameraMatrix, dist if overwrite or (self.camera_matrix, self.dist_coeff) == (None, None) else None
        return cameraMatrix, dist
    
                
    def undistort_frames(self, cameraMatrix:np.ndarray|None = None, dist:np.ndarray|None=None, pathToFolder:str|None=None, return_imgs: bool = False, overwrite_existing:bool=True) -> None | list[np.ndarray]:
        if pathToFolder is None:
            pathToFolder = self.frames_path
            if pathToFolder is None:
                raise ValueError("No frames path specified.")
        if cameraMatrix is None:
            cameraMatrix = self.camera_matrix
            if cameraMatrix is None:
                raise ValueError("Camera matrix is empty.")
        if dist is None:
            dist = self.dist_coeff
            if dist is None:
                raise ValueError("Distance coefficient is empty.")
        undistorted_frames = []
        if self.undistorted_frames is None:
            self.undistorted_frames = []
        if self.saved_img_names is None:
            self.saved_img_names = []
        for filename in os.listdir(pathToFolder):
            frame_path = os.path.join(pathToFolder, filename)
            frame = cv2.imread(frame_path)
            h, w = frame.shape[:2]
            newCameraMatrix, _ = cv2.getOptimalNewCameraMatrix(cameraMatrix, dist, (w, h), 1, (w, h))

            # Undistort the image
            undistorted = cv2.undistort(frame, cameraMatrix, dist, None, newCameraMatrix)
            if return_imgs:
                undistorted_frames.append(undistorted)
            if overwrite_existing:
                self.undistorted_frames.append(undistorted)
                self.saved_img_names.append(filename)
        if return_imgs:
            return undistorted_frames
        return None
                
            
    
    def findWarpPerspective(self, image_path, image_points, cornerpoints, image: np.ndarray | None = None):
        
        # Load the image
        if image is None:
            image = cv2.imread(image_path)
        if image is None:
            print(f"Failed to load image at {image_path}")
            exit()
        # Create a figure and axis
        # Display the input image
        # ax.imshow(image)
        # plt.close()

        # Add dots for each coordinate
        # for point in image_points:
        #     ax.scatter(point[0], point[1], color='red', s=40)  # s is the size of the dot
        if len(image_points) != 4:
            print("You need to select exactly 4 points.")
            exit()
        # Convert points to numpy float32 format
        pts1 = np.float32(image_points)
        # Compute the width and height of the quadrilateral
        width_top = np.linalg.norm(pts1[0] - pts1[1])
        width_bottom = np.linalg.norm(pts1[2] - pts1[3])
        height_left = np.linalg.norm(pts1[0] - pts1[3])
        height_right = np.linalg.norm(pts1[1] - pts1[2])

        # Use the maximum of the widths and heights to define the square size
        max_width = max(int(width_top), int(width_bottom))
        max_height = max(int(height_left), int(height_right))
        square_size = max(max_width, max_height)

        # Define the destination points as a square with the calculated size
        pts2 = np.float32([
            [0, 0],
            [square_size - 1, 0],
            [square_size - 1, square_size - 1],
            [0, square_size - 1]
        ])
        # Get the perspective transform matrix
        matrix = cv2.getPerspectiveTransform(pts1, pts2)
        # Warp the entire image using the perspective transform matrix
        # To keep the whole image visible, let's compute the output bounds
        h, w = image.shape[:2]

        # Transform the four corners of the original image
        if cornerpoints is None or len(cornerpoints) == 0:
            corners_points = np.float32([[0, 0], [w-1, 0], [w-1, h-1], [0, h-1]])
        else:
            corners_points = np.float32(cornerpoints)
        
        transformed_corners = cv2.perspectiveTransform(corners_points[None, :, :], matrix)[0]
        # Find the bounding box of the transformed corners
        x_min, y_min = np.min(transformed_corners, axis=0).astype(int)
        x_max, y_max = np.max(transformed_corners, axis=0).astype(int)
        
        # Calculate the size of the new image
        new_width = x_max - x_min
        new_height = y_max - y_min

        # Create the translation matrix to shift the image to the positive coordinates
        translation_matrix = np.array([[1, 0, -x_min], [0, 1, -y_min], [0, 0, 1]])

        # Adjust the perspective transform matrix with the translation
        adjusted_matrix = translation_matrix @ matrix
        
        # Perform the warp with the adjusted matrix
        result = cv2.warpPerspective(image, adjusted_matrix, (new_width, new_height), flags=cv2.INTER_LINEAR,
                                    borderMode=cv2.BORDER_CONSTANT, borderValue=(0, 0, 0))
        
        # Display the transformed image using matplotlib
        # plt.figure(figsize=(10, 10))
        # plt.imshow(result)
        # cv2.imwrite("../../Test Data/charuco_temp_folder/measurement_frames_formatted/inverted.jpg", cv2.cvtColor(result, cv2.COLOR_BGR2RGB))
        # plt.title("Perspective Transform Applied to Entire Image")
        # plt.show()
        impth = "../../Test Data/charuco_temp_folder/abcdef.jpg"
        abc, defg, _ = result.shape
        # print(result.shape)
        
        self.adjusted_matrix, self.adjusted_width, self.adjusted_height = adjusted_matrix, new_width, new_height
        
        # result_resized = cv2.resize(result, (defg // 4, abc// 4), interpolation=cv2.INTER_LINEAR)
        # cv2.imwrite(impth, result)
        # cv2.imshow("res", result_resized)
        # cv2.waitKey()
        # cv2.destroyAllWindows()
    
    def __warp_image(self, image):
        return cv2.warpPerspective(image, self.adjusted_matrix, (self.adjusted_width, self.adjusted_height), flags=cv2.INTER_LINEAR,borderMode=cv2.BORDER_CONSTANT, borderValue=(0, 0, 0))
        
    
    def generate_warped_images(self, camera_matrix:np.ndarray|None = None, dist_coeff:np.ndarray|None=None, path_to_write_vids:str|None=None, path_to_distorted_pics:str|None=None, path_to_undistorted_pics:str|None=None, filenames:list[str] | np.ndarray[str] |None=None):
        if camera_matrix is None:
            camera_matrix = self.camera_matrix
        if dist_coeff is None:
            dist_coeff = self.dist_coeff
        if dist_coeff is None or camera_matrix is None:
            self.dist_coeff = None
            self.camera_matrix = None
            self.find_intrinsic(overwrite=True)
        
        if path_to_write_vids is None:
            raise ValueError("No destination for warped videos found.")
        if path_to_distorted_pics is not None:
            undistorted_imgs = self.undistort_frames(pathToFolder=path_to_distorted_pics, return_imgs=True, overwrite_existing = False)
        if self.undistorted_frames is None:
            self.undistort_frames()
        else:
            undistorted_imgs = self.undistorted_frames
        if self.adjusted_matrix is None or self.adjusted_width is None or self.adjusted_height is None:
            raise ValueError("You need to find the warp matrix. Run self.findWarpPerspective first.")
        
        for i, frame in enumerate(self.undistorted_frames):
            img_name = f"{i}.jpg" if filenames is None else filenames[i]
            warped_image = self.__warp_image(frame)
            cv2.imwrite(f"{path_to_write_vids}/{img_name}", warped_image)
            
        
        

In [3]:
# warp_img_path = "../../Test Data/charuco_temp_folder/testtest"
unwarped_undistorted_images_folder = "../../Test Data/charuco_temp_folder/testwarping/e_9_percent"
warped_destination_folder = "../../Test Data/charuco_temp_folder/testwarping/destination"
x = np.float32([[1.44058342 * 10 ** 3, 0.00000000, 9.49703258 * 10 ** 2],
 [0.00000000, 1.43643171 * 10 ** 3, 5.66369243 * 10 ** 2],
 [0.00000000, 0.00000000, 1.00000000]])
y = np.float32([[1.92756759 * 10 ** -1, -7.20995760 * 10 ** -1,  3.34726484 * 10 ** -3, -5.61250479 * 10 ** -4,
   7.99993126 * 10 ** -1]])

In [4]:
obj = WarpVideo("PATH_ONGEBRUIKT", "../../Test Data/charuco_temp_folder/intrinsic_vid")
obj.camera_matrix = x
obj.dist_coeff = y

chessboard_img_path = "../../Test Data/charuco_temp_folder/chessboard_pic/middle_frame.jpg"

image_points_undistorted = [
    [1071, 317],
    [1557, 397],
    [1676, 655],
    [997, 535]
]

corners = np.float32([
    [276,64],
    [1836,438],
    [1769,1038],
    [100,974]
])
obj.findWarpPerspective(chessboard_img_path, image_points_undistorted, corners)

In [5]:
path_to_vids_folder = "../../Test Data/charuco_temp_folder/lange video 1 deel"
path_write_all_frames = "../../Test Data/charuco_temp_folder/measurement_frames_formatted"
snippets_test = np.array([
    [datetime.datetime(2024, 6, 7, 15, 2, 57), datetime.datetime(2024, 6, 7, 15, 2, 58)],
    [datetime.datetime(2024, 6, 7, 15, 3, 22), datetime.datetime(2024, 6, 7, 15, 3, 24)],
    [datetime.datetime(2024, 6, 7, 15, 3, 50), datetime.datetime(2024, 6, 7, 15, 3, 52)],
    [datetime.datetime(2024, 6, 7, 15, 4, 11), datetime.datetime(2024, 6, 7, 15, 4, 12)],
    [datetime.datetime(2024, 6, 7, 15, 4, 31), datetime.datetime(2024, 6, 7, 15, 4, 31, 500000)],
    [datetime.datetime(2024, 6, 7, 15, 4, 50), datetime.datetime(2024, 6, 7, 15, 4, 51)]])
snippets_step_wind = np.array([
    [datetime.datetime(2024, 6, 7, 15, 2, 45), datetime.datetime(2024, 6, 7, 15, 3, 0)],
    [datetime.datetime(2024, 6, 7, 15, 3, 4), datetime.datetime(2024, 6, 7, 15, 3, 30)],
    [datetime.datetime(2024, 6, 7, 15, 3, 38), datetime.datetime(2024, 6, 7, 15, 3, 52)],
    [datetime.datetime(2024, 6, 7, 15, 4, 0), datetime.datetime(2024, 6, 7, 15, 4, 13)],
    [datetime.datetime(2024, 6, 7, 15, 4, 20), datetime.datetime(2024, 6, 7, 15, 4, 32)],
    [datetime.datetime(2024, 6, 7, 15,4, 36), datetime.datetime(2024, 6, 7, 15, 4, 54)]])
folderabspath = "../../Test Data/THE REAL DEAL/test_to_be_processed"
background_img_file = "../../Test Data/IMG_2378_goed/200.jpg"

In [6]:
def create_folders(folder_destination:str|None = None):
    if folder_destination is None:
        folderabspath = "../../Test Data/THE REAL DEAL/Unprocessed Frame Pairs"
    else:
        folderabspath = folder_destination
    letters = ["a", "b", "c", "d", "e", "f"]
    for idx, i in enumerate(range(5, 11)):
        folder_path = os.path.join(folderabspath, f"{letters[idx]}_{i}_percent")
        os.makedirs(folder_path, exist_ok=True)
        
    

In [55]:
def do_everything(source_folder, destination_folder):
    background = None
    processor = Processor(path=source_folder, df="jpg")
    all_files = os.listdir(source_folder)
    for i, file in enumerate(all_files):
        image = cv2.imread(f"{source_folder}/{file}", cv2.IMREAD_GRAYSCALE)
        print(file)
        print(image.shape)
        if image is None:
            print("NONE")
        if background is None:
            background = processor.denoise(image)
            processor.reference = background
            continue
        # print(i)
        denoised_image = processor.denoise(image)
        masked_image = processor.mask(denoised_image)
        print("dafs")
        warped_image = cv2.warpPerspective(masked_image, obj.adjusted_matrix, (obj.adjusted_width, obj.adjusted_height), flags=cv2.INTER_LINEAR,borderMode=cv2.BORDER_CONSTANT, borderValue=(0, 0, 0))
        # cropped_image = crop_single_image(warped_image, crop_corner_points=(1295, 1581, 1745, 2185))
        print("s")
        cv2.imwrite(f"{destination_folder}/{file}", warped_image)
        print("img_1_done")
        # print(f"{destination_folder}/{file}")
        

In [35]:
def write_whole_vid(vid_folder, destination_folder, start_time, fps_real, pairs_per_second):
    video = cv2.VideoCapture(f"{vid_folder}/{os.listdir(vid_folder)[0]}")
    total_frames = int(video.get(cv2.CAP_PROP_FRAME_COUNT))
    frame_interval = int(round(fps_real / pairs_per_second))
    dt = 1 / fps_real
    frame_locs, times, = [], []
    # smoke_smart_time = datetime.datetime(2024, 6, 7, 15, 2, 44)
    # starting_frame = int((smoke_smart_time-start_time).total_seconds() * fps_real)
    _, _ = video.read()
    for i in range(0, total_frames + 1, frame_interval):
        if i > 0:
            video.set(cv2.CAP_PROP_POS_FRAMES, i)
        grabbed1, frame1 = video.read()
        grabbed2, frame2 = video.read()
        if not grabbed1 or not grabbed2:
            raise ValueError(f"Ran out of frames at frame {i}. Video has {total_frames} frames.")
        frame_name_1 = (start_time + datetime.timedelta(0, i / fps_real)).strftime("%d_%m_%Y_%H_%M_%S_%f")[:-3]
        frame_name_2 = (start_time + datetime.timedelta(0, i / fps_real + dt)).strftime("%d_%m_%Y_%H_%M_%S_%f")[:-3]
        frame_locs.append(i)
        frame_locs.append(i + 1)
        times.append(frame_name_1)
        times.append(frame_name_2)
        cv2.imwrite(f"{destination_folder}/{frame_name_1}.jpg", frame1)
        cv2.imwrite(f"{destination_folder}/{frame_name_2}.jpg", frame2)
    video.release()
    return frame_locs, times

In [33]:
video_folder = "../../Test Data/fill big video"
video_pairs_destination = "../../Test Data/Full big vid pairs"
video_processed_destination = "../../Test Data/Full big vid PROCESSED"

## START TIME IS DUS FOUT, VERBETEREN. laatste entry is in microseconden.

start_time = datetime.datetime(2024, 6, 7, 15, 1, 59, 270000)



In [None]:
all_big_frames, all_big_times = write_whole_vid(
    vid_folder=video_folder,
    destination_folder=video_pairs_destination,
    start_time=start_time,
    fps_real=198.53,
    pairs_per_second=10)

In [32]:
do_everything(video_pairs_destination, video_processed_destination)