In [None]:
import numpy as np
import matplotlib.pyplot as plt
import librosa
import soundfile as sf
import pyrubberband as pyrb
import boto3
import os
import wave
from scipy import signal
from io import BytesIO

polly_client = boto3.client('polly')

In [None]:
class SingingVoiceGenerator():
    def __init__(
        self, 
        scale = 'A',
        length_sec = 1,
        text = 'あ', 
        engine = 'standard', 
        language = 'ja-JP',
        output_format = 'pcm',
        sample_rate = 16000,
        text_type = 'text',
        voice_id = 'Mizuki',
        sep_num = 5
    ):
        self.scale = scale
        self.length_sec = length_sec
        self.text = text
        self.engine = engine
        self.language = language
        self.output_format = output_format
        self.sample_rate = sample_rate
        self.text_type = text_type
        self.voice_id = voice_id
        self.sep_num = sep_num
        self._scale_list = ['L-A','L-As','L-H','C','Cs','D','Ds','E','F','Fs','G','Gs','A','As','H','H-C','H-Cs','H-D','H-Ds','H-E','H-F','H-Fs','H-G','H-Gs','H-A']
        self._scale_dict = {}
        for i, key in enumerate(self._scale_list):
            freq = 442 * (2 ** ((-12+i)/12))
            self._scale_dict[key] = freq
        self.scale_freq = self._scale_dict[self.scale]
        self.raw_pcm_bin = self.generate_voice_data()
        self.amplitude = self.cut_scilence()
        self.shift_pcm_array = self.pitch_shift()
        self.time_stretch_array = self.time_stretch()
    def generate_voice_data(self):
        raw_pcm_bin = BytesIO()
        raw_pcm_bin.name = 'raw.wav'
        args = {
            'Engine':self.engine,
            'LanguageCode':self.language,
            'OutputFormat':self.output_format,
            'SampleRate':str(self.sample_rate),
            'Text':self.text,
            'TextType':self.text_type,
            'VoiceId':self.voice_id
        }
        try:
            response = polly_client.synthesize_speech(**args)
            if 'AudioStream' in response:
                with wave.open(raw_pcm_bin, 'wb') as wav_file:
                    wav_file.setparams((1, 2, self.sample_rate, 0, 'NONE', 'NONE'))
                    wav_file.writeframes(response['AudioStream'].read())
            raw_pcm_bin.seek(0)
        except Exception as e:
            print('synthesize_speech exception: ', e)
        
        return raw_pcm_bin
    def cut_scilence(self):
        amplitude, _ = sf.read(self.raw_pcm_bin)
        # 無音区間削除
        for i in range(amplitude.shape[0]):
            if np.abs(amplitude[i]) > 0.002:
                cut_start_index = i-1 if i-1 > 0 else 0
                break
        cut_end_index = amplitude.shape[0]
        for i in range(amplitude.shape[0]-1,-1,-1):
            if np.abs(amplitude[i]) > 30:
                cut_end_index = i+1
                break
        return amplitude[cut_start_index:cut_end_index]
    def pitch_shift(self):
        width = self.amplitude.shape[0]//self.sep_num
        segment_freq_list = []
        for i in range(self.sep_num):
            if i==self.sep_num-1:
                sampling_amp = self.amplitude[i*width:-1]
            else:
                sampling_amp = self.amplitude[i*width:(i+1)*width]
            fft_data = np.fft.fft(sampling_amp)
            freq_list = np.fft.fftfreq(sampling_amp.shape[0], d=1.0/self.sample_rate)
            amp = np.abs(fft_data)
            amp_p = amp[0: amp.shape[0]//2]
            freq_list_p = freq_list[0: freq_list.shape[0]//2]
            segment_freq_list.append(freq_list_p[amp_p.argmax()])
        shift_amplitude_list = []
        data_points = 0
        for i, origin_freq in enumerate(segment_freq_list):
            n_steps = np.log2(self.scale_freq/origin_freq) * 12
            if i == self.sep_num-1:
                shift_amplitude_list.append(pyrb.pitch_shift(self.amplitude[i*width:-1], sr = self.sample_rate, n_steps=n_steps))
            else:
                shift_amplitude_list.append(pyrb.pitch_shift(self.amplitude[i*width:(i+1)*width], sr = self.sample_rate, n_steps=n_steps))
            data_points += shift_amplitude_list[-1].shape[0]
        shift_amplitude = np.zeros((data_points),dtype=np.float64)
        start_index = 0
        for i in range(self.sep_num):
            shift_amplitude[start_index:start_index + shift_amplitude_list[i].shape[0]] = shift_amplitude_list[i]
            start_index += shift_amplitude_list[i].shape[0]
        return shift_amplitude
    def time_stretch(self):
        origin_time = self.shift_pcm_array.shape[0] / self.sample_rate
        ratio = origin_time / self.length_sec
        return pyrb.time_stretch(self.shift_pcm_array, self.sample_rate, ratio)
    def output_wave(self, name):
        sf.write(name, self.time_stretch_array, 16000, subtype="PCM_16")

In [None]:
arg = {'text':'はぁつ','length_sec':2,'scale':'H-C','sep_num':1,} # 'voice_id':'Vicki','language':'de-DE'
svg = SingingVoiceGenerator(**arg)
svg.output_wave('test.wav')

In [None]:
# score = [
#     {'text':'かー','length_sec':0.5,'scale':'C'},
#     {'text':'えー','length_sec':0.5,'scale':'D'},
#     {'text':'るー','length_sec':0.5,'scale':'E'},
#     {'text':'のー','length_sec':0.5,'scale':'F'},
#     {'text':'うー','length_sec':0.5,'scale':'E'},
#     {'text':'たー','length_sec':0.5,'scale':'D'},
#     {'text':'がー','length_sec':0.5,'scale':'C'},
# ]

In [None]:
score = [
    {'text':'どぅぅ','length_sec':4,'scale':'H-C','sep_num':3,},
    {'text':'まぃ','length_sec':3,'scale':'H-C','sep_num':2,} ,
    {'text':'ね','length_sec':1,'scale':'H-C','sep_num':1,},
    {'text':'じぃ','length_sec':6,'scale':'H-Ds','sep_num':2,},
    {'text':'れぇ','length_sec':2,'scale':'Gs','sep_num':2,},
    {'text':'どぅぅ','length_sec':4,'scale':'Gs','sep_num':3,},
    {'text':'まぃん','length_sec':4,'scale':'As','sep_num':2,},
    # 
    {'text':'はぁつ','length_sec':16,'scale':'H-C','sep_num':1,},
    {'text':'どぅぅ','length_sec':4,'scale':'Gs','sep_num':2,},
    {'text':'まぃ','length_sec':2,'scale':'Gs','sep_num':2,},
    {'text':'ねぇ','length_sec':2,'scale':'Gs','sep_num':2,},
    # 
    {'text':'ゔぉん','length_sec':12,'scale':'H-F','sep_num':2,},
    {'text':'ねぇ','length_sec':4,'scale':'H-Ds','sep_num':2,},
    {'text':'どぅぅ','length_sec':4,'scale':'H-Cs','sep_num':3,},
    {'text':'まぃん','length_sec':4,'scale':'H-C','sep_num':2,},
    #
    {'text':'しゅめぇるつぅ','length_sec':16,'scale':'As','sep_num':5,},
    {'text':'どぅぅ','length_sec':4,'scale':'As','sep_num':3,},
    {'text':'まぃ','length_sec':2,'scale':'G','sep_num':2,},
    {'text':'ねぇ','length_sec':2,'scale':'Ds','sep_num':2,},
    # 
    {'text':'ゔぇるとぅ','length_sec':12,'scale':'Gs','sep_num':3,},
    {'text':'いん','length_sec':4,'scale':'Gs','sep_num':1,},
    {'text':'でぇ','length_sec':6,'scale':'H-Cs','sep_num':2,},
    {'text':'りっひ','length_sec':2,'scale':'H-Cs','sep_num':2,},
    # 
    {'text':'りぃ','length_sec':8,'scale':'H-Cs','sep_num':2,},
    {'text':'ぶぅ','length_sec':4,'scale':'H-C','sep_num':2,},
    {'text':'まいん','length_sec':4,'scale':'H-C','sep_num':2,},
    {'text':'ひぃ','length_sec':6,'scale':'H-Ds','sep_num':2,},
    {'text':'めぅ','length_sec':2,'scale':'Gs','sep_num':2,},
    #
    {'text':'どぅ','length_sec':12,'scale':'H-Cs','sep_num':2,},
    
]
for i in range(len(score)):
    score[i]['length_sec'] /= 8
print(score)

In [None]:
flog_np_arrays = [SingingVoiceGenerator(**s).time_stretch_array for s in score]

In [None]:
data_points = 0
for flog_np_array in flog_np_arrays:
    data_points += flog_np_array.shape[0]
concat_np_array = np.zeros((data_points),dtype=np.float64)
start_index = 0
for flog_np_array in flog_np_arrays:
    concat_np_array[start_index:start_index+flog_np_array.shape[0]] = flog_np_array
    start_index += flog_np_array.shape[0]

In [None]:
shift_wav_path = os.path.join('./work', 'concat.wav')
sf.write(shift_wav_path, concat_np_array, 16000, subtype="PCM_16")