In [None]:
project_id="" #GCP Project ID used to call the Speech-to-Text API (API must be enabled)
gcs_audio_files_directory="" #GCS URI to audio files in format gs://bucket_name/path, files must be stored in subdirectories named after language code of the video (i.e. en-US)
gcs_transcripts_directory="" #GCS URI to store generated transcripts in format gs://bucket_name/path
model="chirp" #GCP model to use for transcription (i.e. chirp, long)
max_characters_per_chunk=40 #Maximum number of characters per SRT chunk

In [None]:
!(pip install google-cloud-speech)

In [None]:
from google.api_core.client_options import ClientOptions
from google.cloud.speech_v2.types import cloud_speech
from google.cloud import speech_v2

async def transcribe_batch_multiple_files_v2(
    project_id: str,
    file_array: [],
    gcs_output_path: str,
    model: str,
):
    if model=="chirp":
        client = speech_v2.SpeechAsyncClient(
            client_options=ClientOptions(
                api_endpoint="europe-west4-speech.googleapis.com",
            )
        )
        location="europe-west4"
    if model=="long":
        client = speech_v2.SpeechAsyncClient()
        location="global"

    default_config = cloud_speech.RecognitionConfig(
        auto_decoding_config=cloud_speech.AutoDetectDecodingConfig(),
        language_codes=["en-US"],
        model=model,
        features=cloud_speech.RecognitionFeatures(
            enable_automatic_punctuation=True,
            enable_word_time_offsets=True,
        ),
    )
    
    files=[]
    
    for file in file_array:
        config = cloud_speech.RecognitionConfig(
            language_codes=[file["language_code"]],
        )
        files.append(
            cloud_speech.BatchRecognizeFileMetadata(
                uri=file["gcs_uri"],
                config=config,
            )
        )

    request = speech_v2.BatchRecognizeRequest(
        recognizer=f"projects/{project_id}/locations/{location}/recognizers/_",
        config=default_config,
        files=files,
        recognition_output_config=cloud_speech.RecognitionOutputConfig(
            gcs_output_config=cloud_speech.GcsOutputConfig(
                uri=gcs_output_path+"/"+model+"/srt/json",
            ),
        ),
    )
    operation = client.batch_recognize(request=request,timeout=530)
    print("Waiting for operation to complete...")
    response = await (await operation).result(timeout=530)
    print(response)

In [None]:
import re

language_directories=!(gsutil ls {gcs_audio_files_directory})
file_array=[]

for language_directory in language_directories:
    
    language_code=re.search("/([^/]+)/$",language_directory).group(1)
    files=!(gsutil ls {language_directory})
    for uri in files:
        
        file_array.append({"gcs_uri": uri,"language_code": language_code})

print(file_array)
number_of_files=len(file_array)
print(f"Number of files: {number_of_files}")

In [None]:
import time
import asyncio

def split_array_into_chunks(array, chunk_size):
    """
    Splits an array into chunks of the specified size.

    Args:
        array: The array to split.
        chunk_size: The size of each chunk.

    Returns:
        A list of chunks.
    """

    chunks = []
    for i in range(0, len(array), chunk_size):
        chunks.append(array[i : i + chunk_size])

    return chunks

arrays=split_array_into_chunks(file_array, 5)

print(arrays)
start_time = time.time()
await asyncio.gather(*[transcribe_batch_multiple_files_v2(project_id, array, gcs_transcripts_directory, model) for array in arrays])

duration = time.time() - start_time
print(f"Took {duration} to execute")

In [None]:
!(pip install srt)

In [None]:
import srt
import re
import datetime

def break_sentences(subs, alternative, max_chars=40):
    firstword = True
    charcount = 0
    idx = len(subs) + 1
    content = ""

    for w in alternative["words"]:
        if firstword:
            # first word in sentence, record start time
            start = datetime.timedelta(seconds=float(re.search("([0-9\.]+)",w["startOffset"]).group(1)))

        charcount += len(w["word"])
        content += " " + w["word"].strip()

        if ("." in w["word"] or "!" in w["word"] or "?" in w["word"] or
                charcount > max_chars or
                ("," in w["word"] and not firstword)):
            # break sentence at: . ! ? or line length exceeded
            # also break if , and not first word
            end = datetime.timedelta(seconds=float(re.search("([0-9\.]+)",w["endOffset"]).group(1)))
            subs.append(srt.Subtitle(index=idx,
                                     start=start,
                                     end=end,
                                     content=srt.make_legal_content(content)))
            firstword = True
            idx += 1
            content = ""
            charcount = 0
        else:
            firstword = False
    return subs

In [None]:
import json
import re
from google.cloud import storage

transcript_array=!(gsutil ls {gcs_transcripts_directory}/{model}/srt/json)

for transcript in transcript_array:
    print(f"Fetching results from {transcript}...")
    output_bucket, output_object = re.match(
        r"gs://([^/]+)/(.*)", transcript
    ).group(1, 2)

    # Instantiates a Cloud Storage client
    storage_client = storage.Client()

    # Fetch results from Cloud Storage
    bucket = storage_client.bucket(output_bucket)
    blob_json = bucket.blob(output_object)
    data = json.loads(blob_json.download_as_string(client=None))
    
    subs = []
    
    if "results" in data:
        for result in data["results"]:
            if "alternatives" in result:
                # First alternative is the most probable result
                subs = break_sentences(subs, result["alternatives"][0], max_characters_per_chunk)
        
    srt_filename=re.search(f"/json/(.*)_transcript.*",output_object).group(1)
    srt_path=re.search("(.*)/json",output_object).group(1)
    
    blob_srt = bucket.blob(f"{srt_path}/srt/{srt_filename}.srt")
    blob_srt.upload_from_string(srt.compose(subs))