In [3]:
import json
from pathlib import Path
from typing import Dict
import pandas as pd

# Load the video_info_new.csv file
video_info_new_path = "/home/nele_pauline_suffo/projects/mmaction2/data/quantex_share/video_info_new.csv"
df_video_info_new = pd.read_csv(video_info_new_path)

# Function to get the subset for a given video ID
def get_subset(video_id):
    row = df_video_info_new[df_video_info_new['video'] == video_id]
    if not row.empty:
        return row.iloc[0]['subset']
    else:
        return None  # Video ID not found
    
# Function to read JSON from a file
def read_json(file_path: str) -> Dict:
    with open(file_path, 'r') as file:
        data = json.load(file)
    return data

# Conversion function to ActivityNet format
def convert_annotations(data: Dict, fps: float = 30.0) -> Dict:
    # Initialize the converted structure
    converted_annotations = {}

    # Extract video ID, duration in seconds, and duration in frames
    video_id = data['metadata']['name']
    short_video_id = video_id.replace(".MP4", "")
    duration_microseconds = data['metadata']['duration']
    duration_seconds = duration_microseconds / 1_000_000.0
    # Extract the subset from the video_info_new.csv file
    subset = get_subset(int(short_video_id))

    # Initialize the video data structure in ActivityNet format
    converted_annotations[short_video_id] = {
        "subset": subset,
        "duration": duration_seconds,
        "url": "",  # Optional: Add video URL if available
        "annotations": []
    }

    # Loop through each annotation instance
    for item in data['instances']:
        meta = item['meta']
        if meta.get('className') in (None, 'Location'):
            continue

        # Extract start and end time
        start_time = meta["start"]
        end_time = meta["end"]

        # Process each parameter and add its first annotation to the list
        for parameter in item.get("parameters", []):
            timestamps = parameter.get("timestamps", [])

            # Check if there is at least one timestamp
            if timestamps and "attributes" in timestamps[0] and timestamps[0]["attributes"]:
                # Collect all "name" entries in a list
                names = [attr["name"] for timestamp in timestamps for attr in timestamp.get("attributes", [])]                
                # Choose the first one that is in list_to_include
                list_to_include = ['Playing with Object', 
                                'Playing without Object', 
                                'Pretend play',
                                'Watching Something',
                                'Reading a Book',
                                'Drawing',
                                'Crafting Things',
                                'Dancing',
                                'Making Music']   
                label = next((name for name in names if name in list_to_include), None)
                
                if label is not None:
                    segment = [start_time / 1_000_000.0, end_time / 1_000_000.0]

                    # Append the annotation for this timestamp
                    converted_annotations[short_video_id]["annotations"].append({
                        "segment": segment,
                        "label": label
                    })

    return converted_annotations

# Function to process all JSON files in a folder and generate ActivityNet format
def process_all_json_files(input_dir: Path, output_file: Path, fps: float = 30.0) -> None:
    all_annotations = {
        "version": "1.0",
        "taxonomy": [
            {"nodeId": 1, "nodeName": "Playing with object", "parentId": None},
            {"nodeId": 2, "nodeName": "Playing without object", "parentId": None},
            {"nodeId": 3, "nodeName": "Pretend play", "parentId": None},
            {"nodeId": 4, "nodeName": "Watching something", "parentId": None},
            {"nodeId": 5, "nodeName": "Reading book", "parentId": None},
            {"nodeId": 6, "nodeName": "Drawing", "parentId": None},
            {"nodeId": 7, "nodeName": "Crafting things", "parentId": None},
            {"nodeId": 8, "nodeName": "Dancing", "parentId": None},
            {"nodeId": 9, "nodeName": "Making music", "parentId": None},
        ],
        "database": {}
    }

    # Iterate over all files in the specified folder
    for filename in input_dir.glob("*.json"):
        if filename.name == output_file.name:
            continue  # Skip the combined file
        # Read the JSON file
        data = read_json(filename)

        # Convert annotations and merge them into the "database" field
        video_annotations = convert_annotations(data, fps)
        all_annotations["database"].update(video_annotations)

    # Save combined_annotations as a JSON file in ActivityNet format
    with open(output_file, 'w') as file:
        json.dump(all_annotations, file, indent=4)

In [41]:
input_dir = Path("/home/nele_pauline_suffo/ProcessedData/annotations_superannotate")
output_file = Path("/home/nele_pauline_suffo/projects/mmaction2/data/quantex_share/quantex_share.json")
process_all_json_files(input_dir, output_file)

In [13]:
json_file = '/home/nele_pauline_suffo/projects/mmaction2/data/quantex_share/quantex_share.json'
load_dict = json.load(open(json_file))
database = load_dict['database']
action_name_list = '/home/nele_pauline_suffo/projects/mmaction2/tools/data/quantex_share/action_name.csv'


quantex_labels = open(action_name_list).readlines()
quantex_labels = [x.strip() for x in quantex_labels[1:]]
quantex_labels

['Playing with Object',
 'Playing without Object',
 'Pretend play',
 'Watching Something',
 'Reading a Book',
 'Drawing',
 'Crafting Things',
 'Dancing',
 'Making Music']

In [16]:
quantex_labels.index('Pretend play')

2

In [15]:
def simple_label(anno):
    label = anno[0]['label']
    return quantex_labels.index(label)
    
data = database['147984']
subset = data['subset']

if subset in ['training', 'validation']:
    annotations = data['annotations']
    label = simple_label(annotations)
    print(label)

4


In [None]:
# Function to read JSON from a file
def read_json(file_path: str) -> Dict:
    with open(file_path, 'r') as file:
        data = json.load(file)
    return data

# Conversion function
def convert_annotations(data: Dict, fps: float = 30.0) -> Dict:
    converted_annotations = {}
    
    # Extract video ID, duration in seconds, and duration in frames
    video_id = data['metadata']['name']
    short_video_id = video_id.replace(".MP4", "")
    duration_microseconds = data['metadata']['duration']
    duration_seconds = duration_microseconds / 1000000.0
    duration_frames = int(duration_seconds * fps)
    
    # Initialize the video data structure
    converted_annotations[short_video_id] = {
        "duration_second": duration_seconds, # duration in seconds
        "duration_frame": duration_frames,   # duration in frames
        "annotations": [],                   # initialize empty list for annotations
        "feature_frame": duration_frames - 1, # feature frame
        "fps": fps,                          # frames per second
        "rfps": fps * (duration_frames / duration_seconds) # real frames per second
    }

    # Loop through each annotation instance
    for item in data['instances']:
        # Extract start and end time
        start_time = item["meta"]["start"]
        end_time = item["meta"]["end"]
        
        # Process each parameter and add its first annotation to the list
        for parameter in item.get("parameters", []):
            timestamps = parameter.get("timestamps", [])
            
            # Check if there is at least one timestamp
            if timestamps and "attributes" in timestamps[0] and timestamps[0]["attributes"]:
                # Access the first timestamp directly
                label = timestamps[0]["attributes"][0]["name"]
                segment = [start_time / 1000000.0, end_time / 1000000.0]
                
                # Append the annotation for this timestamp
                converted_annotations[short_video_id]["annotations"].append({
                    "segment": segment,
                    "label": label
                })
    
    return converted_annotations


# Function to process all JSON files in a folder
def process_all_json_files(folder_path: str, fps: float = 30.0) -> Dict:
    all_annotations = {}
    
    # Iterate over all files in the specified folder
    for filename in folder_path.glob("*.json"):
        if filename.name == "combined_annotations.json":
            continue  # Skip the combined file
        # Read the JSON file
        data = read_json(filename)
        
        # Convert annotations and merge them into the main dictionary
        video_annotations = convert_annotations(data, fps)
        all_annotations.update(video_annotations)
    
    # Save combined_annotations as a JSON file
    output_file = folder_path / "combined_annotations.json"
    with open(output_file, 'w') as file:
        json.dump(all_annotations, file, indent=4)

In [4]:
import cv2

def get_frame_count(video_path):
    # Open the video file
    video = cv2.VideoCapture(video_path)
    
    # Check if the video was opened successfully
    if not video.isOpened():
        raise ValueError(f"Could not open the video file: {video_path}")
    
    # Get the frame count
    frame_count = int(video.get(cv2.CAP_PROP_FRAME_COUNT))
    
    # Release the video file
    video.release()
    
    return frame_count

# Example usage
video_path = "/home/nele_pauline_suffo/projects/mmaction2/data/quantex_share/videos/204839.MP4"
print("Total number of frames:", get_frame_count(video_path))

Total number of frames: 8946
