In [14]:
import pandas as pd
import cv2
import os
import json
import string
import random as rand
import ast
from typing import Any, List, Tuple, Union
import random 

In [15]:
CVAT_FOLDER_PATH = "/home/mh731nk/_data/experiments_tmp/data/revision_8/cvat_project_raw_unzip"

# Read video DataFrame 
Data was prepared in the previous part of the project

In [16]:
# Dataset path
df_videos = pd.read_pickle("/home/mh731nk/_data/experiments_tmp/data/revision_8/video.pkl", 'zip')
df_videos.shape

(344, 12)

# Select videos for experiments

In [17]:
df_filtere_videos = df_videos.loc[df_videos["folder_class"].isin([
    'USG - Lung point',
    'USG - Lung sliding absent (aj ine znaky - radiol. klin.)',
    'USG - Lung sliding present (aj ine znaky - radiol. klin.)'
    ])]

print(f'LP -> {df_filtere_videos.loc[df_filtere_videos["folder_class"] == "USG - Lung point"].shape[0]}')
print(f'A -> {df_filtere_videos.loc[df_filtere_videos["folder_class"] == "USG - Lung sliding absent (aj ine znaky - radiol. klin.)"].shape[0]}')
print(f'P -> {df_filtere_videos.loc[df_filtere_videos["folder_class"] == "USG - Lung sliding present (aj ine znaky - radiol. klin.)"].shape[0]}')
17 + 9 + 26

LP -> 17
A -> 9
P -> 26


52

# Read anotations for videos

In [18]:
def id_generator(self, size: int = 8, chars: str = string.ascii_uppercase + string.digits) -> str:
    """
    Generate a random string identifier.

    This method generates a random string of the specified size using a secure random
    generator. By default, the generated string consists of uppercase letters and digits.

    Args:
        size (int): The length of the generated string. Defaults to 8.
        chars (str): A string containing the set of characters to choose from.
                     Defaults to uppercase letters and digits (A-Z, 0-9).

    Returns:
        str: A randomly generated string identifier.
    """
    return ''.join(random.SystemRandom().choice(chars) for _ in range(size))


def parse_polygon_points(pts: Union[str, List[Any]]) -> List[Tuple[Union[int, float], Union[int, float]]]:
    """
    Parse and normalize polygon points into a list of (x, y) coordinate pairs.
    
    The input can be provided in different formats:
    - A string representation of a list (e.g., "[556.9, 200.3, 555.3, 214.0, ...]"), 
      which will be evaluated into a Python list.
    - A flat list of numbers representing coordinates (e.g., [x0, y0, x1, y1, ...]).
    - A list already containing (x, y) pairs.
    
    The function ensures that the returned value is always a list of tuples,
    where each tuple represents a point as (x, y).
    
    Args:
        pts (Union[str, List[Any]]): The polygon points in one of the accepted formats.
    
    Returns:
        List[Tuple[Union[int, float], Union[int, float]]]: A list of (x, y) coordinate pairs.
    """
    # If pts is a string, safely evaluate it to convert the string into a Python list.
    if isinstance(pts, str):
        pts = ast.literal_eval(pts)
    
    # If the list is a flat list of coordinates (e.g., [x0, y0, x1, y1, ...]),
    # then group the numbers into (x, y) pairs.
    if pts and isinstance(pts[0], (float, int)):
        pts = list(zip(pts[::2], pts[1::2]))
    
    # Return the normalized list of (x, y) coordinate pairs.
    return pts


Each video exported from the CVAT annotation tool is organized into a structured folder that holds both the raw video data and its associated metadata. This structure typically includes:

Raw Video Storage:
The primary video file is saved in a dedicated folder, ensuring that the original footage is preserved intact.

Metadata Files:
Accompanying the raw video are metadata files (often in JSON format) that provide detailed information about the video, such as resolution, frame rate, and other relevant properties.

Annotation Data:
In addition to video metadata, the export includes detailed annotation files. These files contain information about label masks and polygon coordinates for each frame, which outline the annotated regions of interest within the video. This structured annotation data is essential for tasks like object detection and segmentation.

This systematic folder organization makes it easy to access and process both the video content and its annotations, streamlining workflows for analysis and machine learning applications.

In [19]:
# Initialize a list to hold rows extracted from annotations
rows_to_append = []
print(f"Start processing of {df_filtere_videos.shape[0]} videos")
# Iterate over each row in the DataFrame containing filtered videos
indexer = 0
for index, video_df_row in df_filtere_videos.iterrows():
    indexer = indexer +1
    # Build the path to the 'data' folder within the current video's subfolder
    data_folder_path = os.path.join(
        CVAT_FOLDER_PATH, 
        video_df_row["video_subfolder_path"], 
        'data'
    )
    
    # Walk through the directory tree starting at data_folder_path.
    # os.walk returns tuples of (current_path, directories, files)
    folders_scan = [x for x in os.walk(data_folder_path)]
    
    # Retrieve the first subdirectory from the scan (assumes at least one exists)
    video_folder = folders_scan[0][1][0]
    
    # Print the name of the video folder (for debugging or logging purposes)
    # print(f' Video {video_df_row["video_id"]} is processing')
    
    # Construct the full path to the video subfolder
    path_video = os.path.join(
        CVAT_FOLDER_PATH,
        video_df_row["video_subfolder_path"]
    )
    
    # Open and load the annotations JSON file
    with open(os.path.join(path_video, 'annotations.json')) as json_file:
        json_anotation = json.load(json_file)
    
    # Open and load the task JSON file
    with open(os.path.join(path_video, 'task.json')) as json_file:
        task = json.load(json_file)

    data = json_anotation[0]  # assuming you're working with the first element

    # Case 1: Shapes are directly available
    if 'shapes' in data and data['shapes']:
        for shape in data['shapes']:
            row = {
                'polygon_id': id_generator(8),             # Generate a unique identifier for the shape
                'video_id': video_df_row["video_id"],        # Retrieve the video ID from the DataFrame row
                'name_cvat': task["name"],                   # Retrieve the task name from the task JSON
                'type': shape["type"],                       # The type of shape (e.g., polygon, rectangle)
                'frame': int(shape["frame"]),                # Convert the frame number to an integer
                'polygon_label': shape.get("label", ""),     # Use the shape's label if available
                'points': parse_polygon_points(list(shape["points"]))  # Parse the coordinate points
            }
            rows_to_append.append(row)

    # Case 2: Shapes are nested inside tracks
    elif 'tracks' in data and data['tracks']:
        for track in data['tracks']:
            # Use the track label if individual shapes do not have one
            track_label = track.get("label", "")
            for shape in track.get("shapes", []):
                row = {
                    'polygon_id': id_generator(8),
                    'video_id': video_df_row["video_id"],
                    'name_cvat': task["name"],
                    'type': shape["type"],
                    'frame': int(shape["frame"]),
                    # Use the shape's label if present, otherwise use the track's label
                    'polygon_label': shape.get("label", track_label),
                    'points': parse_polygon_points(list(shape["points"]))
                }
                rows_to_append.append(row)

df_polygons = pd.DataFrame(rows_to_append) 
print(f'Dataset include {df_polygons.shape[0]} polygons.')

Start processing of 52 videos
Dataset include 3198 polygons.


In [20]:
dffff = df_videos.loc[df_videos["video_id"].isin(set(df_polygons["video_id"]))]

print(f'LP -> {dffff.loc[dffff["folder_class"] == "USG - Lung point"].shape[0]}')
print(f'A -> {dffff.loc[dffff["folder_class"] == "USG - Lung sliding absent (aj ine znaky - radiol. klin.)"].shape[0]}')
print(f'P -> {dffff.loc[dffff["folder_class"] == "USG - Lung sliding present (aj ine znaky - radiol. klin.)"].shape[0]}')

print(17 + 9 + 26)

# dffff.loc[df_check["folder_class"] == "USG - Lung point"]

LP -> 17
A -> 9
P -> 26
52


In [21]:
df_polygons

Unnamed: 0,polygon_id,video_id,name_cvat,type,frame,polygon_label,points
0,DL4HRGVZ,PFFP3KRO,014 2021-10-18_125058_159.avi,polygon,0,lungslidingabsent,"[(556.9000000000015, 200.3000000000011), (555...."
1,P1HAR4EP,PFFP3KRO,014 2021-10-18_125058_159.avi,polygon,0,lungslidingabsent,"[(359.705078125, 186.00537109375), (359.704868..."
2,HNZ148Q1,PFFP3KRO,014 2021-10-18_125058_159.avi,polygon,10,lungslidingabsent,"[(366.041015625, 185.21337890625), (366.040805..."
3,L4NPYU7P,PFFP3KRO,014 2021-10-18_125058_159.avi,polygon,10,lungslidingabsent,"[(556.9000000000015, 201.8000000000011), (555...."
4,MKEDIUJ1,PFFP3KRO,014 2021-10-18_125058_159.avi,polygon,20,lungslidingabsent,"[(556.1000000000004, 207.40000000000146), (555..."
...,...,...,...,...,...,...,...
3193,FPN9AY85,T7EUVCFF,x220819--071841_20220819_MSK_0005s.AVI,polygon,260,lungslidingpresent,"[(365.4625976562511, 203.77187500000036), (361..."
3194,NOVTDJPC,T7EUVCFF,x220819--071841_20220819_MSK_0005s.AVI,polygon,270,lungslidingpresent,"[(371.1701171875011, 203.13808593750036), (366..."
3195,IH7B8HV8,T7EUVCFF,x220819--071841_20220819_MSK_0005s.AVI,polygon,280,lungslidingpresent,"[(371.1701171875011, 202.50332031250036), (366..."
3196,9JIMW44I,T7EUVCFF,x220819--071841_20220819_MSK_0005s.AVI,polygon,290,lungslidingpresent,"[(371.1706054687511, 202.50332031250036), (366..."


# Create an instance ID for each polygon annotation.

In a single frame, multiple polygons can be annotated, potentially with different labels. Additionally, it is possible
to have multiple polygons of the same type within the same frame. In the original dataset, polygons corresponding to the
same observed object across different frames do not share a common reference. Assigning a unique instance ID to each
polygon is crucial for later interpolation operations, as it allows for linking and tracking the same object across frames.


In [22]:
import pandas as pd
import numpy as np
from scipy.optimize import linear_sum_assignment
from typing import List, Tuple, Union, Any

def compute_polygon_centroid(poly: List[Tuple[Union[int, float], Union[int, float]]]) -> np.ndarray:
    """
    Compute the centroid of a polygon represented by a list of (x, y) tuples.
    
    The centroid is calculated as the arithmetic mean of the vertices.
    
    Parameters:
        poly (List[Tuple[Union[int, float], Union[int, float]]]):
            A list of (x, y) tuples representing the polygon's vertices.
    
    Returns:
        np.ndarray: A numpy array containing the (x, y) coordinates of the centroid.
    """
    # Convert the list of points into a numpy array for efficient computation.
    pts = np.array(poly)
    # Calculate the mean of x and y coordinates separately and return as a numpy array.
    return np.array([pts[:, 0].mean(), pts[:, 1].mean()])


def get_polygon_from_row(row: Any) -> List:
    """
    Retrieve polygon data from a row of a DataFrame.
    
    This function checks for polygon data in two possible columns:
      - 'interp_polygon': preferred if available.
      - 'points': used if 'interp_polygon' is not available.
    
    Parameters:
        row (Any): A dictionary-like object (e.g., a pandas Series) representing a row in the DataFrame.
    
    Returns:
        List: A list representing the polygon points (assumed to be (x, y) tuples).
    
    Raises:
        ValueError: If neither 'interp_polygon' nor 'points' is found in the row.
    """
    # Check for interpolated polygon data first.
    poly = row.get('interp_polygon', None)
    if poly is not None:
        return poly
    # Fallback to original points if interpolated data is not available.
    poly = row.get('points', None)
    if poly is not None:
        return poly
    # Raise an error if no polygon data is found.
    raise ValueError("Row does not contain polygon data.")


def assign_tracks_poly(df_group: pd.DataFrame, max_distance: float = 50) -> pd.DataFrame:
    """
    Assign consistent track IDs to polygon annotations across frames.
    
    For a given group (annotations from the same video and with the same polygon label),
    this function assigns a unique "polygon_track_id" to each polygon such that the same
    object is tracked across consecutive frames. This is achieved by comparing the centroids
    of polygons between frames using the Hungarian algorithm to minimize the assignment cost.
    
    Parameters:
        df_group (pd.DataFrame):
            DataFrame containing polygon annotations for a single video and a specific polygon label.
            Expected columns include "frame" and either "interp_polygon" or "points" for polygon data.
        max_distance (float):
            The maximum Euclidean distance allowed for linking two polygons across frames.
            Polygons with a centroid distance exceeding this value are not considered the same object.
    
    Returns:
        pd.DataFrame:
            A new DataFrame with an added "polygon_track_id" column that provides the track IDs.
    """
    # Sort the group by frame to ensure temporal order.
    df_group = df_group.sort_values("frame").reset_index(drop=True)
    
    # Initialize the 'polygon_track_id' column with a default value of -1 (unassigned).
    df_group["polygon_track_id"] = -1
    next_track_id = 0  # Counter for assigning new track IDs.

    # List to store information about polygons from the previous frame.
    # Each element is a dictionary with keys: "track_id" and "centroid".
    prev_tracks = []

    # Array to store the new track IDs corresponding to each row.
    new_ids = np.empty(len(df_group), dtype=int)
    
    # Process the DataFrame frame by frame.
    for frame, frame_df in df_group.groupby("frame"):
        # Get the indices of rows in the current frame.
        idx = frame_df.index.tolist()
        # List to store the computed centroids for polygons in the current frame.
        centroids = []
        for i in idx:
            # Retrieve polygon data from the current row.
            poly = get_polygon_from_row(df_group.loc[i])
            # Compute the centroid for the polygon.
            centroid = compute_polygon_centroid(poly)
            centroids.append(centroid)
        
        # Initialize an array for storing track IDs for polygons in the current frame.
        frame_ids = np.full(len(idx), -1, dtype=int)
        
        if not prev_tracks:
            # For the first frame, assign a new track ID to every polygon.
            for j in range(len(idx)):
                frame_ids[j] = next_track_id
                next_track_id += 1
        else:
            # Build a cost matrix where each entry is the Euclidean distance between
            # a polygon in the previous frame and one in the current frame.
            cost_matrix = np.zeros((len(prev_tracks), len(idx)))
            for i, prev in enumerate(prev_tracks):
                for j, current_centroid in enumerate(centroids):
                    cost_matrix[i, j] = np.linalg.norm(prev["centroid"] - current_centroid)
            
            # Use the Hungarian algorithm to determine the best assignment between previous and current polygons.
            row_ind, col_ind = linear_sum_assignment(cost_matrix)
            
            # For each assignment, if the distance is within the allowed max_distance,
            # assign the same track ID from the previous frame.
            for r, c in zip(row_ind, col_ind):
                if cost_matrix[r, c] <= max_distance:
                    frame_ids[c] = prev_tracks[r]["track_id"]
            
            # For any polygon that did not receive a track ID, assign a new one.
            for j in range(len(idx)):
                if frame_ids[j] == -1:
                    frame_ids[j] = next_track_id
                    next_track_id += 1
        
        # Record the assigned track IDs for the current frame in the new_ids array.
        for k, i in enumerate(idx):
            new_ids[i] = frame_ids[k]
        
        # Update the prev_tracks list with polygons from the current frame for use in the next iteration.
        prev_tracks = []
        for j, i in enumerate(idx):
            prev_tracks.append({
                "track_id": frame_ids[j],
                "centroid": centroids[j]
            })
    
    # Assign the computed track IDs back to the DataFrame.
    df_group["polygon_track_id"] = new_ids
    return df_group


In [23]:
# ----- Example Usage -----
# Assume df_polygons_revision_9 is a DataFrame containing polygon annotations with the following columns:
# "video_id", "frame", "polygon_label", and either "points" or "interp_polygon" for the polygon coordinates.

# Group the DataFrame by 'video_id' and 'polygon_label', then apply the track assignment function to each group.
df_polygons = (
    df_polygons.groupby(["video_id", "polygon_label"], group_keys=False)
             .apply(lambda g: assign_tracks_poly(g, max_distance=50))
             .reset_index(drop=True)
)

# df_with_tracks now contains an additional column "polygon_track_id" that tracks polygon instances across frames.
# df_polygons

# Polygons interpolation
The program addresses a common challenge in video annotation: manually labeling every frame is extremely time-consuming. Typically, experts annotate only every 10th frame (or another interval), which means many intermediate frames lack direct annotations. To solve this, the program includes an interpolation module that calculates the polygon annotations for these in-between frames. By leveraging the annotations from the two nearest labeled frames, it estimates the positions and shapes of the polygons for all intervening frames, ensuring a complete and continuous labeling of the entire video sequence without requiring exhaustive manual work.

In [24]:
import pandas as pd
import numpy as np

def interpolate_points(polyA, polyB, t):
    """
    Linearly interpolate between two polygons that each have the same number of vertices.
    
    Given two polygons, polyA and polyB, each represented as a list of (x, y) tuples, this function computes
    an interpolated polygon by linearly blending each corresponding pair of vertices using the parameter t.
    
    The interpolation formula for each coordinate is:
        interpolated_value = (1 - t) * value_A + t * value_B
    where t is a fraction between 0 and 1:
      - t = 0 yields polyA,
      - t = 1 yields polyB,
      - and values in between yield points along the straight line between the two vertices.
    
    Parameters:
        polyA: list of (x, y) tuples representing the first polygon.
        polyB: list of (x, y) tuples representing the second polygon.
        t: float in [0, 1] representing the interpolation factor.
    
    Returns:
        A list of (x, y) tuples representing the interpolated polygon.
    """
    result = []
    # Iterate over corresponding vertices of both polygons
    for (xA, yA), (xB, yB) in zip(polyA, polyB):
        # Compute the linear interpolation for x and y separately.
        x = (1 - t) * xA + t * xB
        y = (1 - t) * yA + t * yB
        result.append((x, y))
    return result

def interpolate_polygon_track(df_track):
    """
    Interpolate polygon annotations over a track across consecutive frames.
    
    For a single track—identified by a unique combination of video_id and polygon_id—this function:
      - Sorts the annotations by frame.
      - Creates a complete DataFrame that includes every frame from the minimum to the maximum annotated frame.
      - For each consecutive pair of annotated frames, it linearly interpolates the polygon points for 
        the frames in between using linear interpolation.
      - Copies constant identification columns (such as video_id, polygon_id, name_cvat, polygon_label, type)
        from the original annotations to the interpolated frames.
      
    The polygon is assumed to be stored in the "points" column, and the resulting interpolated polygon
    is stored in a new column "interp_polygon".
    
    Mathematical Details:
      - For two annotated frames f_start and f_end with known polygons poly_start and poly_end, the gap is defined
        as (f_end - f_start). For each intermediate frame at an offset 'offset' from f_start, the interpolation 
        factor t is calculated as t = offset / gap.
      - The function then uses the interpolate_points function to calculate the polygon for that frame.
    
    Parameters:
        df_track (pd.DataFrame): DataFrame for a single polygon track with annotations.
        max_distance: (not used in this function, but kept for consistency with overall pipeline).
    
    Returns:
        pd.DataFrame: A DataFrame with an "interp_polygon" column containing interpolated polygon points for
                      every frame in the track.
    """
    # Sort the track by frame to maintain temporal order.
    df_track = df_track.sort_values("frame").reset_index(drop=True)
    
    # Create a DataFrame with one row per frame between the minimum and maximum annotated frames.
    min_frame = df_track["frame"].min()
    max_frame = df_track["frame"].max()
    all_frames = pd.DataFrame({"frame": range(min_frame, max_frame + 1)})
    
    # Merge to ensure that annotated frames retain their original data.
    merged = pd.merge(all_frames, df_track, on="frame", how="left")
    
    # Initialize a column to store interpolated polygon data.
    merged["interp_polygon"] = None
    
    # Identify the indices of rows that already have polygon annotations (i.e., "points" are available).
    annotated_idx = merged[~merged["points"].isna()].index
    
    # Process each pair of consecutive annotated frames.
    for start_i, end_i in zip(annotated_idx, annotated_idx[1:]):
        # Retrieve the frame numbers for the start and end of the current segment.
        f_start = merged.loc[start_i, "frame"]
        f_end = merged.loc[end_i, "frame"]
        
        # Get the polygon points from the start and end frames.
        poly_start = merged.loc[start_i, "points"]  # Expected to be a list of (x,y) tuples.
        poly_end = merged.loc[end_i, "points"]
        
        # Calculate the number of frames between the two annotated frames.
        gap = f_end - f_start
        
        # For each frame in the gap, compute the interpolation factor t and the corresponding polygon.
        for offset in range(gap + 1):
            # If gap is 0 (shouldn't happen, but for safety), t defaults to 0.
            t = offset / float(gap) if gap else 0
            # Compute the interpolated polygon using linear interpolation on each vertex.
            poly_interp = interpolate_points(poly_start, poly_end, t)
            # Store the interpolated polygon in the corresponding row.
            merged.at[start_i + offset, "interp_polygon"] = poly_interp
    
    # For frames that were originally annotated, ensure that interp_polygon matches the original points.
    merged.loc[annotated_idx, "interp_polygon"] = merged.loc[annotated_idx, "points"]
    
    # Propagate constant identification columns from the original annotations to every row.
    for col in ["video_id", "polygon_id", "name_cvat", "polygon_label", "type"]:
        if col in df_track.columns:
            merged[col] = df_track[col].iloc[0]
    
    return merged

def interpolate_all_polygons(df, num_points=None):
    """
    Interpolate polygon annotations for all polygon tracks in a DataFrame.
    
    This function processes a DataFrame that contains polygon annotations with columns such as
    video_id, polygon_id, frame, and points. It groups the DataFrame by video_id and polygon_track_id,
    and then applies interpolation for each group. The result is a DataFrame that contains an interpolated
    polygon ("interp_polygon") for every frame in the range of each polygon track.
    
    Note:
      - It assumes that the polygon points in "points" have the same number of vertices across annotations.
        If they do not, a resampling step might be necessary prior to interpolation.
    
    Parameters:
        df (pd.DataFrame): DataFrame containing polygon annotations.
        num_points (optional): Parameter reserved for potential resampling of polygon vertices (not used here).
    
    Returns:
        pd.DataFrame: A new DataFrame with interpolated polygon annotations for each frame.
    """
    df_result = (
        df.groupby(["video_id", "type", "polygon_label","polygon_track_id"], group_keys=True)
          .apply(interpolate_polygon_track)
          .reset_index(drop=True)
    )
    return df_result


In [26]:
print(f'Polygons count before interpolation -> {df_polygons.shape[0]}')
df_polygons = interpolate_all_polygons(df_polygons)  # filter for a given video)
print(f'Polygons count before interpolation -> {df_polygons.shape[0]}')

Polygons count before interpolation -> 25758
Polygons count before interpolation -> 25758


In [27]:
df_polygons

Unnamed: 0,frame,polygon_id,video_id,name_cvat,type,polygon_label,points,polygon_track_id,interp_polygon
0,0,6T8OQ8BX,0CJ4LA0L,x211119--131441_20211119_MSK_0004 LPB.AVI,polygon,lungslidingabsent,"[(437.1181640625, 223.267578125), (437.1182141...",0.0,"[(437.1181640625, 223.267578125), (437.1182141..."
1,1,6T8OQ8BX,0CJ4LA0L,x211119--131441_20211119_MSK_0004 LPB.AVI,polygon,lungslidingabsent,,,"[(438.00634765625006, 223.43082031250015), (43..."
2,2,6T8OQ8BX,0CJ4LA0L,x211119--131441_20211119_MSK_0004 LPB.AVI,polygon,lungslidingabsent,,,"[(438.89453125, 223.59406250000032), (439.3945..."
3,3,6T8OQ8BX,0CJ4LA0L,x211119--131441_20211119_MSK_0004 LPB.AVI,polygon,lungslidingabsent,,,"[(439.78271484375, 223.75730468750044), (440.5..."
4,4,6T8OQ8BX,0CJ4LA0L,x211119--131441_20211119_MSK_0004 LPB.AVI,polygon,lungslidingabsent,,,"[(440.6708984375, 223.92054687500058), (441.67..."
...,...,...,...,...,...,...,...,...,...
25753,295,AZVODFRA,Z8CJVCN7,x220721--094147_20220721_MSK_0001 s.AVI,polygon,lungslidingpresent,,,"[(333.83740234375, 257.38916015625), (330.0324..."
25754,296,AZVODFRA,Z8CJVCN7,x220721--094147_20220721_MSK_0001 s.AVI,polygon,lungslidingpresent,,,"[(333.83740234375, 257.38916015625), (330.0324..."
25755,297,AZVODFRA,Z8CJVCN7,x220721--094147_20220721_MSK_0001 s.AVI,polygon,lungslidingpresent,,,"[(333.83740234375, 257.38916015625), (330.0324..."
25756,298,AZVODFRA,Z8CJVCN7,x220721--094147_20220721_MSK_0001 s.AVI,polygon,lungslidingpresent,,,"[(333.83740234375, 257.38916015625), (330.0324..."


# Create merge of pleuras - presnet & absent

In [33]:
import pandas as pd
import numpy as np
from shapely.geometry import Polygon
from shapely.ops import unary_union
import pandas as pd
import numpy as np
from shapely.ops import unary_union, nearest_points
# The helper function 'row_to_poly' is assumed to convert a DataFrame row containing a list of (x,y) points
# into a Shapely Polygon object. This is a crucial step because subsequent operations (union, intersection, distance)
# require working with geometric objects.
def row_to_poly(row):
    """
    Retrieve polygon data from a row.
    Prefer 'interp_polygon'; if not present, use 'points'.
    Return a Shapely Polygon, or None if conversion fails.
    """
    
    poly_data = row.get("interp_polygon")
    if poly_data is None:
        poly_data = row.get("interp_polygon")
    try:
        return Polygon(poly_data)
    except Exception as e:
        return None


In [34]:
def merge_lung_sliding_polygons(df, merge_threshold=20):
    """
    For each video and frame in the input DataFrame (which must include polygon data in either 
    "interp_polygon" or "points"), check if both 'lungslidingabsent' and 'lungslidingpresent' polygons exist.
    
    If the polygons intersect or if the minimum distance between them is less than merge_threshold,
    two new rows are created:puted lungpoint (as a point geometry) labeled as "lungsliding".
    
    The lungpoint is determined as follows:
      - If the polygons intersect, it is the centroid of their intersection.
      - If they do not intersect
      - One row with the merged polygon labeled as "lungpointpleura".
      - Another row with the com, it is the midpoint between the nearest points on each polygon.
    
    Parameters:
        df (pd.DataFrame): The DataFrame containing the polygon annotations.
        merge_threshold (float): Maximum distance between polygons for merging.
    
    Returns:
        pd.DataFrame: Original DataFrame with the new merged rows appended.
    """
    new_rows = []      # Store new rows for the merged polygons and lungpoints.
    video_ids_seen = set()  # Track processed video IDs.

    # Group by video_id and frame to process each frame individually.
    grouped = df.groupby(["video_id", "frame"])
    for (video_id, frame), group in grouped:
        video_ids_seen.add(video_id)
        
        # Filter the rows by the two specific polygon labels.
        absent = group[group["polygon_label"] == "lungslidingabsent"]
        present = group[group["polygon_label"] == "lungslidingpresent"]

        # Process only if both annotations are present.
        if absent.empty or present.empty:
            continue

        # Convert rows to Shapely Polygons for 'absent'.
        polys_abs = []
        for idx, row in absent.iterrows():
            poly = row_to_poly(row)
            if poly is not None:
                if not poly.is_valid:
                    poly = poly.buffer(0)
                polys_abs.append(poly)
        # Convert rows to Shapely Polygons for 'present'.
        polys_pres = []
        for idx, row in present.iterrows():
            poly = row_to_poly(row)
            if poly is not None:
                if not poly.is_valid:
                    poly = poly.buffer(0)
                polys_pres.append(poly)

        # Skip if polygon conversion failed.
        if not polys_abs or not polys_pres:
            continue

        # Compute the union of polygons for each annotation type.
        union_abs = unary_union(polys_abs)
        union_pres = unary_union(polys_pres)

        # Initialize lungpoint as None.
        lungpoint = None
        
        # Check if the unions intersect or are close enough.
        if union_abs.intersects(union_pres) or (union_abs.distance(union_pres) < merge_threshold):
            # Merge the unions.
            merged_poly = union_abs.union(union_pres)
            if merged_poly.geom_type == "MultiPolygon":
                merged_poly = merged_poly.convex_hull

            # Determine the lungpoint location.
            if union_abs.intersects(union_pres):
                # Use the centroid of the intersection.
                intersection = union_abs.intersection(union_pres)
                lungpoint = intersection.centroid
            else:
                # Compute nearest points and then the midpoint.
                p1, p2 = nearest_points(union_abs, union_pres)
                lungpoint = type(p1)([ (p1.x + p2.x) / 2.0, (p1.y + p2.y) / 2.0 ])
            
            # Create a new row for the merged polygon with label "lungpointpleura".
            merged_row = {
                "video_id": video_id,
                "frame": frame,
                "name_cvat": group["name_cvat"].iloc[0] if "name_cvat" in group.columns else None,
                "type": group["type"].iloc[0] if "type" in group.columns else None,
                "polygon_label": "lungpointpleura",
                "interp_polygon": list(merged_poly.exterior.coords),
                "points": list(merged_poly.exterior.coords)
            }
            new_rows.append(merged_row)
            
            # Create a separate row for the lungpoint (as a point geometry) with label "lungsliding".
            point_row = {
                "video_id": video_id,
                "frame": frame,
                "name_cvat": group["name_cvat"].iloc[0] if "name_cvat" in group.columns else None,
                "type": group["type"].iloc[0] if "type" in group.columns else None,
                "polygon_label": "lungpoint",
                # Represent the point geometry as a single coordinate in a list.
                "interp_polygon": [(lungpoint.x, lungpoint.y)],
                "points": [(lungpoint.x, lungpoint.y)]
            }
            new_rows.append(point_row)
    
    print(f"Processed {len(video_ids_seen)} videos.")
    # Append the new rows to the original DataFrame if any were created.
    if new_rows:
        df_new = pd.DataFrame(new_rows)
        df_out = pd.concat([df, df_new], ignore_index=True)
        return df_out
    else:
        print("No merged polygons created.")
        return df


In [41]:
# ----- Example Usage -----
# Assume df_interpolated is your DataFrame after polygon interpolation.
# It should include columns like: "video_id_x", "frame", "name_cvat", "type", "polygon_label",
# and either "interp_polygon" or "points" (each as a list of (x,y) tuples).

df_polygons = merge_lung_sliding_polygons(df_polygons, merge_threshold=60)
# print("Merged rows with 'lungpointpleura':")
# print(df_merged[df_merged["polygon_label"] == "lungpointpleura"].head())


Processed 52 videos.


In [42]:
df_polygons

Unnamed: 0,frame,polygon_id,video_id,name_cvat,type,polygon_label,points,polygon_track_id,interp_polygon
0,0,6T8OQ8BX,0CJ4LA0L,x211119--131441_20211119_MSK_0004 LPB.AVI,polygon,lungslidingabsent,"[(437.1181640625, 223.267578125), (437.1182141...",0.0,"[(437.1181640625, 223.267578125), (437.1182141..."
1,1,6T8OQ8BX,0CJ4LA0L,x211119--131441_20211119_MSK_0004 LPB.AVI,polygon,lungslidingabsent,,,"[(438.00634765625006, 223.43082031250015), (43..."
2,2,6T8OQ8BX,0CJ4LA0L,x211119--131441_20211119_MSK_0004 LPB.AVI,polygon,lungslidingabsent,,,"[(438.89453125, 223.59406250000032), (439.3945..."
3,3,6T8OQ8BX,0CJ4LA0L,x211119--131441_20211119_MSK_0004 LPB.AVI,polygon,lungslidingabsent,,,"[(439.78271484375, 223.75730468750044), (440.5..."
4,4,6T8OQ8BX,0CJ4LA0L,x211119--131441_20211119_MSK_0004 LPB.AVI,polygon,lungslidingabsent,,,"[(440.6708984375, 223.92054687500058), (441.67..."
...,...,...,...,...,...,...,...,...,...
46365,297,,YOBBHM8M,x220425--091211_20220425_MSK_0004lp.AVI,polygon,lungpoint,"[(403.48615050862213, 255.95520820372263)]",,"[(403.48615050862213, 255.95520820372263)]"
46366,298,,YOBBHM8M,x220425--091211_20220425_MSK_0004lp.AVI,polygon,lungpointpleura,"[(424.5855183361883, 239.41720878850174), (399...",,"[(424.5855183361883, 239.41720878850174), (399..."
46367,298,,YOBBHM8M,x220425--091211_20220425_MSK_0004lp.AVI,polygon,lungpoint,"[(403.5353102191025, 255.94643054296756)]",,"[(403.5353102191025, 255.94643054296756)]"
46368,299,,YOBBHM8M,x220425--091211_20220425_MSK_0004lp.AVI,polygon,lungpointpleura,"[(424.6869723292439, 239.41720878850174), (399...",,"[(424.6869723292439, 239.41720878850174), (399..."


# Extract boudingboxes

In [46]:
import ast
import numpy as np

def extract_bbox(row):
    """
    Extracts the bounding box from a row containing polygon points.
    
    Expects row["interp_polygon"] to be either:
      - a list of (x, y) tuples, e.g.
            [(437.1181640625, 223.267578125), (437.1182141045156, 241.65905631659007), ...]
      - or a string representation of such a list.
      
    The bounding box is computed as:
      - bb_min_x: minimum x-coordinate
      - bb_max_x: maximum x-coordinate
      - bb_min_y: minimum y-coordinate
      - bb_max_y: maximum y-coordinate
    
    Parameters:
        row : pd.Series
            A row of a DataFrame containing the "interp_polygon" column.
    
    Returns:
        row : pd.Series
            The input row with added keys: "bb_min_x", "bb_max_x", "bb_min_y", "bb_max_y".
    """
    points = row["interp_polygon"]
    # If points is a string, parse it into a list.
    if isinstance(points, str):
        points_list = np.array(ast.literal_eval(points))
    else:
        points_list = np.array(points)
    
    # points_list is expected to be of shape (n, 2), where n is the number of vertices.
    row["bb_min_x"] = points_list[:, 0].min()
    row["bb_max_x"] = points_list[:, 0].max()
    row["bb_min_y"] = points_list[:, 1].min()
    row["bb_max_y"] = points_list[:, 1].max()
    return row

# Example usage:
df = df_polygons.copy()
df_polygons = df.apply(extract_bbox, axis=1)


In [47]:
df_polygons.to_json('/home/mh731nk/_data/experiments_tmp/data/revision_8/lp_polygons.json', orient='index', indent=4)