## Medial axis detection of Moving Objects

### Importing Libraries

In [71]:
# !pip install gdown
import gdown
import os
import cv2
import numpy as np
import math
import random
import statistics
from os.path import isfile, join
import shutil

### **"Video Frame Extraction, Background Subtraction, Cleaning, and Edge Detection Pipeline"**


1. **Frame Extraction**: Extracts individual frames from a video file and saves them as image files.
2. **Background Subtraction**: Removes background from extracted frames using a Gaussian Mixture Model (MOG2).
3. **Image Cleaning**: Applies morphological operations (erosion, dilation, opening, closing, etc.) to clean and enhance the frames.
4. **Edge Detection**: Detects edges in the processed frames using methods such as Laplacian, Sobel (X, Y, or combined), and Canny edge detection.



In [72]:
def ExtractFrames(video):
    r_path = f'./Videos/{video}.mp4'
    video_frame = cv2.VideoCapture(r_path)
    w_path = './Frames'
    os.makedirs(w_path, exist_ok=True)
    fps = video_frame.get(cv2.CAP_PROP_FPS)
    print(f"Frames per second: {fps}")
    No_of_frames = int(video_frame.get(cv2.CAP_PROP_FRAME_COUNT))
    print(f"Number of frames: {No_of_frames}")
    i = 0
    while True:
        success, frame = video_frame.read()
        if not success:
            break
        frame_filename = os.path.join(w_path, f'Frame{i}_{video}.jpg')
        cv2.imwrite(frame_filename, frame)
        i += 1
    video_frame.release()
    return fps

import os
import cv2

def BgSubtract(video, fps, varThreshold):
    w_path = './Background-Subtraction-Frames'
    os.makedirs(w_path, exist_ok=True)
    r_path = './Frames'
    Subtraction_MOG = cv2.createBackgroundSubtractorMOG2(
        history=int(10 * fps), detectShadows=True, varThreshold=varThreshold
    )
    for img_name in os.listdir(r_path):
        if not img_name.startswith(f'Frame') or not img_name.endswith(f'_{video}.jpg'):
            continue
        frame_path = os.path.join(r_path, img_name)
        frame = cv2.imread(frame_path)
        if frame is None:
            print(f"Warning: Unable to read {frame_path}. Skipping.")
            continue
        Image_foreground = Subtraction_MOG.apply(frame)
        Image_foreground[Image_foreground == 127] = 0
        output_path = os.path.join(w_path, img_name)
        cv2.imwrite(output_path, Image_foreground)
    print("Background subtraction completed and images saved to:", w_path)

def Cleaning(video, operation_type, kernel_size, structural="no", size=100):
    w_path = './Image-Cleaning-Results'
    r_path = './Background-Subtraction-Frames'
    os.makedirs(w_path, exist_ok=True)
    kernel = np.ones((kernel_size, kernel_size), np.uint8)
    for img_name in os.listdir(r_path):
        if not img_name.endswith(f'_{video}.jpg'):
            continue
        img_path = os.path.join(r_path, img_name)
        img = cv2.imread(img_path)
        if img is None:
            print(f"Warning: Unable to read {img_path}. Skipping.")
            continue
        if operation_type == "erosion":
            out = cv2.erode(img, kernel, iterations=1)
        elif operation_type == "dilation":
            out = cv2.dilate(img, kernel, iterations=1)
        elif operation_type == "opening":
            out = cv2.morphologyEx(img, cv2.MORPH_OPEN, kernel)
        elif operation_type == "closing":
            out = cv2.morphologyEx(img, cv2.MORPH_CLOSE, kernel)
        elif operation_type == "gradient":
            out = cv2.morphologyEx(img, cv2.MORPH_GRADIENT, kernel)
        elif operation_type == "tophat":
            out = cv2.morphologyEx(img, cv2.MORPH_TOPHAT, kernel)
        elif operation_type == "blackhat":
            out = cv2.morphologyEx(img, cv2.MORPH_BLACKHAT, kernel)
        else:
            raise ValueError(f"Invalid operation type: {operation_type}")
        if structural == "yes":
            horizontal_size = 1920 // size
            horizontal_structure = cv2.getStructuringElement(cv2.MORPH_RECT, (horizontal_size, 1))
            horizontal = out.copy()
            out1 = cv2.erode(horizontal, horizontal_structure)
            out1 = cv2.dilate(out1, horizontal_structure)
            out1 = cv2.bitwise_not(out1)
            out = cv2.bitwise_and(out, out1)
        cv2.imwrite(os.path.join(w_path, img_name), out)
    print("Image cleaning completed and results saved to:", w_path)

def EdgeDetection(video, edge_type="canny", kernel=3):
    w_path = './Edge-Detection-Results'
    r_path = './Image-Cleaning-Results'
    os.makedirs(w_path, exist_ok=True)
    for img_name in os.listdir(r_path):
        if not img_name.endswith(f'_{video}.jpg'):
            continue
        img_path = os.path.join(r_path, img_name)
        img = cv2.imread(img_path)
        if img is None:
            print(f"Warning: Unable to read {img_path}. Skipping.")
            continue
        gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
        img_blur = cv2.GaussianBlur(gray, (3, 3), 0)
        if edge_type == 'laplacian':
            out = cv2.Laplacian(img_blur, cv2.CV_64F, ksize=kernel)
            out = cv2.convertScaleAbs(out)
        elif edge_type == 'sobelx':
            out = cv2.Sobel(img_blur, cv2.CV_64F, 1, 0, ksize=kernel)
            out = cv2.convertScaleAbs(out)
        elif edge_type == 'sobely':
            out = cv2.Sobel(img_blur, cv2.CV_64F, 0, 1, ksize=kernel)
            out = cv2.convertScaleAbs(out)
        elif edge_type == 'sobelxy':
            out = cv2.Sobel(img_blur, cv2.CV_64F, 1, 1, ksize=kernel)
            out = cv2.convertScaleAbs(out)
        elif edge_type == 'canny':
            v = np.mean(img_blur)
            sigma = 0.5
            lower = int(max(0, (1.0 - sigma) * 30 * v))
            upper = int(min(255, (1.0 + sigma) * 30 * v))
            out = cv2.Canny(img_blur, lower, upper)
        else:
            raise ValueError(f"Invalid edge detection type: {edge_type}")
        cv2.imwrite(os.path.join(w_path, img_name), out)
    print("Edge detection completed and results saved to:", w_path)


### **"Hough Transform-Based Line Detection and Smoothing in Video Frames"**

A comprehensive pipeline for line detection and smoothing in video frames. Using the Hough Transform, it identifies lines in edge-detected frames and processes them to extract the most frequent orientations. The pipeline includes features like default line handling for missing detections, line averaging, and Gaussian smoothing to enhance visual consistency across frames. The results are stored in organized directories, making it suitable for applications such as lane detection, structural analysis, and video post-processing.

In [73]:
def find_key(mydict, search_val):
    for key, value in mydict.items():
        if value == search_val:
            return key

def HoughTransform(video):
    c_array = {}
    param_array = {}
    w_path1 = './Line-Detection-Results/all'
    w_path2 = './Line-Detection-Results/max_freq'
    r_path = './Edge-Detection-Results'
    o_path = './Final-Frames'

    os.makedirs(w_path1, exist_ok=True)
    os.makedirs(w_path2, exist_ok=True)
    os.makedirs(o_path, exist_ok=True)

    files = [f for f in os.listdir(r_path) if isfile(join(r_path, f)) and not f.startswith('.') and f.endswith(f'_{video}.jpg')]
    files.sort(key=lambda x: int(x[5:-6]))

    frame_array = []
    avg_length = 100
    counter = 0
    m_array = {'x1': [], 'x2': [], 'y1': [], 'y2': []}
    window = 5
    gaussian = [35/1230, 50/1230, 100/1230, 150/1230, 180/1230, 200/1230, 180/1230, 150/1230, 100/1230, 50/1230, 35/1230]

    for i, img_name in enumerate(files):
        if not img_name.endswith('.jpg'):
            continue

        frame_no = img_name[5:-6]
        original_img = cv2.imread(f'Frames/{img_name}')
        img = cv2.imread(f'{r_path}/{img_name}')
        gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
        avg_int = np.mean(gray)

        lines = cv2.HoughLinesP(gray, rho=1, theta=np.pi / 180, threshold=60, minLineLength=max(avg_length, 100), maxLineGap=100)

        if lines is None or len(lines) <= 0:
            draw_default_line(original_img, frame_no, c_array)
            continue

        process_lines(lines, original_img, frame_no, m_array, c_array)

    apply_median_filter(files, m_array, window, o_path)

def draw_default_line(original_img, frame_no, c_array):
    xa, ya, xb, yb = 0, 0, 0, 0
    cv2.line(original_img, (xa, ya), (xb, yb), (0, 0, 255), 3)
    cv2.imwrite(f'Line-Detection-Results/all/{frame_no}.jpg', original_img)
    c_array[frame_no] = {(float((yb - ya) / (xb - xa + 0.00012))): (xa, ya, xb, yb)}

def process_lines(lines, original_img, frame_no, m_array, c_array):
    avg_length = 0
    line_count = 0
    maps = {}

    for i in range(len(lines)):
        for x1, y1, x2, y2 in lines[i]:
            avg_length += math.sqrt((x1 - x2)**2 + (y1 - y2)**2)
            cv2.line(original_img, (x1, y1), (x2, y2), (0, 0, 255), 3)
            slope = (y2 - y1) / (x2 - x1 + 0.0001)
            theta = math.atan(slope)
            if theta * 180 / np.pi // 10 not in maps:
                maps[theta * 180 / np.pi // 10] = []
            maps[theta * 180 / np.pi // 10].append((x1, y1, x2, y2, theta))

    avg_length /= len(lines) * 1.17
    avg_length += np.random.normal(-10, 10)

    max_bin = max(maps.values(), key=len, default=[])
    if max_bin:
        draw_most_frequent_line(max_bin, original_img, m_array, c_array, frame_no)

def draw_most_frequent_line(max_bin, original_img, m_array, c_array, frame_no):
    maxi = -math.inf
    mini = math.inf
    lx1, ly1, lx2, ly2 = max_bin[0][0], max_bin[0][1], max_bin[0][2], max_bin[0][3]

    for x1, y1, x2, y2, theta in max_bin:
        c = y1 - (y2 - y1) / (x2 - x1 + 0.0001) * x1
        if c <= mini:
            lx1, ly1, lx2, ly2 = x1, y1, x2, y2
            mini = c
        elif c >= maxi:
            rx1, ry1, rx2, ry2 = x1, y1, x2, y2
            maxi = c

    xa = int((lx1 + rx1) / 2)
    ya = int((ly1 + ry1) / 2)
    xb = int((lx2 + rx2) / 2)
    yb = int((ly2 + ry2) / 2)
    c_array[frame_no] = {(float((yb - ya) / (xb - xa + 0.00012))): (xa, ya, xb, yb)}

    m_array['x1'].append(xa)
    m_array['x2'].append(xb)
    m_array['y1'].append(ya)
    m_array['y2'].append(yb)

    cv2.line(original_img, (xa, ya), (xb, yb), (0, 0, 255), 3)

def apply_median_filter(files, m_array, window, o_path):
    avg_coordx1_prev, avg_coordx2_prev, avg_coordy1_prev, avg_coordy2_prev = m_array['x1'][0], m_array['x2'][0], m_array['y1'][0], m_array['y2'][0]
    equal_count = 0
    flag = True

    for i in range(window, len(files) - window):
        img_name = files[i]
        my_img = cv2.imread(f'Frames/{img_name}')
        avg_coordx1 = int(np.average(m_array['x1'][i - window:i + window + 1]))
        avg_coordx2 = int(np.average(m_array['x2'][i - window:i + window + 1]))
        avg_coordy1 = int(np.average(m_array['y1'][i - window:i + window + 1]))
        avg_coordy2 = int(np.average(m_array['y2'][i - window:i + window + 1]))

        if (avg_coordx1_prev, avg_coordx2_prev, avg_coordy1_prev, avg_coordy2_prev) == (avg_coordx1, avg_coordx2, avg_coordy1, avg_coordy2):
            equal_count += 1
            if equal_count <= 3:
                cv2.line(my_img, (avg_coordx1_prev + int(2 * np.random.normal()), avg_coordy1_prev + int(2 * np.random.normal())),
                         (avg_coordx2_prev + int(2 * np.random.normal()), avg_coordy2_prev + int(2 * np.random.normal())), (0, 0, 255), 3)
        else:
            if equal_count >= 3 and flag:
                equal_count = 2 * window
                flag = False
            equal_count -= 1
            if equal_count <= 0:
                flag = True
                equal_count = 0
                (avg_coordx1_prev, avg_coordx2_prev, avg_coordy1_prev, avg_coordy2_prev) = (avg_coordx1, avg_coordx2, avg_coordy1, avg_coordy2)
                cv2.line(my_img, (avg_coordx1_prev, avg_coordy1_prev), (avg_coordx2_prev, avg_coordy2_prev), (0, 0, 255), 3)

        cv2.imwrite(f'{o_path}/{img_name}', my_img)


### **"Video Creation from Processed Frames"**


1. **Locates Frames**: Reads images from a specified directory that match a given naming pattern.
2. **Sorts Frames**: Orders the frames numerically to maintain the correct sequence.
3. **Initializes VideoWriter**: Determines the video resolution from the first valid frame and sets up a video writer with the MJPG codec at 30 frames per second.
4. **Compiles Frames**: Iteratively writes frames into the video file.
5. **Handles Errors**: Skips unreadable frames and exits if no valid frames are found.


In [74]:
def SaveVideo(name):
    w_path = f'./Result/result_{name}.mp4'
    r_path = './Final-Frames'
    os.makedirs(os.path.dirname(w_path), exist_ok=True)

    files = [f for f in os.listdir(r_path) if isfile(join(r_path, f)) and f.endswith(f'_{name}.jpg')]
    files.sort(key=lambda x: int(x[5:-6]))

    frame_array = []
    size = None

    for i, file in enumerate(files):
        filename = join(r_path, file)
        if i % 100 == 0:
            print(f"Processing: {filename}")
        img = cv2.imread(filename)

        if img is None:
            print(f"Error reading file: {filename}")
            continue

        if size is None:
            height, width, _ = img.shape
            size = (width, height)

        frame_array.append(img)

    if not frame_array:
        print("No valid frames found. Exiting.")
        return

    myvideo = cv2.VideoWriter(w_path, cv2.VideoWriter_fourcc(*'MJPG'), 30, size)

    for frame in frame_array:
        myvideo.write(frame)

    myvideo.release()
    print(f"Video saved to {w_path}")


### Important steps: Downloading videos

In [4]:
folder_path = '/content/Videos'

if not os.path.exists(folder_path):
    os.makedirs(folder_path)

file_ids = [
    ('1nF6qsYQoygpnj5pL4--J1xAeh_IDGGem', '1.mp4'),
    ('1-l2mFlspFh7vNxU8V7tykxnJiJPk7VGT', '2.mp4'),
    ('12J5PpOLlsHAY-DqeFjteojc4RO61H6QN', '3.mp4')
]

for file_id, filename in file_ids:
    download_url = f'https://drive.google.com/uc?id={file_id}'
    destination = os.path.join(folder_path, filename)
    print(f"Downloading {filename}...")
    gdown.download(download_url, destination, quiet=False)

print("Download complete!")


Downloading 1.mp4...


Downloading...
From: https://drive.google.com/uc?id=1nF6qsYQoygpnj5pL4--J1xAeh_IDGGem
To: /content/Videos/1.mp4
100%|██████████| 6.35M/6.35M [00:00<00:00, 235MB/s]


Downloading 2.mp4...


Downloading...
From: https://drive.google.com/uc?id=1-l2mFlspFh7vNxU8V7tykxnJiJPk7VGT
To: /content/Videos/2.mp4
100%|██████████| 6.08M/6.08M [00:00<00:00, 183MB/s]


Downloading 3.mp4...


Downloading...
From: https://drive.google.com/uc?id=12J5PpOLlsHAY-DqeFjteojc4RO61H6QN
To: /content/Videos/3.mp4
100%|██████████| 5.46M/5.46M [00:00<00:00, 192MB/s]

Download complete!





In [79]:
def process_video_with_options():
    """
    Runs the entire video processing pipeline with user-selected options for arguments.

    Returns:
    - None
    """
    print("Before executing this section make sure that you have executed Important steps: Downloading videos")
    # Step 1: Select video name
    print("Select a video:")
    print("1. Video 1")
    print("2. Video 2")
    print("3. Video 3")
    video_choice = int(input("Choose a video (1-3): ").strip())
    video = input("Enter the name of the video (without extension): ").strip()

    # Step 2: Set varThreshold for BgSubtract
    print("\nSet Background Subtraction Threshold (varThreshold):")
    print("Options:")
    print("1. Low (30)")
    print("2. Medium (50)")
    print("3. High (70)")
    varThreshold_choice = int(input("Choose an option (1-3): ").strip())
    varThreshold = {1: 30, 2: 50, 3: 70}.get(varThreshold_choice, 50)  # Default to 50

    # Step 3: Cleaning arguments
    print("\nSet Cleaning Operation Type:")
    print("Options:")
    print("1. Erosion")
    print("2. Dilation")
    print("3. Opening")
    print("4. Closing")
    print("5. Gradient")
    print("6. Tophat")
    print("7. Blackhat")
    cleaning_type_choice = int(input("Choose an option (1-4): ").strip())
    cleaning_type = {1: "erosion", 2: "dilation", 3: "opening", 4: "closing", 5: "gradient", 6: "tophat", 7: "blackhat"}.get(cleaning_type_choice, "opening")  # Default to opening

    kernel_size = int(input("\nEnter kernel size for cleaning (default is 3): ").strip() or 3)
    structural = input("Apply structural processing? (yes/no, default is no): ").strip().lower() or "no"
    size = int(input("Enter size parameter for structural processing (default is 100): ").strip() or 100)

    # Step 4: Edge detection arguments
    print("\nSet Edge Detection Type:")
    print("Options:")
    print("1. Canny")
    print("2. Laplacian")
    print("3. SobelX")
    print("4. SobelY")
    print("5. SobelXY")
    edge_type_choice = int(input("Choose an option (1-5): ").strip())
    edge_type = {1: "canny", 2: "laplacian", 3: "sobelx", 4: "sobely", 5: "sobelxy"}.get(edge_type_choice, "canny")  # Default to canny

    # Run the entire process
    process_video(
        video=video,
        varThreshold=varThreshold,
        cleaning_type=cleaning_type,
        kernel_size=kernel_size,
        structural=structural,
        size=size,
        edge_type=edge_type
    )

def process_video(video, varThreshold, cleaning_type, kernel_size, structural, size, edge_type):
    """
    Runs the entire video processing pipeline:
    1. ExtractFrames
    2. BgSubtract
    3. Cleaning
    4. EdgeDetection
    5. HoughTransform
    6. SaveVideo

    Arguments:
    - video (str): The name of the video to process (without extension).
    - varThreshold (int): Threshold for background subtraction.
    - cleaning_type (str): Cleaning operation type.
    - kernel_size (int): Kernel size for cleaning operation.
    - structural (str): Apply structural processing ('yes' or 'no').
    - size (int): Size parameter for structural processing.
    - edge_type (str): Type of edge detection.

    Returns:
    - None
    """
    # Step 1: Extract frames from the video
    print("Step 1: Extracting frames...")
    fps = ExtractFrames(video)
    print(f"Frames extracted with {fps} FPS.")

    # Step 2: Perform background subtraction
    print("Step 2: Background subtraction...")
    BgSubtract(video, fps, varThreshold)
    print("Background subtraction completed.")

    # Step 3: Clean the images
    print("Step 3: Cleaning images...")
    Cleaning(video, cleaning_type, kernel_size, structural, size)
    print("Image cleaning completed.")

    # Step 4: Perform edge detection
    print("Step 4: Edge detection...")
    EdgeDetection(video, edge_type, 3)
    print("Edge detection completed.")

    # Step 5: Detect lines using Hough Transform
    print("Step 5: Line detection using Hough Transform...")
    HoughTransform(video)
    print("Line detection completed.")

    # Step 6: Save the processed frames as a video
    print("Step 6: Saving video...")
    SaveVideo(video)
    print("Video saved successfully.")

    shutil.rmtree('/content/Frames')
    shutil.rmtree('/content/Background-Subtraction-Frames')
    shutil.rmtree('/content/Image-Cleaning-Results')
    shutil.rmtree('/content/Edge-Detection-Results')
    shutil.rmtree('/content/Final-Frames')
    shutil.rmtree('/content/Line-Detection-Results')

# Run the process with options
process_video_with_options()


Before executing this section make sure that you have executed Important steps: Downloading videos
Select a video:
1. Video 1
2. Video 2
3. Video 3
Choose a video (1-3): 1
Enter the name of the video (without extension): 1

Set Background Subtraction Threshold (varThreshold):
Options:
1. Low (30)
2. Medium (50)
3. High (70)
Choose an option (1-3): 1

Set Cleaning Operation Type:
Options:
1. Erosion
2. Dilation
3. Opening
4. Closing
5. Gradient
6. Tophat
7. Blackhat
Choose an option (1-4): 4

Enter kernel size for cleaning (default is 3): 3
Apply structural processing? (yes/no, default is no): no
Enter size parameter for structural processing (default is 100): 100

Set Edge Detection Type:
Options:
1. Canny
2. Laplacian
3. SobelX
4. SobelY
5. SobelXY
Choose an option (1-5): 5
Step 1: Extracting frames...
Frames per second: 30.0
Number of frames: 463
Frames extracted with 30.0 FPS.
Step 2: Background subtraction...
Background subtraction completed and images saved to: ./Background-Subtra