In [None]:
pip install numpy==1.23.4

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


In [None]:
!pip install pyworld

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyworld
  Using cached pyworld-0.3.3.tar.gz (218 kB)
  Installing build dependencies ... [?25l[?25hdone
  Getting requirements to build wheel ... [?25l[?25hdone
  Preparing metadata (pyproject.toml) ... [?25l[?25hdone
Building wheels for collected packages: pyworld
  Building wheel for pyworld (pyproject.toml) ... [?25l[?25hdone
  Created wheel for pyworld: filename=pyworld-0.3.3-cp310-cp310-linux_x86_64.whl size=887714 sha256=49e40377dd8a82606d56e8b756175b85abcdaa69c4dd32fccd5cd50164bc82d2
  Stored in directory: /root/.cache/pip/wheels/70/50/a9/36b47c7f055bbee666a2b5718aaf85bce2152ef90f9bd10697
Successfully built pyworld
Installing collected packages: pyworld
Successfully installed pyworld-0.3.3


In [None]:
import os
import random
from urllib.request import urlretrieve
import zipfile
import argparse
import shlex, subprocess
import zipfile


def unzip(zip_filepath, dest_dir='./data'):
    with zipfile.ZipFile(zip_filepath) as zf:
        zf.extractall(dest_dir)
    print("Extraction complete!")

def download_vcc2016():
    datalink="https://datashare.is.ed.ac.uk/bitstream/handle/10283/2211/"
    data_files = ['vcc2016_training.zip', 'evaluation_all.zip']

    if os.path.exists(data_files[0]) or os.path.exists(data_files[1]):
        print("File already exists!")
        return

    trainset = f'{datalink}/{data_files[0]}'
    evalset = f'{datalink}/{data_files[1]}'

    train_comm = f'wget {trainset}'
    eval_comm = f'wget {evalset}'

    train_comm = shlex.split(train_comm)
    eval_comm = shlex.split(eval_comm)

    print('Start download dataset...')
    
    subprocess.run(train_comm)
    subprocess.run(eval_comm)

    unzip(data_files[0])
    unzip(data_files[1])
    
    print('Finish download dataset...')

def create_dirs(trainset: str='./data/speakers', testset: str='./data/speakers_test'):
    '''create train test dirs'''
    if not os.path.exists(trainset):
        print(f'create train set dir {trainset}')
        os.makedirs(trainset, exist_ok=True)

    if not os.path.exists(testset):
        print(f'create test set dir {testset}')
        os.makedirs(testset, exist_ok=True)

if __name__ == '__main__':

    parser = argparse.ArgumentParser(description = 'Download  voice conversion datasets.')

    datasets_default = 'vcc2016'
    train_dir = './data/speakers'
    test_dir = './data/speakers_test'
    parser.add_argument('--datasets', type = str, help = 'Datasets available: vcc2016', default = datasets_default)
    
    parser.add_argument('--train_dir', type = str, help = 'trainset directory', default = train_dir)
    parser.add_argument('--test_dir', type = str, help = 'testset directory', default = test_dir)

    argv = parser.parse_args("")

    datasets = argv.datasets
    create_dirs(train_dir, test_dir)

    if datasets == 'vcc2016' or datasets == 'VCC2016':
        download_vcc2016()
    else:
        print('Dataset not available.')

   

File already exists!


In [None]:
import numpy as np
import pyworld as pw
import os,shutil
import glob
import librosa

class Singleton(type):

    def __init__(self, *args, **kwargs):
        self.__instance = None
        super().__init__(*args, **kwargs)

    def __call__(self, *args, **kwargs):
        if self.__instance is None:
            self.__instance = super().__call__(*args, **kwargs)
            return self.__instance
        else:
            return self.__instance

class CommonInfo(metaclass=Singleton):
    """docstring for CommonInfo."""
    def __init__(self, datadir: str):
        super(CommonInfo, self).__init__()
        self.datadir = datadir
    
    @property
    def speakers(self):
        ''' return current selected speakers for training
        eg. ['SF2', 'TM1', 'SF1', 'TM2']
        '''
        p = os.path.join(self.datadir, "*")
        all_sub_folder = glob.glob(p)
            
        all_speaker = [s.rsplit('/', maxsplit=1)[1] for s in all_sub_folder]
        all_speaker.sort()
        return all_speaker

speakers = CommonInfo('data/speakers').speakers


class Normalizer(object):
    '''Normalizer: convience method for fetch normalize instance'''
    def __init__(self, statfolderpath: str='./etc'):
        
        self.folderpath = statfolderpath

        self.norm_dict = self.normalizer_dict()

    def forward_process(self, x, speakername):
        mean = self.norm_dict[speakername]['coded_sps_mean']
        std = self.norm_dict[speakername]['coded_sps_std']
        mean = np.reshape(mean, [-1,1])
        std = np.reshape(std, [-1,1])
        x = (x - mean) / std

        return x

    def backward_process(self, x, speakername):
        mean = self.norm_dict[speakername]['coded_sps_mean']
        std = self.norm_dict[speakername]['coded_sps_std']
        mean = np.reshape(mean, [-1,1])
        std = np.reshape(std, [-1,1])
        x = x * std + mean

        return x

    def normalizer_dict(self):
        '''return all speakers normailzer parameter'''

        d = {}
        for one_speaker in speakers:

            p = os.path.join(self.folderpath, '*.npz')
            try:
                stat_filepath = [fn for fn in glob.glob(p) if one_speaker in fn][0]
            except:
                raise Exception('====no match files!====')
            # print(f'[load]: {stat_filepath}')
            t = np.load(stat_filepath)
            d[one_speaker] = t

        return d
    
    def pitch_conversion(self, f0, source_speaker, target_speaker):
        '''Logarithm Gaussian normalization for Pitch Conversions'''
        
        mean_log_src = self.norm_dict[source_speaker]['log_f0s_mean']
        std_log_src = self.norm_dict[source_speaker]['log_f0s_std']

        mean_log_target = self.norm_dict[target_speaker]['log_f0s_mean']
        std_log_target = self.norm_dict[target_speaker]['log_f0s_std']

        f0_converted = np.exp((np.ma.log(f0) - mean_log_src) / std_log_src * std_log_target + mean_log_target)

        return f0_converted
    
class GenerateStatistics(object):
    def __init__(self, folder: str ='./data/processed'):
        self.folder = folder
        self.include_dict_npz = {}
        for s in speakers:
            if not self.include_dict_npz.__contains__(s):
                self.include_dict_npz[s] = []

            for one_file in os.listdir(folder):
                if one_file.startswith(s) and one_file.endswith('npz'):
                    self.include_dict_npz[s].append(one_file)
        

    @staticmethod
    def coded_sp_statistics(coded_sps):
        # sp shape (D, T)
        coded_sps_concatenated = np.concatenate(coded_sps, axis = 1)
        coded_sps_mean = np.mean(coded_sps_concatenated, axis = 1, keepdims = False)
        coded_sps_std = np.std(coded_sps_concatenated, axis = 1, keepdims = False)
        return coded_sps_mean, coded_sps_std

    @staticmethod
    def logf0_statistics(f0s):
        log_f0s_concatenated = np.ma.log(np.concatenate(f0s))
        log_f0s_mean = log_f0s_concatenated.mean()
        log_f0s_std = log_f0s_concatenated.std()

        return log_f0s_mean, log_f0s_std

    def generate_stats(self, statfolder: str = 'etc'):
        '''generate all user's statistics used for calutate normalized
           input like sp, f0
           step 1: generate coded_sp mean std
           step 2: generate f0 mean std
         '''
        etc_path = os.path.join(os.path.realpath('.'), statfolder)
        os.makedirs(etc_path, exist_ok=True)

        for one_speaker in self.include_dict_npz.keys():
            f0s = []
            coded_sps = []           
            arr01 = self.include_dict_npz[one_speaker]
            if len(arr01) == 0:
                continue
            for one_file in arr01:
                t = np.load(os.path.join(self.folder, one_file))
                f0_ = np.reshape(t['f0'], [-1,1])

                f0s.append(f0_)
                coded_sps.append(t['coded_sp'])
            
            log_f0s_mean, log_f0s_std = self.logf0_statistics(f0s)
            coded_sps_mean, coded_sps_std = self.coded_sp_statistics(coded_sps)

            print(f'log_f0s_mean:{log_f0s_mean} log_f0s_std:{log_f0s_std}')
            print(f'coded_sps_mean:{coded_sps_mean.shape}  coded_sps_std:{coded_sps_std.shape}')

            filename = os.path.join(etc_path, f'{one_speaker}-stats.npz')
            np.savez(filename, 
            log_f0s_mean=log_f0s_mean, log_f0s_std=log_f0s_std,
            coded_sps_mean=coded_sps_mean, coded_sps_std=coded_sps_std)

            print(f'[save]: {filename}')

  
    def normalize_dataset(self):
        '''normalize dataset run once!'''
        norm  = Normalizer()
        files = librosa.util.find_files(self.folder, ext='npy')

        for p in files:
            filename = os.path.basename(p)
            speaker = filename.split(sep='_', maxsplit=1)[0]
            mcep = np.load(p)
            mcep_normed = norm.forward_process(mcep, speaker)
            os.remove(p)
            np.save(p, mcep_normed)
            print(f'[normalize]:{p}')

if __name__ == "__main__":
    pass

In [None]:
import librosa
import numpy as np
import os
import pyworld
import pyworld as pw
import glob
#from utility import *
import argparse
from datetime import datetime

FEATURE_DIM = 36
SAMPLE_RATE = 16000
FRAMES = 512
FFTSIZE = 1024
SPEAKERS_NUM = len(speakers)
CHUNK_SIZE = 1 # concate CHUNK_SIZE audio clips together
EPSILON = 1e-10
MODEL_NAME = 'starganvc_model'

def load_wavs(dataset: str, sr):
    '''
    data dict contains all audios file path &
    resdict contains all wav files   
    '''
    data = {}
    with os.scandir(dataset) as it:
        for entry in it:
            if entry.is_dir():
                data[entry.name] = []
                # print(entry.name, entry.path)
                with os.scandir(entry.path) as it_f:
                    for onefile in it_f:
                        if onefile.is_file():
                            # print(onefile.path)
                            data[entry.name].append(onefile.path)
    print(f'loaded keys: {data.keys()}')
    #data like {TM1:[xx,xx,xxx,xxx]}
    resdict = {}

    cnt = 0
    for key, value in data.items():
        resdict[key] = {}

        for one_file in value:
            
            filename = one_file.split('/')[-1].split('.')[0] #like 100061
            newkey = f'{filename}'
            wav, _ = librosa.load(one_file, sr=sr, mono=True, dtype=np.float64)
            y,_ = librosa.effects.trim(wav, top_db=15)
            wav = np.append(y[0], y[1:] - 0.97 * y[:-1])

            resdict[key][newkey] = wav
            # resdict[key].append(temp_dict) #like TM1:{100062:[xxxxx], .... }
            print('.', end='')
            cnt += 1

    print(f'\nTotal {cnt} aduio files!')
    return resdict

def chunks(iterable, size):
    """Yield successive n-sized chunks from iterable."""
    for i in range(0, len(iterable), size):
        yield iterable[i:i + size]

def wav_to_mcep_file(dataset: str, sr=SAMPLE_RATE, processed_filepath: str = './data/processed'):
    '''convert wavs to mcep feature using image repr'''
    shutil.rmtree(processed_filepath)
    os.makedirs(processed_filepath, exist_ok=True)

    allwavs_cnt = len(glob.glob(f'{dataset}/*/*.wav'))
    print(f'Total {allwavs_cnt} audio files!')

    d = load_wavs(dataset, sr)
    for one_speaker in d.keys():
        values_of_one_speaker = list(d[one_speaker].values())
       
        for index, one_chunk in enumerate (chunks(values_of_one_speaker, CHUNK_SIZE)):
            wav_concated = [] #preserve one batch of wavs
            temp = one_chunk.copy()

            #concate wavs
            for one in temp:
                wav_concated.extend(one)
            wav_concated = np.array(wav_concated)

            #process one batch of wavs 
            f0, ap, sp, coded_sp = cal_mcep(wav_concated, sr=sr, dim=FEATURE_DIM)
            newname = f'{one_speaker}_{index}'
            file_path_z = os.path.join(processed_filepath, newname)
            np.savez(file_path_z, f0=f0, coded_sp=coded_sp)
            print(f'[save]: {file_path_z}')

            #split mcep t0 muliti files  
            for start_idx in range(0, coded_sp.shape[1] - FRAMES + 1, FRAMES):
                one_audio_seg = coded_sp[:, start_idx : start_idx+FRAMES]

                if one_audio_seg.shape[1] == FRAMES:
                    temp_name = f'{newname}_{start_idx}'
                    filePath = os.path.join(processed_filepath, temp_name)

                    np.save(filePath, one_audio_seg)
                    print(f'[save]: {filePath}.npy')
            
        

def world_features(wav, sr, fft_size, dim):
    f0, timeaxis = pyworld.harvest(wav, sr)
    sp = pyworld.cheaptrick(wav, f0, timeaxis, sr,fft_size=fft_size)
    ap = pyworld.d4c(wav, f0, timeaxis, sr, fft_size=fft_size)
    coded_sp = pyworld.code_spectral_envelope(sp, sr, dim)

    return f0, timeaxis, sp, ap, coded_sp

def cal_mcep(wav, sr=SAMPLE_RATE, dim=FEATURE_DIM, fft_size=FFTSIZE):
    '''cal mcep given wav singnal
        the frame_period used only for pad_wav_to_get_fixed_frames
    '''
    f0, timeaxis, sp, ap, coded_sp = world_features(wav, sr, fft_size, dim)
    coded_sp = coded_sp.T # dim x n

    return f0, ap, sp, coded_sp


if __name__ == "__main__":
    start = datetime.now()
    parser = argparse.ArgumentParser(description = 'Convert the wav waveform to mel-cepstral coefficients(MCCs)\
    and calculate the speech statistical characteristics')
    
    input_dir = './data/speakers'
    output_dir = './data/processed'
   
    parser.add_argument('--input_dir', type = str, help = 'the direcotry contains data need to be processed', default = input_dir)
    parser.add_argument('--output_dir', type = str, help = 'the directory stores the processed data', default = output_dir)
    
    argv = parser.parse_args("")
    input_dir = argv.input_dir
    output_dir = argv.output_dir

    os.makedirs(output_dir, exist_ok=True)
    
    wav_to_mcep_file(input_dir, SAMPLE_RATE,  processed_filepath=output_dir)

    #input_dir is train dataset. we need to calculate and save the speech\
    # statistical characteristics for each speaker.
    generator = GenerateStatistics(output_dir)
    generator.generate_stats()
    generator.normalize_dataset()
    end = datetime.now()
    print(f"[Runing Time]: {end-start}")

Total 0 audio files!
loaded keys: dict_keys([])

Total 0 aduio files!
[Runing Time]: 0:00:00.121582


In [None]:
import glob
import os

import librosa
import numpy as np
import torch
from sklearn.preprocessing import LabelBinarizer
from torch.utils.data.dataloader import DataLoader
from torch.utils.data.dataset import Dataset

#from preprocess import (FEATURE_DIM, FFTSIZE, FRAMES, SAMPLE_RATE,
#                        world_features)
#from utility import Normalizer, speakers
import random

class AudioDataset(Dataset):
    """docstring for AudioDataset."""
    def __init__(self, datadir:str):
        super(AudioDataset, self).__init__()
        self.datadir = datadir
        self.files = librosa.util.find_files(datadir, ext='npy')
        self.encoder = LabelBinarizer().fit(speakers)
        

    def __getitem__(self, idx):
        p = self.files[idx]
        filename = os.path.basename(p)
        speaker = filename.split(sep='_', maxsplit=1)[0]
        label = self.encoder.transform([speaker])[0]
        mcep = np.load(p)
        mcep = torch.FloatTensor(mcep)
        mcep = torch.unsqueeze(mcep, 0)
        return mcep, torch.tensor(speakers.index(speaker), dtype=torch.long), torch.FloatTensor(label)

    def speaker_encoder(self):
        return self.encoder

    def __len__(self):
        return len(self.files)

def data_loader(datadir: str, batch_size=4, shuffle=True, mode='train', num_workers=2):
    '''if mode is train datadir should contains training set which are all npy files
        or, mode is test and datadir should contains only wav files.
    '''
    dataset = AudioDataset(datadir)
    loader = DataLoader(dataset, batch_size=batch_size, shuffle=shuffle, num_workers=num_workers)
    
    return loader



class TestSet(object):
    """docstring for TestSet."""
    def __init__(self, datadir:str):
        super(TestSet, self).__init__()
        self.datadir = datadir
        self.norm = Normalizer()
        
    def choose(self):
        '''choose one speaker for test'''
        r = random.choice(speakers)
        return r
    
    def test_data(self, src_speaker=None):
        '''choose one speaker for conversion'''
        if src_speaker:
            r_s = src_speaker
        else:
            r_s = self.choose()
        p = os.path.join(self.datadir, r_s)
        wavfiles = librosa.util.find_files(p, ext='wav')
       
        res = {}
        for f in wavfiles:
            filename = os.path.basename(f)
            wav, _ = librosa.load(f, sr=SAMPLE_RATE, dtype=np.float64)
            f0, timeaxis, sp, ap, coded_sp = world_features(wav, SAMPLE_RATE, FFTSIZE, FEATURE_DIM)
            coded_sp_norm = self.norm.forward_process(coded_sp.T, r_s)

            if not res.__contains__(filename):
                res[filename] = {}
            res[filename]['coded_sp_norm'] = np.asarray(coded_sp_norm)
            res[filename]['f0'] = np.asarray(f0)
            res[filename]['ap'] = np.asarray(ap)
        return res , r_s    

if __name__=='__main__':

    # t = TestSet('data/speakers_test')
    # # mcep, f0, speaker = t[0]
    # # print(speaker)
    # # print(mcep)
    # # print(f0)
    # # print(np.ma.log(f0))
    # d, speaker = t.test_data()
    

    # for filename, content in d.items():
    #     coded_sp_norm = content['coded_sp_norm']
    #     print(content['coded_sp_norm'].shape)
    #     f_len = coded_sp_norm.shape[1]
    #     if  f_len >= FRAMES: 
    #         pad_length = FRAMES-(f_len - (f_len//FRAMES) * FRAMES)
    #     elif f_len < FRAMES:
    #         pad_length = FRAMES - f_len
        
    #     coded_sp_norm = np.hstack((coded_sp_norm, np.zeros((coded_sp_norm.shape[0], pad_length))))
    #     print('after:' , coded_sp_norm.shape)
    # print(t[1])
    ad = AudioDataset('./data/processed')
    print(len(ad))

    data, s,label = ad[367]
    print(data, label) 
    # loader = data_loader('./data/processed', batch_size=4)   
    
    # for i_batch, batch_data in enumerate(loader):
    #     # print(batch_data)
    #     # print(batch_data[0])
    #     print(batch_data[1])
    #     print(batch_data[2])
    #     if i_batch == 2:
    #         break

ValueError: ignored

In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import numpy as np
#from data_loader import data_loader
#from utility import speakers


class Down2d(nn.Module):
    """docstring for Down2d."""
    def __init__(self, in_channel ,out_channel, kernel, stride, padding):
        super(Down2d, self).__init__()

        self.c1 = nn.Conv2d(in_channel, out_channel, kernel_size=kernel, stride=stride, padding=padding)
        self.n1 = nn.InstanceNorm2d(out_channel)
        self.c2 = nn.Conv2d(in_channel, out_channel, kernel_size=kernel, stride=stride, padding=padding)
        self.n2 = nn.InstanceNorm2d(out_channel)
    
    def forward(self, x):
        x1 = self.c1(x)
        x1 = self.n1(x1)

        x2 = self.c2(x)
        x2 = self.n2(x2)

        x3 =  x1 * torch.sigmoid(x2)
        
        return x3
            
        
class Up2d(nn.Module):
    """docstring for Up2d."""
    def __init__(self, in_channel ,out_channel, kernel, stride, padding):
        super(Up2d, self).__init__()
        self.c1 = nn.ConvTranspose2d(in_channel, out_channel, kernel_size=kernel, stride=stride, padding=padding)
        self.n1 = nn.InstanceNorm2d(out_channel)
        self.c2 = nn.ConvTranspose2d(in_channel, out_channel, kernel_size=kernel, stride=stride, padding=padding)
        self.n2 = nn.InstanceNorm2d(out_channel)
         
    def forward(self, x):
        x1 = self.c1(x)
        x1 = self.n1(x1)

        x2 = self.c2(x)
        x2 = self.n2(x2)

        x3 =  x1 * torch.sigmoid(x2)
        
        return x3
        

class Generator(nn.Module):
    """docstring for Generator."""
    def __init__(self):
        super(Generator, self).__init__()
        self.downsample = nn.Sequential(
            Down2d(1, 32, (3,9), (1,1), (1,4)),
            Down2d(32, 64, (4,8), (2,2), (1,3)),
            Down2d(64, 128, (4,8), (2,2), (1,3)),
            Down2d(128, 64, (3,5), (1,1), (1,2)),
            Down2d(64, 5, (9,5), (9,1), (1,2))
        )

        
        self.up1 = Up2d(9, 64, (9,5), (9,1), (0,2))
        self.up2 = Up2d(68, 128, (3,5), (1,1), (1,2))
        self.up3 = Up2d(132, 64, (4,8), (2,2), (1,3))
        self.up4 = Up2d(68, 32, (4,8), (2,2), (1,3))

        self.deconv = nn.ConvTranspose2d(36, 1, (3,9), (1,1), (1,4))

    def forward(self, x, c):
        x = self.downsample(x)
        c = c.view(c.size(0), c.size(1), 1, 1)

        c1 = c.repeat(1, 1, x.size(2), x.size(3))
        x = torch.cat([x, c1], dim=1)
        x = self.up1(x)

        c2 = c.repeat(1,1,x.size(2), x.size(3))
        x = torch.cat([x, c2], dim=1)
        x = self.up2(x)

        c3 = c.repeat(1,1,x.size(2), x.size(3))
        x = torch.cat([x, c3], dim=1)
        x = self.up3(x)

        c4 = c.repeat(1,1,x.size(2), x.size(3))
        x = torch.cat([x, c4], dim=1)
        x = self.up4(x)

        c5 = c.repeat(1,1, x.size(2), x.size(3))
        x = torch.cat([x, c5], dim=1)
        x = self.deconv(x)
        return x


class Discriminator(nn.Module):
    """docstring for Discriminator."""
    def __init__(self):
        super(Discriminator, self).__init__()
        
        self.d1 = Down2d(5, 32, (3,9), (1,1), (1,4))
        self.d2 = Down2d(36, 32, (3,8), (1,2), (1,3))    
        self.d3 = Down2d(36, 32, (3,8), (1,2), (1,3))    
        self.d4 = Down2d(36, 32, (3,6), (1,2), (1,2)) 
        
        self.conv = nn.Conv2d(36, 1, (36,5), (36,1), (0,2))
        self.pool = nn.AvgPool2d((1,64))
    def forward(self, x, c):
        c = c.view(c.size(0), c.size(1), 1, 1)
        
        c1 = c.repeat(1, 1, x.size(2), x.size(3))
        x = torch.cat([x, c1], dim=1)
        x = self.d1(x)

        c2 = c.repeat(1, 1, x.size(2), x.size(3))
        x = torch.cat([x, c2], dim=1)
        x = self.d2(x)

        c3 = c.repeat(1, 1, x.size(2), x.size(3))
        x = torch.cat([x, c3], dim=1)
        x = self.d3(x)

        c4 = c.repeat(1, 1, x.size(2), x.size(3))
        x = torch.cat([x, c4], dim=1)
        x = self.d4(x)

        c5 = c.repeat(1, 1, x.size(2), x.size(3))
        x = torch.cat([x, c5], dim=1)
        x = self.conv(x)
       
        x = self.pool(x)
        x = torch.squeeze(x)
        x = torch.tanh(x)
        return x

class DomainClassifier(nn.Module):
    """docstring for DomainClassifier."""
    def __init__(self):
        super(DomainClassifier, self).__init__()
        self.main = nn.Sequential(
            Down2d(1, 8, (4,4), (2,2), (5,1)),
            Down2d(8, 16, (4,4), (2,2), (1,1)),
            Down2d(16, 32, (4,4), (2,2), (0,1)),
            Down2d(32, 16, (3,4), (1,2), (1,1)),
            nn.Conv2d(16, 4, (1,4), (1,2), (0,1)),
            nn.AvgPool2d((1,16)),
            nn.LogSoftmax()
        )
        
    def forward(self, x):
       x = x[:, :, 0:8, :]
       x = self.main(x)
       x = x.view(x.size(0), x.size(1))
       return x



if __name__ == '__main__':
    # device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')
    # train_loader = data_loader('data/processed', 1)
    # data_iter = iter(train_loader)


    t = torch.rand([1,1,36,512])
    l = torch.FloatTensor([[0,1,0,0]])
    print(l.size())
    # d1 = Down2d(1, 32, 1,1,1)
    # print(d1(t).shape)

    # u1 = Up2d(1,32,1,2,1)
    # print(u1(t).shape)
    # G = Generator()
    # o1 = G(t, l)
    # print(o1.shape)
    
    D = Discriminator()
    o2 = D(t, l)
    print(o2.shape, o2)

    # C = DomainClassifier()
    # o3 = C(t)
    # print(o3.shape)
    # m = nn.Softmax()
    # input = torch.Tensor([[1,2],[5,5]])
    # output = m(input)
    # print(output)

In [None]:
pip install tensorboardX

In [None]:
import tensorflow as tf


class Logger(object):
    """Tensorboard logger."""

    def __init__(self, log_dir):
        """Initialize summary writer."""
        self.writer = tf.summary.create_file_writer(log_dir)

    #def scalar_summary(self, tag, value, step):
     #   """Add scalar summary."""
      #  summary = tf.compat.v1.Summary(value=[tf.compat.v1.Summary.Value(tag=tag, simple_value=value)])
       # self.writer.summary(summary, step)
    
    def scalar_summary(self, tag, value, step):
        #"""Add scalar summary."""
      with self.writer.as_default():
        tf.summary.scalar(tag, value, step=step)


In [None]:
import os
import time
from datetime import datetime, timedelta

import soundfile as sf

import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.autograd import Variable
from torchvision.utils import save_image

#from data_loader import TestSet
#from model import Discriminator, DomainClassifier, Generator
#from utility import Normalizer, speakers
#from preprocess import FRAMES, SAMPLE_RATE, FFTSIZE
import random
from sklearn.preprocessing import LabelBinarizer
from pyworld import decode_spectral_envelope, synthesize
import librosa
import ast


class Solver(object):
    """docstring for Solver."""
    def __init__(self, data_loader, config):
        
        self.config = config
        self.data_loader = data_loader
        # Model configurations.
        
        self.lambda_cycle = config.lambda_cycle
        self.lambda_cls = config.lambda_cls
        self.lambda_identity = config.lambda_identity

        # Training configurations.
        self.data_dir = config.data_dir
        self.test_dir = config.test_dir
        self.batch_size = config.batch_size
        self.num_iters = config.num_iters
        self.num_iters_decay = config.num_iters_decay
        self.g_lr = config.g_lr
        self.d_lr = config.d_lr
        self.c_lr = config.c_lr
        self.n_critic = config.n_critic
        self.beta1 = config.beta1
        self.beta2 = config.beta2
        self.resume_iters = config.resume_iters
        

        # Test configurations.
        self.test_iters = config.test_iters
        self.trg_speaker = ast.literal_eval(config.trg_speaker)
        self.src_speaker = config.src_speaker

        # Miscellaneous.
        self.use_tensorboard = config.use_tensorboard
        self.device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')
        self.spk_enc = LabelBinarizer().fit(speakers)
        # Directories.
        self.log_dir = config.log_dir
        self.sample_dir = config.sample_dir
        self.model_save_dir = config.model_save_dir
        self.result_dir = config.result_dir

        # Step size.
        self.log_step = config.log_step
        self.sample_step = config.sample_step
        self.model_save_step = config.model_save_step
        self.lr_update_step = config.lr_update_step

        # Build the model and tensorboard.
        self.build_model()
        if self.use_tensorboard:
            self.build_tensorboard()
    
    def build_model(self):
        self.G = Generator()
        self.D = Discriminator()
        self.C = DomainClassifier()

        self.g_optimizer = torch.optim.Adam(self.G.parameters(), self.g_lr, [self.beta1, self.beta2])
        self.d_optimizer = torch.optim.Adam(self.D.parameters(), self.d_lr, [self.beta1, self.beta2])
        self.c_optimizer = torch.optim.Adam(self.C.parameters(), self.c_lr,[self.beta1, self.beta2])
        
        self.print_network(self.G, 'G')
        self.print_network(self.D, 'D')
        self.print_network(self.C, 'C')
            
        self.G.to(self.device)
        self.D.to(self.device)
        self.C.to(self.device)
    
    def print_network(self, model, name):
        """Print out the network information."""
        num_params = 0
        for p in model.parameters():
            num_params += p.numel()
        print(model)
        print(name)
        print("The number of parameters: {}".format(num_params))

    def build_tensorboard(self):
        """Build a tensorboard logger."""
        #from logger import Logger
        self.logger = Logger(self.log_dir)

    def update_lr(self, g_lr, d_lr, c_lr):
        """Decay learning rates of the generator and discriminator and classifier."""
        for param_group in self.g_optimizer.param_groups:
            param_group['lr'] = g_lr
        for param_group in self.d_optimizer.param_groups:
            param_group['lr'] = d_lr
        for param_group in self.c_optimizer.param_groups:
            param_group['lr'] = c_lr

    def train(self):
        # Learning rate cache for decaying.
        g_lr = self.g_lr
        d_lr = self.d_lr
        c_lr = self.c_lr

        start_iters = 0
        if self.resume_iters:
            pass
        
        norm = Normalizer()
        data_iter = iter(self.data_loader)

        print('Start training......')
        start_time = datetime.now()

        for i in range(start_iters, self.num_iters):
            # =================================================================================== #
            #                             1. Preprocess input data                                #
            # =================================================================================== #
             # Fetch real images and labels.
            try:
                x_real, speaker_idx_org, label_org = next(data_iter)
            except:
                data_iter = iter(self.data_loader)
                x_real, speaker_idx_org, label_org = next(data_iter)           

            # Generate target domain labels randomly.
            rand_idx = torch.randperm(label_org.size(0))
            label_trg = label_org[rand_idx]
            speaker_idx_trg = speaker_idx_org[rand_idx]
            
            x_real = x_real.to(self.device)           # Input images.
            label_org = label_org.to(self.device)     # Original domain one-hot labels.
            label_trg = label_trg.to(self.device)     # Target domain one-hot labels.
            speaker_idx_org = speaker_idx_org.to(self.device) # Original domain labels
            speaker_idx_trg = speaker_idx_trg.to(self.device) #Target domain labels

            # =================================================================================== #
            #                             2. Train the discriminator                              #
            # =================================================================================== #
            # Compute loss with real audio frame.
            CELoss = nn.CrossEntropyLoss()
            cls_real = self.C(x_real)
            cls_loss_real = CELoss(input=cls_real, target=speaker_idx_org)

            self.reset_grad()
            cls_loss_real.backward()
            self.c_optimizer.step()
             # Logging.
            loss = {}
            loss['C/C_loss'] = cls_loss_real.item()

            out_r = self.D(x_real, label_org)
            # Compute loss with fake audio frame.
            x_fake = self.G(x_real, label_trg)
            out_f = self.D(x_fake.detach(), label_trg)
            d_loss_t = F.binary_cross_entropy_with_logits(input=out_f,target=torch.zeros_like(out_f, dtype=torch.float)) + \
                F.binary_cross_entropy_with_logits(input=out_r, target=torch.ones_like(out_r, dtype=torch.float))
           
            out_cls = self.C(x_fake)
            d_loss_cls = CELoss(input=out_cls, target=speaker_idx_trg)

            # Compute loss for gradient penalty.
            alpha = torch.rand(x_real.size(0), 1, 1, 1).to(self.device)
            x_hat = (alpha * x_real.data + (1 - alpha) * x_fake.data).requires_grad_(True)
            out_src = self.D(x_hat, label_trg)
            d_loss_gp = self.gradient_penalty(out_src, x_hat)

            d_loss = d_loss_t + self.lambda_cls * d_loss_cls + 5*d_loss_gp

            self.reset_grad()
            d_loss.backward()
            self.d_optimizer.step()


            # loss['D/d_loss_t'] = d_loss_t.item()
            # loss['D/loss_cls'] = d_loss_cls.item()
            # loss['D/D_gp'] = d_loss_gp.item()
            loss['D/D_loss'] = d_loss.item()

            # =================================================================================== #
            #                               3. Train the generator                                #
            # =================================================================================== #        
            if (i+1) % self.n_critic == 0:
                # Original-to-target domain.
                x_fake = self.G(x_real, label_trg)
                g_out_src = self.D(x_fake, label_trg)
                g_loss_fake = F.binary_cross_entropy_with_logits(input=g_out_src, target=torch.ones_like(g_out_src, dtype=torch.float))
                
                out_cls = self.C(x_real)
                g_loss_cls = CELoss(input=out_cls, target=speaker_idx_org)

                # Target-to-original domain.
                x_reconst = self.G(x_fake, label_org)
                g_loss_rec = F.l1_loss(x_reconst, x_real )

                # Original-to-Original domain(identity).
                x_fake_iden = self.G(x_real, label_org)
                id_loss = F.l1_loss(x_fake_iden, x_real )

                # Backward and optimize.
                g_loss = g_loss_fake + self.lambda_cycle * g_loss_rec +\
                 self.lambda_cls * g_loss_cls + self.lambda_identity * id_loss
                 
                self.reset_grad()
                g_loss.backward()
                self.g_optimizer.step()

                # Logging.
                loss['G/loss_fake'] = g_loss_fake.item()
                loss['G/loss_rec'] = g_loss_rec.item()
                loss['G/loss_cls'] = g_loss_cls.item()
                loss['G/loss_id'] = id_loss.item()
                loss['G/g_loss'] = g_loss.item()
            # =================================================================================== #
            #                                 4. Miscellaneous                                    #
            # =================================================================================== #
            # Print out training information.
            if (i+1) % self.log_step == 0:
                et = datetime.now() - start_time
                et = str(et)[:-7]
                log = "Elapsed [{}], Iteration [{}/{}]".format(et, i+1, self.num_iters)
                for tag, value in loss.items():
                    log += ", {}: {:.4f}".format(tag, value)
                print(log)

                if self.use_tensorboard:
                    for tag, value in loss.items():
                        self.logger.scalar_summary(tag, value, i+1)

            # Translate fixed images for debugging.
            if (i+1) % self.sample_step == 0:
                with torch.no_grad():
                    d, speaker = TestSet(self.test_dir).test_data()
                    target = random.choice([x for x in speakers if x != speaker])
                    label_t = self.spk_enc.transform([target])[0]
                    label_t = np.asarray([label_t])

                    for filename, content in d.items():
                        f0 = content['f0']
                        ap = content['ap']
                        sp_norm_pad = self.pad_coded_sp(content['coded_sp_norm'])
                        
                        convert_result = []
                        for start_idx in range(0, sp_norm_pad.shape[1] - FRAMES + 1, FRAMES):
                            one_seg = sp_norm_pad[:, start_idx : start_idx+FRAMES]
                            
                            one_seg = torch.FloatTensor(one_seg).to(self.device)
                            one_seg = one_seg.view(1,1,one_seg.size(0), one_seg.size(1))
                            l = torch.FloatTensor(label_t)
                            one_seg = one_seg.to(self.device)
                            l = l.to(self.device)
                            one_set_return = self.G(one_seg, l).data.cpu().numpy()
                            one_set_return = np.squeeze(one_set_return)
                            one_set_return = norm.backward_process(one_set_return, target)
                            convert_result.append(one_set_return)

                        convert_con = np.concatenate(convert_result, axis=1)
                        convert_con = convert_con[:, 0:content['coded_sp_norm'].shape[1]]
                        contigu = np.ascontiguousarray(convert_con.T, dtype=np.float64)   
                        decoded_sp = decode_spectral_envelope(contigu, SAMPLE_RATE, fft_size=FFTSIZE)
                        f0_converted = norm.pitch_conversion(f0, speaker, target)
                        wav = synthesize(f0_converted, decoded_sp, ap, SAMPLE_RATE)

                        name = f'{speaker}-{target}_iter{i+1}_{filename}'
                        path = os.path.join(self.sample_dir, name)
                        print(f'[save]:{path}')
                        #librosa.output.write_wav(path, wav, SAMPLE_RATE)
                        sf.write(path, wav, SAMPLE_RATE)
                        
            # Save model checkpoints.
            if (i+1) % self.model_save_step == 0:
                G_path = os.path.join(self.model_save_dir, '{}-G.ckpt'.format(i+1))
                D_path = os.path.join(self.model_save_dir, '{}-D.ckpt'.format(i+1))
                C_path = os.path.join(self.model_save_dir, '{}-C.ckpt'.format(i+1))
                torch.save(self.G.state_dict(), G_path)
                torch.save(self.D.state_dict(), D_path)
                torch.save(self.C.state_dict(), C_path)
                print('Saved model checkpoints into {}...'.format(self.model_save_dir))

            # Decay learning rates.
            if (i+1) % self.lr_update_step == 0 and (i+1) > (self.num_iters - self.num_iters_decay):
                g_lr -= (self.g_lr / float(self.num_iters_decay))
                d_lr -= (self.d_lr / float(self.num_iters_decay))
                c_lr -= (self.c_lr / float(self.num_iters_decay))
                self.update_lr(g_lr, d_lr, c_lr)
                print ('Decayed learning rates, g_lr: {}, d_lr: {}.'.format(g_lr, d_lr))

    def gradient_penalty(self, y, x):
        """Compute gradient penalty: (L2_norm(dy/dx) - 1)**2."""
        weight = torch.ones(y.size()).to(self.device)
        dydx = torch.autograd.grad(outputs=y,
                                   inputs=x,
                                   grad_outputs=weight,
                                   retain_graph=True,
                                   create_graph=True,
                                   only_inputs=True)[0]

        dydx = dydx.view(dydx.size(0), -1)
        dydx_l2norm = torch.sqrt(torch.sum(dydx**2, dim=1))
        return torch.mean((dydx_l2norm-1)**2)

    def reset_grad(self):
        """Reset the gradient buffers."""
        self.g_optimizer.zero_grad()
        self.d_optimizer.zero_grad()
        self.c_optimizer.zero_grad()

    def restore_model(self, resume_iters):
        """Restore the trained generator and discriminator."""
        print('Loading the trained models from step {}...'.format(resume_iters))
        G_path = os.path.join(self.model_save_dir, '{}-G.ckpt'.format(resume_iters))
        D_path = os.path.join(self.model_save_dir, '{}-D.ckpt'.format(resume_iters))
        C_path = os.path.join(self.model_save_dir, '{}-C.ckpt'.format(resume_iters))
        self.G.load_state_dict(torch.load(G_path, map_location=lambda storage, loc: storage))
        self.D.load_state_dict(torch.load(D_path, map_location=lambda storage, loc: storage))
        self.C.load_state_dict(torch.load(C_path, map_location=lambda storage, loc: storage))

    @staticmethod
    def pad_coded_sp(coded_sp_norm):
        f_len = coded_sp_norm.shape[1]
        if  f_len >= FRAMES: 
            pad_length = FRAMES-(f_len - (f_len//FRAMES) * FRAMES)
        elif f_len < FRAMES:
            pad_length = FRAMES - f_len

        sp_norm_pad = np.hstack((coded_sp_norm, np.zeros((coded_sp_norm.shape[0], pad_length))))
        return sp_norm_pad 

    def test(self):
        """Translate speech using StarGAN ."""
        # Load the trained generator.
        self.restore_model(self.test_iters)
        norm = Normalizer()

        # Set data loader.
        d, speaker = TestSet(self.test_dir).test_data(self.src_speaker)
        targets = self.trg_speaker
       
        for target in targets:
            print(target)
            assert target in speakers
            label_t = self.spk_enc.transform([target])[0]
            label_t = np.asarray([label_t])
            
            with torch.no_grad():

                for filename, content in d.items():
                    f0 = content['f0']
                    ap = content['ap']
                    sp_norm_pad = self.pad_coded_sp(content['coded_sp_norm'])

                    convert_result = []
                    for start_idx in range(0, sp_norm_pad.shape[1] - FRAMES + 1, FRAMES):
                        one_seg = sp_norm_pad[:, start_idx : start_idx+FRAMES]
                        
                        one_seg = torch.FloatTensor(one_seg).to(self.device)
                        one_seg = one_seg.view(1,1,one_seg.size(0), one_seg.size(1))
                        l = torch.FloatTensor(label_t)
                        one_seg = one_seg.to(self.device)
                        l = l.to(self.device)
                        one_set_return = self.G(one_seg, l).data.cpu().numpy()
                        one_set_return = np.squeeze(one_set_return)
                        one_set_return = norm.backward_process(one_set_return, target)
                        convert_result.append(one_set_return)

                    convert_con = np.concatenate(convert_result, axis=1)
                    convert_con = convert_con[:, 0:content['coded_sp_norm'].shape[1]]
                    contigu = np.ascontiguousarray(convert_con.T, dtype=np.float64)   
                    decoded_sp = decode_spectral_envelope(contigu, SAMPLE_RATE, fft_size=FFTSIZE)
                    f0_converted = norm.pitch_conversion(f0, speaker, target)
                    wav = synthesize(f0_converted, decoded_sp, ap, SAMPLE_RATE)

                    name = f'{speaker}-{target}_iter{self.test_iters}_{filename}'
                    path = os.path.join(self.result_dir, name)
                    print(f'[save]:{path}')
                    #librosa.output.write_wav(path, wav, SAMPLE_RATE)
                    
                    sf.write(path, wav, SAMPLE_RATE)            


    

if __name__ == '__main__':
    pass

In [None]:
import os
import argparse
#from solver import Solver
#from data_loader import data_loader
from torch.backends import cudnn


def str2bool(v):
    return v.lower() in ('true')


def main(config):
    # For fast training.
    cudnn.benchmark = True

    # Create directories if not exist.
    if not os.path.exists(config.log_dir):
        os.makedirs(config.log_dir)
    if not os.path.exists(config.model_save_dir):
        os.makedirs(config.model_save_dir)
    if not os.path.exists(config.sample_dir):
        os.makedirs(config.sample_dir)
    if not os.path.exists(config.result_dir):
        os.makedirs(config.result_dir)

    # Data loader.
    
    dloader = data_loader(config.data_dir, batch_size=config.batch_size, mode=config.mode,num_workers=config.num_workers)

    # Solver for training and testing StarGAN.
    solver = Solver(dloader, config)

    if config.mode == 'train':
        solver.train()

    elif config.mode == 'test':
        solver.test()
      


if __name__ == '__main__':
    parser = argparse.ArgumentParser()

    # Model configuration.

    parser.add_argument('--lambda_cycle', type=float, default=3, help='weight for cycle loss')
    parser.add_argument('--lambda_cls', type=float, default=2, help='weight for domain classification loss')
    
    parser.add_argument('--lambda_identity', type=float, default=2, help='weight for identity loss')
    
    # Training configuration.
    
    parser.add_argument('--batch_size', type=int, default=4, help='mini-batch size')
    parser.add_argument('--num_iters', type=int, default=10, help='number of total iterations for training D')
    parser.add_argument('--num_iters_decay', type=int, default=5, help='number of iterations for decaying lr')
    parser.add_argument('--g_lr', type=float, default=0.0001, help='learning rate for G')
    parser.add_argument('--d_lr', type=float, default=0.0001, help='learning rate for D')
    parser.add_argument('--c_lr', type=float, default=0.0001, help='learning rate for C')
    parser.add_argument('--n_critic', type=int, default=5, help='number of D updates per each G update')
    parser.add_argument('--beta1', type=float, default=0.5, help='beta1 for Adam optimizer')
    parser.add_argument('--beta2', type=float, default=0.999, help='beta2 for Adam optimizer')
    parser.add_argument('--resume_iters', type=int, default=None, help='resume training from this step')
    
    

    # Test configuration.
    parser.add_argument('--test_iters', type=int, default=10, help='test model from this step')
    parser.add_argument('--src_speaker', type=str, default=None, help='test model source speaker')
    parser.add_argument('--trg_speaker', type=str, default="['SF1', 'TM1']", help='string list repre of target speakers eg."[a,b]"')

    # Miscellaneous.
    parser.add_argument('--num_workers', type=int, default=4)
    parser.add_argument('--mode', type=str, default='train', choices=['train', 'test'])
    parser.add_argument('--use_tensorboard', type=str2bool, default=True)

    # Directories.
    parser.add_argument('--data_dir', type=str, default='data/processed')
    parser.add_argument('--test_dir', type=str, default='data/speakers_test')
    parser.add_argument('--log_dir', type=str, default='starganvc/logs')
    parser.add_argument('--model_save_dir', type=str, default='starganvc/models')
    parser.add_argument('--sample_dir', type=str, default='starganvc/samples')
    parser.add_argument('--result_dir', type=str, default='starganvc/results')
    
    # Step size.
    parser.add_argument('--log_step', type=int, default=10)
    parser.add_argument('--sample_step', type=int, default=2000)
    parser.add_argument('--model_save_step', type=int, default=10)
    parser.add_argument('--lr_update_step', type=int, default=5)

    config = parser.parse_args("")
    print(config)
    main(config)
    

In [None]:
import os
import argparse
#from solver import Solver
#from data_loader import data_loader
from torch.backends import cudnn


def str2bool(v):
    return v.lower() in ('true')


def main(config):
    # For fast training.
    cudnn.benchmark = True

    # Create directories if not exist.
    if not os.path.exists(config.log_dir):
        os.makedirs(config.log_dir)
    if not os.path.exists(config.model_save_dir):
        os.makedirs(config.model_save_dir)
    if not os.path.exists(config.sample_dir):
        os.makedirs(config.sample_dir)
    if not os.path.exists(config.result_dir):
        os.makedirs(config.result_dir)

    # Data loader.
    
    dloader = data_loader(config.data_dir, batch_size=config.batch_size, mode=config.mode,num_workers=config.num_workers)

    # Solver for training and testing StarGAN.
    solver = Solver(dloader, config)

    if config.mode == 'train':
        solver.train()

    elif config.mode == 'test':
        solver.test()
#        torch.save(solver.state_dict(), "solver.pth")
 #       print("Saved PyTorch Model State to solver.pth") 
      


if __name__ == '__main__':
    parser = argparse.ArgumentParser()

    # Model configuration.

    parser.add_argument('--lambda_cycle', type=float, default=3, help='weight for cycle loss')
    parser.add_argument('--lambda_cls', type=float, default=2, help='weight for domain classification loss')
    
    parser.add_argument('--lambda_identity', type=float, default=2, help='weight for identity loss')
    
    # Training configuration.
    
    parser.add_argument('--batch_size', type=int, default=4, help='mini-batch size')
    parser.add_argument('--num_iters', type=int, default=10, help='number of total iterations for training D')
    parser.add_argument('--num_iters_decay', type=int, default=5, help='number of iterations for decaying lr')
    parser.add_argument('--g_lr', type=float, default=0.0001, help='learning rate for G')
    parser.add_argument('--d_lr', type=float, default=0.0001, help='learning rate for D')
    parser.add_argument('--c_lr', type=float, default=0.0001, help='learning rate for C')
    parser.add_argument('--n_critic', type=int, default=5, help='number of D updates per each G update')
    parser.add_argument('--beta1', type=float, default=0.5, help='beta1 for Adam optimizer')
    parser.add_argument('--beta2', type=float, default=0.999, help='beta2 for Adam optimizer')
    parser.add_argument('--resume_iters', type=int, default=None, help='resume training from this step')
    
    

    # Test configuration.
    parser.add_argument('--test_iters', type=int, default=10, help='test model from this step')
    parser.add_argument('--src_speaker', type=str, default="TM1", help='test model source speaker')
    parser.add_argument('--trg_speaker', type=str, default="['TM2', 'SF1']", help='string list repre of target speakers eg."[a,b]"')

    # Miscellaneous.
    parser.add_argument('--num_workers', type=int, default=4)
    parser.add_argument('--mode', type=str, default='test', choices=['train', 'test'])
    parser.add_argument('--use_tensorboard', type=str2bool, default=True)

    # Directories.
    parser.add_argument('--data_dir', type=str, default='data/processed')
    parser.add_argument('--test_dir', type=str, default='data/speakers_test')
    parser.add_argument('--log_dir', type=str, default='starganvc/logs')
    parser.add_argument('--model_save_dir', type=str, default='starganvc/models')
    parser.add_argument('--sample_dir', type=str, default='starganvc/samples')
    parser.add_argument('--result_dir', type=str, default='starganvc/results')
    
    # Step size.
    parser.add_argument('--log_step', type=int, default=10)
    parser.add_argument('--sample_step', type=int, default=2000)
    parser.add_argument('--model_save_step', type=int, default=10)
    parser.add_argument('--lr_update_step', type=int, default=5)

    config = parser.parse_args("")
    print(config)
    main(config)
    