In [None]:
import os
import speech_recognition as sr
from os import path
import shutil
import zipfile
from pydub import AudioSegment
from pydub.silence import split_on_silence
from multiprocessing import Pool, Manager
import time
import logging
from tqdm.notebook import tqdm
from datetime import datetime
import argparse

logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
# create a file handler
log_file = logging.FileHandler('transcribe_audio.log')
log_file.setLevel(logging.INFO)
if logger.hasHandlers():
    logger.handlers.clear()
logger.addHandler(log_file)

In [None]:
def print_time(prev_time):
    delta = time.time() - prev_time
    readable_time = f"{delta//3600}h {(delta//60)%60}m {delta%60:.2f}s"
    return readable_time

In [None]:
def convert_to_wav(file_path, output_path):
    # Load the audio file
    audio = AudioSegment.from_file(file_path)

    # Ensure the output path has a .wav extension
    if not output_path.endswith('.wav'):
        output_path += '.wav'

    # Export the audio to WAV format
    audio.export(output_path, format="wav")

In [None]:
def split_audio_on_silence(file_name, path_to_temp, min_silence_len=300, silence_thresh=-16, keep_silence=100, min_chunk_length=1000, seek_step=1 ):
    #  create a directory to store the audio chunks
    folder_name = f"{path_to_temp}/audio-chunks"
    if not os.path.isdir(folder_name):
        os.mkdir(folder_name)
    # split audio sound where silence is 0.5 seconds or more and get chunks
    try:
        sound = AudioSegment.from_file(f"{path_to_temp}/{file_name}", format="aac")
    except:
        sound = AudioSegment.from_file(f"{path_to_temp}/{file_name}", format="ogg")
    chunks = split_on_silence(
        sound,
        min_silence_len=min_silence_len,
        silence_thresh=silence_thresh,
        keep_silence=keep_silence,
        seek_step=seek_step
    )
    combined_audio = AudioSegment.silent(duration=10)
    accumulated_chunks = 0
    for i, audio_chunk in enumerate(chunks, start=1):
        # export audio chunk and save it in
        # the `folder_name` directory.
        if(len(combined_audio) < min_chunk_length):
            combined_audio += audio_chunk
            continue
        else:  
            chunk_filename = os.path.join(folder_name, f"chunk_{accumulated_chunks}.wav")
            # save audio chunk and append metadata to a file
            combined_audio.export(chunk_filename, format="wav")
            combined_audio = AudioSegment.silent(duration=10)
            accumulated_chunks += 1

## V2 (Multithreading)

In [None]:
def transcribe_audio_mt(path, index, result_list):
    # use the audio file as the audio source
    recognizer = sr.Recognizer()
    with sr.AudioFile(path) as source:
        audio_listened = recognizer.record(source)
        # try converting it to text
        try:
            text = recognizer.recognize_google(audio_listened)
            result_list.append((index, text))
        except sr.UnknownValueError as e:
            pass

In [None]:
def all_zip(given_path):
    #  Finds all zip files based on name
    if os.path.isfile(given_path):
        yield given_path
    else:
        for file in os.listdir(given_path):
            if os.path.isdir(os.path.join(given_path, file)):
                yield from files(os.path.join(given_path, file))
            else:
                # print(1)
                yield f"{given_path}/{file}"

In [None]:
def transriber(rec_base_path, 
               base_path, 
               desired_file_name, 
               transcript_name=None, 
               min_silence_len=2000, 
               silence_thresh=-100, 
               keep_silence=100, 
               min_chunk_length=1000, 
               seek_step=1):
    start = time.time()
    for i in tqdm(all_zip(rec_base_path)):
        if i.endswith(".zip"):
            next_time_start = time.time()
            session_name = i.split("/")[-1].split(".")[0]
            logger.info(f"Ran on day {datetime.today().strftime('%Y-%m-%d')} at {datetime.today().strftime('%H:%M:%S')}")
            logger.info(f"Starting {session_name}")
            tokens = 0
            if(transcript_name is None):
                path_to_transcript = f"{base_path}/transcript-{session_name}.txt"
            else:
                path_to_transcript = f"{base_path}/{transcript_name}.txt"
            path_to_temp = "/".join(rec_base_path.split("/")[:-1])+f"/temp-{session_name}"
            with zipfile.ZipFile(i, 'r') as zip_ref:
                for name in zip_ref.namelist():
                    if desired_file_name.lower() in name.lower():
                        # Extract file
                        if not os.path.isfile(f"{path_to_temp}/{name}"):
                            zip_ref.extract(name, path=path_to_temp)
                            logger.info(f"Extracted {name} to {path_to_temp} in {print_time(next_time_start)} seconds")
                        else:
                            logger.info(f"{name} already extracted")
                        next_time_start = time.time()

                        # do transcription
                        # 1. split on silence
                        # Splits the large audio file into chunks
                        split_audio_on_silence(
                            name, 
                            path_to_temp,
                            min_silence_len=min_silence_len,
                            silence_thresh=silence_thresh,
                            keep_silence=keep_silence,
                            min_chunk_length=min_chunk_length,
                            seek_step=seek_step
                        )
                        logger.info(f"Split {name} in {print_time(next_time_start)} seconds")
                        next_time_start = time.time()
                        
                        # 2. transcribe
                        # Order the chunks of audio by their index and transcribes them
                        # Performs this in a multi-threaded manner
                        manager = Manager()
                        results = manager.list()
                        chunks_folder_name = f"{path_to_temp}/audio-chunks/"
                        ordered_list = sorted(os.listdir(chunks_folder_name), key=lambda x: int(x.split("_")[1].split(".")[0]))
                        temp_paths = []
                        for file in ordered_list:
                            temp_paths.append(os.path.join(chunks_folder_name, file))
                        with Pool() as pool:
                            pool.starmap(transcribe_audio_mt, [(path, i, results) for i, path in enumerate(temp_paths)])
                        final_result = sorted(results, key=lambda x: x[0])
                        logger.info(f"Transcribed {name} in {print_time(next_time_start)} seconds")
                        next_time_start = time.time()

                        # 3. save amd clean up
                        # Writes the results to a file
                        if(os.path.isfile(path_to_transcript)):
                            os.remove(path_to_transcript)
                        for result in final_result:
                            if(len(result[1].split(" ")) > 10):
                                tokens += len(result[1].split(" "))
                                with open(path_to_transcript, "a") as f: #change names
                                    f.write(result[1] + "\n")
                        logger.info(f"Saved {name} in {print_time(next_time_start)} seconds")
                        next_time_start = time.time()
                        break
            logger.info(f"Finished {session_name} in {print_time(start)} seconds with {tokens} words")
            logger.info(f"Saved at {path_to_transcript}")
            logger.info(f"Params: min_silence_len={min_silence_len}, silence_thresh={silence_thresh}, keep_silence={keep_silence}, min_chunk_length={min_chunk_length}, seek_step={seek_step}")
            logger.info("--------------------------------------------------")

In [None]:
def cleanup(rec_base_path):
    # List all subdirectories in the given directory
    subdirs = [d for d in os.listdir(rec_base_path) if os.path.isdir(os.path.join(rec_base_path, d)) and "temp" in d]

    # Display the subdirectories and ask for user input
    print("Subdirectories found:")
    for idx, subdir in enumerate(subdirs, 1):
        print(f"{idx}. {subdir}")
    print(f"{len(subdirs) + 1}. All")

    # Get the user's choice
    choice = input("Enter the number of the directory to delete (or 'All' to delete everything): ")

    # Validate and process the choice
    if choice.isdigit() and 1 <= int(choice) <= len(subdirs):
        # Delete the selected subdirectory
        # print(f"Deleting {subdirs[int(choice) - 1]}...")
        chosen = os.path.join(rec_base_path, subdirs[int(choice) - 1],"audio-chunks")
        # print(chosen)
        shutil.rmtree(chosen)
    elif choice.lower() == 'all':
        # Delete all subdirectories
        # print("Deleting all subdirectories...")
        for subdir in subdirs:
            # print(f"Deleting {subdir}...")
            chosen = os.path.join(rec_base_path, subdir,"audio-chunks")
            # print(chosen)
            shutil.rmtree(chosen)
    else:
        print("Invalid input. No directories were deleted.")