
# Prepare your own dataset for DiffSinger (MIDI-less version)

## 1 Overview

This Jupyter Notebook will guide you to prepare your own dataset for DiffSinger with 44.1 kHz sampling rate.
Please read and follow the guidance carefully, take actions when there are notice for <font color="red">manual action</font> and pay attention to blocks marked with <font color="red">optional step</font>.

### 1.1 Introduction to this pipeline and MIDI-less version

This pipeline does not support customized phoneme dictionaries. It uses the [opencpop strict pinyin dictionary](../dictionaries/opencpop-strict.txt) by default.

MIDI-less version is a simplified version of DiffSinger where MIDI layers, word layers and slur layers are removed from the data labels. The model uses raw phoneme sequence with durations as input, and applies pitch embedding directly from the ground truth. Predictors for phoneme durations and pitch curve are also removed. Below are some limitations and advantages of the MIDI-less version:

- The model will not predict phoneme durations and f0 sequence by itself. You must specify `ph_dur` and `f0_seq` at inference time.
- Performance of pitch control will be better than MIDI-A version, because MIDI keys are misleading information for the diffusion decoder when f0 sequence is already embedded.
- MIDIs and slurs does not need to be labeled, thus the labeling work is easier than other versions.
- More varieties of data can be used as training materials, even including speech.

### 1.2 Install dependencies

Please run the following code block the first time you start this notebook.


In [None]:
!pip install librosa soundfile matplotlib
!conda install -c conda-forge montreal-forced-aligner


### 1.3 Initializing environment

Please run the following code block every time you start this notebook.


In [None]:
import glob
import os
import shutil
import wave

import librosa
import soundfile


def length(src: str):
    if os.path.isfile(src) and src.endswith('.wav'):
        with wave.open(src, 'r') as w:
            return w.getnframes() / w.getframerate() / 3600
    elif os.path.isdir(src):
        total = 0
        for ch in [os.path.join(src, c) for c in os.listdir(src)]:
            total += length(ch)
        return total
    return 0


print('Environment initialized successfully.')


## 2 Raw recordings and audio slicing

### 2.1 Choose raw recordings

Your recordings must meet the following conditions:

1. They must be in one single folder. Files in sub-folders will be ignored.
2. They must be in WAV format.
3. They must have a sampling rate higher than 32 kHz.
4. They should contain only voices from human, and only one human, since multi-speaker training is not supported yet.
5. They should be clean voices with no significant noise or reverb.

<font color="red">Optional step</font>: The raw data must be sliced into parts of about 5-15 seconds. If you want to do this yourself, please skip to section 2.3. Otherwise, please edit paths in the following code block before you run it.


In [None]:
########################################

# Configuration for data paths
raw_path = 'path/to/your/raw/recordings'  # Path to your raw, unsliced recordings

########################################

assert os.path.exists(raw_path) and os.path.isdir(raw_path), 'The chosen path does not exist or is not a directory.'
print('Raw recording path:', raw_path)
print()
print('===== Recording List =====')
raw_filelist = glob.glob(f'{raw_path}/*.wav', recursive=True)
raw_length = length(raw_path)
if len(raw_filelist) > 5:
    print('\n'.join(raw_filelist[:5] + [f'... ({len(raw_filelist) - 5} more)']))
else:
    print('\n'.join(raw_filelist))
print()
print(f'Found {len(raw_filelist)} valid recordings with total length of {round(raw_length, 2)} hours.')

### 2.2 Audio slicing

We provide an audio slicer which automatically cuts recordings into short pieces.

The audio slicer is based on silence detection and has several arguments that have to be specified. You should modify these arguments according to your data.

For more details of each argument, see its [GitHub repository](https://github.com/openvpi/audio-slicer).

Please edit paths and arguments in the following code block before you run it.


In [None]:
########################################

# Configuration for data paths
sliced_path = 'path/to/your/sliced/recordings'  # Path to hold the sliced segments of your recordings

# Slicer arguments
db_threshold_ = -40.
min_length_ = 5000
win_l_ = 400
win_s_ = 20
max_silence_kept_ = 500

# Number of threads (based on your CPU kernels)
num_workers = 5

########################################

assert 'raw_path' in locals().keys(), 'Raw path of your recordings has not been specified.'
assert not os.path.exists(sliced_path) or os.path.isdir(sliced_path), 'The chosen path is not a directory.'
os.makedirs(sliced_path, exist_ok=True)
print('Sliced recording path:', sliced_path)

from concurrent.futures import ThreadPoolExecutor, wait, ALL_COMPLETED

from utils.slicer import Slicer


def slice_one(in_audio):
    audio, sr = librosa.load(in_audio, sr=None)
    slicer = Slicer(
        sr=sr,
        db_threshold=db_threshold_,
        min_length=min_length_,
        win_l=win_l_,
        win_s=win_s_,
        max_silence_kept=max_silence_kept_
    )
    chunks = slicer.slice(audio)
    for i, chunk in enumerate(chunks):
        soundfile.write(os.path.join(sliced_path, f'%s_slice_%04d.wav' % (os.path.basename(in_audio).rsplit('.', maxsplit=1)[0], i)), chunk, sr)


print('Slicing your recordings may take several minutes. Please wait.')
thread_pool = ThreadPoolExecutor(max_workers=num_workers)
tasks = []
for file in raw_filelist:
    tasks.append(thread_pool.submit(slice_one, file))
wait(tasks, return_when=ALL_COMPLETED)
print()
print('===== Segment List =====')
sliced_filelist = glob.glob(f'{sliced_path}/*.wav', recursive=True)
sliced_length = length(sliced_path)
if len(sliced_filelist) > 5:
    print('\n'.join(sliced_filelist[:5] + [f'... ({len(sliced_filelist) - 5} more)']))
else:
    print('\n'.join(sliced_filelist))
print()
print(f'Sliced your recordings into {len(sliced_filelist)} segments with total length of {round(sliced_length, 2)} hours.')


### 2.3 Validating recording segments

In this section, we validate your recording segments.

<font color="red">Optional step</font>: If you skipped section 2.2, please specify the path to your sliced recordings in the following code block and run it. Otherwise, skip this code block.


In [None]:
########################################

# Configuration for data paths
sliced_path_ = 'assets'  # r'D:\Vocoder Datasets\formatted\44100\00'  # 'path/to/your/sliced/recordings'  # Path to your sliced segments of recordings

########################################

if not 'sliced_path' in locals().keys():
    sliced_path = sliced_path_
    assert os.path.exists(sliced_path) and os.path.isdir(sliced_path), 'The chosen path does not exist or is not a directory.'

print('Sliced recording path:', sliced_path)
print()
print('===== Segment List =====')
sliced_filelist = glob.glob(f'{sliced_path}/*.wav', recursive=True)
sliced_length = length(sliced_path)
if len(sliced_filelist) > 5:
    print('\n'.join(sliced_filelist[:5] + [f'... ({len(sliced_filelist) - 5} more)']))
else:
    print('\n'.join(sliced_filelist))
print()
print(f'Found {len(sliced_filelist)} valid segments with total length of {round(sliced_length, 2)} hours.')


Run the following code block to check if there are segments with an unexpected length (less than 2 seconds or more than 30 seconds).


In [None]:
reported = False
for file in sliced_filelist:
    with wave.open(file, 'r') as wav:
        wave_seconds = wav.getnframes() / wav.getframerate()
        if wave_seconds < 2.:
            reported = True
            print(f'Too short! \'{file}\' has a length of {round(wave_seconds, 1)} seconds!')
        if wave_seconds > 30.:
            reported = True
            print(f'Too long! \'{file}\' has a length of {round(wave_seconds, 1)} seconds!')
if not reported:
    print('Congratulations! All segments have proper length.')


<font color="red">Manual action</font>: please consider removing segments too short and manually slicing segments to long, as reported above.

Move on when this is done or there are no segments reported.


## 3 Label your segments

### 3.1 Label syllable sequence

All segments should have their transcriptions (or lyrics) annotated. Run the following code block to see the example segment (from Opencpop dataset) and its corresponding annotation.


In [None]:
from IPython.display import Audio

# noinspection PyTypeChecker
display(Audio(filename='assets/2001000001.wav'))
with open('assets/2001000001.lab', 'r') as f:
    print(f.read())


<font color="red">Manual action</font>: now your task is to annotation transcriptions for each segment like the example shown above.

Each segment should have one annotation file with the same filename as it and `.lab` extension, and placed in the same directory. In the annotation file, you should write all syllables sung or spoken in this segment. Syllables should be split by space, and only syllables that appears in the dictionary are allowed. In addition, all phonemes in the dictionary should be covered in the annotations.

**Special notes**: `AP` and `SP` should not appear in the annotation.

**News**:  We developed [MinLabel](https://github.com/SineStriker/qsynthesis-revenge/tree/main/src/Test/MinLabel), a simple yet efficient tool to help finishing this step. You can download the binary executable for Windows [here](https://diffsinger-1307911855.cos.ap-beijing.myqcloud.com/label/minlabel_latest.zip).

<font color="red">Optional step</font>: if you want us to help you create all empty `lab` files (instead of creating them yourself), please run the following code block.


In [None]:
for file in sliced_filelist:
    filename = os.path.basename(file)
    name_without_ext = filename.rsplit('.', maxsplit=1)[0]
    annotation = os.path.join(sliced_path, f'{name_without_ext}.lab')
    if not os.path.exists(annotation):
        with open(annotation, 'a'):
            print(f'Created: \'{annotation}\'')
print('Creating missing lab files done.')


Run the following code block to see if all segments are annotated and all annotations are valid. If there are failed checks, please fix them and run again.

A summary of your phoneme coverage will be generated. If there are some phonemes that have extremely few occurrences (for example, less than 20), it is highly recommended to add more recordings to cover these phonemes.


In [None]:
import matplotlib.pyplot as plt

# Load dictionary
dict_path = '../dictionaries/opencpop-strict.txt'
with open(dict_path, 'r', encoding='utf8') as f:
    rules = [ln.strip().split('\t') for ln in f.readlines()]
dictionary = {}
phoneme_set = set()
for r in rules:
    phonemes = r[1].split()
    dictionary[r[0]] = phonemes
    phoneme_set.update(phonemes)

# Run checks
check_failed = False
covered = set()
phoneme_map = {}
for ph in sorted(phoneme_set):
    phoneme_map[ph] = 0

segment_pairs = []

for file in sliced_filelist:
    filename = os.path.basename(file)
    name_without_ext = filename.rsplit('.', maxsplit=1)[0]
    annotation = os.path.join(sliced_path, f'{name_without_ext}.lab')
    if not os.path.exists(annotation):
        print(f'No annotation found for \'{filename}\'!')
        check_failed = True
    with open(annotation, 'r', encoding='utf8') as f:
        syllables = f.read().strip().split()
    if not syllables:
        print(f'Annotation file \'{annotation}\' is empty!')
        check_failed = True
    else:
        oov = []
        for s in syllables:
            if s not in dictionary:
                oov.append(s)
            else:
                for ph in dictionary[s]:
                    phoneme_map[ph] += 1
                covered.update(dictionary[s])
        if oov:
            print(f'Syllable(s) {oov} not allowed in annotation file \'{annotation}\'')
            check_failed = True

# Phoneme coverage
uncovered = phoneme_set - covered
if uncovered:
    print(f'The following phonemes are not covered!')
    print(sorted(uncovered))
    print('Please add more recordings to cover these phonemes.')
    check_failed = True

if not check_failed:
    print('Congratulations! All annotations are well prepared.')
    print('Here are a summary of your phoneme coverage.')

fig = plt.figure(figsize=(int(len(phoneme_set) * 0.8), 10))
x = list(phoneme_map.keys())
values = list(phoneme_map.values())
plt.bar(x=x, height=values)
plt.tick_params(labelsize=15)
plt.xlim(-1, len(phoneme_set))
for a, b in zip(x, values):
    plt.text(a, b, b, ha='center', va='bottom', fontsize=15)
plt.grid()
plt.title('Phoneme Distribution Summary', fontsize=30)
plt.xlabel('Phoneme', fontsize=20)
plt.ylabel('Number of occurrences', fontsize=20)

phoneme_summary = os.path.join(sliced_path, 'phoneme_distribution.jpg')
plt.savefig(fname=phoneme_summary,
            bbox_inches='tight',
            pad_inches=0.25)
plt.show()
print(f'Summary saved to \'{phoneme_summary}\'.')


### 3.2 Forced alignment

Given the transcriptions of each segment, we are able to align the phoneme sequence to its corresponding audio, thus obtaining position and duration information of each phoneme.

We use [Montreal Forced Aligner](https://github.com/MontrealCorpusTools/Montreal-Forced-Aligner) to do forced phoneme alignment.

Run the following code block to download and unzip the pretrained MFA acoustic model.


In [None]:
import requests
import zipfile

mfa_dirname = 'assets/mfa-opencpop-strict'
mfa_zip = f'{mfa_dirname}.zip'
mfa_uri = 'https://diffsinger-1307911855.cos.ap-beijing.myqcloud.com/mfa/mfa-opencpop-strict.zip'
if not os.path.exists(mfa_dirname):
    # Download
    print('Model not found, downloading...')
    with open(mfa_zip, 'wb') as f:
        f.write(requests.get(mfa_uri).content)
    # Unzip
    print('Unzipping...')
    with zipfile.ZipFile(mfa_zip, 'r') as zf:
        zf.extractall(path='assets/')
    # Clean
    print('Cleaning...')
    os.remove(mfa_zip)
    print('Done.')
else:
    print('Model already exists. Please move on.')


To run MFA alignment, please first run the following code block to resample all recordings to 16 kHz.

The resampled recordings will be saved, and the phoneme labels will be copied, at `./segments/`.


In [None]:
segments_dir = 'segments'

if os.path.exists(segments_dir):
    shutil.rmtree(segments_dir)
os.makedirs(segments_dir)
for file in sliced_filelist:
    samplerate = 16000
    y, _ = librosa.load(file, sr=samplerate, mono=True)
    filename = os.path.basename(file)
    soundfile.write(os.path.join(segments_dir, filename), y, samplerate, subtype='PCM_16')
    name_without_ext = filename.rsplit('.', maxsplit=1)[0]
    annotation = os.path.join(sliced_path, f'{name_without_ext}.lab')
    shutil.copy(annotation, segments_dir)
print('Resampling and copying done.')


Row run the following code block to run forced alignment.

The results will be saved at `./textgrids`.


In [None]:
textgrids_dir = 'textgrids'
!mfa align $segments_dir $dict_path $mfa_dirname $textgrids_dir --beam 100 --clean --overwrite
