# Develop a pyAudioAnalysis based segmentation algorithm
The purpose of this notebook is to develop a pyAudioAnalysis based segmentation. It results from the work done in PerformanceOnLowSeg.ipynb.

The idea here is to use the silence_removal algorithm to gain segments that have voice, music, or other in them. Then the svm algorithm is applied to filter out just the voice segments. Once the voice segments are identified they are converted into segments using the following rules:

1. No more than 25% of segment should contain non-voice data.
2. Segment starts with at least 0.75s of speech
3. Segment starts on a speech pause boundary (0.2s)

After implementing this the effects on different files will be examined to pick a good set of parameters.

In [1]:
import pandas as pd
import numpy as np
import os
import sys
module_path = os.path.abspath(os.path.join('~/work/pyAudioAnalysis'))
if module_path not in sys.path:
    sys.path.append(module_path)
from collections import namedtuple

from pyAudioAnalysis import audioSegmentation as aS
from pyAudioAnalysis import audioTrainTest as at
from pyAudioAnalysis import MidTermFeatures as mtf
from pyAudioAnalysis import audioBasicIO as aIO

In [2]:
# define our unittests
import unittest

def run_tests(test_invoked=''):
  default_test = (test_invoked, ) if test_invoked != '' else None
  unittest.main(argv=[''], verbosity=1, exit=False, defaultTest=default_test)

Read in some files for analysis

In [3]:
instrument = pd.read_csv('../../data/instrument.csv')
poor_webrtcvad = pd.read_csv('../../poor_webrtcvad.csv')
good_webrtcvad = pd.read_csv('../../data/good_webrtcvad.csv')

## Reimplement mid_term_file_classification
The original mid_term_file_classification does not do what we want it to. As we are rewriting it we will take the opportunity to integrate silence removal - which is much more accurate than the classification algorithm.

In [4]:
import warnings
# sklearn puts out a lot of annoying warnings
warnings.filterwarnings('ignore')

Segment = namedtuple('Segment', ['start', 'end', 'classification'])

# re-implement a simplified version of mid_term_file_classification
# it is implemented as a class to allow the model to be cached.
class ExtractVoiceSegments():
    classifier, mean, std, class_names, mt_win, mid_step, st_win, \
         st_step, compute_beat = at.load_model('/home/jovyan/work/pyAudioAnalysis/pyAudioAnalysis/data/models/svm_rbf_4class')

    def __init__(self):
        pass

    def __segments_in(self, signal, sampling_rate, offset):
        labels = []

        # mid-term feature extraction:
        mt_feats, _, _ = \
            mtf.mid_feature_extraction(signal, sampling_rate,
                                    ExtractVoiceSegments.mt_win * sampling_rate,
                                    ExtractVoiceSegments.mid_step * sampling_rate,
                                    round(sampling_rate * ExtractVoiceSegments.st_win),
                                    round(sampling_rate * ExtractVoiceSegments.st_step))

        # for each feature vector (i.e. for each fix-sized segment):
        for col_index in range(mt_feats.shape[1]):
            # normalize current feature v
            feature_vector = (mt_feats[:, col_index] - ExtractVoiceSegments.mean) / ExtractVoiceSegments.std

            # classify vector:
            label_predicted, _ = \
                at.classifier_wrapper(ExtractVoiceSegments.classifier, 'svm', feature_vector)
            labels.append(label_predicted)

        segs, classes = aS.labels_to_segments(labels, ExtractVoiceSegments.mid_step)
        # there is a bug in labels to segments when there is a single label. In this case it returns a list rather than a list of lists
        if len(labels) == 1:
            segs = [].append(segs)
        return [] if segs is None else [Segment(seg[0]+offset, seg[1]+offset, ExtractVoiceSegments.class_names[int(label)]) for seg, label in zip(segs, classes)]



    """
    This function performs mid-term classification of an audio stream.
    Towards this end, supervised knowledge is used,
    i.e. a pre-trained classifier.
    ARGUMENTS:
        - input_file:        path of the input WAV/mp3 file
    RETURNS:
        - list of Segments (see above tuple)
    """
    def __call__(self, input_file):
        segments = []

        # load input file
        sampling_rate, signal = aIO.read_audio_file(input_file)

        # could not read file
        if sampling_rate == 0:
            return segments

        # convert stereo (if) to mono
        signal = aIO.stereo_to_mono(signal)

        # find the silence segments
        non_silent_segments = aS.silence_removal(signal, sampling_rate, 0.02, 0.02, smooth_window=1.0, weight=0.3)

        # work through each segment
        for seg in non_silent_segments:
            start = int(seg[0]*sampling_rate)
            stop = int(seg[1]*sampling_rate)
            segments.extend(self.__segments_in(signal[start:stop], sampling_rate, seg[0]))

        return segments

In [5]:
extract_voice_segments = ExtractVoiceSegments()
segments = extract_voice_segments('/media/programs/' + 'Programs/65/65984/A65984/PM-1901/A65984-036.wav')


In [6]:
print('/media/programs/' + good_webrtcvad.iloc[13].filename)

/media/programs/Programs/30/30850/A30850/From_CM/A30850-14.wav


Now we want to convert the list of segments into speech epochs of a given length. This is where we apply the rules for the division of the raw track into voice segments.

We want to test these rules:

1. No more than 25% (programmable) of segment should contain non-voice data.
2. Segment starts with at least 0.75s of speech (redundant rule. It has to be a second to register)
3. Segment starts on a speech pause boundary (0.2s) (too hard to implement)

In [9]:
Epoch = namedtuple('Epoch', ['start', 'end'])
def speech_epochs_from_segments(segments, epoch_length=4.0, silence_tolerance=0.0):
    epochs = []
    i = 0
    silence_this_epoch = silence_tolerance
    while i < len(segments):
        seg_duration = segments[i].end - segments[i].start
        if segments[i].classification != 'speech':
            silence_this_epoch = silence_tolerance

        elif seg_duration >= epoch_length:
            epochs.append(Epoch(segments[i].start, segments[i].start+epoch_length))
            silence_this_epoch = silence_tolerance
            # process the same segment again with a smaller size
            new_start = segments[i].start+epoch_length
            new_end = segments[i].end
            if new_start < new_end:
                segments[i] = Segment(new_start, new_end, segments[i].classification)
                continue
        else:
            if i+1 < len(segments):
                if (segments[i].end + silence_this_epoch) >= segments[i+1].start and segments[i+1].classification == 'speech':
                    # did we use up any silence tolerence
                    if segments[i].end < segments[i+1].start:
                        silence_this_epoch -= (segments[i+1].start - segments[i].end)
                    segments[i+1] = Segment(segments[i].start, segments[i+1].end, segments[i].classification)
                else:
                    silence_this_epoch = silence_tolerance

        i+=1

    return epochs

In [10]:
class TestSpeechEpochsFromSegments(unittest.TestCase):

    def test_null_case(self):
        self.assertEqual([], speech_epochs_from_segments([]))

    def test_one_to_one(self):
        self.assertEqual([Epoch(0, 4)], speech_epochs_from_segments([
            Segment(0.0, 4.0, 'speech')
        ]))

    def test_reject_all_but_voice(self):
        self.assertEqual([], speech_epochs_from_segments([
            Segment(0.0, 4.0, 'music')
        ]))

        self.assertEqual([], speech_epochs_from_segments([
            Segment(0.0, 4.0, 'noise')
        ]))
        self.assertEqual([], speech_epochs_from_segments([
            Segment(0.0, 4.0, 'other')
        ]))
        self.assertEqual([Epoch(0, 4)], speech_epochs_from_segments([
            Segment(0.0, 4.0, 'speech')
        ]))
    def test_many_to_many(self):
        self.assertEqual([Epoch(0, 4), Epoch(5,9), Epoch(10, 14)], speech_epochs_from_segments([
            Segment(0.0, 4.0, 'speech'),
            Segment(5.0, 9.0, 'speech'),
            Segment(10.0, 14.0, 'speech')
        ]))

    def test_many_to_many_mixed(self):
        self.assertEqual([Epoch(0, 4), Epoch(10, 14)], speech_epochs_from_segments([
            Segment(0.0, 4.0, 'speech'),
            Segment(5.0, 9.0, 'music'),
            Segment(10.0, 14.0, 'speech')
        ]))

    def test_reject_too_short(self):
        self.assertEqual([], speech_epochs_from_segments([
            Segment(0.0, 3.0, 'speech')
        ], 4.0))

    def test_merge_short_segments(self):
        self.assertEqual([Epoch(0, 4), Epoch(4, 8)], speech_epochs_from_segments([
            Segment(0.0, 1.0, 'speech'),
            Segment(1.0, 2.0, 'speech'),
            Segment(2.0, 3.0, 'speech'),
            Segment(3.0, 4.0, 'speech'),
            Segment(4.0, 8.0, 'speech')
        ]))

    def test_split_long_segments(self):
        self.assertEqual([Epoch(0, 4), Epoch(4, 8)], speech_epochs_from_segments([
            Segment(0.0, 8.0, 'speech')
        ]))

    def test_merge_short_mixed_type_segments(self):
        self.assertEqual([Epoch(3, 7)], speech_epochs_from_segments([
            Segment(0.0, 1.0, 'speech'),
            Segment(1.0, 2.0, 'speech'),
            Segment(2.0, 3.0, 'music'),
            Segment(3.0, 4.0, 'speech'),
            Segment(4.0, 8.0, 'speech')
        ]))

    def test_silent_gaps(self):
        self.assertEqual([Epoch(3, 7)], speech_epochs_from_segments([
            Segment(0.0, 1.0, 'speech'),
            Segment(1.0, 2.0, 'speech'),
            Segment(3.0, 4.0, 'speech'),
            Segment(4.0, 8.0, 'speech')
        ]))

    def test_silent_gaps_with_silence_tolerance(self):
        self.assertEqual([Epoch(0, 4), Epoch(4, 8)], speech_epochs_from_segments([
            Segment(0.0, 1.0, 'speech'),
            Segment(1.0, 2.0, 'speech'),
            Segment(3.0, 4.0, 'speech'),
            Segment(4.0, 8.0, 'speech')
        ], silence_tolerance=1.0))
        self.assertEqual([Epoch(3, 7)], speech_epochs_from_segments([
            Segment(0.0, 1.0, 'speech'),
            Segment(1.0, 2.0, 'speech'),
            Segment(3.0, 4.0, 'speech'),
            Segment(4.0, 8.0, 'speech')
        ], silence_tolerance=0.9))

    def test_epoch_length(self):
        self.assertEqual([Epoch(0, 6)], speech_epochs_from_segments([
            Segment(0.0, 1.0, 'speech'),
            Segment(1.0, 2.0, 'speech'),
            Segment(3.0, 4.0, 'speech'),
            Segment(4.0, 8.0, 'speech')
        ], silence_tolerance=1.0, epoch_length=6.0))

    def test_fragmented_with_silence(self):
        self.assertEqual([Epoch(0, 6)], speech_epochs_from_segments([
            Segment(0.0, 1.0, 'speech'),
            Segment(2.0, 3.0, 'speech'),
            Segment(4.0, 5.0, 'speech'),
            Segment(6.0, 8.0, 'speech')
        ], silence_tolerance=3.0, epoch_length=6.0))

    def test_too_much_silence(self):
        self.assertEqual([Epoch(4, 8)], speech_epochs_from_segments([
            Segment(0.0, 1.0, 'speech'),
            Segment(2.0, 3.0, 'speech'),
            Segment(4.0, 5.0, 'speech'),
            Segment(6.0, 8.0, 'speech')
        ], silence_tolerance=1.0, epoch_length=4.0))

    def test_seg_too_small(self):
        self.assertEqual([], speech_epochs_from_segments([
            Segment(0.0, 3.0, 'speech')
        ], silence_tolerance=1.0, epoch_length=4.0))

    # these have been adapted from tests on divide_into_speech_epochs
    def test_reject_music(self):
        self.assertEqual([Epoch(5, 9)], speech_epochs_from_segments([
            Segment(0, 5, 'music'),
            Segment(5, 10, 'speech')]))
        
    def test_two_epochs_one_segment(self):
        self.assertEqual([Epoch(0,4), Epoch(4,8)], speech_epochs_from_segments([Segment(0,9, 'speech')]))

    def test_reject_complex(self):
        self.assertEqual([Epoch(5, 9), Epoch(25, 29)], speech_epochs_from_segments([
            Segment(0, 5, 'music'),
            Segment(5, 10, 'speech'),
            Segment(11, 20, 'other'),
            Segment(25, 30, 'speech') ]))
        
    def test_terminate_at_end(self):
        self.assertEqual([], speech_epochs_from_segments([
            Segment(5, 8, 'speech')]))
        
    def test_reject_too_little_voice(self):
        self.assertEqual([], speech_epochs_from_segments([
            Segment(5, 7.5, 'speech')]))
        self.assertEqual([], speech_epochs_from_segments([
            Segment(5, 8, 'speech')], silence_tolerance=0.8))
        
    def test_accept_fragmented_segments(self):
        self.assertEqual([Epoch(5, 9)], speech_epochs_from_segments([
            Segment(5, 7, 'speech'), Segment(8, 9, 'speech')], silence_tolerance=1.0))
        
    def test_accept_segment_after_fragment(self):
        self.assertEqual([Epoch(11, 15)], speech_epochs_from_segments([
            Segment(5, 7, 'speech'), Segment(8, 10, 'speech'), Segment(11, 15, 'speech')], silence_tolerance=0.5))
        
    def test_reject_music_segment_in_fragment(self):
        self.assertEqual([Epoch(8, 12)], speech_epochs_from_segments([
            Segment(5, 7, 'speech'), Segment(7, 8, 'music'), Segment(8, 10, 'speech'), Segment(11, 12, 'speech')], silence_tolerance=1.0))

    def test_accept_multiple_fragments(self):
        self.assertEqual([Epoch(5, 15)], speech_epochs_from_segments([
            Segment(5, 7, 'speech'), 
            Segment(8, 10, 'speech'), 
            Segment(11, 12, 'speech'),
            Segment(13, 20, 'speech')], epoch_length=10.0, silence_tolerance=5.0))
        
    # we need test cases that explicitly test the desired behaviour around merging of segments. 
    # case 1: Our new epoch ends inside a segment but there is insufficient space in the remaining segment for it to be used. 
    def test_discard_part_segment(self):
        self.assertEqual([Epoch(5, 9), Epoch(11, 15), Epoch(15, 19)], speech_epochs_from_segments([
            Segment(5, 7, 'speech'), 
            Segment(8, 9, 'speech'), 
            Segment(11, 12, 'speech'),
            Segment(13, 20, 'speech')], epoch_length=4.0, silence_tolerance=1.0))
        
    # case 2: Our new epoch ends inside a segment and there is sufficient space in the remaining segment for it to be used. 
    def test_use_part_segment(self):
        self.assertEqual([Epoch(5, 9), Epoch(9, 13)], speech_epochs_from_segments([
            Segment(5, 7, 'speech'), 
            Segment(8, 13, 'speech')], epoch_length=4.0, silence_tolerance=1.0))

    def test_discard_music_segment(self):
        self.assertEqual([Epoch(5, 9), Epoch(13, 17)], speech_epochs_from_segments([
            Segment(5, 7, 'speech'), 
            Segment(8, 10, 'speech'), 
            Segment(11, 12, 'music'),
            Segment(13, 20, 'speech')], epoch_length=4.0, silence_tolerance=1.0))

    def test_fraction_voice(self):
        pass
        """self.assertEqual([Epoch(13, 17)], speech_epochs_from_segments([
            Segment(5, 7, 'speech'), 
            Segment(8, 10, 'speech'), 
            Segment(11, 12, 'speech'),
            Segment(13, 20, 'speech')], duration=4.0, fraction_voice=0.8))"""

        self.assertEqual([Epoch(5, 9), Epoch(9, 13), Epoch(13, 17)], speech_epochs_from_segments([
            Segment(5, 7, 'speech'), 
            Segment(8, 10, 'speech'), 
            Segment(11, 12, 'speech'),
            Segment(13, 20, 'speech')], epoch_length=4.0, silence_tolerance=3.0))

run_tests('TestSpeechEpochsFromSegments')


............................
----------------------------------------------------------------------
Ran 28 tests in 0.011s

OK


Now for running these tests I do not actually want the segments, I just want to count them.

In [11]:
import time

class ApplyClassification:
    def __init__(self):
        self.count = 0
        self.start_time = time.time()
        self.extract_voice_segments = ExtractVoiceSegments()

    def segment(self, filename):
        #if self.count % 10 == 0:
        print(f'Processing({self.count}) {time.time()-self.start_time:.2f}: {filename}')
        self.count += 1
        segments = extract_voice_segments('/media/programs/' + filename)
        return speech_epochs_from_segments(segments, silence_tolerance=1.0)
    
    def __call__(self, filename):
        return len(self.segment(filename))

       

In [12]:
count_segs = ApplyClassification()
#count_segs(good_webrtcvad.iloc[13].filename)
good_webrtcvad['segs_4sec'] = good_webrtcvad.filename.apply(count_segs)


Processing(0) 0.00: Programs/73/73980/A73980/PM-1906/A73980-008.wav
Processing(1) 5.68: Programs/37/37643/A37643/MM/A37643-11.wav
Processing(2) 11.06: Programs/64/64615/A64615/PM-1408/A64615-007.wav
Processing(3) 15.91: Programs/73/73980/A73980/PM-1906/A73980-024.wav
Processing(4) 21.18: Programs/66/66247/A66247/PM-1906/A66247-013.wav
Processing(5) 25.84: Programs/64/64615/A64615/PM-1408/A64615-018.wav
Processing(6) 30.33: Programs/64/64616/A64616/PM-1409/A64616-014.wav
Processing(7) 35.86: Programs/37/37643/A37643/MM/A37643-40.wav
Processing(8) 43.83: vox_grn/Audio_MP3/74/74769/Achik LLL 1 Beginning with GOD 008 Picture 7 The Rainbow and God's Promise 74769.mp3
Processing(9) 49.70: Programs/37/37101/A37101/From_CM/C37101A Region 00_03_32_253 to 00_06_37_829 (02 _ 03).wav
Processing(10) 60.42: Programs/74/74775/A74775/PM-1801/A74775-018.wav
Processing(11) 67.45: Programs/64/64616/A64616/PM-1409/A64616-013.wav
Processing(12) 72.04: Programs/74/74775/A74775/PM-1801/A74775-005.wav
Process

Now repeat it for poor_webrtcvad and instrumental

In [13]:
count_segs = ApplyClassification()
poor_webrtcvad['segs_4sec'] = poor_webrtcvad.filename.apply(count_segs)
count_inst_segs = ApplyClassification()
instrument['segs_4sec'] = instrument.filename.apply(count_inst_segs)


Processing(0) 0.00: Programs/75/75387/A75387/PM-1501/A75387-024.wav
Processing(1) 3.65: Programs/64/64106/A64106/PM-MM-1305/A64106-24.wav
Processing(2) 12.17: Programs/66/66233/A66233/PM-1905/A66233-006.wav
Processing(3) 17.11: Programs/66/66233/A66233/PM-1905/A66233-009.wav
Processing(4) 19.62: Programs/80/80994/A80994/PM/A80994-05_Potret_05.wav
Processing(5) 23.04: Programs/80/80103/A80103/PM/A80103-10.wav
Processing(6) 26.52: Programs/64/64020/A64020/PM-1302/A64020-35.wav
Processing(7) 29.05: Programs/66/66149/A66149/PM-1811/A66149-16-Rup_15.wav
Processing(8) 30.96: Programs/64/64020/A64020/PM-1302/A64020-39.wav
Processing(9) 34.07: Programs/66/66004/A66004/PM-1903/A66004-006.wav
Processing(10) 47.25: Programs/62/62039/A62039/PM-1506/A62039-002.wav
Processing(11) 57.63: Programs/63/63327/A63327/PM-1011/A63327-16.wav
Processing(12) 60.30: Programs/63/63327/A63327/PM-1011/A63327-14.wav
Processing(13) 63.94: Programs/65/65291/A65291/PM-1608/A65291-04 Pic3 Borana Sakuye LLL5.WAV
Process

In [None]:
good_webrtcvad.to_csv('../../data/good_webrtcvad.csv')


Lets see how many segments were found by each on files where I though webrtcvad was good.

 I overwrote the old results. Lets recover them for a comparison

In [16]:
good_segs_4sec = good_webrtcvad.segs_4sec.copy()
poor_segs_4sec = poor_webrtcvad.segs_4sec.copy()
inst_segs_4sec = instrument.segs_4sec.copy()


In [17]:
good_webrtcvad = pd.read_csv('../../data/good_webrtcvad.csv')
instrument = pd.read_csv('../../data/instrument.csv')
poor_webrtcvad = pd.read_csv('../../poor_webrtcvad.csv')


In [19]:
good_webrtcvad['new_seg_4sec'] = good_segs_4sec
poor_webrtcvad['new_seg_4sec'] = poor_segs_4sec
instrument['new_seg_4sec'] = inst_segs_4sec


In [21]:
print(f'good: webrctvad: {sum(good_webrtcvad.segments)} pyAudio: {sum(good_webrtcvad.segs_4sec)} pyAudio-1: {sum(good_webrtcvad.new_seg_4sec)}')
print(f'poor: webrctvad: {sum(poor_webrtcvad.segments)} pyAudio: {sum(poor_webrtcvad.segs_4sec)} pyAudio-1: {sum(poor_webrtcvad.new_seg_4sec)}')
print(f'inst: webrctvad: {sum(instrument.segments)} pyAudio: {sum(instrument.segs_4sec)} pyAudio-1: {sum(instrument.new_seg_4sec)}')


good: webrctvad: 8000 pyAudio: 14329 pyAudio-1: 7581
poor: webrctvad: 1000 pyAudio: 7213 pyAudio-1: 3984
inst: webrctvad: 10783 pyAudio: 902 pyAudio-1: 337


So the second algorithm does not appear to perform as well in detecting many segments, but it rejects nearly all of the instruments. We will take it.

In [20]:
instrument.to_csv('../../data/instrument.csv')
poor_webrtcvad.to_csv('../../poor_webrtcvad.csv')
good_webrtcvad.to_csv('../../data/good_webrtcvad.csv')
