## Improving transcription time by 50% through parellel processing

I was working on a notebook for transcribing videos.  
I ecountered a problem when I wanted to transcribe hours of videos and it taking a long time.  
By using parallel processing I was able to improve total speed of transcription by 50%.  

This notebook takes youtube videos (from a database created in a sperate project) and transcribed all the videos listed. This notebook is a comparison of different methods to try and improve speed.

#### imports

In [7]:
import os
import time
import datetime
import json
import re
import sys
import math

In [35]:
from dotenv import load_dotenv # add this line
import os
load_dotenv()

True

#### whisper model set up

In [8]:
# Note: you need to be using OpenAI Python v0.27.0 for the code below to work
import whisper
import torch
device = "cuda" if torch.cuda.is_available() else "cpu"


#### google client set-up

In [9]:
import googleapiclient.discovery
api_service_name = "youtube"
api_version = "v3"
DEVELOPER_KEY = os.getenv('GOOGLE_DEVELOPER_KEY')

os.environ["OAUTHLIB_INSECURE_TRANSPORT"] = "1"
youtube = googleapiclient.discovery.build(
    api_service_name, api_version, developerKey = DEVELOPER_KEY)

#### firebase set-up

In [10]:
import firebase_admin
from firebase_admin import credentials
from firebase_admin import firestore

# Use a service account.
cred = credentials.Certificate('{}.json'.format(os.getenv('PROJECT_ID')))

app = firebase_admin.initialize_app(cred)

db = firestore.client(app)
batch = db.batch()

### Functions

#### functions calling to database

In [12]:
def get_all_videos_for_channel_from_db(channelId):
    """
    Retrieve list of videos from firebase
    """

    docs = db.collection(u"videos").order_by('publishedAt', direction=firestore.Query.DESCENDING).where(u"channelId",u"==",u"{}".format(channelId)).stream()

    collection = []
    for doc in docs:
        vid =doc.to_dict()
        collection.append(vid)
    return collection

def get_all_transcripts_for_channel_from_db(channelId):
    """
    Retrieve list of videos from firebase
    """

    docs = db.collection(u"WhisperTranscriptions").where(u"channelId",u"==",u"{}".format(channelId)).stream()

    collection = []
    for doc in docs:
        transcript =doc.to_dict()
        collection.append(transcript)
        
    return collection

#### functions for extracting details from response

In [15]:
def getYoutubeDuration(videoId):
    responseVideoDetails = youtube.videos().list( part="contentDetails",id=videoId).execute()
    durationResponse=responseVideoDetails['items'][0]['contentDetails']['duration']

    duration_string = durationResponse.replace('PT',"")
    number_values = re.findall('\d+',duration_string)
    symbols_available= ''.join([i for i in duration_string if not i.isdigit()])
    symbol_map = {}
    for symbol in 'HMS':
        index = symbols_available.find(symbol)
        if index > -1:
            symbol_map[symbol] = number_values[index]

    duration = 0


    for idx in symbol_map:
        if idx == "H":
            duration = int(symbol_map[idx])*60*60 + duration
        if idx == "M":
            duration = int(symbol_map[idx])*60 + duration
        if idx == "S":
            duration = int(symbol_map[idx]) + duration

    return duration

#### transcription functions

In [16]:
def get_audio(url,_id):
    try:
        yt = YouTube(url)
        video = yt.streams.filter(only_audio=True).first()
        out_file=video.download(output_path="audio_files")
        base, ext = os.path.splitext(out_file)
        new_file = 'audio_files/'+_id+'.mp3'
        os.rename(out_file, new_file)
        a = new_file
    except Exception as e: # work on python 3.x
        print('Could not download data for {}: '.format(_id),e)
        return None
    return a

def transcribe_youtube_video(videoId, videoUrl,model):

    result = {}

# try:
    start_time = time.time()

    audio_downloaded = get_audio(videoUrl,videoId)
    if not audio_downloaded:
        print('Could not get transcription data {} Audio could not download.'.format(videoId))
        return

    print('Downloaded {}'.format(videoId))

    # transcribe to get speech-to-text data
    result = model.transcribe('audio_files/{}.mp3'.format(videoId))
#     os.remove("audio_files/{}.mp3".format(videoId))
    source = "whisper"
    time_to_transcbribe = time.time() - start_time
    print('Transcribed {} in {}'.format(videoId,time_to_transcbribe))

#     except Exception as e: # work on python 3.x
#         print('Could not get transcription data {}.'.format(videoId),e)

    return result

def transcription_response_to_json(transcriptResponse, videoId, channelId):
    transcriptObject = {}
    transcriptObject['videoId'] = videoId
    transcriptObject['channelId'] = channelId

    transcriptObject['text'] = transcriptResponse['text']
    segments = transcriptResponse['segments']
    keys = ['start','end','text','id','seek']
    transcriptObject['sentences'] = [{ keep: item[keep] for keep in keys } for item,i in zip(segments,range(len(segments)) )]
    
    return transcriptObject

## Action Oriented Functions

In [18]:
def find_videos_to_transcribe(channel_id,min_=0, max_=10000000000000):   
    all_videos_for_channel_in_db =  get_all_videos_for_channel_from_db(channel_id)
    videos_for_transcription = [x for x in all_videos_for_channel_in_db if (x['duration'] > min_) & (x['duration']<max_) ]
    print("Videos to be transcribed: ",len(videos_for_transcription))
    return videos_for_transcription

In [19]:
def run_transcription_batch( channelId, videos_for_transcription ): 
    model = whisper.load_model("base").to(device)
    warnings.filterwarnings("ignore")
#     tee_to_file = TeeToFile("logs/transcription_logs/transcription_log_{}_{}.csv".format(channelId, 
#                                                                                          datetime.datetime.now().timestamp()), 
#                             mode='a')
    

    for video in videos_for_transcription:
        try:
            print("Transcribing.... {} ".format(video['videoId']))
            print("Duration {} {} ".format(video['videoId'],video['duration']))




            whisper_transcription = transcribe_youtube_video(video['videoId'],
                                                             "https://www.youtube.com/watch?v={}".format( video['videoId'] ),model)
            transcription_json = transcription_response_to_json(whisper_transcription, video['videoId'], channelId)


        except:
            print("err {}", video['videoId'])
            continue



#     tee_to_file.close()
    warnings.filterwarnings("default")

## User input

In [20]:
channel_url = "https://www.youtube.com/watch?v=lBCOOTyU46M&t=638s&ab_channel=ColinandSamir"
allChannels = get_all_channels_from_db()
channel_id = video_url_to_channel_id(channel_url)


videos_for_transcription = find_videos_to_transcribe(channel_id,min_=60*4,max_=60*5)
json_string = json.dumps(videos_for_transcription)
with open("videos_for_transcription.json", "w") as file:
    file.write(json_string)

Videos to be transcribed:  19


In [21]:
videos_for_transcription_file = open("videos_for_transcription.json", 'r')
videos_for_transcription = json.loads(videos_for_transcription_file.read())
videos_for_transcription = videos_for_transcription[:8]

In [22]:
sum([x['duration'] for x in videos_for_transcription[:4]])

1086

In [27]:
import concurrent.futures

**Method 1:** Batched parallel processing

Run transcriptions a few transcriptions in parallel at a time -- here i control for how many workers I want

In [23]:
num_vids = len(videos_for_transcription)

x_workers = 4
a = math.ceil(num_vids/x_workers) #n_videos_per_thread

x_workers_inputs = [
        (channel_id,videos_for_transcription[i*a:(i+1)*a]) 
        for i in range(x_workers)
]

num_vids,x_workers,a, len(x_workers_inputs)

(8, 4, 2, 4)

In [28]:
start = time.time()

with concurrent.futures.ThreadPoolExecutor(max_workers=x_workers) as executor:
    executor.map(run_transcription_batch, *zip(*x_workers_inputs))
print("Total duration multiprocessing =", time.time() - start)


# tee_to_file.close()

Transcribing.... sNGOgBcCcA4 
Duration sNGOgBcCcA4 265 
Transcribing.... _DRsu5Cv3O0 
Duration _DRsu5Cv3O0 256 
Transcribing.... K-hnbTqo6ko 
Duration K-hnbTqo6ko 253 
Transcribing.... i05bI03nzv4 
Duration i05bI03nzv4 261 
Downloaded sNGOgBcCcA4
Downloaded _DRsu5Cv3O0
Downloaded K-hnbTqo6ko
Downloaded i05bI03nzv4
Transcribed K-hnbTqo6ko in 45.00648880004883
Transcribing.... pwkOf7A6hkw 
Duration pwkOf7A6hkw 284 
Downloaded pwkOf7A6hkw
Transcribed sNGOgBcCcA4 in 51.32943296432495
Transcribing.... re_osGoZf9w 
Duration re_osGoZf9w 280 
Downloaded re_osGoZf9w
Transcribed _DRsu5Cv3O0 in 54.23505210876465
Transcribing.... vqp-87KaqTA 
Duration vqp-87KaqTA 284 
Downloaded vqp-87KaqTA
Transcribed i05bI03nzv4 in 55.9414222240448
Transcribing.... TnyUCA-4BuQ 
Duration TnyUCA-4BuQ 280 
Downloaded TnyUCA-4BuQ
Transcribed re_osGoZf9w in 45.7160701751709
Transcribed pwkOf7A6hkw in 56.11262917518616
Transcribed TnyUCA-4BuQ in 50.43804311752319
Transcribed vqp-87KaqTA in 52.681323766708374
Total dur

**Method 2:** Full parallel processing 

Run transcriptions for all videos in parallel - here I control for how many videos I want grouped. I.e. 1 video groupings for all videos to run at the same time.

In [24]:
c =1
y_workers = math.ceil(num_vids/ c)

y_workers_inputs = [
        (channel_id,videos_for_transcription[i*c:(i+1)*c]) 
        for i in range(y_workers)
]

num_vids,y_workers,c, len(y_workers_inputs)

(8, 8, 1, 8)

In [34]:
start = time.time()

with concurrent.futures.ThreadPoolExecutor(max_workers=y_workers) as executor:
    executor.map(run_transcription_batch, *zip(*y_workers_inputs))
print("Total duration multiprocessing =", time.time() - start)


# tee_to_file.close()

Transcribing.... sNGOgBcCcA4 
Duration sNGOgBcCcA4 265 
Transcribing.... TnyUCA-4BuQ 
Duration TnyUCA-4BuQ 280 
Transcribing.... vqp-87KaqTA 
Duration vqp-87KaqTA 284 
Transcribing.... _DRsu5Cv3O0 
Duration _DRsu5Cv3O0 256 
Transcribing.... pwkOf7A6hkw 
Duration pwkOf7A6hkw 284 
Transcribing.... K-hnbTqo6ko 
Duration K-hnbTqo6ko 253 
Transcribing.... re_osGoZf9w 
Duration re_osGoZf9w 280 
Transcribing.... i05bI03nzv4 
Duration i05bI03nzv4 261 
Downloaded sNGOgBcCcA4
Downloaded TnyUCA-4BuQ
Downloaded vqp-87KaqTA
Downloaded pwkOf7A6hkw
Downloaded K-hnbTqo6ko
Downloaded re_osGoZf9w
Downloaded _DRsu5Cv3O0
Downloaded i05bI03nzv4
Transcribed K-hnbTqo6ko in 85.57955884933472
Transcribed re_osGoZf9w in 96.60068821907043
Transcribed sNGOgBcCcA4 in 100.4392101764679
Transcribed TnyUCA-4BuQ in 107.00060796737671
Transcribed i05bI03nzv4 in 108.68207597732544
Transcribed _DRsu5Cv3O0 in 111.51289677619934
Transcribed pwkOf7A6hkw in 112.63232612609863
Transcribed vqp-87KaqTA in 112.72917890548706
Tot

**Method 3:** No parallel processing

Run transcriptions sequentially

In [30]:

start = time.time()

run_transcription_batch(channel_id,videos_for_transcription)

print("duration =", time.time() - start)

# tee_to_file.close()

Transcribing.... i05bI03nzv4 
Duration i05bI03nzv4 261 
Downloaded i05bI03nzv4
Transcribed i05bI03nzv4 in 24.353605270385742
Transcribing.... TnyUCA-4BuQ 
Duration TnyUCA-4BuQ 280 
Downloaded TnyUCA-4BuQ
Transcribed TnyUCA-4BuQ in 25.48165988922119
Transcribing.... sNGOgBcCcA4 
Duration sNGOgBcCcA4 265 
Downloaded sNGOgBcCcA4
Transcribed sNGOgBcCcA4 in 24.232609033584595
Transcribing.... re_osGoZf9w 
Duration re_osGoZf9w 280 
Downloaded re_osGoZf9w
Transcribed re_osGoZf9w in 20.613561868667603
Transcribing.... K-hnbTqo6ko 
Duration K-hnbTqo6ko 253 
Downloaded K-hnbTqo6ko
Transcribed K-hnbTqo6ko in 18.787139177322388
Transcribing.... pwkOf7A6hkw 
Duration pwkOf7A6hkw 284 
Downloaded pwkOf7A6hkw
Transcribed pwkOf7A6hkw in 29.11623501777649
Transcribing.... _DRsu5Cv3O0 
Duration _DRsu5Cv3O0 256 
Downloaded _DRsu5Cv3O0
Transcribed _DRsu5Cv3O0 in 31.737667083740234
Transcribing.... vqp-87KaqTA 
Duration vqp-87KaqTA 284 
Downloaded vqp-87KaqTA
Transcribed vqp-87KaqTA in 36.827014207839966
du

In [37]:
method_1_time= 108.8
method_2_time= 116.8
method_3_time= 211.7

In [43]:
print("""

Comparing...Method 1   Method 2   Method 3

To:
Method 1:...............{}.........{}

Method 2:....{}....................{}

Method 3:....{}..........{}...........

""".format(round(method_1_time/method_2_time,1),
          round(method_1_time/method_3_time,1),
          round(method_2_time/method_1_time,1),
          round(method_2_time/method_3_time,1),
          round(method_3_time/method_1_time,1),
          round(method_3_time/method_3_time,1))
     )



Comparing...Method 1   Method 2   Method 3

To:
Method 1:...............0.9.........0.5

Method 2:....1.1....................0.6

Method 3:....1.9..........1.0...........




### Conclusion

- Multi threading improved speed by about to 50%
- 10% difference between batch processing and full parallel processing with batch being better
- if batch and full parallel were equal i would still go with batch because there is more risk waiting 1 full hour for results (if something crashes halfway then there will be 0 results availabe) as opposed to getting results every few minutes