# MUSE

## Imports

In [15]:
import tarfile
import os
from dataset import MELDDataset

## Data

In [16]:
# extract the data from a ".tar.gz" file. Put the content into data/name_of_file folder
def extract_data(file_name, data_folder):
    if not os.path.exists(data_folder):
        os.makedirs(data_folder)
    tar = tarfile.open(file_name, "r:gz")
    tar.extractall(data_folder)
    tar.close()

In [12]:
def rename_files(data_folder):
    for root, dirs, files in os.walk(data_folder):
        # for all videos in the folder, remove "._" at the beginning of the file name
        for file in files:
            if file.startswith('._'):
                os.rename(os.path.join(root, file), os.path.join(root, file[2:]))
            
rename_files('./data/test/video')

In [10]:
# extract_data("data/raw/test.tar.gz", "data/test")

## Dataset

In [13]:
import subprocess

import os
import subprocess
from pathlib import Path


def filter_corrupted_videos(folder_path, extensions=("mp4", "mov", "mkv", "avi")):
    """
    Filters out corrupted video files in a specified folder.

    Args:
        folder_path (str): Path to the folder containing video files.
        extensions (tuple): Allowed video file extensions to check (default: common formats).

    Returns:
        valid_videos (list): List of valid video file paths.
        corrupted_videos (list): List of corrupted or invalid video file paths.
    """
    valid_videos = []
    corrupted_videos = []

    # Get all video files in the folder with the specified extensions
    video_files = [file for ext in extensions for file in Path(
        folder_path).rglob(f"*.{ext}")]

    for video_path in video_files:
        # Convert Path object to string (for subprocess compatibility)
        video_path = str(video_path)

        # Run ffprobe to check the video file
        try:
            command = [
                "ffprobe", "-v", "error", "-show_entries", "format=duration",
                "-of", "default=noprint_wrappers=1:nokey=1", video_path
            ]
            result = subprocess.run(
                command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)

            if result.returncode == 0:
                # If ffprobe runs successfully and returns a duration, the video is valid
                duration = result.stdout.strip()
                if duration:  # If duration is not empty
                    valid_videos.append(video_path)
                else:
                    corrupted_videos.append(video_path)
            else:
                # If ffprobe fails, the video is considered corrupted
                print(f"Corrupted video detected: {video_path}")
                corrupted_videos.append(video_path)

        except Exception as e:
            # Catch unexpected errors
            print(f"Error processing file {video_path}: {e}")
            corrupted_videos.append(video_path)

    return valid_videos, corrupted_videos

results = filter_corrupted_videos('data/test/video')

Corrupted video detected: data/test/video/dia178_utt7.mp4
Corrupted video detected: data/test/video/dia128_utt5.mp4
Corrupted video detected: data/test/video/dia109_utt5.mp4
Corrupted video detected: data/test/video/dia228_utt0.mp4
Corrupted video detected: data/test/video/dia184_utt5.mp4
Corrupted video detected: data/test/video/dia167_utt14.mp4
Corrupted video detected: data/test/video/dia88_utt8.mp4
Corrupted video detected: data/test/video/dia113_utt1.mp4
Corrupted video detected: data/test/video/dia12_utt1.mp4
Corrupted video detected: data/test/video/dia182_utt7.mp4
Corrupted video detected: data/test/video/dia253_utt0.mp4
Corrupted video detected: data/test/video/dia4_utt7.mp4
Corrupted video detected: data/test/video/dia75_utt4.mp4
Corrupted video detected: data/test/video/dia269_utt2.mp4
Corrupted video detected: data/test/video/dia115_utt1.mp4
Corrupted video detected: data/test/video/dia184_utt7.mp4
Corrupted video detected: data/test/video/dia42_utt5.mp4
Corrupted video det

In [17]:
# setup the dataset
dataset = MELDDataset(
    csv_file="data/dev/dev_sent_emo.csv",
    root_dir="data/dev",
)

In [18]:
print(f"Total videos: {len(results[0])}")
print(f"Corrupted videos: {len(results[1])}")

Total videos: 687
Corrupted videos: 2060


In [13]:
def count_files_in_folder(folder_path):
    """
    Counts the number of files in a specific folder.
    
    Args:
        folder_path (str): The path to the folder.
    
    Returns:
        int: The number of files in the folder.
    """
    try:
        # List all entries in the folder
        entries = os.listdir(folder_path)

        # Filter out only files
        files = [entry for entry in entries if os.path.isfile(
            os.path.join(folder_path, entry))]

        # Return the count of files
        return len(files)
    except FileNotFoundError:
        print(f"Error: The folder '{folder_path}' does not exist.")
        return 0
    except PermissionError:
        print(
            f"Error: Permission denied to access the folder '{folder_path}'.")
        return 0


print(count_files_in_folder("./data/train/audio/"))
print(count_files_in_folder("./data/train/video/"))

9988
9988


In [None]:
example = dataset[0]
example

{'audio_array': array([-0.00198962, -0.02142128, -0.02587057, ..., -0.06124197,
        -0.06868309, -0.0437346 ], dtype=float32),
 'sampling_rate': 22050,
 'transcript': 'also I was the point person on my company\x92s transition from the KL-5 to GR-6 system.',
 'emotion': 'neutral',
 'sentiment': 'neutral'}

In [None]:
from model import *

emotions = dataset.get_emotions_list()
model = MultimodalClassifier(num_classes=len(emotions))

In [20]:
set(dataset.get_emotions_list())

{'anger', 'disgust', 'fear', 'joy', 'neutral', 'sadness', 'surprise'}

In [21]:
set(dataset.get_sentiments_list())

{'negative', 'neutral', 'positive'}