In [None]:
import os
import time
from datetime import datetime
from pathlib import Path
from typing import List
import re
from dataclasses import replace, dataclass

import numpy as np
from numpy import ndarray
import base64
from io import BytesIO
import soundfile as sf
import sounddevice as sd
from pydub import AudioSegment

import banana_dev as banana

from scipy.io.wavfile import write
from more_itertools import chunked

In [None]:
from srt import SrtEntry, SrtPair
from synthesized_utterance import SynthesizedUtterance
from paragraph import Sentence, Paragraph
from sample import Sample

In [None]:
def read_txt(file_path) -> str:
    encodings = ['utf-8', 'utf-16', 'utf-8-sig', 'cp1257', 'iso8859_13',]
    for encoding in encodings:
        try:
            with open(file_path, mode='r', encoding=encoding) as file:
                return file.read().strip()
        except (NotADirectoryError, FileNotFoundError) as e:
            raise Exception(f"Filepath error {e}")
    raise Exception("Could not read file given the encodings")

def get_paragraphs(file_path) -> List[str]:
    text = read_txt(file_path)
    return [Paragraph(i, text) for i, text in enumerate(re.sub(r"\n{2,}", "\n", text).split('\n'))]

def get_chapters(input_dir: Path) -> dict:
    filepaths = [Path(input_dir) / file for file in os.listdir(input_dir)]
    return [(filepath.stem, get_paragraphs(filepath)) for filepath in filepaths]

In [None]:
def split_paragraph_to_sentences(paragraph: Paragraph) -> List[str]:      
    sentences = []
    sentence_idx = 0
    
    sentence_end_pattern = '[.?!]+[\'"]?'
    sentence_ends = [e for e in re.finditer(sentence_end_pattern, paragraph.text)]    
    if len(sentence_ends) == 0:
        # Filter empty
        if paragraph.text:
            sentences.append(Sentence(0, paragraph.index, paragraph.text))
    else:    
        i = 0
        for idx, e in enumerate(sentence_ends):
            text = paragraph.text[i:e.end()]
            # Filter empty
            if text:
                sentences.append(Sentence(idx, paragraph.index, text))
            i = e.end() + 1    
    return sentences

In [None]:
def synthesize_sentences(sentences: List[Sentence], batch_size: int, speed_mult: float):
    texts = [sentence.text for sentence in sentences]
    outputs = [synthesize(batched_sentences, speed_mult) for batched_sentences in chunked(texts, batch_size)]   
    audio_samples = [sf.read(BytesIO(audio))[0] for output in outputs for audio in output]
    return [replace(sentence, audio = audio) for sentence, audio in zip(sentences, audio_samples)]

def synthesize(sentences: List[str], speed_mult: float):
    model_inputs = {'text': sentences, "speed_multiplier": speed_mult}
    retries, retry_count = 3, 0
    while retry_count < retries:
        retry_count += 1
        try:
            out = banana.run(API_KEY, MODEL_KEY, model_inputs)
            break
        except Exception as e:
            print(e)
            
    if retry_count == retries:
        raise Exception(
            f"Retry exceeded retry count of {retry_count}. Max sentence length: {max(len(s) for s in sentences)}.\n {model_inputs}"
        )

    data = out["modelOutputs"][0].get("audio", None)
    return [base64.b64decode(audio_bytes) for audio_bytes in data]

In [None]:
def concatenate_with_silence(audio0: np.array, audio1: np.array, silence=0.5, sample_rate=22050):
    silent_frames = np.array(
        AudioSegment.silent(duration=silence * 1000, frame_rate=sample_rate).get_array_of_samples(), 
        dtype=np.float32
    )
    return np.concatenate((audio0, silent_frames, audio1))

def concatenate_sentences(sentences: List[Sentence], sentence_silence: float, paragraph_silence: float):
    audio = sentences[0].audio
    paragraph_idx = sentences[0].paragraph_idx
    for sentence in sentences[1:]:
        if paragraph_idx == sentence.paragraph_idx:
            silence = sentence_silence
        else:
            paragraph_idx = sentence.paragraph_idx
            silence = paragraph_silence    
        audio = concatenate_with_silence(audio, sentence.audio, silence)
    return audio    

In [None]:
def save_audio(audio: np.array, sample_rate: int, file_path: Path):
    os.makedirs(str(file_path.parent), exist_ok=True)
    write(str(file_path), SAMPLE_RATE, audio.astype(np.float32))

In [None]:
def srt_entries_from_sentences(sentences: List[Sentence], fill_sentences: float, fill_paragraphs: float) -> List:
    curr_start = 0.0
    prev_paragraph = sentences[0].paragraph_idx
    srt_entries = []
    for i, sentence in enumerate(sentences):
        if i > 0:
            if sentence.paragraph_idx == prev_paragraph:
                curr_start += fill_sentences
            else:
                prev_paragraph = sentence.paragraph_idx
                curr_start += fill_paragraphs 
        
        duration = len(sentence.audio) / SAMPLE_RATE
        end = curr_start + duration
        srt_entries.append(
            SrtEntry(idx=i, start=curr_start * 1000, end=end * 1000, text=sentence.text, audio=sentence.audio)
        )
        curr_start = end
    return srt_entries

def save_srt_entries_to_file(srt_entries: List[SrtEntry], out_filepath: Path):
    text = '\n\n'.join([srt_entry.to_string() for srt_entry in srt_entries])
    with open(out_filepath, mode='w', encoding='utf-8') as f:
        f.write(text)

### Pipeline Start

In [None]:
SAMPLE_RATE = 22050
BATCH_SIZE = 16

In [None]:
SILENCE_BETWEEN_SENTENCES = 1
SILENCE_BETWEEN_PARAGRAPHS = 1.7
SPEED_MULTIPLIER = 1.25

In [None]:
API_KEY = ""
MODEL_KEY = ""

In [None]:
input_dir = Path("")
output_dir = Path("")

In [None]:
chapters = get_chapters(input_dir)

In [None]:
for i, (name, paragraphs) in enumerate(chapters, start=1):
    print(f"Synthesizing chapter `{name}` ({i}/{len(chapters)})")

    paragraph_sentences = (split_paragraph_to_sentences(paragraph) for paragraph in paragraphs)
    sentences = list(sentence for sentences in paragraph_sentences for sentence in sentences)

    print(f"Synthesizing {len(sentences)} sentences")
    synthesized_sentences = synthesize_sentences(sentences, BATCH_SIZE, SPEED_MULTIPLIER)
    
    chapter_audio = concatenate_sentences(synthesized_sentences, SILENCE_BETWEEN_SENTENCES, SILENCE_BETWEEN_PARAGRAPHS)
    save_audio(chapter_audio, SAMPLE_RATE, output_dir / f"{name}.wav")
    
    srt_entries = srt_entries_from_sentences(synthesized_sentences, SILENCE_BETWEEN_SENTENCES, SILENCE_BETWEEN_PARAGRAPHS)
    save_srt_entries_to_file(srt_entries, output_dir / f"{name}.srt")