In [1]:
import os
import re
from itertools import compress

import numpy as np
import soundfile as sf
from bs4 import BeautifulSoup

from DataLoader import DataLoader

In [2]:
class OralLoader(DataLoader):
    c2n_map = DataLoader.c2n_map
    c2n_map['*'] = 13  # ch character is subbed as * but in the reverse map it is still 'ch'
    
    def __init__(self, audiofiles, transcripts, bigrams=False, repeated=False):
        super().__init__(audiofiles, transcripts, bigrams, repeated)
        self.labels = None
        self.audio = dict()  # audiofile dictionary with filenames as keys
        self.fs = dict()     # sampling frequencies of the audiofiles with filenames as keys
        
    def transcripts_to_labels(self, label_max_duration=10.0):
        turn_info = [list() for _ in range(len(self.transcripts))]
        turn_info_no_overlap = [list() for _ in range(len(self.transcripts))]
        reg_ch = r'ch'  # any sequence of characters 'ch' ... which counts as a single character in Czech
        reg_pthses = r'\(.*?\)'  # any character between parentheses () -- in oral it marks special sounds (laugh, ambient, ...)
        reg_not_czech = r'[^A-Za-záéíóúýčďěňřšťůž ]+'  # all nonalphabetic characters (czech alphabet)
        labels = dict()
        for idx, file in enumerate(self.transcripts):
            with open(file, 'r', encoding='cp1250') as f:
                raw = f.read()
            
            soup = BeautifulSoup(raw, 'lxml')
            
            file_name = soup.find('trans')['audio_filename']
            speakers = {s['id'] for s in soup.find_all('speaker')}  # speaker id's that appear in the current transcript file
            turns = soup.find_all('turn')

            # extract time_spans and number of speakers from each turn
            for i, t in enumerate(turns):
                sync_times = [float(sync['time']) for sync in t.find_all('sync')]
                turn_info[idx].append({
                    'sync_times': list(zip(sync_times, [*sync_times[1:], float(t['endtime'])])),
                    'speakers': t['speaker'].split(' ') if 'speaker' in t.attrs.keys() else [],
                    'text': [' '.join(re.sub(reg_not_czech, '',         # remove nonalphabetic characters
                                             re.sub(reg_pthses, '',     # remove anything in parentheses including the parentheses
                                                    txt.lower())).split()) for txt in t.text[:-1].split('\n\n')[1:]]
                })
                
            # GET TURNS WITH EXACTLY 1 SPEAKER (removes overlap and ambient noises)
            # create mask in which True means that there is exactly 1 speaker
            one_speaker_mask = [len(turn['speakers']) == 1 for turn in turn_info[idx]]
            num_removed = len(one_speaker_mask) - sum(one_speaker_mask)
            # compress the lists using the mask
            turns_no_overlap = list(compress(turns, one_speaker_mask))
            turn_info_no_overlap[idx] = list(compress(turn_info[idx], one_speaker_mask))
            
            assert len(turns_no_overlap) == len(turns) - num_removed
            assert len(turn_info_no_overlap[idx]) == len(turn_info[idx]) - num_removed
            assert all([len(t['speakers']) == 1 for t in turn_info_no_overlap[idx]])
        
            # TODO: split into transcripts with time length of 'label_max_duration' seconds or less 
            sents = []
            starts = []
            ends = []
            for info in turn_info_no_overlap[idx]:
                sync_times, speaker, text = info.values()
                assert len(sync_times) == len(text)
                # fill sents, starts and ends with the first entries in turn_info
                starts.append(sync_times[0][0])
                ends.append(sync_times[0][1])
                sent_duration = sync_times[0][1] - sync_times[0][0]
                sents.append(text[0]) 
                for i in range(1, len(sync_times)):
                    utterance_duration = sync_times[i][1] - sync_times[i][0]
                    # TODO: if current sent duration is shorter than 'label_max_duration' seconds, add to end time, else
                    if sent_duration + utterance_duration < label_max_duration:
                        ends[-1] = sync_times[i][1]
                        sent_duration += utterance_duration
                        sents[-1] += ' ' + text[i]
                    else:
                        starts.append(sync_times[i][0])
                        ends.append(sync_times[i][1])
                        sent_duration = sync_times[i][1] - sync_times[i][0]
                        sents.append(text[i])
            
            # convert the sentences into integer arrays
            sents = [np.array([self.c2n_map[c] for c in re.sub(reg_ch, '*', s)]) for s in sents]
            labels[file_name] = tuple(zip(sents, starts, ends))
                 
        self.labels = labels        
        
        return labels
    
    def load_audio(self):
        for i, file in enumerate(self.audiofiles):
            path, filename = os.path.split(file)
            filename, ext = os.path.splitext(filename)
            
            signal, fs = sf.read(file)
            
            # create array with sampling times of the audiofile
            tstart = 0
            tend = signal.shape[0]/fs
            tstep = 1/fs
            tspan = np.arange(tstart, tend, tstep, dtype=np.float32)
            
            starts = []
            ends = []
            for label in self.labels[filename]:
                starts.append(label[1])
                ends.append(label[2])
            
            starts_idcs = np.asarray([np.searchsorted(tspan, start) for start in starts], dtype=np.int32)
            ends_idcs = np.asarray([np.searchsorted(tspan, end) for end in ends], dtype=np.int32)
            
            self.audio[filename] = [signal[starts_idcs[j]:ends_idcs[j]] for j in range(starts_idcs.shape[0])]
            self.fs[filename] = fs
            
        return self.audio, self.fs
    
    @staticmethod
    def save_audio(file, audio, fs):
        sf.write(file, audio, fs)
        
    def save_labels(self, labels=None, folder='./data/oral2013/', exist_ok=False):
        """
        Save labels of transcripts to specified folder under folders with names equal to name of the transcrips files
        """
        if not labels:
            if self.labels:
                labels = self.labels
            else:
                print('No labels were given and the class labels have not been generated yet.'
                      'Please call transcripts_to_labels class function first.')
                return
            
        subfolders = tuple(labels.keys())
        
        try:
            for subfolder in subfolders:
                os.makedirs(os.path.join(folder, subfolder), exist_ok=exist_ok)
        except OSError:
            print('Subfolders already exist. Please set exist_ok to True if you want to save into them anyway.')
            return
        
        for key, vals in labels.items():
            ndigits = len(str(len(vals)))
            fullpath = os.path.join(folder, key)
            for i, (sent, _, _) in enumerate(vals):
                np.save('{0}/transcript-{1:0{2}d}.npy'.format(fullpath, i, ndigits), sent)
                print(key + ' saved to ' + fullpath)
            
    @staticmethod
    def load_labels(path_to_files='./data'):
        """ Load labels of transcripts from transcript-###.npy files in specified folder
        into a dictionary of labels and paths to their files
        :param path_to_files: string path leading to the folder with transcript files or .npy trascript file

        :return Dict["folder/file_name":Tuple[List[labels], List[path_to_files]]]
        """

        ext = os.path.splitext(path_to_files)[1]

        # if path_to_files leads to a single (.npy) file , load only the one file
        if ext == ".npy":
            key = os.path.splitext(os.path.basename(path_to_files))[0]
            labels = {key:([np.load(path_to_files)],
                           [os.path.abspath(path_to_files)])}
        elif not ext:
            # if the path_to_files contains subfolders, load data from all subfolders
            labels = dict()
            path_list = []
            subfolders = [os.path.join(path_to_files, subfolder) for subfolder in next(os.walk(path_to_files))[1]]

            # if there are no subfolders in the provided path_to_files, look directly in path_to_files
            if not subfolders:
                subfolders.append(path_to_files)

            for sub in subfolders:
                files = [os.path.splitext(f) for f in os.listdir(sub) if
                         os.path.isfile(os.path.join(sub, f))]
                paths = [os.path.abspath(os.path.join(sub, ''.join(file)))
                         for file in files if 'transcript' in file[0] and file[-1] == '.npy']
                sublabels = [np.load(path) for path in paths]
                labels[os.path.normpath(sub).split('\\')[-1]] = tuple(zip(sublabels, paths))
        else:
            raise IOError("Specified file doesn't have .npy suffix.")

        return labels
        

In [3]:
audio_folder = 'd:/Audio/CeskyNarodniKorpus/oral2013/audio'
transcript_folder = 'd:/Audio/CeskyNarodniKorpus/oral2013/transcripts'
audio_paths = [os.path.join(audio_folder, f) for f in os.listdir(audio_folder)
                  if os.path.isfile(os.path.join(audio_folder, f))]
transcript_paths = [os.path.join(transcript_folder, f) for f in os.listdir(transcript_folder)
                       if os.path.isfile(os.path.join(transcript_folder, f))]

oral = OralLoader(['d:/Audio/CeskyNarodniKorpus/oral2013/audio/08A002N.wav', 'd:/Audio/CeskyNarodniKorpus/oral2013/audio/08A003N.wav'], 
                  ['d:/Audio/CeskyNarodniKorpus/oral2013/transcripts/08A002N.trs', 'd:/Audio/CeskyNarodniKorpus/oral2013/transcripts/08A003N.trs'])
# oral = OralLoader(audio_paths, transcript_paths)

labels = oral.transcripts_to_labels()
audio, fs = oral.load_audio()

FileNotFoundError: [WinError 3] Systém nemůže nalézt uvedenou cestu: 'd:/Audio/CeskyNarodniKorpus/oral2013/audio'

In [205]:
oral.save_labels(exist_ok=True, folder='./data/oral2013/transcripts/')

08A002N saved to ./data/oral2013/transcripts/08A002N
08A002N saved to ./data/oral2013/transcripts/08A002N
08A002N saved to ./data/oral2013/transcripts/08A002N
08A002N saved to ./data/oral2013/transcripts/08A002N
08A002N saved to ./data/oral2013/transcripts/08A002N
08A002N saved to ./data/oral2013/transcripts/08A002N
08A002N saved to ./data/oral2013/transcripts/08A002N
08A002N saved to ./data/oral2013/transcripts/08A002N
08A002N saved to ./data/oral2013/transcripts/08A002N
08A002N saved to ./data/oral2013/transcripts/08A002N
08A002N saved to ./data/oral2013/transcripts/08A002N
08A002N saved to ./data/oral2013/transcripts/08A002N
08A002N saved to ./data/oral2013/transcripts/08A002N
08A002N saved to ./data/oral2013/transcripts/08A002N
08A002N saved to ./data/oral2013/transcripts/08A002N
08A002N saved to ./data/oral2013/transcripts/08A002N
08A002N saved to ./data/oral2013/transcripts/08A002N
08A002N saved to ./data/oral2013/transcripts/08A002N
08A002N saved to ./data/oral2013/transcripts/0

08A003N saved to ./data/oral2013/transcripts/08A003N
08A003N saved to ./data/oral2013/transcripts/08A003N
08A003N saved to ./data/oral2013/transcripts/08A003N
08A003N saved to ./data/oral2013/transcripts/08A003N
08A003N saved to ./data/oral2013/transcripts/08A003N
08A003N saved to ./data/oral2013/transcripts/08A003N
08A003N saved to ./data/oral2013/transcripts/08A003N
08A003N saved to ./data/oral2013/transcripts/08A003N
08A003N saved to ./data/oral2013/transcripts/08A003N
08A003N saved to ./data/oral2013/transcripts/08A003N
08A003N saved to ./data/oral2013/transcripts/08A003N
08A003N saved to ./data/oral2013/transcripts/08A003N
08A003N saved to ./data/oral2013/transcripts/08A003N
08A003N saved to ./data/oral2013/transcripts/08A003N
08A003N saved to ./data/oral2013/transcripts/08A003N
08A003N saved to ./data/oral2013/transcripts/08A003N
08A003N saved to ./data/oral2013/transcripts/08A003N
08A003N saved to ./data/oral2013/transcripts/08A003N
08A003N saved to ./data/oral2013/transcripts/0

In [189]:
# saving audio segments as wav files
file_index = 6
print(audio['08A002N'][file_index].shape[0]/fs['08A002N'])
print(''.join(DataLoader.n2c_map[c] for c in labels['08A002N'][file_index][0]))
OralLoader.save_audio('./data/08A002N-{}.wav'.format(file_index),
                      audio['08A002N'][file_index],
                      fs['08A002N'])

2.364
já vim že ti to pomůže


In [190]:
print(''.join(DataLoader.n2c_map[c] for c in labels['08A002N'][47][0]))

no


In [229]:
lab = OralLoader.load_labels('./data/oral2013/transcripts/')
labs_DL, paths_DL = DataLoader.load_labels('./data/oral2013/transcripts/')

In [230]:
all(labels['08A002N'][10][0] == lab['08A002N'][10][0])

True

In [243]:
print(labs_DL[0][3])
print(lab['08A002N'][3][0])

[20 22 42 30  7  6 42  2 32  5 32 42 19 15 30 42  0 41 42  5 22 42 22 28
 32 19 20  1  3 30 39 12 22 42 16 14 42  2 32  5 32 42 19 15 30 42 17  0
 41  5  7 16 42  5  7 20 42 30  0 17 22 35 39 42 14 20 30  7 20 40 14 35
 20 15 42  0 42 19  1 19 42 24 22  3 14 30]
[20 22 42 30  7  6 42  2 32  5 32 42 19 15 30 42  0 41 42  5 22 42 22 28
 32 19 20  1  3 30 39 12 22 42 16 14 42  2 32  5 32 42 19 15 30 42 17  0
 41  5  7 16 42  5  7 20 42 30  0 17 22 35 39 42 14 20 30  7 20 40 14 35
 20 15 42  0 42 19  1 19 42 24 22  3 14 30]
