In [6]:
import pylangacq
from glob import glob
from tqdm import tqdm
import json
import ffmpeg

## Generate metadata

In [8]:
all_chas = glob('/workspace/dataset/all_chas/*/*.cha') # change the path to the directory where all the .cha files are stored
chats = []

for cha in tqdm(all_chas):
    try:
        chat = pylangacq.read_chat(cha)
        chats.append(chat)
    except:
        print(cha) # You need to check the files that are printed here and see if they are valid .cha files

 15%|█▍        | 208/1390 [05:12<27:27,  1.39s/it]

D:/aphasia/dataset/all_chas\Fridriksson-2\1003-3.cha


100%|██████████| 1390/1390 [35:36<00:00,  1.54s/it]


In [95]:
total_info = {}

for reader in tqdm(chats):
    info = {}
    headers = reader.headers()[0]

    # Extracting information of the participants
    par_info = headers['Participants']['PAR']
    chat_name = f"{par_info['corpus']}/{headers['Media'].split(',')[0]}"
    
    info['lang'] = headers['Languages'][0]
    info['age'] = par_info['age']
    info['sex'] = par_info['sex']
    info['media_type'] = headers['Media'].split(',')[1].strip()
    
    # Extracting aphasia type and label
    aphasia_type = par_info['group'].lower()
    info['aphasia_type'] = aphasia_type
    label_mapping = {
        'control': 'Control',
        'anomic': 'Fluent', 'conduction': 'Fluent',
        'transsensory': 'Non-Comprehension', 'wernicke': 'Non-Comprehension',
        'transmotor': 'Non-Fluent', 'broca': 'Non-Fluent'
    }
    info['label'] = label_mapping.get(aphasia_type, None)

    # Cinderella task extraction
    cinderella_task = reader.utterances(task='Cinderella')

    if not cinderella_task:
        info['timestamp'] = []
        info['timestamp_inv'] = []
    else:
        # Cinderella task time extraction
        start_time = next((u.time_marks[0] for u in cinderella_task if u.time_marks), None)
        end_time = next((u.time_marks[1] for u in reversed(cinderella_task) if u.time_marks), None)

        if start_time and end_time:
            info['timestamp'] = (start_time, end_time)
        
        info['timestamp_inv'] = [utterance.time_marks for utterance in cinderella_task if utterance.participant == 'INV']

    total_info[chat_name] = info

100%|██████████| 1389/1389 [00:00<00:00, 4157.22it/s]


In [97]:
# change the path to the directory where you want to save the metadata.json file
with open('/workspace/RAPID/data_preprocessing/metadata.json', 'w') as f: 
    json.dump(total_info, f, ensure_ascii=False, indent=4, sort_keys=True)

In [98]:
labels = [v['aphasia_type'] for k,v in total_info.items() if v['label'] != None]

import numpy as np
labels, counts = np.unique(labels, return_counts=True)

print(dict(zip(labels, counts)))

{'anomic': 285, 'broca': 309, 'conduction': 149, 'control': 367, 'transmotor': 14, 'transsensory': 2, 'wernicke': 63}


## Cinderella task segment extraction

In [19]:
import ffmpeg
import json
import os
from tqdm import tqdm

def cut_video(input_video, output_video, start_time_ms, end_time_ms):
    start_time_sec = start_time_ms / 1000.0
    end_time_sec = end_time_ms / 1000.0

    try:
        process = (
            ffmpeg
            .input(input_video, ss=start_time_sec, to=end_time_sec)
            .output(output_video, codec='copy')
            .run_async(pipe_stdout=True, pipe_stderr=True)
        )
        stdout, stderr = process.communicate()
        if process.returncode != 0:
            raise ffmpeg.Error(f'Error cutting video {input_video}: {stderr.decode("utf-8")}')
    except Exception as e:
        print(f"Failed to cut video {input_video}: {str(e)}")


def generate_video_clips(dataset_root, metadata_path, output_dir):
    with open(metadata_path, 'r') as f:
        metadata = json.load(f)
        metadata = {k: v for k, v in metadata.items() if v['label'] is not None and v['media_type'] == 'video' and len(v['timestamp']) != 0}

    video_metadata = {}

    for k, v in tqdm(metadata.items(), leave=False):
        if v['label'] is None or v['media_type'] != 'video':
            continue

        video_name = k.split("/")[1] + '.mp4'
        input_video = os.path.join(dataset_root, video_name)
        output_video = os.path.join(output_dir, video_name)

        if os.path.exists(output_video):
            continue

        try:
            start_time, end_time = v['timestamp'][0], v['timestamp'][1]
        except:
            continue

        cut_video(input_video, output_video, start_time, end_time)

        try:
            timestamp_inv = [[ts[0] - start_time, ts[1] - start_time] for ts in v.get('timestamp_inv', [])]
        except:
            timestamp_inv = []

        video_metadata[k] = {
            'video_name': video_name,
            'label': v['label'],
            'age': v['age'],
            'lang': v['lang'],
            'aphasia_type': v.get('aphasia_type'),
            'timestamp_inv': timestamp_inv,
            'duration': end_time - start_time
        }

    return video_metadata


In [27]:
dataset_root = '/workspace/dataset/videos'
metadata_path = '/workspace/RAPID/data_preprocessing/metadata.json'
output_dir = "/workspace/dataset/video_clips"

video_metadata = generate_video_clips(dataset_root, metadata_path, output_dir)

  0%|          | 0/1100 [00:00<?, ?it/s]

                                                  

Failed to cut video D:/aphasia/dataset/videos\1030-1.mp4: __init__() missing 2 required positional arguments: 'stdout' and 'stderr'
Failed to cut video D:/aphasia/dataset/videos\1030-5.mp4: __init__() missing 2 required positional arguments: 'stdout' and 'stderr'




In [40]:
labels = [v['aphasia_type'] for k,v in video_metadata.items() if v['label'] != None]

import numpy as np
labels, counts = np.unique(labels, return_counts=True)

print(dict(zip(labels, counts)))

{'anomic': 266, 'broca': 287, 'conduction': 138, 'control': 328, 'transmotor': 13, 'transsensory': 1, 'wernicke': 59}


In [39]:
with open('/workspace/RAPID/data_preprocessing/video_metadata.json', 'w') as f:
    json.dump(video_metadata, f, ensure_ascii=False, indent=4, sort_keys=True)