In [None]:
import os
import cv2

frame_id = 0
person_num = 0
video_clip = None
video_clip_set = []
vid_len = len(os.listdir('frames'))
while frame_id < vid_len:
    frame = cv2.imread('frames/%d.jpg' % frame_id)
    if frame is None:
        print("Warning: Could not read image file: frames/%d.jpg" % frame_id)
        frame_id += 1
        continue  # Skip to the next frame
    w, h, c = frame.shape
    txt_path = 'result/labels/%d.txt' % frame_id
    
    # Check if the text file exists
    face_bbox = []
    if os.path.exists(txt_path):
        try:
            with open(txt_path, 'r') as f:
                # Iterate through each line
                for line in f.readlines():
                    line = line.strip()
                    line = line.split(' ')
                    for i in range(len(line)):
                        line[i] = eval(line[i])
                    # Store data from each line into dictionary
                    if line[0] == 1:
                        face_bbox.append([(line[1]), (line[2]), (line[3]), (line[4])])
        except Exception as e:
            print(f"Error reading file {txt_path}: {e}")
    else:
        # File doesn't exist - likely no detections
        print(f"No detections in frame {frame_id} (file {txt_path} not found)")
    
    # Sort by first dimension
    if face_bbox:  # This will be False if the list is empty
        face_bbox = sorted(face_bbox, key=lambda x: x[0])
        cur_person_num = len(face_bbox)
    else:
        cur_person_num = 0
    
    # Initialize video_clip if it's None
    if video_clip is None:
        video_clip = {'frame_id': [], 'person_num': cur_person_num}
        video_clip['frame_id'].append(frame_id)
        for i in range(cur_person_num):
            video_clip['p'+str(i)] = [face_bbox[i]]
    elif cur_person_num != person_num:
        # Person count changed, save current clip and start a new one
        video_clip_set.append(video_clip)
        video_clip = {'frame_id': [], 'person_num': cur_person_num}
        video_clip['frame_id'].append(frame_id)
        for i in range(cur_person_num):
            video_clip['p'+str(i)] = [face_bbox[i]]
    else:
        # Same number of people, add to current clip
        video_clip['frame_id'].append(frame_id)
        for i in range(cur_person_num):
            video_clip['p'+str(i)].append(face_bbox[i])
    
    person_num = cur_person_num
    frame_id += 1

# Add the last video clip if it exists
if video_clip is not None:
    video_clip_set.append(video_clip)

In [None]:
from mmdet.apis import init_detector
from mmdet.datasets.pipelines import Compose
import torch
from mmcv.parallel import collate, scatter
import numpy as np
model = init_detector(
        '/home/s14-htx/Documents/kailin/all_my_stuff/MCGaze/configs/multiclue_gaze/multiclue_gaze_r50_l2cs.py',
        '/home/s14-htx/Documents/kailin/all_my_stuff/MCGaze/ckpts/multiclue_gaze_r50_l2cs.pth',
        device="cuda:0",
        cfg_options=None,)
cfg = model.cfg


In [None]:
print(cfg.data.test.pipeline[1:])
test_pipeline = Compose(cfg.data.test.pipeline[1:])

def load_datas(data, test_pipeline, datas):
    datas.append(test_pipeline(data))

In [None]:
def infer(datas,model,clip,i):
    datas = sorted(datas, key=lambda x:x['img_metas'].data['filename']) # 按帧顺序 img名称从小到大
    datas = collate(datas, samples_per_gpu=len(frame_id)) # 用来形成batch用的
    datas['img_metas'] = datas['img_metas'].data
    datas['img'] = datas['img'].data
    datas = scatter(datas, ["cuda:0"])[0]
    with torch.no_grad():
        (det_bboxes, det_labels), det_gazes = model(
                return_loss=False,
                rescale=True,
                format=False,# 返回的bbox既包含face_bboxes也包含head_bboxes
                **datas)    # 返回的bbox格式是[x1,y1,x2,y2],根据return_loss函数来判断是forward_train还是forward_test.
    gaze_dim = det_gazes['gaze_score'].size(1)
    det_fusion_gaze = det_gazes['gaze_score'].view((det_gazes['gaze_score'].shape[0], 1, gaze_dim))
    clip['gaze_p'+str(i)].append(det_fusion_gaze.cpu().numpy()) 

max_len = 100
for clip in video_clip_set:
    frame_id = clip['frame_id']
    person_num = clip['person_num']
    for i in range(person_num):
        head_bboxes = clip['p'+str(i)]
        clip['gaze_p'+str(i)] = []
        datas = []
        for j,frame in enumerate(frame_id):
            cur_img = cv2.imread("frames/"+str(frame)+".jpg")
            w,h,_ = cur_img.shape
            for xy in head_bboxes[j]:
                xy = int(xy)
            head_center = [int(head_bboxes[j][1]+head_bboxes[j][3])//2,int(head_bboxes[j][0]+head_bboxes[j][2])//2]
            l = int(max(head_bboxes[j][3]-head_bboxes[j][1],head_bboxes[j][2]-head_bboxes[j][0])*0.8)
            head_crop = cur_img[max(0,head_center[0]-l):min(head_center[0]+l,w),max(0,head_center[1]-l):min(head_center[1]+l,h),:]
            w_n,h_n,_ = head_crop.shape
            # if frame==0:
            #     plt.imshow(head_crop)
            # print(head_crop.shape)
            cur_data = dict(filename=j,ori_filename=111,img=head_crop,img_shape=(w_n,h_n,3),ori_shape=(2*l,2*l,3),img_fields=['img'])
            load_datas(cur_data,test_pipeline,datas)
            if len(datas)>max_len or j==(len(frame_id)-1):
                infer(datas,model,clip,i)
                datas = []
                if j==(len(frame_id)-1):
                    clip['gaze_p'+str(i)] = np.concatenate(clip['gaze_p'+str(i)],axis=0)


In [None]:
import cv2
import math
from collections import defaultdict

# Configuration
TOLERANCE_DEG = 114
EYE_LEVEL_TOLERANCE = 18
PARALLEL_EPSILON = 1e-6          # Threshold for considering lines parallel

# Tracking dictionaries
eye_contact_log = []  # Format: [frame_id, detected, theta_deg]
eye_level_log = []    # Format: [frame_id, detected, min_y_diff]
joint_attention_log = []  # Format: [frame_id, detected, point]
joint_attention_streaks = defaultdict(lambda: {'count': 0, 'points': []})

def check_eye_level(head_centers, tolerance):
    for j in range(len(head_centers)):
        for k in range(j+1, len(head_centers)):
            y_diff = abs(head_centers[j][1] - head_centers[k][1])
            if y_diff <= tolerance:
                return True
    return False

def calculate_intersection(p1, v1, p2, v2):
    """Calculate intersection point of two gaze vectors"""
    # Check for zero vectors
    if (v1[0] == 0 and v1[1] == 0) or (v2[0] == 0 and v2[1] == 0):
        return None
    
    # Calculate determinant
    D = v1[1] * v2[0] - v1[0] * v2[1]
    if abs(D) < PARALLEL_EPSILON:
        return None  # Lines are parallel
    
    # Calculate t and s parameters
    numerator_t = (p2[0] - p1[0]) * (-v2[1]) + v2[0] * (p2[1] - p1[1])
    numerator_s = v1[0] * (p2[1] - p1[1]) - v1[1] * (p2[0] - p1[0])
    t = numerator_t / D
    s = numerator_s / D
    
    # Check if intersection is in front of both subjects
    if t >= 0 and s >= 0:
        return (
            p1[0] + t * v1[0],
            p1[1] + t * v1[1]
        )
    return None

def process_joint_attention(streaks, frame_id):
    """Process joint attention points - no consecutive frames requirement"""
    detected_points = []
    for pair_key in list(streaks.keys()):
        streak = streaks[pair_key]
        
        if len(streak['points']) > 0:
            # Just use the most recent point
            point = streak['points'][-1]
            detected_points.append(point)
            joint_attention_log.append([frame_id, True, point])
        else:
            del streaks[pair_key]
    return detected_points

for vid_clip in video_clip_set:
    for i, frame_id in enumerate(vid_clip['frame_id']):
        cur_img = cv2.imread(f"frames/{vid_clip['frame_id'][i]}.jpg")
        person_count = vid_clip['person_num']
        current_gazes = []
        head_centers = []

        # Collect gaze data
        for j in range(person_count):
            gaze = vid_clip[f'gaze_p{j}'][i][0]
            head_bboxes = [int(xy) for xy in vid_clip[f'p{j}'][i]]
            head_center = (
                (head_bboxes[0] + head_bboxes[2]) // 2,
                (head_bboxes[1] + head_bboxes[3]) // 2
            )
            current_gazes.append(gaze)
            head_centers.append(head_center)

        eye_contact_detected = False
        max_theta_deg = 0
        
        # Check all pairs for maximum angle between gaze vectors
        for j in range(person_count):
            for k in range(j+1, person_count):
                u = current_gazes[j]
                v = current_gazes[k]
                dot_product = u[0]*v[0] + u[1]*v[1]
                mag_u = math.hypot(u[0], u[1])
                mag_v = math.hypot(v[0], v[1])
                
                if mag_u * mag_v == 0:
                    continue
                
                cos_theta = dot_product / (mag_u * mag_v)
                theta_deg = math.degrees(math.acos(max(min(cos_theta, 1.0), -1.0)))
                
                # Always update max_theta_deg regardless of threshold
                max_theta_deg = max(max_theta_deg, theta_deg)
                
                # Check if this pair has eye contact
                if theta_deg >= (180 - TOLERANCE_DEG):
                    eye_contact_detected = True
                    
        # If no valid pairs were found (e.g., only one person), set max_theta_deg to 0
        if person_count < 2 or max_theta_deg == 0:
            max_theta_deg = 0

        # Detect eye level - always calculate minimum vertical difference
        eye_level_detected = False
        min_y_diff = float('inf')
        
        # Check all pairs for minimum vertical difference between heads
        if len(head_centers) >= 2:
            for j in range(len(head_centers)):
                for k in range(j+1, len(head_centers)):
                    y_diff = abs(head_centers[j][1] - head_centers[k][1])
                    min_y_diff = min(min_y_diff, y_diff)
                    
                    # Check if this pair is at eye level
                    if y_diff <= EYE_LEVEL_TOLERANCE:
                        eye_level_detected = True
        
        # If no valid pairs were found (e.g., only one person), set min_y_diff to -1
        if min_y_diff == float('inf'):
            min_y_diff = -1
            
        # Always record both metrics, regardless of detection status
        # For eye contact, we track the maximum angle between any gaze vectors
        # even if it doesn't exceed our threshold for eye contact detection
        eye_contact_log.append([frame_id, eye_contact_detected, max_theta_deg])
        
        # For eye level, we track the minimum vertical difference between any heads
        # even if it doesn't meet our threshold for eye level detection
        eye_level_log.append([frame_id, eye_level_detected, min_y_diff])

        # Add visual annotations
        y_pos = 50
        if eye_contact_detected:
            cv2.putText(cur_img, "Eye Contact Detected", (50, y_pos),
                        cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 255), 2)
            y_pos += 40
            
        # Joint attention detection
        current_intersections = []
        for j in range(person_count):
            for k in range(j+1, person_count):
                p1 = head_centers[j]
                v1 = current_gazes[j]
                p2 = head_centers[k]
                v2 = current_gazes[k]
                
                # Define pair_key here first
                pair_key = (j, k)
                
                # Calculate intersection
                point = calculate_intersection(p1, v1, p2, v2)
                if point:
                    # No need to increment count since we're not using it anymore
                    joint_attention_streaks[pair_key]['points'] = [point]  # Just keep the current point
                    current_intersections.append(point)
                else:
                    if pair_key in joint_attention_streaks:
                        del joint_attention_streaks[pair_key]

        # Process valid joint attention
        valid_points = process_joint_attention(joint_attention_streaks, frame_id)
        has_joint_attention = len(valid_points) > 0
        
        # Record joint attention for this frame (even if not detected)
        if not has_joint_attention:
            joint_attention_log.append([frame_id, False, None])

        # Visualization
        y_pos = 50
        # Draw gaze arrows
        for j in range(person_count):
            gaze = current_gazes[j]
            head_center = head_centers[j]
            l = int(max(
                vid_clip[f'p{j}'][i][3] - vid_clip[f'p{j}'][i][1],
                vid_clip[f'p{j}'][i][2] - vid_clip[f'p{j}'][i][0]
            )) * 1
            gaze_len = l * 1.0
            thick = max(5, int(l * 0.01))
            
            end_point = (
                int(head_center[0] - gaze_len * gaze[0]),
                int(head_center[1] - gaze_len * gaze[1])
            )
            cv2.arrowedLine(cur_img, head_center, end_point,
                           (230, 253, 11), thickness=thick)

        # Save frame
        cv2.imwrite(f'new_frames/{frame_id}.jpg', cur_img)

# Save combined log with all metrics
with open(f'{nameonly}_gaze_behavior_log.txt', 'w') as f:
    # Write header
    f.write("Frame\tEyeContact\tEyeContact_Angle\tEyeLevel\tEyeLevel_VertDiff\tJointAttention\tJA_X\tJA_Y\n")
    
    # Create a dictionary to organize all data by frame
    all_data = {}
    
    # Process eye contact data
    for frame_id, detected, theta_deg in eye_contact_log:
        if frame_id not in all_data:
            all_data[frame_id] = {"eye_contact": (detected, theta_deg)}
        else:
            all_data[frame_id]["eye_contact"] = (detected, theta_deg)
    
    # Process eye level data
    for frame_id, detected, y_diff in eye_level_log:
        if frame_id not in all_data:
            all_data[frame_id] = {"eye_level": (detected, y_diff)}
        else:
            all_data[frame_id]["eye_level"] = (detected, y_diff)
    
    # Process joint attention data
    for frame_id, detected, point in joint_attention_log:
        if frame_id not in all_data:
            all_data[frame_id] = {"joint_attention": (detected, point)}
        else:
            all_data[frame_id]["joint_attention"] = (detected, point)
    
    # Write all data in frame order
    for frame_id in sorted(all_data.keys()):
        data = all_data[frame_id]
        
        # Default values if data is missing
        ec_detected = "False"
        ec_angle = "N/A"
        el_detected = "False"
        el_diff = "N/A"
        ja_detected = "False"
        ja_x = "N/A"
        ja_y = "N/A"
        
        # Fill in eye contact data if available
        if "eye_contact" in data:
            ec_detected, ec_angle = data["eye_contact"]
            # Always format the angle, even if eye contact wasn't detected
            ec_angle = f"{ec_angle:.2f}" if isinstance(ec_angle, (int, float)) else "0.00"
        
        # Fill in eye level data if available
        if "eye_level" in data:
            el_detected, el_diff = data["eye_level"]
            # Ensure we have a numeric value for vertical difference
            el_diff = el_diff if el_diff != -1 else "N/A"
        
        # Fill in joint attention data if available
        if "joint_attention" in data:
            ja_detected, point = data["joint_attention"]
            if ja_detected and point:
                ja_x = f"{point[0]:.1f}"
                ja_y = f"{point[1]:.1f}"
        
        # Write the line
        f.write(f"{frame_id}\t{ec_detected}\t{ec_angle}\t{el_detected}\t{el_diff}\t{ja_detected}\t{ja_x}\t{ja_y}\n")

In [None]:
import pandas as pd
import numpy as np
import os

def majority_filter(binary_array, window_size=5):
    """
    Apply a majority filter to smooth binary detection results.
    This will convert patterns like tttfttt to ttttttt.
    
    Parameters:
    -----------
    binary_array : array-like
        Array of boolean values to smooth
    window_size : int
        Size of the sliding window (should be odd)
        
    Returns:
    --------
    numpy.ndarray
        Smoothed boolean array
    """
    smoothed = np.copy(binary_array)
    half_window = window_size // 2
    
    for i in range(len(binary_array)):
        # Define window boundaries with edge handling
        start = max(0, i - half_window)
        end = min(len(binary_array), i + half_window + 1)
        
        # Count True values in the window
        window = binary_array[start:end]
        true_count = np.sum(window)
        
        # Set to True if majority are True
        smoothed[i] = true_count > (end - start) / 2
        
    return smoothed

def minimum_duration_filter(binary_array, min_duration=15):
    """
    Filter out brief state changes that don't persist for min_duration frames.
    This will convert patterns like ffftfff to fffffff.
    
    Parameters:
    -----------
    binary_array : array-like
        Array of boolean values to filter
    min_duration : int
        Minimum number of consecutive frames required to maintain a state
        
    Returns:
    --------
    numpy.ndarray
        Filtered boolean array
    """
    smoothed = np.copy(binary_array)
    
    # Find runs of True states
    i = 0
    while i < len(binary_array):
        if binary_array[i]:
            # Found a True state, check its duration
            start = i
            while i < len(binary_array) and binary_array[i]:
                i += 1
            
            duration = i - start
            
            # If duration is less than minimum and surrounded by False states
            if duration < min_duration:
                # Check if it's an isolated island (surrounded by False)
                is_isolated = (start == 0 or not binary_array[start-1]) and \
                             (i == len(binary_array) or not binary_array[i])
                
                if is_isolated:
                    # Fill with False values
                    smoothed[start:i] = [False] * duration
        else:
            i += 1
            
    return smoothed

def smooth_eye_detection( df, min_duration, window_size):
    

    columns_to_smooth = ['EyeContact', 'EyeLevel', 'JointAttention']
    
    # Apply filters to each column
    for col in columns_to_smooth:
        if col in df.columns:
            # Convert column to boolean if it's not already
            binary_array = df[col].astype(bool).values
            
            # First apply minimum duration filter to eliminate brief True sequences
            filtered = minimum_duration_filter(binary_array, min_duration)
            
            # Then apply majority filter to smooth out brief False interruptions
            smoothed = majority_filter(filtered, window_size)
            
            # Create a new column with the smoothed values
            df[f'{col}_Smoothed'] = smoothed
            
            # Print statistics about changes
            changes = np.sum(binary_array != smoothed)
            # print(f"Applied smoothing to {col}: {changes} values changed ({changes/len(binary_array):.2%})")
    
    return df


In [None]:
import pandas as pd
import numpy as np
import os

def filter_short_interactions(df, fps=30,smooth=bool, eye_contact_min_seconds=1, joint_attention_min_seconds=3):
    """
    Filter out eye contact sections shorter than the specified duration and
    joint attention sections shorter than the specified duration.
    
    Parameters:
    -----------
    df : pandas.DataFrame
        DataFrame containing interaction data with SourceFile, Frame, EyeContact_Smoothed, and JointAttention_Smoothed columns
    fps : int
        Frames per second of the videos (default: 30)
    eye_contact_min_seconds : float
        Minimum duration in seconds for eye contact sections to keep (default: 1)
    joint_attention_min_seconds : float
        Minimum duration in seconds for joint attention sections to keep (default: 3)
    
    Returns:
    --------
    pandas.DataFrame
        DataFrame with short interactions filtered out
    """
    # Calculate minimum frames for each interaction type
    eye_contact_min_frames = int(eye_contact_min_seconds * fps)
    joint_attention_min_frames = int(joint_attention_min_seconds * fps)
    
    # Create a copy of the DataFrame to avoid modifying the original
    filtered_df = df.copy()
    if smooth==True:
        eyecontact='EyeContact_Smoothed'
        jointattention='JointAttention_Smoothed'
    else:
        eyecontact='EyeContact'
        jointattention='JointAttention'
    # Make sure the interaction columns exist
    if eyecontact not in filtered_df.columns:
        print("Warning: 'EyeContact' column not found in the DataFrame")
    if 'JointAttention_Smoothed' not in filtered_df.columns:
        print("Warning: 'JointAttention_Smoothed' column not found in the DataFrame")
    
    # Process each video separately
    for source_file, video_df in filtered_df.groupby('SourceFile'):
        # print(f"Processing video: {source_file}")
        
        # Get indices for this video group to update the original DataFrame
        video_indices = video_df.index
        
        # Sort by frame number to ensure temporal order
        video_df = video_df.sort_values('Frame')
        
        # Process eye contact interactions
        if eyecontact in video_df.columns:
            # Convert to numpy array for faster processing
            eye_contact_array = video_df[eyecontact].values.astype(bool)
            
            # Find continuous sections of eye contact
            sections = []
            in_section = False
            start_idx = 0
            
            for i in range(len(eye_contact_array)):
                if eye_contact_array[i] and not in_section:
                    # Start of a new section
                    in_section = True
                    start_idx = i
                elif not eye_contact_array[i] and in_section:
                    # End of a section
                    in_section = False
                    duration = i - start_idx
                    sections.append((start_idx, i, duration))
            
            # Handle case where the last section extends to the end
            if in_section:
                duration = len(eye_contact_array) - start_idx
                sections.append((start_idx, len(eye_contact_array), duration))
            
            # Filter out short sections
            for start, end, duration in sections:
                if duration < eye_contact_min_frames:
                    # Set this section to False
                    eye_contact_array[start:end] = False
            
            # Update the DataFrame with filtered array
            filtered_df.loc[video_indices, eyecontact] = eye_contact_array
        
        # Process joint attention interactions
        if jointattention in video_df.columns:
            # Convert to numpy array for faster processing
            joint_att_array = video_df[jointattention].values.astype(bool)
            
            # Find continuous sections of joint attention
            sections = []
            in_section = False
            start_idx = 0
            
            for i in range(len(joint_att_array)):
                if joint_att_array[i] and not in_section:
                    # Start of a new section
                    in_section = True
                    start_idx = i
                elif not joint_att_array[i] and in_section:
                    # End of a section
                    in_section = False
                    duration = i - start_idx
                    sections.append((start_idx, i, duration))
            
            # Handle case where the last section extends to the end
            if in_section:
                duration = len(joint_att_array) - start_idx
                sections.append((start_idx, len(joint_att_array), duration))
            
            # Filter out short sections
            for start, end, duration in sections:
                if duration < joint_attention_min_frames:
                    # Set this section to False
                    joint_att_array[start:end] = False
            
            # Update the DataFrame with filtered array
            filtered_df.loc[video_indices, jointattention] = joint_att_array
    
    return filtered_df

In [None]:

df = pd.read_csv(f'{nameonly}_gaze_behavior_log.txt', sep='\t')

smooth=True
window_size=5
min_duration=15

# df= adjust_angles(df, smooth, type)
df= smooth_eye_detection( df, min_duration, window_size)
filtered_df = filter_short_interactions(
    df,
    fps=30,  # Adjust based on your video frame rate
    eye_contact_min_seconds=1,
    joint_attention_min_seconds=3
)

In [None]:
def add_interactions_to_frames(df, nameonly fps=30):
    """
    Add detected interaction texts to frames and create a video.
    
    Parameters:
    -----------
    df : DataFrame
        DataFrame with interaction data
    fps : int
        Frames per second for the output video
    """
    interaction_columns = ['EyeContact_Smoothed', 'EyeLevel_Smoothed', 'JointAttention_Smoothed']
    
    # Convert interaction columns to boolean if they exist
    for col in interaction_columns:
        if col in df.columns:
            df[col] = df[col].astype(str).str.lower() == 'true'
    
    img = cv2.imread('new_frames/0.jpg')  #读取第一张图片
    fps = 30
    imgInfo = img.shape
    size = (imgInfo[1],imgInfo[0]) 
    
    fourcc = cv2.VideoWriter_fourcc(*"mp4v")
    videoWrite = cv2.VideoWriter(f'{nameonly}_results.mp4',fourcc,fps,size)# 根据图片的大小，创建写入对象 （文件名，支持的编码器，25帧，视频大小（图片大小））
 
    frame_to_interactions = {}
    for _, row in df.iterrows():
        frame_num = int(row['Frame'])
        interactions = []
        
        if 'EyeContact_Smoothed' in row and row['EyeContact_Smoothed']:
            interactions.append("Eye Contact")
        
        if 'EyeLevel_Smoothed' in row and row['EyeLevel_Smoothed']:
            interactions.append("Eye Level")
        
        if 'JointAttention_Smoothed' in row and row['JointAttention_Smoothed']:
            interactions.append("Joint Attention")
        
        frame_to_interactions[frame_num] = interactions
    
        frame_count = len(os.listdir('new_frames/'))
        print(f"Processing {frame_count} frames...")
    for i, frame_file in range(0, frame_count):
        # Extract frame number from filename
        frame_num = int(frame_file.split('.')[0])
        
        # Read the frame
        file_path = 'new_frames/'+str(i)+'.jpg'    #循环读取所有的图片,假设以数字顺序命名
        frame = cv2.imread(file_path)
        
        if frame is None:
            print(f"Error reading frame: {file_path}, skipping...")
            continue
        
        # Get interactions for this frame if available
        interactions = frame_to_interactions.get(frame_num, [])
        
        # Add text overlays for each interaction
        y_position = 40  # Starting y-position for text
        for interaction in interactions:
            # Get text size for background rectangle
            text_size = cv2.getTextSize(
                interaction,
                cv2.FONT_HERSHEY_SIMPLEX,
                1,  # font scale
                2   # thickness
            )[0]
            
            # Draw black background rectangle
            cv2.rectangle(
                frame,
                (10, y_position - 30),  # top-left corner
                (10 + text_size[0], y_position + 10),  # bottom-right corner
                (0, 0, 0),  # color (black)
                -1  # filled rectangle
            )
            
            # Draw text
            cv2.putText(
                frame, 
                interaction, 
                (10, y_position),  # position (x, y)
                cv2.FONT_HERSHEY_SIMPLEX,  # font
                1,  # font scale
                (0, 255, 0),  # color (green)
                2,  # thickness
                cv2.LINE_AA  # line type
            )
            
            # Increment y-position for next text with enough space to avoid overlap
            y_position += text_size[1] + 20  # Add text height plus padding
        
        # Write the annotated frame to output video
        videoWrite.write(frame)
        

In [None]:
add_interactions_to_frames(
    filtered_df, nameonly
)