# 💬 Speech-to-Text

This tool converts speech in audio files to text using the Google Cloud Speech-to-Text API.  It is designed to work well with audio files stored in Google Drive.

Permanent link [https://dm-gadgets.jasontc.net/speech_to_text_converter/](https://dm-gadgets.jasontc.net/speech_to_text_converter/)

## Input

* Audio files (in flac format)

## Output

* The speech in text

## Usage

A. Connect your Google Drive to this environment

![instructions](https://storage.googleapis.com/dmtools_resources/audio/connect_to_gdrive_guide_web.png)

B. Make sure that you have a valid key file from Google Cloud to use this tool.  

  - If you do not have one, please check [the instructions no how to obtain access an access key file](https://github.com/jason-chao/memespector-gui/blob/master/doc/GetKeyFromGoogleCloud.md).

  - Make sure that the `Cloud Speech-To-Text API` is enabled for your account
    ![instructions](https://storage.googleapis.com/dmtools_resources/audio/enable_speech-to-text-api_guide_web.png)

C. Click the **▶** button below once the initialise the tool

D. Click `👉 Before you start: Select your Googld Cloud Service Account Key File` and then select the key file.  If the key file is not valid, the fields below will remain disabled.

E. Complete the fields below

* `Input directory` A directory containing a single or multiple audio files in flac format
* `Output directory` The directory where this tool will generate the output files
* `Language` The langauge of the speech.  _(Please note that Google Cloud Speech-to-Text API does not automatically detect the language of the speech for you. Your must select the language in advance.)_
* `Model` A transcription model offered by Google Cloud _(See [the explanations](https://cloud.google.com/speech-to-text/docs/transcription-model))_

F. Click `Convert`

In [None]:
#@title Run the extractor
#@markdown **Important:** Press the **▶** button once to initialise the tool before using the tool.

!pip install google-cloud google-cloud-speech google-cloud-storage ffmpeg-python

from google.cloud import storage, speech
from pathlib import Path
import ffmpeg
import uuid
import json
import csv
import os
from typing import List
from typing import NamedTuple
from datetime import datetime
import ipywidgets as widgets
from IPython.display import display, display_markdown, clear_output, Markdown

gcloud_speech2text_default_timeout_s = 180
gcloud_speech2text_timeout_duration_coefficient_s = 2

gcloud_token_filename = "/tmp/gcloud_service_key.json"
gcloud_speech2text_lang_codes_url = "https://storage.googleapis.com/dmtools_resources/audio/gcloud_speech2text_langcodes.json"
gcloud_speech2text_lang_codes_local_filename = "/tmp/gcloud_speech2text_lang_codes.json"
gcloud_speech2text_lang_codes = []
gcloud_bucket_prefix = "speech2text_"

!wget -O $gcloud_speech2text_lang_codes_local_filename $gcloud_speech2text_lang_codes_url

storage_client = None
speech_client = None

def init_gcloud_clients():
  global storage_client
  global speech_client
  os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = gcloud_token_filename
  storage_client = storage.Client()
  speech_client = speech.SpeechClient()

def get_audio_file_info(audio_filename):
  probe = ffmpeg.probe(audio_filename)
  audio_streams = [s for s in probe["streams"] if s["codec_type"] == "audio"]
  return audio_streams[0] if len(audio_streams) > 0 else None

def generate_bucket_name(prefix: str) -> str:
  return prefix + str(uuid.uuid4()).replace("-","")

def get_or_create_bucket(bucket_name: str) -> storage.Bucket:
  the_bucket = storage_client.lookup_bucket(bucket_name)
  if the_bucket is None:
    the_bucket = storage_client.create_bucket(bucket_name)
  return the_bucket

def upload_file_to_bucket(the_bucket: storage.Bucket, local_file_path: Path) -> None:
  file_blob = the_bucket.blob(local_file_path.name)
  file_blob.upload_from_filename(str(local_file_path.absolute()))

def remove_file_from_bucket(the_bucket: storage.Bucket, remote_file_name: str) -> None:
  file_blob = the_bucket.blob(remote_file_name)
  file_blob.delete()

def delete_bucket(bucket_name: str) -> None:
  the_bucket = get_or_create_bucket(bucket_name)
  the_bucket.delete()

def test_bucket_access():
  try:
    test_bucket_name = generate_bucket_name("test_s2t_")
    the_test_bucket = get_or_create_bucket(test_bucket_name)
    test_local_file_path = Path(f"/tmp/{test_bucket_name}")
    test_local_file_path.write_text(test_bucket_name)
    upload_file_to_bucket(the_test_bucket, test_local_file_path)
    remove_file_from_bucket(the_test_bucket, test_local_file_path.name)
    delete_bucket(test_bucket_name)
    return True
  except Exception as ex:
    print(ex)
  return False

def get_speech2text_results(bucket_name: str, remote_file_name: str, language_code = "en-US", model="default", timeout=0):
  gcs_uri = f"gs://{bucket_name}/{remote_file_name}"
  audio = speech.RecognitionAudio(uri=gcs_uri)
  config = speech.RecognitionConfig(
    encoding=speech.RecognitionConfig.AudioEncoding.FLAC,
    language_code=language_code,
    model=model
  )
  operation = speech_client.long_running_recognize(config=config, audio=audio)
  if timeout <= -1:
    timeout = gcloud_speech2text_default_timeout_s
  response = operation.result(timeout=timeout)
  return response.results

def get_audio_paths(video_path) -> List[Path]:
  video_file_paths: List[Path] = []
  if video_path:
    video_dir = Path(video_path)
    if video_dir.is_dir() and video_dir.exists():
      for dir_child in video_dir.iterdir():
        if dir_child.is_file() and dir_child.exists() \
            and dir_child.suffix.lower().replace(".", "") == "flac":
          video_file_paths.append(dir_child)
    return video_file_paths


def read_gcloud_keyfile():
  try:
    with open(gcloud_token_filename, "r") as key_file:
      return json.loads(key_file.read())
  except:
    pass
  return {}


def get_gcloud_speech2text_lang_codes():
  with open(gcloud_speech2text_lang_codes_local_filename, "r") as langcode_file:
    lang_codes = json.loads(langcode_file.read())
    lang_codes = [(lang_code[0], lang_code[1]) for lang_code in lang_codes]
    return lang_codes
  return []

gcloud_speech2text_lang_codes = get_gcloud_speech2text_lang_codes()

class UserInput(NamedTuple):
  audio_dir: str
  output_dir: str
  language_code: str
  model: str

clear_output()

gcloud_token_fileupload = widgets.FileUpload(
    description="👉 Before you start: Select your Googld Cloud Service Account Key File",
    accept=".json",
    multiple=False,
    layout={"width":"800px"}
    )

audio_dir_path_textbox = widgets.Text(
    value="",
    placeholder="path to the directory containing flac files",
    description="Input directory",
    disabled=True,
    style={"description_width":"initial"},
    layout={"width":"800px"}
    )

output_path_textbox = widgets.Text(
    value="",
    placeholder="path to the directory",
    description="Output directory",
    disabled=True,
    style={"description_width":"initial"},
    layout={"width":"800px"}
    )

language_code_dropdown = widgets.Dropdown(
    options=gcloud_speech2text_lang_codes,
    value="en-US",
    description="Language",
    disabled=True,
    style={"description_width":"initial"}
    )

speech2text_model_dropdown = widgets.Dropdown(
    options=["default", "latest_long", "latest_short", "video", "command_and_search", "phone_call", "medical_dictation", "medical_conversation"],
    value="latest_long",
    description="Model",
    disabled=True,
    style={"description_width":"initial"}
    )

convert_button = widgets.Button(
    description="Convert",
    disabled=True,
    button_style="success"
    )

status_output = widgets.Output()

def write_status_text(status_text):
  with status_output:
    clear_output(wait=True)
    display_markdown(Markdown(f"## {status_text}"))


def get_user_input():
  user_input = UserInput(audio_dir_path_textbox.value,
                         output_path_textbox.value,
                         language_code_dropdown.value,
                         speech2text_model_dropdown.value)
  return user_input

def disable_input_controls(disabled):
  audio_dir_path_textbox.disabled = disabled
  output_path_textbox.disabled = disabled
  language_code_dropdown.disabled = disabled
  speech2text_model_dropdown.disabled = disabled
  convert_button.disabled = disabled

def on_gcloud_keyfile_uploaded(file_input):
  gcloud_token_fileupload.disabled=True
  key_file_bytes = file_input["new"][0] if len(file_input["new"]) > 0 else []
  with open(gcloud_token_filename, "wb") as key_file:
    key_file.write(key_file_bytes)
  loaded_keyfile = read_gcloud_keyfile()
  json_keys = ["type", "project_id", "private_key_id", "private_key", "client_email"]
  if all([json_key in loaded_keyfile for json_key in json_keys]):
    init_gcloud_clients()
    if test_bucket_access():
      gcloud_token_fileupload.description="✔️ Your Google Cloud Service Account is selected"
      disable_input_controls(False)
      return
  gcloud_token_fileupload.value.clear()
  gcloud_token_fileupload._counter = 0
  write_status_text("Please provide a valid key file of credentials with 'Editor' permission.")
  gcloud_token_fileupload.disabled=False

def on_convert_button_clicked(args):
  disable_input_controls(True)
  write_status_text("Getting started ...")
  user_input = get_user_input()
  audio_paths = get_audio_paths(user_input.audio_dir)
  if (len(audio_paths) <= 0):
    write_status_text("Not started: Sorry, no flac files are found.  Please check the 'A directory containing flac files' field.")
    disable_input_controls(False)
    return
  if not os.path.exists(user_input.output_dir) or not os.path.isdir(user_input.output_dir):
    write_status_text("Not started: Sorry, the output directory is not valid.  Please check the 'Output directory' field.")
    disable_input_controls(False)
    return
  audio_path_count = len(audio_paths)
  processed_count = 0
  speech2text_results = []
  buffer_bucket_name = generate_bucket_name(gcloud_bucket_prefix)
  buffer_bucket = get_or_create_bucket(buffer_bucket_name)
  for audio_file_path in audio_paths:
    write_status_text(f"Uploading {processed_count+1}/{audio_path_count}: {audio_file_path.name}")
    upload_file_to_bucket(buffer_bucket, audio_file_path)
    audio_info = get_audio_file_info(str(audio_file_path.absolute()))
    timeout = 0
    duration = 0
    if audio_info:
      duration = round(float(audio_info["duration"]))
      timeout = duration * gcloud_speech2text_timeout_duration_coefficient_s
    write_status_text(f"Processing {processed_count+1}/{audio_path_count}: {audio_file_path.name}")
    transcript_results = get_speech2text_results(buffer_bucket_name, audio_file_path.name,
                                                 user_input.language_code, user_input.model,
                                                 timeout)
    transcript = ".".join(r.alternatives[0].transcript for r in transcript_results)
    speech2text_results.append({"local_path": str(audio_file_path.absolute()),
                                "filename": audio_file_path.name,
                                "transcript": transcript,
                                "duration": duration,
                                "language": user_input.language_code,
                                "model": user_input.model
                                })
    remove_file_from_bucket(buffer_bucket, audio_file_path.name)
    processed_count += 1
  timestamp = datetime.now().strftime("%Y%m%d-%H%M%S")
  with open(os.path.join(user_input.output_dir, f"speech_to_text_results_{timestamp}.json"), "w") as results_json_file:
    results_json_file.write(json.dumps(speech2text_results))
  with open(os.path.join(user_input.output_dir, f"speech_to_text_results_{timestamp}.csv"), "w") as results_csv_file:
    csv_writer = csv.writer(results_csv_file)
    if len(speech2text_results) > 0:
      csv_writer.writerow(speech2text_results[0].keys())
    for speech2text_result in speech2text_results:
      csv_writer.writerow(speech2text_result.values())
  delete_bucket(buffer_bucket_name)
  write_status_text(f"Done: {audio_path_count} audio files have been processed\nPlease see the results in the output directory.")
  disable_input_controls(False)
  pass

gcloud_token_fileupload.observe(on_gcloud_keyfile_uploaded, names="data")
convert_button.on_click(on_convert_button_clicked)

widgets.VBox([gcloud_token_fileupload, audio_dir_path_textbox, output_path_textbox,
              language_code_dropdown, speech2text_model_dropdown, convert_button, status_output])

VBox(children=(FileUpload(value={}, accept='.json', description='👉 Before you start: Select your Googld Cloud …

---

### ⚠️ Warranties and liabilities

This tool is provided without warranty of any kind. The developer is not liable for any loss or damage arising from your use of it.