In [1]:
import numpy as np
from scipy.stats import gaussian_kde, entropy
from sklearn.model_selection import train_test_split
from src.utils.approximation import cross_validate_gkde_bandwidth
from src.utils.approximation import monte_carlo_diff_entropy
from src.data.f0_regression_datamodule import (
    F0RegressionDataModule as DataModule,
)

import re

def assign_labels(input_string, labels):
    # Create list to hold words and punctuation
    words_with_punctuation = re.findall(r'[\u0E00-\u0E7F\w]+|[.,!?;"-]|\'', input_string) #words_with_punctuation = re.findall(r"[\w']+|[.,!?;\"-]|'", input_string)

    # Create list to hold only words
    #words_only = re.findall(r"\w+'?\w*", input_string) #original
    words_only= re.findall(r'[\u0E00-\u0E7F\w]+', input_string) #gio


    # Make sure the number of labels matches the number of words
    if not len(labels) == len(words_only):
        # alignmend or extraction failed, skip sample
        return None, None, None

    # Create a generator for word-label pairs
    word_label_pairs = ((word, label) for word, label in zip(words_only, labels))

    # Create list of tuples where each word is matched to a label and each punctuation is matched to None
    words_with_labels = []
    for token in words_with_punctuation:
        #print(token)
        if re.match(r'[\u0E00-\u0E7F\w]+', token): # original: if re.match(r"\w+'?\w*", token):
            #print("match")
            words_with_labels.append(next(word_label_pairs))
        else:
            words_with_labels.append((token, None))
            #print("no match")

    return words_only, words_with_punctuation, words_with_labels


def assign_labels_to_sentences(sentences, labels):
    single_words = []
    single_labels = []
    for i in range(len(sentences)):
        words_only, words_with_punct, words_with_labels = assign_labels(
            sentences[i], labels[i]
        )
        # check if alignment failed
        if words_only is None:
            #print(f"Alignment failed for sentence {i}")
            continue
        #print(words_with_labels)
        # remove Nones
        words_with_labels = [(w, l) for w, l in words_with_labels if l is not None]
        if len(words_with_labels) == 0:
            #print("No labels for sentence {i}")
            continue
        # process words and labels
        words, word_labels = zip(
            *[(w, l) for w, l in words_with_labels if l is not None]
        )
        single_words.extend(words)
        single_labels.extend(word_labels)
        

    return single_words, single_labels

  from .autonotebook import tqdm as notebook_tqdm


In [2]:

def select_gkde_bandwidth(data, bandwidths = ["scott", "silverman", 0.01, 0.1, 0.3]):
    # Cross-validation setup
    n_splits = 5
    indices = np.arange(len(data))
    np.random.shuffle(indices)
    split_size = len(data) // n_splits
    splits = [indices[i*split_size:(i+1)*split_size] for i in range(n_splits)]
    log_likelihoods = []

    # Perform cross-validation
    for bw in bandwidths:
        fold_log_likelihoods = []
        for i in range(n_splits):
            test_indices = splits[i]
            train_indices = np.concatenate([splits[j] for j in range(n_splits) if j != i])
            
            train_data = data[train_indices]
            test_data = data[test_indices]
            
            kde = gaussian_kde(train_data.T, bw_method=bw)
            log_likelihood = np.sum(np.log(kde.evaluate(test_data.T)))
            fold_log_likelihoods.append(log_likelihood)
            
        log_likelihoods.append(np.mean(fold_log_likelihoods))

    # Select the best bandwidth
    best_bandwidth = bandwidths[np.argmax(log_likelihoods)]
    # print(f"Optimal bandwidth: {best_bandwidth}")
    
    return best_bandwidth


In [10]:
def calculate_cond_entropy(word_label_dict, test_word_label_dict, uncond_density, num_labels_thres=20, esp=1e-7):
    word_entropy_dict = {}
    word_numoflabels_dict = {}

    for word, label in word_label_dict.items():
        label = np.array(label)

        if word in test_word_label_dict:
            test_labels = np.array(test_word_label_dict[word])

            # If a word has enough samples, estimate KDE normally
            if len(label) > num_labels_thres:
                best_bw = select_gkde_bandwidth(label, bandwidths=["scott", "silverman", 0.3])
                # best_bw = "scott"
                density = gaussian_kde(label.T, bw_method=best_bw)
            else:
                # Use the unconditional KDE if the word is rare
                density = uncond_density  

            # Compute entropy
            my_entropy = -np.mean(np.log(density(test_labels.T) + esp))
            word_entropy_dict[word] = my_entropy
            word_numoflabels_dict[word] = len(test_labels)
    
    total_numoflabels = sum(word_numoflabels_dict.values())
    cond_entropy = sum([word_numoflabels_dict[word] / total_numoflabels * word_entropy_dict[word] for word in word_entropy_dict])

    return cond_entropy, word_entropy_dict


def merge_labels(*labels):
    return np.concatenate(labels, axis=0)

    # Combine the train, test and dev dictionaries
def merge_dicts(*dicts):
    result = {}
    for d in dicts:
        for key, value in d.items():
            if key in result:
                result[key] = np.concatenate((result[key], value))
            else:
                result[key] = value
    return result



## Calculate Cond Entropy with Bootstrap and Set PMI = 0 for Rare Cases

### Estimate mutual information with the whole dataset

In [None]:
from datetime import datetime
import numpy as np
from scipy.stats import gaussian_kde
from sklearn.utils import resample

# set seed
np.random.seed(234)
esp = 1e-7
for num_labels_thres in [20, 30, 40, 50, 60]:
    datetime_str = datetime.now().strftime("%Y%m%d-%H%M%S")
    parameters = 4
    mode = 'dct'
    SAVE_DIR = f"/home/user/ding/Projects/Prosody/precomputed/without_llm/f0_{mode}_{parameters}"
    result_file = f'{SAVE_DIR}/pmi0_C-KDE-all-Thres{num_labels_thres}-{datetime_str}.txt'
    with open(result_file, 'w') as f:
        f.write(f'lang\tcond_entropy\tcond_entropy_std\n')


    languages = ['de', 'en', 'fr', 'it', 'ja', 'sv', 'th', 'vi', 'zh', 'yue', 'zh-by-char', 'yue-by-char']

    n_iterations = 10  # Number of bootstrap samples


    for lang in languages:
        print(f'-----------------------------------Language: {lang}--------------------------------------')
        
        LAB_ROOT = f"/home/user/ding/Projects/Prosody/languages/{lang}/aligned"
        PHONEME_LAB_ROOT = f"/home/user/ding/Projects/Prosody/languages/{lang}/aligned"
        WAV_ROOT = f"/home/user/ding/Projects/Prosody/languages/{lang}/wav_files"
        DATA_CACHE = f"/home/user/ding/Projects/Prosody/languages/{lang}/cache"

        TRAIN_FILE = "train-clean-100"
        VAL_FILE = "dev-clean"
        TEST_FILE = "test-clean"

        dm = DataModule(
            wav_root=WAV_ROOT,
            lab_root=LAB_ROOT,
            phoneme_lab_root=PHONEME_LAB_ROOT,
            data_cache=DATA_CACHE,
            train_file=TRAIN_FILE,
            val_file=VAL_FILE,
            test_file=TEST_FILE,
            dataset_name=f"common_voice_{lang}",
            model_name="ai-forever/mGPT",
            # model_name = "bert-base-multilingual-cased",
            f0_mode=mode,
            f0_n_coeffs=parameters,
            score_last_token=True,
        )

        dm.setup()

        train_texts, train_labels = dm.train_texts, dm.train_durations
        val_texts, val_labels = dm.val_texts, dm.val_durations
        test_texts, test_labels = dm.test_texts, dm.test_durations

        # print(f"Lengths of train, val, test in samples: {len(train_texts), len(val_texts), len(test_texts)}")
        # print(test_texts)
        # print(test_labels)

        all_train_words, all_train_labels = assign_labels_to_sentences(
            train_texts, train_labels
        )
        all_dev_words, all_dev_labels = assign_labels_to_sentences(val_texts, val_labels)
        all_test_words, all_test_labels = assign_labels_to_sentences(test_texts, test_labels)

        all_train_labels = np.array(all_train_labels)
        all_dev_labels = np.array(all_dev_labels)
        all_test_labels = np.array(all_test_labels)

        all_labels = merge_labels(all_train_labels, all_dev_labels, all_test_labels)
        print(all_train_labels.shape)
        print(all_labels.shape)
        # Find different labels corresponding to the same words
        train_word_label_dict = {}
        for word, label in zip(all_train_words, all_train_labels):
            word = word.lower()
            if word not in train_word_label_dict: 
                train_word_label_dict[word] = []
            train_word_label_dict[word].append(label)

        # print(train_word_label_dict['the'])

        test_word_label_dict = {}
        for word, label in zip(all_test_words, all_test_labels):
            word = word.lower()
            if word not in test_word_label_dict: 
                test_word_label_dict[word] = []
            test_word_label_dict[word].append(label)

        dev_word_label_dict = {}
        for word, label in zip(all_dev_words, all_dev_labels):
            word = word.lower()
            if word not in dev_word_label_dict: 
                dev_word_label_dict[word] = []
            dev_word_label_dict[word].append(label)

        #################### Check the data: Find the number of labels for each word in the datasets ######################
        # # print the number of labels in a descending order
        # train_word_numoflabels_dict = {k:len(v) for k, v in train_word_label_dict.items()}
        # sorted_train_word_numoflabels_dict = sorted(train_word_numoflabels_dict.items(), key=lambda item: item[1], reverse=True)
        # # print(sorted_train_word_numoflabels_dict)
        # selected_train_labels = np.concatenate([label for word, label in train_word_label_dict.items() if len(label) > num_labels_thres])

        # test_word_numoflabels_dict = {k:len(v) for k, v in test_word_label_dict.items()}
        # sorted_test_word_numoflabels_dict = sorted(test_word_numoflabels_dict.items(), key=lambda item: item[1], reverse=True)
        # selected_test_labels = np.concatenate([label for word, label in test_word_label_dict.items() if len(label) > num_labels_thres])
        # selected_test_word_label_dict = {word: label for word, label in test_word_label_dict.items() if len(label) > num_labels_thres}
        # # print(sorted_test_word_numoflabels_dict)
        # # print("haha", selected_test_word_label_dict)

        # dev_word_numoflabels_dict = {k:len(v) for k, v in dev_word_label_dict.items()}
        # sorted_dev_word_numoflabels_dict = sorted(dev_word_numoflabels_dict.items(), key=lambda item: item[1], reverse=True)
        # # print(sorted_dev_word_numoflabels_dict)
        # selected_dev_labels = np.concatenate([label for word, label in dev_word_label_dict.items() if len(label) > num_labels_thres])

        all_word_label_dict = merge_dicts(train_word_label_dict, dev_word_label_dict, test_word_label_dict)
        all_word_numoflabels_dict = {k:len(v) for k, v in all_word_label_dict.items()}
        sorted_all_word_numoflabels_dict = sorted(all_word_numoflabels_dict.items(), key=lambda item: item[1], reverse=True)
        selected_word_label_dict_all = {}
        for word, labels in all_word_label_dict.items():
            labels = np.array(labels)
            np.random.shuffle(labels)
            selected_word_label_dict_all[word] = labels
            selected_labels_all = np.concatenate(list(selected_word_label_dict_all.values()))
        ###################################################################################################################

        cond_entropy_list = []
        best_bwth = select_gkde_bandwidth(selected_labels_all, bandwidths=["scott", "silverman", 0.3])
        uncond_density = gaussian_kde(selected_labels_all.T, bw_method=best_bwth)
        for i in range(n_iterations):
            sampled_all_word_label_dict = {word: resample(labels, replace=True, n_samples=len(labels)) for word, labels in selected_word_label_dict_all.items()}
            # conditional entropy
            sampled_cond_entropy, _ = calculate_cond_entropy(sampled_all_word_label_dict, sampled_all_word_label_dict, uncond_density, num_labels_thres=num_labels_thres, esp=0)
            cond_entropy_list.append(sampled_cond_entropy)
            
            print(f"Iteration {i+1}/{n_iterations}: cond entropy = {sampled_cond_entropy}")
        
        cond_entropy_list = np.array(cond_entropy_list)
        cond_entropy_mean = np.mean(cond_entropy_list)
        cond_entropy_std = np.std(cond_entropy_list)
        
        print(f"Final results: Cond entropy = {cond_entropy_mean}, Cond entropy Std = {cond_entropy_std}")
        
        with open(result_file, 'a') as f:
            f.write(f'{lang}\t{cond_entropy_mean}\t{cond_entropy_std}\n')


### Estimate mutual information with training and testing data.

In [None]:
from datetime import datetime
import numpy as np
import random
from scipy.stats import gaussian_kde
from sklearn.utils import resample

# set seed
np.random.seed(234)
esp = 1e-7
for num_labels_thres in [20, 30, 40, 50, 60]:
# for num_labels_thres in [20]:
    datetime_str = datetime.now().strftime("%Y%m%d-%H%M%S")
    parameters = 4
    mode = 'dct'
    SAVE_DIR = f"/swdata/yin/Cui/prosody/notebooks/precomputed/without_llm"
    result_file = f'{SAVE_DIR}/pmi0_C-KDE-split-Thres{num_labels_thres}-{datetime_str}.txt'
    with open(result_file, 'w') as f:
        f.write(f'lang\tcond_entropy\tcond_entropy_std\n')


    languages = ['de', 'en', 'fr', 'it', 'ja', 'sv', 'th', 'vi', 'zh', 'yue', 'zh-by-char', 'yue-by-char']

    n_iterations = 10  # Number of bootstrap samples


    for lang in languages:
        print(f'-----------------------------------Language: {lang}--------------------------------------')
        
        LAB_ROOT = f"/swdata/yin/Cui/prosody/languages/{lang}/aligned"
        PHONEME_LAB_ROOT = f"/swdata/yin/Cui/prosody/languages/{lang}/aligned"
        WAV_ROOT = f"/swdata/yin/Cui/prosody/languages/{lang}/wav_files"
        DATA_CACHE = f"/swdata/yin/Cui/prosody/languages/{lang}/cache"

        TRAIN_FILE = "train-clean-100"
        VAL_FILE = "dev-clean"
        TEST_FILE = "test-clean"

        dm = DataModule(
            wav_root=WAV_ROOT,
            lab_root=LAB_ROOT,
            phoneme_lab_root=PHONEME_LAB_ROOT,
            data_cache=DATA_CACHE,
            train_file=TRAIN_FILE,
            val_file=VAL_FILE,
            test_file=TEST_FILE,
            dataset_name=f"common_voice_{lang}",
            model_name="ai-forever/mGPT",
            f0_mode=mode,
            f0_n_coeffs=parameters,
            score_last_token=True,
        )

        dm.setup()

        train_texts, train_labels = dm.train_texts, dm.train_durations
        val_texts, val_labels = dm.val_texts, dm.val_durations
        test_texts, test_labels = dm.test_texts, dm.test_durations

        # print(f"Lengths of train, val, test in samples: {len(train_texts), len(val_texts), len(test_texts)}")
        # print(test_texts)
        # print(test_labels)

        all_train_words, all_train_labels = assign_labels_to_sentences(
            train_texts, train_labels
        )
        all_dev_words, all_dev_labels = assign_labels_to_sentences(val_texts, val_labels)
        all_test_words, all_test_labels = assign_labels_to_sentences(test_texts, test_labels)

        all_train_labels = np.array(all_train_labels)
        all_dev_labels = np.array(all_dev_labels)
        all_test_labels = np.array(all_test_labels)

        all_labels = merge_labels(all_train_labels, all_dev_labels, all_test_labels)
        print(all_train_labels.shape)
        print(all_labels.shape)
        # Find different labels corresponding to the same words
        train_word_label_dict = {}
        for word, label in zip(all_train_words, all_train_labels):
            word = word.lower()
            if word not in train_word_label_dict: 
                train_word_label_dict[word] = []
            train_word_label_dict[word].append(label)

        # print(train_word_label_dict['the'])

        test_word_label_dict = {}
        for word, label in zip(all_test_words, all_test_labels):
            word = word.lower()
            if word not in test_word_label_dict: 
                test_word_label_dict[word] = []
            test_word_label_dict[word].append(label)

        dev_word_label_dict = {}
        for word, label in zip(all_dev_words, all_dev_labels):
            word = word.lower()
            if word not in dev_word_label_dict: 
                dev_word_label_dict[word] = []
            dev_word_label_dict[word].append(label)

        all_word_label_dict = merge_dicts(train_word_label_dict, dev_word_label_dict, test_word_label_dict)
        all_word_numoflabels_dict = {k:len(v) for k, v in all_word_label_dict.items()}
        sorted_all_word_numoflabels_dict = sorted(all_word_numoflabels_dict.items(), key=lambda item: item[1], reverse=True)

        cond_entropy_list = []
        # Monte Carlo estimation of differential entropy and conditional entropy
        selected_word_label_dict_train = {}
        selected_word_label_dict_test = {}
        selected_word_label_dict_all = {}
        for word, labels in all_word_label_dict.items():
            labels = np.array(labels) 
            np.random.shuffle(labels)
            selected_word_label_dict_all[word] = labels
        selected_labels_all = np.concatenate(list(selected_word_label_dict_all.values()))
        for i in range(n_iterations):
            sampled_train_word_label_dict = {word: resample(labels, replace=True, n_samples=max(1, int(len(labels)*0.7))) for word, labels in selected_word_label_dict_all.items()}
            sampled_test_word_label_dict = {word: resample(labels, replace=True, n_samples=max(1, int(len(labels)*0.3))) for word, labels in selected_word_label_dict_all.items()}
            sampled_train_labels = np.concatenate(list(sampled_train_word_label_dict.values()))
            # split all the labels into training and testing
            best_bwth = select_gkde_bandwidth(sampled_train_labels, bandwidths=["scott", "silverman", 0.3])
            # print(f"Optimal bandwidth diff: {best_bwth}")
            uncond_density = gaussian_kde(sampled_train_labels.T, bw_method=best_bwth) # density = gaussian_kde(selected_labels_test.T, bw_method=best_bwth) if we want to use the testing data
            # conditional entropy
            sampled_cond_entropy, _ = calculate_cond_entropy(sampled_train_word_label_dict, sampled_test_word_label_dict, uncond_density, num_labels_thres=num_labels_thres, esp=esp)
            cond_entropy_list.append(sampled_cond_entropy)
            
            print(f"Iteration {i+1}/{n_iterations}: cond entropy = {sampled_cond_entropy}")
        
        cond_entropy_list = np.array(cond_entropy_list)
        cond_entropy_mean = np.mean(cond_entropy_list)
        cond_entropy_std = np.std(cond_entropy_list)
        
        print(f"Final results: Cond entropy = {cond_entropy_mean}, Cond entropy Std = {cond_entropy_std}")
        
        with open(result_file, 'a') as f:
            f.write(f'{lang}\t{cond_entropy_mean}\t{cond_entropy_std}\n')
