# Preparation

In [None]:
!pip install python-dotenv
!pip install requests
!pip install pydub
!pip install openai
!pip install --upgrade deepl


In [None]:
# Standard library imports
from datetime import datetime
import json
from io import BytesIO
import os
import pytz
import time

# Third-party imports
from dotenv import load_dotenv
from google.colab import drive, files
import deepl
from openai import OpenAI
from pydub import AudioSegment
import requests

# Mount Google Drive
drive.mount('/content/drive')


In [None]:
# Change path according to directory structure
env_path = '/content/drive/MyDrive/VoiceTranslateFlow/.env'
load_dotenv(env_path)

SPEECHFLOW_API_KEY_ID = os.getenv("SPEECHFLOW_API_KEY_ID")
SPEECHFLOW_API_KEY_SECRET = os.getenv("SPEECHFLOW_API_KEY_SECRET")

OPENAI_API_KEY  = os.getenv("OPENAI_API_KEY")

GENNY_API_URL = os.getenv("GENNY_API_URL")
GENNY_API_KEY = os.getenv("GENNY_API_KEY")
GENNY_SPEAKER = os.getenv("GENNY_SPEAKER")
GENNY_SPEAKER_STYLE = os.getenv("GENNY_SPEAKER_STYLE")

DEEPL_AUTH_KEY= os.getenv("DEEPL_AUTH_KEY")


In [None]:
# material_path = ""

context_path = '/content/drive/MyDrive/VoiceTranslateFlow/data/context.txt'
prompt_fix_jp_path = '/content/drive/MyDrive/VoiceTranslateFlow/data/prompt_fix_jp.txt'
# prompt_translation_path = '/content/drive/MyDrive/VoiceTranslateFlow/data/prompt_translation.txt'

output_dir = "/content/drive/MyDrive/VoiceTranslateFlow/output"


# SpeechFlow

ref: https://docs.speechflow.io/#/

In [None]:
def def_info_speechflow(material_path):
    # SpeechFlow API key
    # Generate API KEY, see: https://docs.speechflow.io/#/?id=generate-api-key

    # The language code of the speech in media file.
    # See more lang code: https://docs.speechflow.io/#/?id=ap-lang-list
    global LANG
    LANG = "ja"

    # The local path or remote path of media file.
    global FILE_PATH
    FILE_PATH = material_path

    # The translation result type.
    # 1, the default result type, the json format for sentences and words with begin time and end time.
    # 2, the json format for the generated subtitles with begin time and end time.
    # 3, the srt format for the generated subtitles with begin time and end time.
    # 4, the plain text format for transcription results without begin time and end time.
    global RESULT_TYPE
    RESULT_TYPE = 1

    global headers
    headers = {"keyId": SPEECHFLOW_API_KEY_ID, "keySecret": SPEECHFLOW_API_KEY_SECRET}


In [None]:
# Just following the Official Document
def create():
    create_data = {
        "lang": LANG,
    }
    files = {}
    create_url = "https://api.speechflow.io/asr/file/v1/create"
    if FILE_PATH.startswith('http'):
        create_data['remotePath'] = FILE_PATH
        print('submitting a remote file')
        response = requests.post(create_url, data=create_data, headers=headers)
    else:
        print('submitting a local file')
        create_url += "?lang=" + LANG
        files['file'] = open(FILE_PATH, "rb")
        response = requests.post(create_url, headers=headers, files=files)
    if response.status_code == 200:
        create_result = response.json()
        print(create_result)
        if create_result["code"] == 10000:
            task_id = create_result["taskId"]
        else:
            print("create error:")
            print(create_result["msg"])
            task_id = ""
    else:
        print('create request failed: ', response.status_code)
        task_id = ""
    return task_id


def query(task_id):
    query_url = "https://api.speechflow.io/asr/file/v1/query?taskId=" + task_id + "&resultType=" + str(RESULT_TYPE)
    print('querying transcription result')
    while (True):
        response = requests.get(query_url, headers=headers)
        if response.status_code == 200:
            global query_result
            query_result = response.json()
            if query_result["code"] == 11000:
                print('transcription result:')
                print(query_result)
                break
            elif query_result["code"] == 11001:
                print('waiting')
                time.sleep(5)
                continue
            else:
                print(query_result)
                print("transcription error:")
                print(query_result['msg'])
                break
        else:
            print('query request failed: ', response.status_code)


def speechflow(material_path):
    print("\n[Transcription started]")
    def_info_speechflow(material_path)
    task_id = create()
    if (task_id != ""):
        query(task_id)


def transcription_results(result):
    """Process and format transcription results."""
    transcription_results = []
    sentences = json.loads(result['result'])['sentences']

    # Extract each sentence's text and add it to the list
    for sentence in sentences:
        transcription_results.append(sentence['s'])

    # Join each statement with a space and return it as a single string
    return ' '.join(transcription_results)


# OpenAI

ref: https://platform.openai.com/docs/api-reference/introduction

In [None]:
def set_context(context_path):
    """Load and return the context from a file."""
    with open(context_path, 'r', encoding='utf-8') as file:
        context = file.read()
    return context


def set_prompt(prompt_path, script_var):
    """Load and format the prompt with a variable."""
    with open(prompt_path, 'r', encoding='utf-8') as file:
        prompt = file.read().format(script_var=script_var)
    return prompt


In [None]:
def chatgpt(prompt: str, context: str, api_key: str, model: str = "gpt-4o", max_tokens: int = 800, max_requests: int = 40) -> str:
    """Interact with ChatGPT API to generate a response based on prompt and context."""
    print("\n[ChatGPT session started]")
    client = OpenAI(api_key=api_key)

    # Initialize the conversation with context and prompt
    messages = [
        {"role": "system", "content": context},
        {"role": "user", "content": prompt}
    ]
    complete_response = ""
    request_count = 0  # Counter for the number of requests

    try:
        while request_count < max_requests:  # Limit the number of requests
            request_count += 1
            print(f"\n[Request {request_count}] Sending API request...")

            # Send the API request
            response = client.chat.completions.create(
                model=model,
                messages=messages,
                temperature=0.3,
                max_tokens=max_tokens
            )

            content = response.choices[0].message.content
            complete_response += content

            # Display progress
            print(f"[Request {request_count}] Received {len(content)} tokens.")
            print(f"[Total tokens so far] {len(complete_response)} tokens collected.")

            # Check if response is complete (content length less than max_tokens)
            if len(content) < max_tokens * 0.8:  # Likely complete if < 80% of max_tokens
                print(f"[Completed] Full response generated with {len(complete_response)} tokens.")
                break

            # Add the latest response to the message history
            messages.append({"role": "assistant", "content": content})
            print(f"[Request {request_count}] Continuing to request additional content...")

        # If loop ends due to request count limit
        if request_count >= max_requests:
            print("[Warning] Maximum request count reached, response may be incomplete.")

        return complete_response

    except Exception as e:
        return f"Error: {str(e)}"


# Deepl

ref: https://developers.deepl.com/docs

In [None]:
def use_deepl(source_text):
    print("\n[Translation started]")
    translator = deepl.Translator(DEEPL_AUTH_KEY)

    # Define maximum chunk size as 50,000 characters
    max_chunk_size = 50000
    chunks = []
    start = 0
    translated_chars = 0  # To keep track of the total translated characters

    # Split the text into chunks of 50,000 characters or less
    while start < len(source_text):
        # Set the end of the current chunk
        end = min(start + max_chunk_size, len(source_text))

        # Adjust end to the last period (。) within the chunk, if possible
        if end < len(source_text) and "。" in source_text[start:end]:
            end = source_text.rfind("。", start, end) + 1
        elif end == len(source_text):
            end = len(source_text)

        # Add the chunk to the list and update the starting position
        chunks.append(source_text[start:end])
        start = end

    # Translate each chunk and combine the results
    translated_texts = []
    for chunk in chunks:
        result = translator.translate_text(chunk, target_lang="EN-US").text
        translated_texts.append(result)

        # Update and display progress
        translated_chars += len(chunk)
        print(f"Translated {translated_chars} / {len(source_text)} characters")

    # Combine all translated chunks
    return ''.join(translated_texts)


# Genny

ref: https://api.genny.lovo.ai/api/docs

ref2: https://docs.genny.lovo.ai/reference/intro/getting-started

In [None]:
def def_header_genny():
    # Request header
    global HEADERS
    HEADERS = {
        'Accept': 'application/json',
        'X-Api-Key': GENNY_API_KEY,
        'Content-Type': 'application/json'
    }

# Text splitting function
def split_text(text, max_length=500):
    """Split text into chunks within the specified maximum length."""
    chunks = []
    current_chunk = ""

    for sentence in text.split(". "):
        if len(current_chunk) + len(sentence) + 1 <= max_length:
            current_chunk += sentence + ". "
        else:
            chunks.append(current_chunk.strip())
            current_chunk = sentence + ". "

    if current_chunk:
        chunks.append(current_chunk.strip())

    return chunks


# Text-to-speech synthesis function
def synthesize_text(text_chunk, chunk_index, total_chunks):
    """Convert a text chunk to audio using the synthesis API."""
    data = {
        'text': text_chunk,
        'speaker': GENNY_SPEAKER,
        'speakerStyle': GENNY_SPEAKER_STYLE,
        'speed': 1.0
    }
    print(f"[{chunk_index + 1}/{total_chunks}] Synthesizing chunk...")
    response = requests.post(GENNY_API_URL, headers=HEADERS, json=data)

    if response.status_code == 200 or response.status_code == 201:
        response_json = response.json()
        # Check for 'urls' key in the response; if missing, display an error
        if "data" in response_json and response_json["data"] and "urls" in response_json["data"][0]:
            audio_url = response_json["data"][0]["urls"][0]
            audio_response = requests.get(audio_url)

            if audio_response.status_code == 200:
                print(f"[{chunk_index + 1}/{total_chunks}] Chunk synthesis succeeded.")
                return AudioSegment.from_file(BytesIO(audio_response.content), format="wav")
            else:
                print(f"[{chunk_index + 1}/{total_chunks}] Failed to download audio. Status code:", audio_response.status_code)
                return None
        else:
            print(f"[{chunk_index + 1}/{total_chunks}] Audio URL not found in response. Response content:", response_json)
            return None
    else:
        print(f"[{chunk_index + 1}/{total_chunks}] Failed to synthesize text. Status code: {response.status_code}")
        return None


# Concatenate multiple audio segments
def concatenate_audios(audio_segments):
    """Concatenate multiple audio segments into one."""
    combined_audio = AudioSegment.empty()
    for segment in audio_segments:
        combined_audio += segment
    return combined_audio


In [1]:
# Main processing function
def genny(text, current_time):
    """Main processing function."""
    print("\n[Text-to-speech process started]")
    def_header_genny()

    text_chunks = split_text(text)
    total_chunks = len(text_chunks)
    audio_segments = []
    script_content_en = ""
    file_index = 1  # File index number

    for i, chunk in enumerate(text_chunks):
        print(f"Processing chunk {i + 1} of {total_chunks}")

        audio_segment = synthesize_text(chunk, i, total_chunks)
        if audio_segment:
            audio_segments.append(audio_segment)
            script_content_en += f"{chunk}\n\n"  # Add successful chunk to script

        else:
            print(f"Skipping chunk {i + 1} due to synthesis failure.")

        # Save files every 80 chunks or at the end
        if (i + 1) % 80 == 0 or (i + 1) == total_chunks:
            # Save audio file
            if audio_segments:
                combined_audio_en = concatenate_audios(audio_segments)
                audio_filename = f"en_audio{file_index}_{current_time}.wav"
                save_audio(combined_audio_en, audio_filename)
                audio_segments = []

            # Save script file
            if script_content_en:
                script_filename_en = f"en_script{file_index}_{current_time}.txt"
                save_script(script_content_en, script_filename_en)
                script_content_en = ""

            file_index += 1
            time.sleep(3)  # Wait before the next request


# Execute

In [None]:
def check_path(material_path):
    """Check and confirm the file path."""
    print("Are you sure that this file path is correct?")
    check_path = False
    while check_path != True:
        check = input("If you want to re-enter, type q, or if you want to leave it as is, type any other key.")
        if check != "q":
            check_path = True
        else:
            material_path = input("Please input the path of the material: ")
    return material_path


def save_audio(audio, filename="audio.wav"):
    """Save the concatenated audio file."""
    global output_dir
    output_dir = output_dir
    os.makedirs(output_dir, exist_ok=True)  # Create the directory if it doesn't exist
    filepath = os.path.join(output_dir, filename)
    audio.export(filepath, format="wav")
    print(f"Audio file saved as {filepath}")


# Save the script text to a file
def save_script(script, filename):
    """Save the script text as a file."""
    global output_dir
    output_dir = output_dir
    os.makedirs(output_dir, exist_ok=True)
    filepath = os.path.join(output_dir, filename)
    with open(filepath, "w", encoding="utf-8") as file:
        file.write(script)
    print(f"Script file saved as {filepath}")


In [None]:
def main():
    """Main function for processing transcription, fixing, translating, and generating audio."""

    # Get current timestamp
    current_time = datetime.now(pytz.timezone('Asia/Tokyo'))

    # Ask material_path
    material_path = input("Please input the path of the file: ")
    material_path = check_path(material_path)

    # Transcription process
    speechflow(material_path)
    original_jp_text = transcription_results(query_result)

    # Fix Japanese text using ChatGPT
    context = set_context(context_path)
    prompt_fix_jp = set_prompt(prompt_fix_jp_path, original_jp_text)
    fixed_jp_text = chatgpt(prompt_fix_jp, context, OPENAI_API_KEY)

    # Save fixed Japanese script if available
    if fixed_jp_text:
        script_filename_jp = f"jp_script_{current_time}.txt"
        save_script(fixed_jp_text, script_filename_jp)

    # Translate fixed Japanese text to English
    en_text = use_deepl(fixed_jp_text)

    # Convert English text to speech
    genny(en_text, current_time)
    print("\nCompleted this process.\nIf you want to run it in a different file, restart the runtime.")


In [None]:
main()