# Audio-based model for detection of self-repetition

The following function, `audio_model_self_repetition` takes as input a **single or multi-speaker audio file** and a **.TextGrid diarization** of that audio. The .TextGrid should contain at least one tier, named **'AC'** (Autistic Child), where the speech of the child of whom echolalic utterances will be detected is indicated in **non-empty intervals** (e.g. indicated by 'speech', 'AC'...). Each utterance of the AC will be compared with previous utterances of the AC and a label ('repetitive' or 'non-repetitive') will be predicted. The output is a **dataframe** containing the predictions for each pair of utterances of the AC containing the timestamps of both utterances. Furthermore, a new **.TextGrid** is created on the basis of the input .TextGrid, containing the annotations of the predicted source and echolalic intervals on two different tiers ('source' and 'repetition'). The output path to this file must be specified in the parameter 'output_path'.

*Note*: the current trained classifier does not yet achieve satisfying results. This script will be adapted with a new version of the classifier once developed.

In [1]:
def audio_model_self_repetition(input_audio= None, textgrid_file= None, output_path= None):
    """Takes an audio file and its diarization in .TextGrid as input, 
    makes a prediction on the label (self-repetitive or not) for each suitable utterance-pair
    and returns a dataframe containing these predictions for each pair,
    along with an annotation of the predictions in a new TextGrid"""
    
    
    # Load libraries
    import pickle
    from sklearn.linear_model import LogisticRegression
    import pandas as pd
    from pydub import AudioSegment
    from praatio import textgrid
    import librosa
    import numpy as np
    from dtw import dtw
    import sklearn.preprocessing
    import itertools
    import tgt
        
        
    # EXTRACT SPEECH INTERVALS
    
    input_tg= textgrid.openTextgrid(textgrid_file, includeEmptyIntervals= True)
    
    def get_speech_intervals(tier):
        """Takes tier name as input
        and returns a nested list of the start and end timestamps
        of the utterances spoken by the speaker of the tier,
        filtering out the unintelligible ones"""
        entries= input_tg.getTier(tier).entries
        intervals= [[entry.start, entry.end] for entry in entries]
                
        return intervals 
    
    child_intervals= get_speech_intervals('AC')
    
    # PREPARE THE AUDIO PROCESSING
    
    def audio_segment_to_np(audiosegment):
        """Converts an 'Audiosegment' object of the 'audiosegment' library
        into a numpy array"""
        samples = audiosegment.get_array_of_samples() # function of AudioSegment library
        samples = np.array(samples)
        samples = samples.astype(np.float32) / np.iinfo(samples.dtype).max
        return samples
    
    # DEFINE FUNCTIONS FOR FEATURE EXTRACTION FROM AUDIO SIGNAL
    
    n_fft = 2048
    hop_length= 512
    
    features= ['mfcc', 'melspectrogram', 'lpc']
    
    def compute_lpc_2d(y, sr, order=13, frame_length=n_fft, hop_length=hop_length):
        # so that lpc coefficients are also computed per frame just as for mfcc and melspectrogram
        
        # Frame the audio signal
        y_padded = np.pad(y, (0, frame_length - len(y) % hop_length), mode='constant')
        frames = librosa.util.frame(y_padded, frame_length=frame_length, hop_length=hop_length).T
        # Compute LPC for each frame
        lpc_coeffs = [librosa.lpc(frame, order=order) for frame in frames]
        # Stack LPC coefficients into a 2D array
        lpc_2d = np.vstack(lpc_coeffs)
        return lpc_2d
    
    def get_feature_array(feature, segment, sr):
        if feature== 'mfcc':
            # use 'audio_segment_to_np()' function previously defined
            feature_vector= librosa.feature.mfcc(y= audio_segment_to_np(segment), n_mfcc=13, hop_length= hop_length, n_fft= n_fft, sr=sr).squeeze()
        elif feature== 'melspectrogram':
            feature_vector= librosa.feature.melspectrogram(y= audio_segment_to_np(segment), hop_length= hop_length, n_fft= n_fft, sr=sr, n_mels=32).squeeze()
        elif feature== 'lpc':
            feature_vector= compute_lpc_2d(y= audio_segment_to_np(segment), sr=sr).squeeze().T
        
        ready_feature_vector= sklearn.preprocessing.minmax_scale(feature_vector, axis=1)
                
        return ready_feature_vector

    
    # LOAD THE AUDIO FILE 
    
    audio = AudioSegment.from_wav(input_audio) # Load the audio and create an 'Audiosegment' object
    sr = librosa.load(input_audio)[1] # Get sample rate
    
    
    # CREATE THE FEATURE DATAFRAME:
    
    # Initalize dataframe
    df= pd.DataFrame(columns=['source_int', 'rep_int'] +[f'DTW_{feature}' for feature in features] \
                     + [f'DTW_{f1}_{f2}' for f1,f2 in list(itertools.combinations(features,2))]+ ['DTW_combined'])
    row_df= 0
    
    
    # Iterate through child_intervals and compare them to other intervals that the same speaker pronounced before


    for start, end in child_intervals:

        rep_int= (start, end)

        start_rep = start * 1000
        end_rep = end * 1000
        rep_segment = audio[start_rep:end_rep]

        rep_features= []

        # Get features for repetitive utterance candidate:

        for feature in features:
            # use function 'get_feature_array()' to get features from 'Audiosegment' object
            # Iterate through the 'features' list (= parameter of the function 'feature_df_echolalia()') 
            # to compute all the features
            rep_feature= get_feature_array(feature, rep_segment, sr)

            # Store all feature arrays in a list
            rep_features.append(rep_feature)

        # Concatenate different combinations of feature arrays
        rep_combinations = [np.concatenate((f1, f2)) for f1, f2 in itertools.combinations(rep_features, 2)]
        rep_all_concatenated= np.concatenate([feature for feature in rep_features]) 


        # Now store data of previous utterances:
        for s, e in child_intervals:

            source_int= (s,e)

            start_source = s * 1000
            end_source = e * 1000

            # Store only data from utterances that start before the repetition candidate:
            if start_source < start_rep: 

                source_segment = audio[start_source:end_source]
                source_features= []

                # Get features for source utterance:

                for feature in features:

                    source_feature= get_feature_array(feature, source_segment, sr)

                    # Store all feature arrays in a list
                    source_features.append(source_feature)

                # Concatenate different combinations of feature arrays
                source_combinations = [np.concatenate((f1, f2)) for f1, f2 in itertools.combinations(source_features, 2)]
                source_all_concatenated= np.concatenate([feature for feature in source_features])


                # Get distance metrics for the utterance pair:

                distance_features= []

                # Iterate over lists of feature arrays of both speakers and compute the normalized distance

                for rep_feature, source_feature in zip(rep_features, source_features):
                    distance_feature= dtw(rep_feature.T, source_feature.T, distance_only=True).normalizedDistance
                    distance_features.append(distance_feature)

                for rep_combination, source_combination in zip(rep_combinations, source_combinations):
                    distance_feature= dtw(rep_combination.T, source_combination.T, distance_only=True).normalizedDistance
                    distance_features.append(distance_feature)

                # dtw computation on concatenation of all feature arrays:
                dtw_combination= dtw(rep_all_concatenated.T, source_all_concatenated.T, distance_only=True).normalizedDistance



                # ADD EVERYTHING TO THE DATAFRAME
                df.loc[row_df] = [(source_int), (rep_int)] + distance_features + [dtw_combination]
                # Move to a new row in the output dataframe and start a new iteration of the intervals of speaker 2
                row_df+=1
        
        
    # MAKE PREDICTIONS

    # Load pretrained model and configurations
    with open('Trained_best_classifier_self-repetition.pkl', 'rb') as f:
        model_config = pickle.load(f)
    model= model_config['model_obj']
    best_features= model_config['best_features']
    threshold= model_config['threshold']

    # Define function for predictions
    def predict(model, X, threshold):
        probs = model.predict_proba(X) 
        return (probs[:, 1] > threshold).astype(int)

    # Predict outcomes
    X= df[best_features]
    preds= pd.DataFrame(predict(model=model, X=X, threshold=threshold), columns= ['prediction_binary'])
    df= pd.concat([df, preds], axis=1)
    df['prediction'] = df['prediction_binary'].apply(lambda x: 'repetitive' if x == 1 else 'non-repetitive')
    
    
    # MAKE TEXTGRID
    tg = tgt.io.read_textgrid(textgrid_file, encoding='utf-16')
    source_tier = tgt.IntervalTier(start_time=0, name='source')
    rep_tier = tgt.IntervalTier(start_time=0, name='repetition')
    
    all_sources=[]
    all_echoes=[]
    
    df_rep= df.loc[df['prediction']== 'repetitive']
    
    for index, row in df_rep.iterrows():
        start_rep, end_rep = row['rep_int']
        start_source, end_source = row['source_int']
        
        if (start_source, end_source) not in all_sources:
            all_sources.append((start_source, end_source))
            source_interval= tgt.Interval(start_time=float(start_source), end_time=float(end_source), text= 'source')
            source_tier.add_interval(source_interval)
        
        if (start_rep, end_rep) not in all_echoes:
            all_echoes.append((start_rep, end_rep))
            rep_interval= tgt.Interval(start_time=float(start_rep), end_time=float(end_rep), text= 'repetition')
            rep_tier.add_interval(rep_interval)
            
    # Output the TextGrid
    tg.add_tier(source_tier)
    tg.add_tier(rep_tier)
    tgt.write_to_file(tg, output_path, format='short')
            
        
    return df[['source_int', 'rep_int', 'prediction']]