# Audio Transcription using AssemblyAI API

In [None]:
import requests
import json
import time
from pprint import pprint
from textwrap import wrap
from urllib.parse import urlencode

## Utils

In [None]:
def read_file(filename, chunk_size=5242880):
    # Open the file in binary mode for reading
    with open(filename, 'rb') as _file:
        while True:
            # Read a chunk of data from the file
            data = _file.read(chunk_size)
            # If there's no more data, stop reading
            if not data:
                break
            # Yield the data as a generator
            yield data

def upload_file(api_token, path):
    """
    Upload a file to the AssemblyAI API.

    Args:
        api_token (str): Your API token for AssemblyAI.
        path (str): Path to the local file.

    Returns:
        str: The upload URL.
    """
    print(f"Uploading file: {path}")

    # Set the headers for the request, including the API token
    headers = {'authorization': api_token}
    
    # Send a POST request to the API to upload the file, passing in the headers
    # and the file data
    response = requests.post('https://api.assemblyai.com/v2/upload',
                             headers=headers,
                             data=read_file(path))

    # If the response is successful, return the upload URL
    if response.status_code == 200:
        return response.json()["upload_url"]
    # If the response is not successful, print the error message and return
    # None
    else:
        print(f"Error: {response.status_code} - {response.text}")
        return None
    

def get_segment(transcript_id, segmentation_type, headers):
    if segmentation_type in ["sentences", "paragraphs"]:
        endpoint = f"https://api.assemblyai.com/v2/transcript/{transcript_id}/{segmentation_type}"
        response = requests.get(endpoint, headers=headers)
        if response.status_code==200:
            response_json = json.loads(response.text)
            return response_json
        else:
            print(f"Error {response.status_code}: {response.text}")
            return None
    else:
        raise Exception(f"Unknown segmentation type: {segmentation_type}")

    
def export_transcript_to_srt_vtt_file(transcript_id, headers, format="vtt", transcript_export_params=None):
    if format not in ["srt", "vtt"]:
        raise ValueError("unknown format for exporting transcript")
    endpoint = f"https://api.assemblyai.com/v2/transcript/{transcript_id}/{format}"
    if transcript_export_params is None:
        transcript_export_params = {
            "chars_per_caption": 32
        }
    response = requests.get(endpoint, headers=headers, params=transcript_export_params)
    return response.text

def transcript_word_search(transcript_id, words, headers):
    params = {
        "words": json.dumps(words)
    }
    endpoint = f"https://api.assemblyai.com/v2/transcript/{transcript_id}/word-search?{urlencode(params)}"
    
    response = requests.get(endpoint, headers=headers, params=params)
    return response.text
    

def create_transcript(api_token, 
                      data,
                      export_format=None,
                      transcript_export_params=None,
                      words_to_search=None,
                      segmentation_type=None
                      ):

    # Set the API endpoint for creating a new transcript
    url = "https://api.assemblyai.com/v2/transcript"

    # Set the headers for the request, including the API token and content type
    headers = {
        "authorization": f'Bearer {api_token}',
        "content-type": "application/json"
    }

    # Send a POST request to the API to create a new transcript, passing in the
    # headers and data
    response = requests.post(url, json=data, headers=headers)

    # Get the transcript ID from the response JSON data
    transcript_id = response.json()['id']

    # Set the polling endpoint URL by appending the transcript ID to the API endpoint
    polling_endpoint = f"https://api.assemblyai.com/v2/transcript/{transcript_id}"

    # Keep polling the API until the transcription is complete
    while True:
        # Send a GET request to the polling endpoint, passing in the headers
        transcription_result = requests.get(polling_endpoint, headers=headers).json()

        # If the status of the transcription is 'completed', exit the loop
        if transcription_result['status'] == 'completed':
            break

        # If the status of the transcription is 'error', raise a runtime error with
        # the error message
        elif transcription_result['status'] == 'error':
            raise RuntimeError(f"Transcription failed: {transcription_result['error']}")

        # If the status of the transcription is not 'completed' or 'error', wait for
        # 3 seconds and poll again
        else:
            time.sleep(3)
        
    output = (transcription_result,)
    if words_to_search is not None:
            word_search_response = transcript_word_search(transcript_id, words_to_search, headers=headers)
            output = output + (word_search_response,)
    else:
        output += (None,)
        raise Exception("words not found for searching")
    
    if export_format is not None:
        transcript_tobe_exported = export_transcript_to_srt_vtt_file(transcript_id, headers, format=export_format, transcript_export_params=transcript_export_params)
        output = output + (transcript_tobe_exported,)
    else:
        output = output + (None,)
        
    if segmentation_type is not None:
        segmented_transcript = get_segment(transcript_id, segmentation_type, headers)
        output = output + (segmented_transcript,)
    else:
        output = output + (None,)
    
    return output


## Define parameters and function calls

In [None]:
# ----------------------------------------------------------------------------------
# Your API token is already set in this variable
your_api_token = ""
# ----------------------------------------------------------------------------------
# Cutom Vocabulary
custom_spellings = [
    {
        "from": ["ryan pace"],
        "to": ["Ryan Pace"]
    },
    {
        "from": ["matt naggy"],
        "to": ["Matt Naggy"]
    }
]
# ----------------------------------------------------------------------------------
# Cutom Vocabulary
word_boost = ["quarterback", "veteran"]
boost_param = "high"
# ----------------------------------------------------------------------------------
# Upload the file to AssemblyAI and get the upload URL
# You may also remove the upload step and update the 'audio_url' parameter in the
# 'create_transcript' function to point to a remote audio or video file.
# If you don't have one, download a sample file: https://storage.googleapis.com/aai-web-samples/espn-bears.m4a
filename = "../data/sample.m4a"
upload_url = upload_file(your_api_token, filename)
# ----------------------------------------------------------------------------------
data = {
    "audio_url": upload_url,    # url of uploaded audio file
    "language_code": "en",      # language code for transcription
    "language_detection": False,# whether to detect dominant language in audio
    "punctuate": False,         # Whether to punctuate transcript or not
    "format_text": False,       # wheter to format transcript or not
    # "custom_spelling": custom_spellings,    # Custom spellings of words
    "word_boost": word_boost,   # For custom vocabulary
    "boost_param": boost_param, # weights for words in custom vocabulary
    "dual_channel": True,      # Wheter to transcript for channel separately
    "disfluencies": True,       # True to include filler words else False
    "filter_profanity": True,   # Whether to remove profanity from transcript
    "audio_start_from": 5000,   # in ms
    "audio_end_at": 45000,      # in ms    
    
}
# ----------------------------------------------------------------------------------
srt_vtt_export_params = {
    "chars_per_caption": 32
}
# ----------------------------------------------------------------------------------
words_to_search =["quarterback", "draft", "veteran"]
# ----------------------------------------------------------------------------------
# Transcribe the audio file using the upload URL
transcript, words_search_response, transcript_in_export_format, segmented_transcript  = create_transcript(your_api_token, 
                                                                                                          data,
                                                                                                          export_format='vtt',
                                                                                                          transcript_export_params=srt_vtt_export_params,
                                                                                                          words_to_search=["ixigo", "direct", "return"],
                                                                                                          segmentation_type='sentences'
                                                                                                          )

In [None]:
# Print the completed transcript object
pprint(transcript)

In [None]:
# print output of each channel separately for dual_channel_transcription
if "utterances" in transcript and transcript["utterances"] is not None:
    for json_obj in transcript["utterances"]:
        print(f'channel {json_obj["channel"]}: {wrap(json_obj["text"])}')

In [None]:
# Results of word search in transcript
pprint(words_search_response)

In [None]:
# Result of transcript export to srt/vtt format
pprint(transcript_in_export_format)

In [None]:
# Paragraphs/Sentence segmented transcript
pprint(segmented_transcript)