In [1]:
#!pip install wavinfo

In [2]:
from wavinfo import WavInfoReader
from pathlib import Path
from pprint import pprint
import subprocess as sp
import numpy as np
import struct
import json
import sys
import re

SR = 16000
CHANNELS = 1
CODEC = "pcm_s16le"
SAMPLES_PATH = Path('samples')

FILE_PATTERNS = {
    'pos_clean': 'pos_clean_{}.wav',
    'pos_noisy': 'pos_noisy_{}.wav',
    'neg_clean': 'neg_clean_{}.wav',
    'neg_noisy': 'neg_noisy_{}.wav',
    'neg_random': 'neg_random_{}.wav',
}

FFPROBE_CMD = (
    "ffprobe -v quiet -print_format json "
    "-show_entries stream=duration,sample_rate,codec_name,channels {file}"
)
AUTOFIX_CMD = (
    f"ffmpeg -hide_banner -loglevel error -i {{input}} "
    f"-c:a {CODEC} -ar {SR} -ac {CHANNELS} -y {{output}}"
)

In [3]:
def validate_wav(path, autofix=False):
    cmd = FFPROBE_CMD.format(file=path)
    out = sp.check_output(cmd, shell=True, text=True)
    data = json.loads(out)["streams"][0]

    good = (
        data["codec_name"] == CODEC
        and int(data["sample_rate"]) == SR
        and data["channels"] == CHANNELS
    )

    if not good and autofix:
        output = path.parent / "autofix" / Path(file).name
        print(f"Autofixing {path} -> {output}")
        output.parent.mkdir(parents=True, exist_ok=True)
        cmd = AUTOFIX_CMD.format(input=file, output=output)
        sp.check_output(cmd, shell=True, text=True)
        data['autofix_path'] = output
    
    return good, data

def extract_wav_regions(path):
    with open(path, 'rb') as file:
        content = file.read()
    
    try:
        info = WavInfoReader(path)
        parent_chunks_dict = {x[0].decode("latin-1"): x for x in info.main_list}
    except Exception as err:
        # print('WavInfo error:', err)
        return None

    ok = "cue " in parent_chunks_dict and "adtl" in parent_chunks_dict
    if not ok:
        # print('Missing chunks (CUE, ADTL) in', path)
        return None

    offset = parent_chunks_dict["cue "].start
    length = struct.unpack_from('I', content, offset)[0]

    offset += 4
    cue_points = {}
    for i in range(length):
        id, pos, *_ = struct.unpack_from('IIIIII', content, offset)
        cue_points[id] = pos
        offset += 24

    region_chunks = parent_chunks_dict["adtl"].children
    regions = []
    for chunk in region_chunks:
        id, length, *_ = struct.unpack_from('IIIHHHH', content, chunk.start)
        regions.append((cue_points[id], cue_points[id] + length))

    return regions

In [4]:
bad = total = 0
for dir in SAMPLES_PATH.glob('*'):
    if not str(dir.name).isnumeric() or dir.is_file():
        continue
    print(f'Checking {dir}...')
    for file in dir.glob('*.wav'):
        good, data = validate_wav(file, autofix=True)
        total += 1
        if not good:
            print(f'Bad file: {file}')
            pprint(data, width=120, stream=sys.stderr)
            print(f'Autofix output:', data.get('autofix_path', None))
            bad += 1
            
print()
print('TOTAL SAMPLES:', total)
print('GOOD SAMPLES:', total - bad)
print('BAD SAMPLES:', bad)

Checking samples\0...
Checking samples\1...
Checking samples\2...
Checking samples\3...
Checking samples\4...
Checking samples\5...
Checking samples\6...

TOTAL SAMPLES: 52
GOOD SAMPLES: 52
BAD SAMPLES: 0


In [5]:
assert bad == 0

all_labels = {key: [] for key in FILE_PATTERNS}

for dir in SAMPLES_PATH.glob('*'):
    if not str(dir.name).isnumeric() or dir.is_file():
        continue
    print(f'\nProcessing {dir}...')
    files = list(dir.glob('*.wav'))
    for key, fmt in FILE_PATTERNS.items():
        pattern = re.compile(fmt.format('\d+'))
        filtered = [x for x in files if pattern.fullmatch(x.name)]
        if not filtered:
            print('Group not found:', key)
            continue
        
        regions = None
        if 'random' not in key:
            main_file = dir / fmt.format(0)
            regions = extract_wav_regions(main_file)
            if regions is None:
                if (dir / 'meta.json').exists():
                    with open(dir / 'meta.json') as file:
                        meta = json.load(file)
                        regions = meta[key]['labels']
                elif main_file.with_suffix('.npy').exists():
                    regions = np.load(main_file.with_suffix('.npy'))
                    regions = regions.tolist()
                else:
                    print('No regions found in', main_file)
                    continue
        
        for file in filtered:
            all_labels[key].append({
                'path': file.relative_to(SAMPLES_PATH).as_posix(),
                'regions': regions,
            })
        print(f'+ {len(filtered)} files -> {key}')


Processing samples\0...
+ 3 files -> pos_clean
+ 3 files -> pos_noisy
+ 3 files -> neg_clean
+ 3 files -> neg_noisy
+ 3 files -> neg_random

Processing samples\1...
+ 1 files -> pos_clean
+ 1 files -> pos_noisy
+ 1 files -> neg_clean
+ 1 files -> neg_noisy
+ 1 files -> neg_random

Processing samples\2...
+ 3 files -> pos_clean
+ 3 files -> pos_noisy
+ 3 files -> neg_clean
+ 3 files -> neg_noisy
+ 3 files -> neg_random

Processing samples\3...
No regions found in samples\3\pos_clean_0.wav
No regions found in samples\3\pos_noisy_0.wav
No regions found in samples\3\neg_clean_0.wav
No regions found in samples\3\neg_noisy_0.wav
+ 1 files -> neg_random

Processing samples\4...
+ 1 files -> pos_clean
Group not found: pos_noisy
+ 1 files -> neg_clean
Group not found: neg_noisy
+ 1 files -> neg_random

Processing samples\5...
+ 1 files -> pos_clean
+ 1 files -> pos_noisy
+ 1 files -> neg_clean
+ 1 files -> neg_noisy
+ 1 files -> neg_random

Processing samples\6...
+ 1 files -> pos_clean
+ 1 fi

In [6]:
for key, items in all_labels.items():
    print('GROUP:', key)
    for item in items:
        print(item['path'])
    print()

print('TOTAL FILES IN GROUPS:')
for key in FILE_PATTERNS:
    print(key, '->', len(all_labels[key]))

path = SAMPLES_PATH / 'meta.json'
with open(path, 'w', encoding='utf-8') as file:
    json.dump(all_labels, file, ensure_ascii=False)
    
print('\nALL LABELS SAVED TO', path)

GROUP: pos_clean
0/pos_clean_0.wav
0/pos_clean_1.wav
0/pos_clean_2.wav
1/pos_clean_0.wav
2/pos_clean_0.wav
2/pos_clean_1.wav
2/pos_clean_2.wav
4/pos_clean_0.wav
5/pos_clean_0.wav
6/pos_clean_0.wav

GROUP: pos_noisy
0/pos_noisy_0.wav
0/pos_noisy_1.wav
0/pos_noisy_2.wav
1/pos_noisy_0.wav
2/pos_noisy_0.wav
2/pos_noisy_1.wav
2/pos_noisy_2.wav
5/pos_noisy_0.wav
6/pos_noisy_0.wav

GROUP: neg_clean
0/neg_clean_0.wav
0/neg_clean_1.wav
0/neg_clean_2.wav
1/neg_clean_0.wav
2/neg_clean_0.wav
2/neg_clean_1.wav
2/neg_clean_2.wav
4/neg_clean_0.wav
5/neg_clean_0.wav

GROUP: neg_noisy
0/neg_noisy_0.wav
0/neg_noisy_1.wav
0/neg_noisy_2.wav
1/neg_noisy_0.wav
2/neg_noisy_0.wav
2/neg_noisy_1.wav
2/neg_noisy_2.wav
5/neg_noisy_0.wav

GROUP: neg_random
0/neg_random_0.wav
0/neg_random_1.wav
0/neg_random_2.wav
1/neg_random_0.wav
2/neg_random_0.wav
2/neg_random_1.wav
2/neg_random_2.wav
3/neg_random_0.wav
4/neg_random_0.wav
5/neg_random_0.wav

TOTAL FILES IN GROUPS:
pos_clean -> 10
pos_noisy -> 9
neg_clean -> 9
ne

In [11]:
from loader import get_alina_sample
import IPython.display as ipd

for key in all_labels:
    sig, meta = get_alina_sample(key, sr=SR)
    print(meta)
    
    start = meta.get('start', 0)
    end = meta.get('end', int(SR * 10))
    sig = sig[start:end]
    audio = ipd.Audio(data=sig, rate=SR)
    ipd.display(audio)

{'src': 'pos_clean', 'path': 'C:\\Users\\AndBondStyle\\Projects\\alina\\datasets\\alina\\samples\\0\\pos_clean_2.wav', 'start': 2189117, 'end': 2199113}


{'src': 'pos_noisy', 'path': 'C:\\Users\\AndBondStyle\\Projects\\alina\\datasets\\alina\\samples\\2\\pos_noisy_2.wav', 'start': 1942680, 'end': 1950914}


{'src': 'neg_clean', 'path': 'C:\\Users\\AndBondStyle\\Projects\\alina\\datasets\\alina\\samples\\2\\neg_clean_1.wav', 'start': 4113239, 'end': 4126074}


{'src': 'neg_noisy', 'path': 'C:\\Users\\AndBondStyle\\Projects\\alina\\datasets\\alina\\samples\\2\\neg_noisy_2.wav', 'start': 1071281, 'end': 1083807}


{'src': 'neg_random', 'path': 'C:\\Users\\AndBondStyle\\Projects\\alina\\datasets\\alina\\samples\\3\\neg_random_0.wav'}
