## Draft for script

In [1]:
import dataset
dataset.cleanup_cache_files()

AttributeError: module 'dataset' has no attribute 'cleanup_cache_files'

In [1]:
import os
from pydub import AudioSegment
from pydub.utils import mediainfo

def calculate_total_duration(directory):
    """Calculate the total duration of all audio files in the given directory."""
    total_duration = 0
    file_count = 0
    
    for root, dirs, files in os.walk(directory):
        for file in files:
            if file.endswith('.wav'):  # Assuming all files are in WAV format
                file_path = os.path.join(root, file)
                try:
                    audio = AudioSegment.from_wav(file_path)
                    duration_seconds = len(audio) / 1000.0  # Convert milliseconds to seconds
                    total_duration += duration_seconds
                    file_count += 1
                    print(f"Processed: {file_path} (Duration: {duration_seconds:.2f} seconds)")
                except Exception as e:
                    print(f"Error processing {file_path}: {e}")

    # Convert total duration to hours, minutes, and seconds
    hours, remainder = divmod(total_duration, 3600)
    minutes, seconds = divmod(remainder, 60)

    print(f"\nTotal audio files processed: {file_count}")
    print(f"Total duration: {int(hours):02d}:{int(minutes):02d}:{seconds:.2f}")

# Specify the target directory
speaker = 'IMJ_Woman'
target_directory = f'dataset/{speaker}'  # Change this to your target directory

# Calculate total duration
calculate_total_duration(target_directory)

Processed: dataset/IMJ_Woman/240812_1734_part216.wav (Duration: 0.99 seconds)
Processed: dataset/IMJ_Woman/240810_2139_part99_a.wav (Duration: 2.98 seconds)
Processed: dataset/IMJ_Woman/240812_1734_part493.wav (Duration: 2.43 seconds)
Processed: dataset/IMJ_Woman/240812_1942_part49.wav (Duration: 1.41 seconds)
Processed: dataset/IMJ_Woman/240811_2034_part28.wav (Duration: 2.13 seconds)
Processed: dataset/IMJ_Woman/240812_1734_part447.wav (Duration: 0.81 seconds)
Processed: dataset/IMJ_Woman/240810_2139_part56.wav (Duration: 2.83 seconds)
Processed: dataset/IMJ_Woman/240811_2034_part111.wav (Duration: 2.87 seconds)
Processed: dataset/IMJ_Woman/240813_1834_part11.wav (Duration: 1.62 seconds)
Processed: dataset/IMJ_Woman/240812_1942_part164.wav (Duration: 8.04 seconds)
Processed: dataset/IMJ_Woman/240812_1734_part481.wav (Duration: 4.79 seconds)
Processed: dataset/IMJ_Woman/240813_2100_part77.wav (Duration: 0.48 seconds)
Processed: dataset/IMJ_Woman/240815_1507_part50.wav (Duration: 7.69 

In [6]:
import os
from pydub import AudioSegment
from pydub.utils import mediainfo

def find_long_audio_files(directory, threshold_seconds=10):
    """Find and print information about audio files longer than the specified threshold."""
    long_files = []
    
    for root, dirs, files in os.walk(directory):
        for file in files:
            if file.endswith('.wav'):  # Assuming all files are in WAV format
                file_path = os.path.join(root, file)
                try:
                    audio = AudioSegment.from_wav(file_path)
                    duration_seconds = len(audio) / 1000.0  # Convert milliseconds to seconds
                    
                    if duration_seconds > threshold_seconds:
                        long_files.append((file_path, duration_seconds))
                        print(f"Long file found: {file_path} (Duration: {duration_seconds:.2f} seconds)")
                except Exception as e:
                    print(f"Error processing {file_path}: {e}")

    # Print summary
    print(f"\nTotal files longer than {threshold_seconds} seconds: {len(long_files)}")
    
    # Sort files by duration (longest first) and print detailed information
    if long_files:
        print("\nDetailed information (sorted by duration, longest first):")
        for file_path, duration in sorted(long_files, key=lambda x: x[1], reverse=True):
            minutes, seconds = divmod(duration, 60)
            print(f"{file_path}: {int(minutes):02d}:{seconds:.2f}")

# Specify the target directory

# Find long audio files
find_long_audio_files(target_directory)

Long file found: dataset/CYR_Woman_50s/CYR_Woman_50s_240822_5959_s1_107.wav (Duration: 10.08 seconds)
Long file found: dataset/CYR_Woman_50s/CYR_Woman_50s_240822_5959_s4_2450.wav (Duration: 12.11 seconds)
Long file found: dataset/CYR_Woman_50s/CYR_Woman_50s_240822_5959_s3_1990.wav (Duration: 12.68 seconds)
Long file found: dataset/CYR_Woman_50s/CYR_Woman_50s_240822_5959_s3_1622.wav (Duration: 10.57 seconds)
Long file found: dataset/CYR_Woman_50s/CYR_Woman_50s_240822_5959_s4_2249.wav (Duration: 12.17 seconds)
Long file found: dataset/CYR_Woman_50s/CYR_Woman_50s_240822_5959_s2_817.wav (Duration: 12.80 seconds)
Long file found: dataset/CYR_Woman_50s/CYR_Woman_50s_240822_5959_s2_1050.wav (Duration: 10.19 seconds)
Long file found: dataset/CYR_Woman_50s/CYR_Woman_50s_240811_0403_27.wav (Duration: 12.47 seconds)
Long file found: dataset/CYR_Woman_50s/CYR_Woman_50s_240822_5959_s3_1896.wav (Duration: 12.63 seconds)
Long file found: dataset/CYR_Woman_50s/CYR_Woman_50s_240822_5959_s3_1620.wav (Du

In [7]:
import json
import random
import pandas as pd
random.seed(42)  # For reproducibility

# Load the CSV file into a DataFrame

csv_path = f'dataset/csvs/{speaker}_transcribed.csv'  # Change this to your CSV file path

df = pd.read_csv(csv_path)


# Path for the output JSON files
train_json_path = os.path.join(target_directory, 'train.json')
val_json_path = os.path.join(target_directory, 'valid.json')
test_json_path = os.path.join(target_directory, 'test.json')

# Initialize a list to store the JSON-formatted data
json_data = []
missing = 0
not_missing = 0

# Iterate over the DataFrame rows to create JSON format for each entry
for index, row in df.iterrows():
    wav_file_name = row['파일명(이름_성별_나이_날짜_일렬번호_순)']
    transcription = row['변환된 문자']
    # remove missing values
    if not os.path.exists(os.path.join(target_directory, wav_file_name)):
        print(f"Missing file: {wav_file_name}")
        missing += 1
        continue
    not_missing += 1
    # Extract speaker information from the file name (or add your own logic here)
    speaker = wav_file_name.split('_')[0]  # Assuming the speaker name is the first part of the filename
    
    # Create the JSON entry
    json_entry = {
        "audio": {
            "path": os.path.join(target_directory, wav_file_name),
            "sampling_rate": 16000  # Assuming all audio files are resampled to 16kHz
        },
        "sentence": transcription,
        "speaker": speaker
    }
    
    # Append to the list
    json_data.append(json_entry)

# Shuffle the data randomly before splitting
random.shuffle(json_data)

# Calculate the sizes of each split
total_size = len(json_data)
train_size = int(0.8 * total_size)
val_size = int(0.1 * total_size)
test_size = total_size - train_size - val_size  # Remainder for the test set

# Split the data
train_data = json_data[:train_size]
val_data = json_data[train_size:train_size + val_size]
test_data = json_data[train_size + val_size:]

# Function to save the data into JSON files
def save_json(data, path):
    with open(path, 'w', encoding='utf-8') as f:
        json.dump(data, f, ensure_ascii=False, indent=2)

# Save the data into respective JSON files
save_json(train_data, train_json_path)
save_json(val_data, val_json_path)
save_json(test_data, test_json_path)

print(f"Data saved to {train_json_path}, {val_json_path}, {test_json_path}")
print(f"Total missing files: {missing}")
print(f"Total not missing files: {not_missing}")

Missing file: CYR_Woman_50s_231123_0016_1.wav
Missing file: CYR_Woman_50s_231123_0016_3.wav
Missing file: CYR_Woman_50s_231123_0016_5.wav
Missing file: CYR_Woman_50s_231123_0016_7.wav
Missing file: CYR_Woman_50s_240809_0016_2.wav
Missing file: CYR_Woman_50s_240809_0016_4.wav
Missing file: CYR_Woman_50s_240810_0907_1.wav
Missing file: CYR_Woman_50s_240810_0907_3.wav
Missing file: CYR_Woman_50s_240810_0907_5.wav
Missing file: CYR_Woman_50s_240810_0907_7.wav
Missing file: CYR_Woman_50s_240810_0907_9.wav
Missing file: CYR_Woman_50s_240810_0907_11.wav
Missing file: CYR_Woman_50s_240810_0907_13.wav
Missing file: CYR_Woman_50s_240810_0907_15.wav
Missing file: CYR_Woman_50s_240810_0907_17.wav
Missing file: CYR_Woman_50s_240810_0907_19.wav
Missing file: CYR_Woman_50s_240810_0907_21.wav
Missing file: CYR_Woman_50s_240810_0907_23.wav
Missing file: CYR_Woman_50s_240810_0907_25.wav
Missing file: CYR_Woman_50s_240810_0907_29.wav
Missing file: CYR_Woman_50s_240810_0907_31.wav
Missing file: CYR_Woman_

In [5]:
import json
import random

target_directory = '/home/ali/dataset/13_CUJ_Woman'
# Load the JSON file
json_path = f'{target_directory}/dataset.json'  # Change this to your JSON file path

with open(json_path, 'r', encoding='utf-8') as f:
    data = json.load(f)

# Shuffle the data randomly
random.shuffle(data)

# Calculate the sizes of each split
total_size = len(data)
train_size = int(0.8 * total_size)
val_size = int(0.1 * total_size)
test_size = total_size - train_size - val_size  # Remainder for the test set

# Split the data
train_data = data[:train_size]
val_data = data[train_size:train_size + val_size]
test_data = data[train_size + val_size:]

# Print the sizes of each split
print(f"Train size: {len(train_data)}")
print(f"Validation size: {len(val_data)}")
print(f"Test size: {len(test_data)}")

# Save the split data
train_path = f'{target_directory}/train.json'
val_path = f'{target_directory}/valid.json'
test_path = f'{target_directory}/test.json'

with open(train_path, 'w', encoding='utf-8') as f:
    json.dump(train_data, f, ensure_ascii=False, indent=2)

with open(val_path, 'w', encoding='utf-8') as f:
    json.dump(val_data, f, ensure_ascii=False, indent=2)

with open(test_path, 'w', encoding='utf-8') as f:
    json.dump(test_data, f, ensure_ascii=False, indent=2)

Train size: 279
Validation size: 34
Test size: 36


### check existance

In [6]:
import json
import os
from collections import Counter
from typing import List, Dict
import matplotlib.pyplot as plt
import numpy as np
from pydub import AudioSegment
import matplotlib.font_manager as fm
from jamo import h2j, j2hcj

# Set up the Korean font
font_path = 'stuff/NanumGothic-Regular.ttf'  # Update this path as necessary
fontprop = fm.FontProperties(fname=font_path, size=12)

plt.rcParams['font.family'] = 'NanumGothic'
plt.rcParams['axes.unicode_minus'] = False

def load_json(filename: str) -> List[Dict]:
    with open(filename, 'r', encoding='utf-8') as f:
        return json.load(f)

def get_word_syllable_jamo_distributions(data: List[Dict]) -> (Counter, Counter, Counter):
    words = Counter()
    syllables = Counter()
    jamos = Counter()
    for item in data:
        sentence_words = [word for word in item['sentence'].split() if word]
        words.update(sentence_words)
        syllables.update(char for char in item['sentence'] if char.strip())
        jamo_text = j2hcj(h2j(item['sentence']))
        jamos.update(char for char in jamo_text if char.strip())
    return words, syllables, jamos

def plot_top_n_distribution(train_dist: Counter, val_dist: Counter, test_dist: Counter, title: str, n: int = 20):
    plt.figure(figsize=(15, 8))
    
    combined_dist = train_dist + val_dist + test_dist
    top_items = sorted(combined_dist.items(), key=lambda x: x[1], reverse=True)[:n]
    
    labels, _ = zip(*top_items)
    train_values = [train_dist[label] for label in labels]
    val_values = [val_dist[label] for label in labels]
    test_values = [test_dist[label] for label in labels]
    
    x = np.arange(len(labels))
    width = 0.25
    
    plt.bar(x - width, train_values, width, label='Train', alpha=0.7)
    plt.bar(x, val_values, width, label='Validation', alpha=0.7)
    plt.bar(x + width, test_values, width, label='Test', alpha=0.7)
    
    plt.xlabel('Items', fontproperties=fontprop)
    plt.ylabel('Frequency', fontproperties=fontprop)
    plt.title(f'Top {n} {title}', fontproperties=fontprop)
    plt.xticks(x, labels, rotation='vertical', fontproperties=fontprop)
    plt.legend(prop=fontprop)
    plt.tight_layout()
    plt.savefig(f"top_{n}_{title.lower().replace(' ', '_')}.png", dpi=300)
    plt.close()

def plot_distribution_scatter(train_dist: Counter, val_dist: Counter, test_dist: Counter, title: str):
    fig, (ax1, ax2, ax3) = plt.subplots(1, 3, figsize=(18, 6))
    
    all_keys = set(train_dist.keys()).union(set(val_dist.keys())).union(set(test_dist.keys()))
    train_values = [train_dist.get(k, 0) for k in all_keys]
    val_values = [val_dist.get(k, 0) for k in all_keys]
    test_values = [test_dist.get(k, 0) for k in all_keys]
    
    max_value = max(max(train_values), max(val_values), max(test_values))
    
    ax1.scatter(train_values, val_values, alpha=0.5)
    ax1.set_xlabel('Train Frequency', fontproperties=fontprop)
    ax1.set_ylabel('Validation Frequency', fontproperties=fontprop)
    ax1.set_title('Train vs Validation', fontproperties=fontprop)
    ax1.plot([0, max_value], [0, max_value], 'r--', alpha=0.5)
    
    ax2.scatter(train_values, test_values, alpha=0.5)
    ax2.set_xlabel('Train Frequency', fontproperties=fontprop)
    ax2.set_ylabel('Test Frequency', fontproperties=fontprop)
    ax2.set_title('Train vs Test', fontproperties=fontprop)
    ax2.plot([0, max_value], [0, max_value], 'r--', alpha=0.5)
    
    ax3.scatter(val_values, test_values, alpha=0.5)
    ax3.set_xlabel('Validation Frequency', fontproperties=fontprop)
    ax3.set_ylabel('Test Frequency', fontproperties=fontprop)
    ax3.set_title('Validation vs Test', fontproperties=fontprop)
    ax3.plot([0, max_value], [0, max_value], 'r--', alpha=0.5)
    
    plt.suptitle(f'{title} Distribution Comparison', fontsize=16, fontproperties=fontprop)
    plt.tight_layout()
    plt.savefig(f"{title.lower().replace(' ', '_')}_scatter.png", dpi=300)
    plt.close()

def calculate_dataset_stats(data: List[Dict]) -> (int, float):
    total_duration = 0
    for item in data:
        audio_path = item['audio']['path']
        if os.path.exists(audio_path):
            audio = AudioSegment.from_file(audio_path)
            total_duration += len(audio) / 1000  # Convert to seconds
        else:
            print(f"Warning: Audio file not found: {audio_path}")
            # delete the missing file
            data.remove(item)
    return len(data), total_duration, data

def analyze_dataset(data: List[Dict], name: str):
    words, syllables, jamos = get_word_syllable_jamo_distributions(data)
    
    print(f"\nAnalysis for {name} dataset:")
    print(f"Total samples: {len(data)}")
    print(f"Total unique words: {len(words)}")
    print(f"Total unique syllables: {len(syllables)}")
    print(f"Total unique jamos (characters/phonemes): {len(jamos)}")
    
    print(f"\nTop 10 most common words in {name} set:")
    print(words.most_common(10))
    
    print(f"\nTop 10 most common syllables in {name} set:")
    print(syllables.most_common(10))
    
    print(f"\nTop 10 most common jamos (characters/phonemes) in {name} set:")
    print(jamos.most_common(10))
    
    return words, syllables, jamos


train_data = load_json(f'{target_directory}/train.json')
val_data = load_json(f'{target_directory}/valid.json')
test_data = load_json(f'{target_directory}/test.json')


train_words, train_syllables, train_jamos = analyze_dataset(train_data, "train")
val_words, val_syllables, val_jamos = analyze_dataset(val_data, "validation")
test_words, test_syllables, test_jamos = analyze_dataset(test_data, "test")

#plot_top_n_distribution(train_syllables, val_syllables, test_syllables, "Syllable Distribution")

#plot_top_n_distribution(train_words, val_words, test_words, "Word Distribution")

train_samples, train_duration, train_data  = calculate_dataset_stats(train_data)
val_samples, val_duration, val_data = calculate_dataset_stats(val_data)
test_samples, test_duration, test_data = calculate_dataset_stats(test_data)
total_sample = train_samples + val_samples + test_samples
total_duration = train_duration + val_duration + test_duration

def format_duration(duration_seconds):
    hours, remainder = divmod(duration_seconds, 3600)
    minutes, seconds = divmod(remainder, 60)
    return f"{int(hours):02d}-{int(minutes):02d}-{seconds:.2f}"


print(f"Train set: {train_samples} samples, {format_duration(train_duration)}")
print(f"Validation set: {val_samples} samples, {format_duration(val_duration)}")
print(f"Test set: {test_samples} samples, {format_duration(test_duration)}")
print(f"Total: {total_sample} samples, {format_duration(total_duration)}")


Analysis for train dataset:
Total samples: 279
Total unique words: 364
Total unique syllables: 299
Total unique jamos (characters/phonemes): 43

Top 10 most common words in train set:
[('머리', 7), ('여기', 6), ('전화', 5), ('나무', 5), ('동물원에', 4), ('있습니다.', 4), ('날씨가', 4), ('예', 3), ('엄마가', 3), ('밥', 3)]

Top 10 most common syllables in train set:
[('.', 53), ('요', 34), ('?', 26), ('가', 24), ('기', 20), ('다', 20), ('어', 20), ('리', 19), ('에', 18), ('이', 18)]

Top 10 most common jamos (characters/phonemes) in train set:
[('ㅇ', 279), ('ㅏ', 214), ('ㄹ', 163), ('ㄱ', 153), ('ㅣ', 141), ('ㄴ', 134), ('ㅁ', 117), ('ㅓ', 103), ('ㅜ', 69), ('ㅗ', 66)]

Analysis for validation dataset:
Total samples: 34
Total unique words: 47
Total unique syllables: 69
Total unique jamos (characters/phonemes): 34

Top 10 most common words in validation set:
[('아', 1), ('동물원', 1), ('상', 1), ('꼬리', 1), ('방', 1), ('남', 1), ('도', 1), ('잔', 1), ('뱀', 1), ('가위', 1)]

Top 10 most common syllables in validation set:
[('.', 5), ('동', 

In [4]:
# Function to save the data into JSON files
def save_json(data, path):
    with open(path, 'w', encoding='utf-8') as f:
        json.dump(data, f, ensure_ascii=False, indent=2)

save_json(train_data, f'{target_directory}/train.json')
save_json(val_data, f'{target_directory}/valid.json')
save_json(test_data, f'{target_directory}/test.json')

In [7]:
import re

def is_korean(char: str) -> bool:
    """Check if a character is a Korean letter (Hangul)."""
    return re.match(r'[\u3131-\u3163\uAC00-\uD7A3]', char) is not None

def filter_korean_only(counter: Counter) -> Counter:
    """Filter out non-Korean characters from a Counter object."""
    return Counter({k: v for k, v in counter.items() if is_korean(k)})

def plot_sorted_distribution(train_dist: Counter, val_dist: Counter, test_dist: Counter, title: str):
    plt.figure(figsize=(15, 8))
    
    # Filter to only Korean characters (jamos)
    train_dist = filter_korean_only(train_dist)
    val_dist = filter_korean_only(val_dist)
    test_dist = filter_korean_only(test_dist)

    # Combine train and test distributions and sort by frequency
    combined_dist = train_dist + test_dist+ val_dist
    sorted_items = sorted(combined_dist.items(), key=lambda x: x[1], reverse=True)
    
    labels, _ = zip(*sorted_items)
    train_values = [train_dist[label] for label in labels]
    val_values = [val_dist[label] for label in labels]
    test_values = [test_dist[label] for label in labels]
    
    x = np.arange(len(labels))
    width = 0.35
    
    plt.bar(x - width / 2, train_values, width, label='Train', alpha=0.7)
    plt.bar(x, val_values, width, label='Validation', alpha=0.7)
    plt.bar(x + width / 2, test_values, width, label='Test', alpha=0.7)
    
    plt.xlabel('Phonemes (Jamos)', fontproperties=fontprop)
    plt.ylabel('Frequency', fontproperties=fontprop)
    plt.title(f'{title} Distribution (Sorted by Frequency)', fontproperties=fontprop)
    plt.xticks(x, labels, rotation='vertical', fontproperties=fontprop)
    plt.legend(prop=fontprop)
    plt.tight_layout()
    plt.savefig(f"{title.lower().replace(' ', '_')}_sorted_distribution.png", dpi=300)
    plt.close()

# Plot full distribution for phonemes (jamos), filtered and sorted by frequency
plot_sorted_distribution(train_jamos, val_jamos, test_jamos, f"Jamo (Character/Phoneme) Sorted Distribution. {speaker}")


NameError: name 'speaker' is not defined