In [None]:
from pathlib import Path

import gtts
import librosa
import numpy as np
import pandas as pd
import soundfile
import srsly
from IPython.core.interactiveshell import InteractiveShell
from pydub import AudioSegment

InteractiveShell.ast_node_interactivity = "all"
pd.options.display.max_colwidth = 100


output_dir = Path("./output/synth_calls/sample_transcript")


In [None]:
audio_fragment_records = []
for file in list(output_dir.glob("./*.mp3")):
    # collate utterance audio files into raw samples
    y, s = librosa.load(str(file))  # FYI: assigns default sample rate
    audio_fragment_records.append(
        {"file": file.name, "sample_array": y, "sample_array_shape": y.shape[0]}
    )

audio_fragments = (
    pd.DataFrame(audio_fragment_records)
    # probably just the df index; but to be sure
    .assign(sequence_idx=lambda x: x.file.apply(lambda y: int(y.split("_")[-1][0])))
    .sort_values("sequence_idx")
    # speaker as channel
    .assign(channel=lambda x: x.sequence_idx.apply(lambda y: 1 if y % 2 == 0 else 2))
    .reset_index(drop=True)
)


In [None]:
import numpy as np
import soundfile
from pydub import AudioSegment

# pad channel 1/2 chunks to ensure for interleaving pattern
channel_1_segments = []
channel_2_segments = []
for idx, e in audio_fragments.iterrows():

    if e.channel == 1:
        channel_1_segments.append(e.sample_array)
        # pad alternating channel (channel 2) with equivalent size zero array to create interleave
        channel_2_segments.append(np.zeros(e.sample_array.shape[0], dtype=np.float32))
    else:
        # odd indices are channel 2
        channel_2_segments.append(e.sample_array)
        # otherwise, channel 2 length zero array
        channel_1_segments.append(np.zeros(e.sample_array.shape[0], dtype=np.float32))

# temp save for channel 1/2 audio - saves as mono
default_sr = 22050
channel_1_padded = np.concatenate(channel_1_segments)
soundfile.write(output_dir / "channel_1_temp.wav", channel_1_padded, default_sr)

channel_2_padded = np.concatenate(channel_2_segments)
soundfile.write(output_dir / "channel_2_temp.wav", channel_2_padded, default_sr)

# consolidate into an interleaving, channel seperated source
left_channel = AudioSegment.from_wav(output_dir / "channel_1_temp.wav")
right_channel = AudioSegment.from_wav(output_dir / "channel_2_temp.wav")

stereo_sound = AudioSegment.from_mono_audiosegments(left_channel, right_channel)
stereo_sound.export(output_dir / "consolidated_final.wav")


In [None]:
# # put the final wavs in a single dir
# episode_wav_dir = base_output_dir / "episode_wavs"
# episode_wav_dir.mkdir(
#     exist_ok=True, parents=True
# ) if episode_wav_dir.exists() == False else None


In [None]:
for e in list(output_dir.parents[0].rglob("./*.wav")):
    if "final" in e.as_posix():
        shutil.move(str(e), output_dir.parents[1] / f"final_calls/{e.name}.wav")


In [None]:
# clean up? nah, files are pretty small > git repo
import shutil

[e.unlink() for e in output_dir.glob("./*.mp3")]
[e.unlink() for e in output_dir.glob("./*.wav") if "temp" in str(e)]

shutil.move(
    str(output_dir / "consolidated_final.wav"),
    output_dir.parents[1] / f"final_calls/{output_dir.name}.wav",
)


In [None]:
import IPython

IPython.display.Audio(
    "/home/asr-synthesis-blog/output/final_calls/mDHvtCPoQGeebm5oCSXXUa.wav"
)
