### Text to Speech

Source: https://github.com/GoogleCloudPlatform/generative-ai/blob/main/audio/speech/getting-started/get_started_with_chirp_2_sdk_features.ipynb

In [None]:
%pip install --upgrade --quiet google-cloud-speech pydub etils jiwer ffmpeg-python plotly gradio

In [None]:
!gcloud auth application-default login

In [1]:
# Use the environment variable if the user doesn't provide Project ID.
import os

PROJECT_ID = "bliss-hack25fra-9533"  # @param {type: "string", placeholder: "[your-project-id]", isTemplate: true}
if not PROJECT_ID or PROJECT_ID == "[your-project-id]":
    PROJECT_ID = str(os.environ.get("GOOGLE_CLOUD_PROJECT"))

LOCATION = os.environ.get("GOOGLE_CLOUD_REGION", "us-central1")

### Import Libraries

In [2]:
import io
import os

import IPython.display as ipd
from etils import epath as ep
from google.api_core.client_options import ClientOptions
from google.cloud.speech_v2 import SpeechClient
from google.cloud.speech_v2.types import cloud_speech
import gradio as gr
from pydub import AudioSegment
     

  from .autonotebook import tqdm as notebook_tqdm


In [3]:
API_ENDPOINT = f"{LOCATION}-speech.googleapis.com"

client = SpeechClient(
    client_options=ClientOptions(
        api_endpoint=API_ENDPOINT,
    )
)

INPUT_AUDIO_SAMPLE_FILE_URI = (
    "gs://github-repo/audio_ai/speech_recognition/attention_is_all_you_need_podcast.wav"
)

RECOGNIZER = client.recognizer_path(PROJECT_ID, LOCATION, "_")

MAX_CHUNK_SIZE = 25600

### Helper Functions

In [4]:
def read_audio_file(audio_file_path: str) -> bytes:
    """
    Read audio file as bytes.
    """
    if audio_file_path.startswith("gs://"):
        with ep.Path(audio_file_path).open("rb") as f:
            audio_bytes = f.read()
    else:
        with open(audio_file_path, "rb") as f:
            audio_bytes = f.read()
    return audio_bytes


def save_audio_sample(audio_bytes: bytes, output_file_uri: str) -> None:
    """
    Save audio sample as a file in Google Cloud Storage.
    """

    output_file_path = ep.Path(output_file_uri)
    if not output_file_path.parent.exists():
        output_file_path.parent.mkdir(parents=True, exist_ok=True)

    with output_file_path.open("wb") as f:
        f.write(audio_bytes)


def extract_audio_sample(audio_bytes: bytes, duration: int) -> bytes:
    """
    Extracts a random audio sample of a given duration from an audio file.
    """
    audio = AudioSegment.from_file(io.BytesIO(audio_bytes))
    start_time = 0
    audio_sample = audio[start_time : start_time + duration * 1000]

    audio_bytes = io.BytesIO()
    audio_sample.export(audio_bytes, format="wav")
    audio_bytes.seek(0)

    return audio_bytes.read()


def play_audio_sample(audio_bytes: bytes) -> None:
    """
    Plays the audio sample in a notebook.
    """
    audio_file = io.BytesIO(audio_bytes)
    ipd.display(ipd.Audio(audio_file.read(), rate=44100))


def parse_real_time_recognize_response(
    response: cloud_speech.RecognizeResponse,
) -> list[tuple[str, int]]:
    """Parse real-time responses from the Speech-to-Text API"""
    real_time_recognize_results = []
    for result in response.results:
        real_time_recognize_results.append(
            (result.alternatives[0].transcript, result.result_end_offset)
        )
    return real_time_recognize_results


def parse_words_real_time_recognize_response(
    response: cloud_speech.RecognizeResponse,
) -> list[dict]:
    """
    Parse the word-level results from a real-time speech recognition response.
    """
    real_time_recognize_results = []
    for result in response.results:
        for word_info in result.alternatives[0].words:
            word = word_info.word
            start_time = word_info.start_offset.seconds
            end_time = word_info.end_offset.seconds
            real_time_recognize_results.append(
                {"word": word, "start": start_time, "end": end_time}
            )
    return real_time_recognize_results


def print_transcription(
    audio_sample_bytes: bytes, transcription: str, play_audio=True
) -> None:
    """Prettify the play of the audio and the associated print of the transcription text in a notebook"""

    if play_audio:
        # Play the audio sample
        display(ipd.HTML("Audio:"))
        play_audio_sample(audio_sample_bytes)
        display(ipd.HTML(""))

    # Display the transcription text
    display(ipd.HTML("Transcription:"))
    formatted_text = f"{transcription}"
    display(ipd.HTML(formatted_text))

### Read Audio File + Select Sample

In [7]:
input_audio_bytes = read_audio_file(INPUT_AUDIO_SAMPLE_FILE_URI)

In [21]:
short_audio_sample_bytes = extract_audio_sample(input_audio_bytes, 10)

In [22]:
play_audio_sample(short_audio_sample_bytes)

### Transcription

In [41]:
target_language_code = "en-US"  # @param {type:"string", isTemplate: true}

In [48]:
ts_real_time_request = cloud_speech.RecognizeRequest(
    config=cloud_speech.RecognitionConfig(
        auto_decoding_config=cloud_speech.AutoDetectDecodingConfig(),
        language_codes=["en-US"],
        # translation_config=cloud_speech.TranslationConfig(
        #     target_language=target_language_code
        # ),
        model="chirp_2",
        features=cloud_speech.RecognitionFeatures(
            enable_automatic_punctuation=True,
        ),
    ),
    content=short_audio_sample_bytes,
    recognizer=RECOGNIZER,
)

In [49]:
ts_response = client.recognize(request=ts_real_time_request)
ts_real_time_recognize_results = parse_real_time_recognize_response(ts_response)
     

In [50]:
for transcription, _ in ts_real_time_recognize_results:
    print_transcription(short_audio_sample_bytes, transcription, play_audio=True)