In [None]:
##########

In [1]:
import os, requests, time, sys
import numpy as np
import pandas as pd
from xml.etree import ElementTree
from datetime import datetime
#
import multiprocessing
#
numConsumers = multiprocessing.cpu_count()
print(numConsumers)

8


In [2]:
#############################################################
#############################################################
###  Using a worker function for parallel processing
#############################################################
#############################################################
###  Adapted from here:
###  https://github.com/Azure-Samples/Cognitive-Speech-TTS/blob/master/Samples-Http/Python/TTSSample.py
###
###  Take the cleaned up extracted PDF contents CSV file as input data.
###  Convert the sentence to wav audio in 16kHz, mono, PCM coding.
###  Save the audio files
###  Create the CSV file required by Deepspeech training.
###  Create additional CSV file which also includes the short name of the voice type
###         -- note this file is not used for for Deepspeech and only for reference.
###
###  Parallel processing related:
###      Create tasks objects and then add one poison pill per consumer.
###      Create consumers = # available cores
###      Start the consumers.
###      Wait for them to rejoin and summarize results.
###
###  Adapted code to handle 429 and 503 responses
###      For 503: will try up to 3 times for the same sentence.
###      For 429: will break out of processing, but allow writing file.
#############################################################
#############################################################

In [3]:
#
import os, requests, time, sys
import numpy as np
import pandas as pd
from xml.etree import ElementTree
from datetime import datetime
from random import randint
#
import multiprocessing
#
pd.set_option('max_columns', 500)
pd.set_option('max_colwidth', -1)
pd.set_option('max_rows', 500)
#
import logging
logFilenameWithPath = '/home/rohit/dpspTraining/data/wavFiles/azure1/pdfExtraction/workerLogic_801_950_1950wavs/LOG_testTTS11_with429503_parallelProc_charCountTrack_801_950.log'
## https://www.patricksoftwareblog.com/python-logging-tutorial/
logging.basicConfig(level=logging.WARNING, filename=logFilenameWithPath,             \
    filemode='w', format='%(processName)-10s) : %(asctime)s : %(message)s')
#    filemode='w', format='')
#    filemode='w', format='%(asctime)s %(levelname)s:%(message)s')
#

In [4]:
#
#
############################################################
#
#
## https://github.com/Azure-Samples/Cognitive-Speech-TTS/blob/master/Samples-Http/Python/TTSSample.py
class TextToSpeech(object):
    #
    def __init__(self, subscription_key):
        self.subscription_key = subscription_key
        #self.tts = input("What would you like to convert to speech: ")
        self.tts = None   ## defaulting the value to None -- the text to be converted to audio
        self.timestr = time.strftime("%Y%m%d-%H%M%S")
        self.access_token = None
        self.access_token_startTime = None
    #
    #The TTS endpoint requires an access token. This method exchanges your
    #subscription key for an access token that is valid for ten minutes.
    #
    def get_token(self):
        #fetch_token_url = "https://westus.api.cognitive.microsoft.com/sts/v1.0/issueToken"
        fetch_token_url = "https://westeurope.api.cognitive.microsoft.com/sts/v1.0/issueToken"
        headers = {
            'Ocp-Apim-Subscription-Key': self.subscription_key
        }
        response = requests.post(fetch_token_url, headers=headers)
        self.access_token = str(response.text)
        self.access_token_startTime = time.strftime("%Y%m%d-%H%M%S")
    #
    def check_tokenGenerationTime_and_regenerateIfRequired(self, thresholdInSecs = 540):
        timeNow = time.strftime("%Y%m%d-%H%M%S")
        s_tg = time.strptime(self.access_token_startTime, "%Y%m%d-%H%M%S")  # struct_time of token generation time
        s_now = time.strptime(timeNow, "%Y%m%d-%H%M%S")  # struct_time of now time
        timeDiff = time.mktime(s_now) - time.mktime(s_tg) ## time since token generation in seconds
        if timeDiff > thresholdInSecs: # azure documentation states token valid for 10 mins, so default 9*60 = 540 secs
            myStr = f"\nRequesting new access token automatically." + \
                  f"Earlier token: Start time = {self.access_token_startTime}\nTime now = {timeNow}" + \
                  f"Time Difference (secs) since last genertion= {timeDiff}" + \
                  f"\nOld access_token = \n{self.access_token}\n"
            #print(f"{myStr}")
            logging.warning(f"{myStr}")
            self.get_token()
            myStr = f"\nNew Access Token received = \n{self.access_token}" + \
                  f"\nNew Access Token Start Time = \n{self.access_token_startTime}\n"
            #print(f"{myStr}")
            logging.warning(f"{myStr}")
    #
    def save_audio(self, counters, inVoiceTypeShortName = '', fileNumber = 1, wavFilePath = ''):
        #### Inputs:
        ####       the counters dictionary.
        ####       the Short Names of the voice types to be used.
        ####       the file number to include in the filename.
        ####       the absolute path where to save the audio file.
        #### Returns:
        ####       the status code (should be 200 if all good).
        ####       the audio file name created.
        #
        #base_url = 'https://westus.tts.speech.microsoft.com/'
        base_url = 'https://westeurope.tts.speech.microsoft.com/'
        path = 'cognitiveservices/v1'
        constructed_url = base_url + path
        headers = {
            'Authorization': 'Bearer ' + self.access_token,
            'Content-Type': 'application/ssml+xml',
            #'X-Microsoft-OutputFormat': 'riff-24khz-16bit-mono-pcm',
            'X-Microsoft-OutputFormat': 'riff-16khz-16bit-mono-pcm', # Deepspeech needs 16kHz, mono, PCM encoded wav files
            'User-Agent': 'YOUR_RESOURCE_NAME'
        }
        xml_body = ElementTree.Element('speak', version='1.0')
        xml_body.set('{http://www.w3.org/XML/1998/namespace}lang', 'en-us')
        voice = ElementTree.SubElement(xml_body, 'voice')
        voice.set('{http://www.w3.org/XML/1998/namespace}lang', 'en-US')
        #voice.set('name', 'en-US-Guy24kRUS') # Short name for 'Microsoft Server Speech Text to Speech Voice (en-US, Guy24KRUS)'
        voice.set('name', inVoiceTypeShortName) # Short name for 'Microsoft Server Speech Text to Speech Voice
        voice.text = self.tts
        body = ElementTree.tostring(xml_body)
        #
        ## sometimes azure response = 503
        ##    https://docs.microsoft.com/en-us/rest/api/searchservice/http-status-codes
        ##    Solution: make API call up to 3 times with random delay
        for _ in range(3):
            counters['countApiCallsMade'] += 1
            response = requests.post(constructed_url, headers=headers, data=body)
            if response.status_code != 503:
                break
            sleep503Time = randint(3, 10)
            print(f"\nAure response = 503, so sleeping for {sleep503Time} seconds.\n")
            logging.warning(f"\nAure response = 503, so sleeping for {sleep503Time} seconds.\n")
            time.sleep(sleep503Time)
            #
        #
        #If a success response is returned, then the binary audio is written to the file.
        #
        outWavFile = '' # if conversion failed then this will remain empty string
        ## if the conversion is successful then status code will be 200.
        if response.status_code == 200:
            # the wav file is created at the location sent during function call via wavFilePath variable
            outWavFile = 'azureSTT_' + str(fileNumber) + '_' + inVoiceTypeShortName + \
                         '_' + time.strftime("%Y%m%d-%H%M%S") + '.wav'
            with open(wavFilePath + outWavFile, 'wb') as audio:
                audio.write(response.content)
                #print(f"\nSuccess -- file number = {fileNumber}\nfile created: {outWavFile}")
                logging.warning(f"\nSuccess -- file number = {fileNumber}\nfile created: {outWavFile}")
                counters['countAudioConversionSuccess'] += 1
        else:
            #print(f"\nFAILED -- file number = {fileNumber}\nresponse.status_code: {str(response.status_code)}\nsentence=\n{self.tts}")
            logging.warning(f"\nFAILED -- file number = {fileNumber}\nresponse.status_code: {str(response.status_code)}\nsentence=\n{self.tts}")
            counters['countAudioConversionFailed'] += 1
        #
        return response.status_code, outWavFile
    #
    def get_voices_list(self):
        #base_url = 'https://westus.tts.speech.microsoft.com/'
        base_url = 'https://westeurope.tts.speech.microsoft.com/'
        path = 'cognitiveservices/voices/list'
        constructed_url = base_url + path
        headers = {
            'Authorization': 'Bearer ' + self.access_token,
        }
        response = requests.get(constructed_url, headers=headers)
        if response.status_code == 200:
            print("\nAvailable voices: \n" + response.text)
        else:
            print("\nStatus code: " + str(response.status_code) + "\nSomething went wrong. Check your subscription key and headers.\n")
    #
    ### my code
    def set_text_to_convert(self, inText):
        if isinstance(inText, str):
            self.tts = inText
            return True
        else:
            return False
#
#
############################################################
#
#
## Each worker to process the number of sentences passed. For each sentence process for each
##      voice type passed to it.
def workerFunction(taskApiCallThrottleSleepTime, taskApiCallThrottleLimit, taskNumberIn, voiceTypeShortNameListIn, dfIpData, outWavFilesPath, outFilePath, outFileNameFullData, outFileName4Deepspeech, startingFileNumber = 1, accessTokenRecheckInSeconds = 540, statusRowPrintFreq = 50):
    ''' Converts the input sentences to wav files and stores them.
    Inputs:
           taskApiCallThrottleSleepTime
                 Time in seconds to sleep during throttling.
           taskApiCallThrottleLimit
                 Number of Api calls to Azure after which to sleep (throttling to avoid 429 response).
           taskNumberIn
                 Unique number to be passed for each worker function. Is used to create filenames,
                 for logging, etc.
           voiceTypeShortNameListIn
                 List of the Azure Short Names for the types of voices to be used for audio output.
           dfIpData
                 Dataframe with sentences that should be converted.
           outWavFilesPath
                 Location to store the wav files.
           outFilePath
                 Location to store the output CSV files.
           outFileNameFullData
                 Filename to store the full data which includes the voice type short name.
           outFileName4Deepspeech
                 Filename to store the Deepspeech data (excluding voice type short name).
           startingFileNumber
                 The position of the starting sentence as per the full data read in from input file.
                 Default value = 1.
           accessTokenRecheckInSeconds
                 Time after which the token should be regenerated automatically.
                 Default value = 540 seconds.
            statusRowPrintFreq
                 Will print status for row number and sentence every these many sentences from input data.
                 Default value = 50 rows.
    '''
    #
    myStr = f"\n\n\n ************ Worker Function Invoked for taskNumber = {taskNumberIn} ************\n" + \
          f"    Number of sentences passed to worker = {len(dfIpData)}\n\n\n"
    print(f"{myStr}")
    logging.warning(f"{myStr}")
    #
    ## create the object for the TTS processing
    app = TextToSpeech('xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx') ## -- paid one
    #
    app.get_token()
    myStr = f"\nFirst time Access Token = \n{app.access_token}" + \
          f"\nFirst time Access Token Start Time = \n{app.access_token_startTime}\n"
    print(f"{myStr}")
    logging.warning(f"{myStr}")
    #
    ## access each row from input datafame, send the sentence via api to convert to audio,
    ##        save the sentence, audio file with absolute path and the filesize
    ##        as required for Deepspeech training.
    #
    ## When the final output CSV file is written for Deepspeech, then the voiceShortName will not be included.
    #
    ## Deepspeech training file does not need voiceShortName, but capturing for now
    dfOutColumns = [ 'voiceShortName' , 'wav_filename' , 'wav_filesize' , 'transcript']
    dfOutColumnsDtypes = [str, str, int, str]
    dfOut = pd.DataFrame(columns = dfOutColumns)
    #dfOut = dfOut.astype({"wav_filename": str, "wav_filesize": int, 'transcript': str})
    dfOut = dfOut.astype(dict(zip(dfOutColumns, dfOutColumnsDtypes)))
    #
    ## add slash at end if not there already
    if outFilePath[-1] != '/':
        outFilePath = outFilePath + '/'
    if outWavFilesPath[-1] != '/':
        outWavFilesPath = outWavFilesPath + '/'
    #
    ## Note: these counters are only for the worker function and will be aggregated into
    ##       overall counters later by the main process.
    workerCounters = { 'countRowsReadIn': 0 , 'countRowsWrittenToDfOut': 0 , 'countAudioConversionSuccess': 0 , \
            'countAudioConversionFailed': 0 , 'countInvalidSentences': 0, 'countApiCallsMade': 0}
    #
    ## track how many characters were sent to Azure
    workerCharCount = 0
    #
    ## this should be remain 0 ideally, the flag for 429 will tell if 429 error occurred
    processingReturnCode = 0
    #
    ## ideally this should never have been set to True
    flagRc429Encountered = False  ## check any Azure call had 429 response code
    #
    ## when the worker function is called the correct file number is passed. Reducing now as it is
    ##      first incremented in the loop.
    startingFileNumber -= 1
    #
    ## start processing the sentences from dfin for each voice type required
    for row in dfIpData.itertuples():
        #
        if processingReturnCode != 0 or flagRc429Encountered == True:
            break
        #
        workerCounters['countRowsReadIn'] += 1
        startingFileNumber += 1  ## same file number should remain for a particular sentence
        #
        ## Convert named tuple to dictionary
        ## using as per this link: https://thispointer.com/pandas-6-different-ways-to-iterate-over-rows-in-a-dataframe-update-while-iterating-row-by-row/
        dictRow = row._asdict()
        #
        textToConvert = dictRow['sentence']
        #
        ## log each sentence , but print a status only as per input frequency
        myStr = f"\n ----------------------- **** Processing sentence # {workerCounters['countRowsReadIn']} *** -----------------------" + \
                f"\nSentence =\n{textToConvert}"
        logging.warning(f"{myStr}")
        if workerCounters['countRowsReadIn'] % statusRowPrintFreq == 0 or workerCounters['countRowsReadIn'] == 1:
            print(f"{myStr}")
        #
        if textToConvert is not None and textToConvert != '':
            if app.set_text_to_convert(textToConvert) == False:
                print(f"\n\nFatal Error: Problem setting text in the app object.\n\n")
                logging.warning(f"\n\nFatal Error: Problem setting text in the app object.\n\n")
                processingReturnCode = 100
                break
        else:
            #print(f"\nSentence was None or empty string. Continuing to next sentence.\n")
            logging.warning(f"\nSentence was None or empty string. Continuing to next sentence.\n")
            workerCounters['countInvalidSentences'] += 1
            continue ## move to the next sentence
        #
        ## sentence is fine and set in the app object, so now convert for each type of voice
        #
        for idxVoiceType, voiceTypeShortName in enumerate(voiceTypeShortNameListIn):
            #print(f"\n **** Voice = {idxVoiceType + 1} *** \t\tvoice Short Name = {voiceTypeShortName}")
            logging.warning(f"\n **** Voice = {idxVoiceType + 1} *** \t\tvoice Short Name = {voiceTypeShortName}")
            #
            ## throttling to avoid response 429 from Azure
            ## https://docs.microsoft.com/en-us/azure/cognitive-services/speech-service/faq-text-to-speech
            if workerCounters['countApiCallsMade'] != 0 and workerCounters['countApiCallsMade'] % taskApiCallThrottleLimit == 0:
                #print(f"\nSleeping for {taskApiCallThrottleSleepTime} seconds, api calls made so far = {workerCounters['countApiCallsMade']}")
                logging.warning(f"\nSleeping for {taskApiCallThrottleSleepTime} seconds, api calls made so far = {workerCounters['countApiCallsMade']}")
                time.sleep(taskApiCallThrottleSleepTime)
                #print(f"\nFinished sleep, continuing....\n")
                logging.warning(f"\nFinished sleep, continuing....\n")
            #
            ## check the current token issue time. If more than threshold time has passed, then get new token again.
            app.check_tokenGenerationTime_and_regenerateIfRequired(accessTokenRecheckInSeconds)
            #
            ## make api call to convert audio. also increments the counters for audio conversion
            ##      success, failed, api calls made.
            workerCharCount += len(textToConvert)
            try:
                appSaveAudioRespCode, savedWavFilename = app.save_audio( workerCounters, voiceTypeShortName, startingFileNumber, outWavFilesPath )
            except:
                ## unexpected issue (e.g. had a OSError: Tunnel connection failed: 502 Could not relay message upstream)
                ##     this killed process midway -> no output CSV file and NO REJOIN with main process.
                ##     so main process did not proceed and waits infinitely.
                ## if there are already any entries in dfOut then write to CSV file and rejoin the main process
                processingReturnCode = 11000 ## using huge value
                myStr = f"\n" + \
                        "\n ------ Unhandled exception occurred during save_audio call ------ " * 2 + \
                        "\n"
                print(f"{myStr}")
                logging.warning(f"{myStr}")
                break
                #
            ## azure api call returns with response code 200 if all is ok.
            ##       overwriting processingReturnCode only if it is serious.
            if appSaveAudioRespCode != 200:
                if appSaveAudioRespCode == 429:
                    flagRc429Encountered = True
                    break
                #
                ## its not 429, something serious
                #print(f"\n\nProblem with audio conversion. Azure response Status Code = {appSaveAudioRespCode}\n\n")
                logging.warning(f"\n\nProblem with audio conversion. Azure response Status Code = {appSaveAudioRespCode}\n\n")
                processingReturnCode = appSaveAudioRespCode
                break
            #
            ## no major problems encountered so far, get the filesize
            try:
                ## get the filesize and make entry in dataframe for output file
                fullPathWavFile = outWavFilesPath + savedWavFilename
                wavFileSize = os.path.getsize(fullPathWavFile)
                dfOutColumnsData = [ voiceTypeShortName, fullPathWavFile , wavFileSize , textToConvert ]
                dfOut = dfOut.append( dict(zip(dfOutColumns, dfOutColumnsData)) , ignore_index = True)
                workerCounters['countRowsWrittenToDfOut'] += 1
            except:
                print(f"\n\nFatal Error: Problem getting the wav file size or making entry for output dataframe.\n\n")
                logging.warning(f"\n\nFatal Error: Problem getting the wav file size or making entry for output dataframe.\n\n")
                processingReturnCode = 300
                break
            #
        #
    #
    if processingReturnCode != 0:
        myStr = f"\n" + \
              f"\nFatal Error: " + \
              f"### PROBLEM " * 4 + \
              f"\nFatal Error: " + \
              f"### PROBLEM " * 4
        print(f"{myStr}")
        logging.warning(f"{myStr}")
    #
    myStr = f"\n\nSentence+voiceType processing done." + \
          f"\nprocessingReturnCode = {processingReturnCode}\n" + \
          f"\n\nCreating output files...\n"
    #print(f"{myStr}")
    logging.warning(f"{myStr}")
    #
    ## Write full output file including voice type short name
    dfOut.to_csv(outFilePath + outFileNameFullData, index = False)
    #
    ## Write output file for Deepspeech training -- only the three required columns
    dfOutForFile = dfOut[['wav_filename' , 'wav_filesize' , 'transcript']]
    dfOutForFile.to_csv(outFilePath + outFileName4Deepspeech, index = False)
    #
    # Print the summary info for the worker
    #
    myStr = f"\n\n ############# WORKER SUMMARY for TASK NUMBER = {taskNumberIn} #############" + \
        f"\nFull Data CSV file created here:\n{outFilePath + outFileNameFullDataForTask}" + \
        f"\nDeepspeech training CSV file created here:\n{outFilePath + outFileName4DeepspeechForTask}" + \
        f"\nWAV files location:\n{outWavFilesPath}" + \
        f"\nSentences in for worker                    = {len(dfIpData)}" + \
        f"\nVoices types input                         = {len(voiceTypeShortNameListIn)}" + \
        f"\nSentences read in                          = {workerCounters['countRowsReadIn']}" + \
        f"\nInvalid Sentences                          = {workerCounters['countInvalidSentences']}" + \
        f"\nAssuming all good, there should be these number of rows in the output file:\n"+ \
        f"     numSentences * numVoiceTypes = {len(dfIpData) * len(voiceTypeShortNameListIn)}" + \
        f"\nData written to output dataframe           = {workerCounters['countRowsWrittenToDfOut']}" + \
        f"\nRows in dfOut                              = {len(dfOut)}" + \
        f"\nSuccessful audio conversion                = {workerCounters['countAudioConversionSuccess']}" + \
        f"\nFailed audio conversion                    = {workerCounters['countAudioConversionFailed']}" + \
        f"\nAPI calls made to Azure                    = {workerCounters['countApiCallsMade']}" + \
        f"\nNo. of characters processed                = {workerCharCount}" + \
        f"\nProcessing return code                     = {processingReturnCode}" + \
        f"\n     Status flag for 429 error             = {flagRc429Encountered}"
    print(f"{myStr}")
    logging.warning(f"{myStr}")
    #
    ## completed processing by the worker
    print(f"\n\nWorker function completed and exiting for task number = {taskNumberIn}.")
    logging.warning(f"\n\nWorker function completed and exiting for task number = {taskNumberIn}.")
    #
    return processingReturnCode, workerCounters, flagRc429Encountered, workerCharCount 
    #
#
#
############################################################
#
#
class Consumer(multiprocessing.Process):
    def __init__(self, task_queue, result_queue):
        multiprocessing.Process.__init__(self)
        self.task_queue = task_queue
        self.result_queue = result_queue

    def run(self):
        proc_name = self.name
        while True:
            next_task = self.task_queue.get()
            if next_task is None:
                # Poison pill means shutdown this process
                self.task_queue.task_done()
                print(f"\nNone Task poison pill picked and EXITING for consumer {self.name}\n")
                logging.warning(f"\nNone Task poison pill picked and EXITING for consumer {self.name}\n")
                break
            print(f"\nTaskNumber {next_task.taskNumber} PICKED by Consumer process = {proc_name}\n")
            logging.warning(f"\nTaskNumber {next_task.taskNumber} PICKED by Consumer process = {proc_name}\n")
            answer = next_task()  ## taskObject() automatically calls the __call__ magic function.
            print(f"\nTaskNumber {next_task.taskNumber} COMPLETED by Consumer process = {proc_name}\n")
            logging.warning(f"\nTaskNumber {next_task.taskNumber} COMPLETED by Consumer process = {proc_name}\n")
            self.task_queue.task_done()
            self.result_queue.put(answer)
            print(f"\nRESULT queue entry by Consumer process = {proc_name}\n")
            logging.warning(f"\nRESULT queue entry by Consumer process = {proc_name}\n")
        return
#
#
############################################################
#
#
class Task(object):
    def __init__(self, taskApiCallThrottleSleepTime, taskApiCallThrottleLimit, taskNumber, voiceTypeShortNames, dfIpDataForTask, outWavFilesPath, outFilePath, outFileNameFullDataForTask, outFileName4DeepspeechForTask, startingFileNumberForTask, accessTokenRecheckInSeconds, statusRowPrintFreq):
        self.taskApiCallThrottleSleepTime = taskApiCallThrottleSleepTime
        self.taskApiCallThrottleLimit = taskApiCallThrottleLimit
        self.taskNumber = taskNumber
        self.voiceTypeShortNames = voiceTypeShortNames
        self.dfIpDataForTask = dfIpDataForTask
        self.outWavFilesPath = outWavFilesPath
        self.outFilePath = outFilePath
        self.outFileNameFullDataForTask = outFileNameFullDataForTask
        self.outFileName4DeepspeechForTask = outFileName4DeepspeechForTask
        self.startingFileNumberForTask = startingFileNumberForTask
        self.accessTokenRecheckInSeconds = accessTokenRecheckInSeconds
        self.flagRc429Encountered = None
        self.statusRowPrintFreq = statusRowPrintFreq
        self.taskReturnRC = None
        self.taskCounters = None
        self.taskCharCount = 0
        #
        print(f"\nCreated Task object with taskNumber = {self.taskNumber}\n")
        logging.warning(f"\nCreated Task object with taskNumber = {self.taskNumber}\n")
    #
    def __call__(self):
        self.taskReturnRC = None
        self.taskCounters = None
        self.flagRc429Encountered = None
        self.taskCharCount = 0
        self.taskReturnRC, self.taskCounters, self.flagRc429Encountered, self.taskCharCount = workerFunction( self.taskApiCallThrottleSleepTime, self.taskApiCallThrottleLimit, self.taskNumber, self.voiceTypeShortNames, self.dfIpDataForTask, self.outWavFilesPath, self.outFilePath, self.outFileNameFullDataForTask, self.outFileName4DeepspeechForTask, self.startingFileNumberForTask, self.accessTokenRecheckInSeconds, self.statusRowPrintFreq )
        return self.taskNumber, self.taskReturnRC, self.taskCounters, self.outFileNameFullDataForTask, self.outFileName4DeepspeechForTask, self.flagRc429Encountered, self.taskCharCount
    #
    def __str__(self):
        return
#
#
############################################################
#
#

In [5]:
#
## track how many characters were sent to Azure TTS
## Paid version allows 20 concurrent requests  (free allows only 1)
## https://azure.microsoft.com/en-us/pricing/details/cognitive-services/speech-services/
## Costs for Standard TTS:
## Paid version - 4$ per 1 million characters
overallCharCountTrack = 0
#
## Read the input file.
#
# The file has these three columns in a csv format:
#             sentence,pageNum,sentenceLen
#
inFilePath = '/home/rohit/dpspTraining/data/wavFiles/azure1/pdfExtraction/'
#inFilePath = '/home/rohit/dpspTraining/data/azure/pdfExtraction/'
inFileName = 'System_800xA_Summary.pdf_extractedText_4.csv'
inPdFile = inFilePath + inFileName
#
## The output file expected for the Deepspeech training has three columns:
#             wav_filename,wav_filesize,transcript
#
myStr = f"\n    *************************************************************** " + \
      f"\n    Input file =\n{inPdFile}" + \
      f"\n    *************************************************************** " + \
      f"\n\n"
print(f"{myStr}")
logging.warning(f"{myStr}")
#
colsToReadIn = ['sentence']
#
# set rowsToRead = -1 to read the whole dataframe in one shot, else specify the number accordingly
#rowsToRead = -1
rowsToRead = 150
rowsToSkip = 800
if rowsToRead == -1:
    dfIn = pd.read_csv( inPdFile, sep=',', usecols = colsToReadIn, skiprows = range(1, rowsToSkip+1), header = 0, low_memory=False )
else:
    dfIn = pd.read_csv( inPdFile, sep=',', nrows = rowsToRead, usecols = colsToReadIn, skiprows = range(1, rowsToSkip+1), header = 0, low_memory=False )
#
myStr = f'\n\t# Rows read into dataframe = {rowsToRead if rowsToRead != -1 else "all the rows"}' + \
      f"\n\t# Rows Skipped = {rowsToSkip}" + \
      f"\n\tdfIn.shape = {dfIn.shape}"
print(f"{myStr}")
logging.warning(f"{myStr}")
#
## The output file expected for the Deepspeech training has three columns:
#             wav_filename,wav_filesize,transcript
#
outWavFilesPath = '/home/rohit/dpspTraining/data/wavFiles/azure1/pdfExtraction/workerLogic_801_950_1950wavs/convertedWavs_801_950_1950wavs/'
#
outFilePath = '/home/rohit/dpspTraining/data/wavFiles/azure1/pdfExtraction/workerLogic_801_950_1950wavs/'
#
### NOTE: Specify the two output filenames WITHOUT .csv --- logic to append worker function number and
###               .csv extension is present later
#
# file for Deepspeech -- only 3 columns
### NOTE -- do NOT specifiy file extension
outFileName4Deepspeech = 'System_800xA_Summary.pdf_extractedText_4_forDS'
# file for full data -- all 4 columns -- including the Short Name of Voice Type
### NOTE -- do NOT specifiy file extension
outFileNameFullData = 'System_800xA_Summary.pdf_extractedText_4_NOT_forDS'
#
## number of API calls before throttling in each process and time to sleep in seconds
apiCallThrottleLimit = 20
apiCallThrottleSleepTime = 10
#
## automatically regenerate access token after these many seconds
accessTokenRecheckInSeconds = 300
#
## how many sentences should each worker process = batch size for worker
## note: Actual number of audio files expected to be created by each worker =
##       batch size per worker * number of voice types in list
##       This larger value will be the number of API calls made each worker.
batchSizeForWorker = 10
#
## how frequently should it print status to console -- based on row read in from input file
statusRowPrintFreq = 5
#
if len(dfIn) %  batchSizeForWorker == 0:
    numberOfTasksRequired = int (len(dfIn) / batchSizeForWorker)
else:
    numberOfTasksRequired = int (len(dfIn) / batchSizeForWorker) + 1
#
myStr = f"\nTotal rows to process = {len(dfIn)}" + \
      f"\nBatch size for worker = {batchSizeForWorker}" + \
      f"\nTasks required = {numberOfTasksRequired}"
print(f"{myStr}")
logging.warning(f"{myStr}")
#
## Note: these counters are the aggregates for the whole processing.
##       They will be aggregated with the counters returned
##       by each Worker Function that processes part of the input data.
overallCounters = { 'countRowsReadIn': 0 , 'countRowsWrittenToDfOut': 0 , 'countAudioConversionSuccess': 0 , \
            'countAudioConversionFailed': 0 , 'countInvalidSentences': 0, 'countApiCallsMade': 0}
#
## flag to track if any of the tasks were completed with the task flag for 429 error as True
overallFlagRc429Encountered = False
#
## setup for multiprocessing -- create queues, add the tasks to queue, create and start consumers,
##       wait for all consumers to join, process results, show final summary
#
## establish communication queues
tasks = multiprocessing.JoinableQueue()
results = multiprocessing.Queue()
#
## create the Tasks and add them to the task queue
for taskNumber in range( numberOfTasksRequired ):
    #
    startDataPosititionForTask = batchSizeForWorker * taskNumber  ## starts with value 0
    #
    startingFileNumberForTask = rowsToSkip + startDataPosititionForTask + 1  ## starts with value 1
    #
    outFileNameFullDataForTask = outFileNameFullData + '_task_' + str(taskNumber) + '.csv'
    #
    outFileName4DeepspeechForTask = outFileName4Deepspeech  + '_task_' + str(taskNumber) + '.csv'
    #
    voiceTypeShortNames = [ 'en-AU-Catherine', 'en-AU-HayleyRUS', 'en-CA-HeatherRUS', 'en-CA-Linda', \
                          'en-GB-George-Apollo', 'en-GB-HazelRUS', 'en-GB-Susan-Apollo', \
                          'en-IN-Heera-Apollo', 'en-IN-PriyaRUS', 'en-IN-Ravi-Apollo', \
                          'en-US-BenjaminRUS', 'en-US-JessaRUS', 'en-US-ZiraRUS' ]
    #voiceTypeShortNames = [ 'en-AU-Catherine', 'en-AU-HayleyRUS', 'en-CA-HeatherRUS' ]
    #voiceTypeShortNames = [ 'en-AU-Catherine', 'en-AU-HayleyRUS' ]
    #
    dfIpDataForTask = dfIn.iloc[ startDataPosititionForTask : startDataPosititionForTask + batchSizeForWorker , : ]
    dfIpDataForTask.reset_index(drop=True, inplace=True)
    #
    myStr = f"\n\n\n\n****** taskNumber = {taskNumber}  ****** Data for task: ******" + \
          f"\n\taccessTokenRecheckInSeconds     = {accessTokenRecheckInSeconds}" + \
          f"\n\tapiCallThrottleLimit            = {apiCallThrottleLimit}" + \
          f"\n\tapiCallThrottleSleepTime        = {apiCallThrottleSleepTime}" + \
          f"\n\tstatusRowPrintFreq              = {statusRowPrintFreq}" + \
          f"\n\tVoices passed                   = {len(voiceTypeShortNames)}" + \
          (f"\n\tShort names of voices:")
    for voiceType in voiceTypeShortNames:
        myStr += f"\n\t\t\t{voiceType}"
    myStr += f"\n\tstartDataPosititionForTask      = {startDataPosititionForTask}" + \
          f"\n\tstartingFileNumberForTask       = {startingFileNumberForTask}" + \
          f"\n\tSize of dataframe passed        = {dfIpDataForTask.shape}" + \
          f"\n\toutFileNameFullDataForTask      = {outFileNameFullDataForTask}" + \
          f"\n\toutFileName4DeepspeechForTask   = {outFileName4DeepspeechForTask}" + \
          f"\n\tfirst sentence                  =\n{dfIpDataForTask['sentence'].iloc[0]}" + \
          f"\n\tlast  sentence                  =\n{dfIpDataForTask['sentence'].iloc[-1]}"
    print(f"{myStr}")
    logging.warning(f"{myStr}")
    #
    ## create Task in queue for some worker to pick up later -- each of these will be objects of the Task Class
    tasks.put( Task(apiCallThrottleSleepTime, apiCallThrottleLimit, taskNumber, voiceTypeShortNames, dfIpDataForTask, outWavFilesPath, outFilePath, outFileNameFullDataForTask, outFileName4DeepspeechForTask, startingFileNumberForTask, accessTokenRecheckInSeconds, statusRowPrintFreq ))
    #
#
print(f"\n\nCreated all the tasks  = {numberOfTasksRequired}\n\n")
logging.warning(f"\n\nCreated all the tasks  = {numberOfTasksRequired}\n\n")
#
## create the consumers = number of cores available
numConsumers = multiprocessing.cpu_count()
consumers = [Consumer(tasks, results) for i in range(numConsumers)]
print(f"\n\nCreated one consumer per core. No. of consumers = {numConsumers}.\n\n")
logging.warning(f"\n\nCreated one consumer per core. No. of consumers = {numConsumers}.\n\n")
#
## add a poison pill for each consumer in the Task queue
##     -- one None will be picked up by each worker started and will stop processing.
for i in range(numConsumers):
    tasks.put(None)
print(f"\n\nPoison pill None Tasks added to task queue = {numConsumers}.\n\n")
logging.warning(f"\n\nPoison pill None Tasks added to task queue = {numConsumers}.\n\n")
#
## start the consumers -- as all the tasks and poison pills have been added to the task queue
for w in consumers:
    w.start()
print(f"\n\nStarted all Consumers.\n\n")
logging.warning(f"\n\nStarted all Consumers.\n\n")
#
## wait for all of the tasks to finish
print(f"\n\nWaiting for all tasks to join.\n\n")
logging.warning(f"\n\nWaiting for all tasks to join.\n\n")
#
tasks.join()
#
print(f"\n\n\n**********  All tasks have rejoined.  **********\n\n")
logging.warning(f"\n\n\n**********  All tasks have rejoined.  **********\n\n")
#
# Start processing results
print(f"\n")
logging.warning(f"\n")
for jobNumber in range(numberOfTasksRequired):
    result = results.get()
    myStr = f"\n\n\nJob number = {jobNumber}: Picked up result for Task number = {result[0]}" + \
          f"\nworkerReturnRC = {result[1]}" + \
          f"\nworkerCounters =\n{result[2]}" + \
          f"\nFile made for Full Data =\n{result[3]}" + \
          f"\nFile made for DS =\n{result[4]}" + \
          f"\nFlag for 429 error = {result[5]}" + \
          f"\nNo. of characters processed = {result[6]}"
    print(f"{myStr}")
    logging.warning(f"{myStr}")
    #
    ## update the overall counters
    for key in overallCounters.keys():
        ## result[2] is the counters for each task
        overallCounters[key] += result[2][key]
    #
    ## update the overallFlagRc429Encountered: if it is still False, then set it to the flag from task
    if overallFlagRc429Encountered == False:
        overallFlagRc429Encountered = result[5]
    #
    ## get number of characters processed
    overallCharCountTrack += result[6]
#
## FINAL SUMMARY INFO
#
myStr = f"\n\n\n\n\n " + f"########## SUMMARY FINAL " * 3 + "##########\n" + \
      f"\nOverall Counters:"
for k,v in overallCounters.items():
    myStr += f"\n\t\t" + f"{k:35} = {v}"
myStr += f"\nOverall no. of characters processed = {overallCharCountTrack}" + \
      f"\nOutput CSV files created here       =\n{outFilePath}" + \
      f"\nWav files created here              =\n{outWavFilesPath}" + \
      f"\nOverall flag for 429 problem        = {overallFlagRc429Encountered}." + \
      f"\tSo, {'no 429 problem.' if overallFlagRc429Encountered == False else '429 problem occurred.'}" + \
      f"\nLog file created here               =\n{logFilenameWithPath}"
print(f"{myStr}")
logging.warning(f"{myStr}")
#
print(f"\n\n\nNormal exit\n")
logging.warning(f"\n\n\nNormal exit\n")


    *************************************************************** 
    Input file =
/home/rohit/dpspTraining/data/wavFiles/azure1/pdfExtraction/System_800xA_Summary.pdf_extractedText_4.csv
    *************************************************************** 



	# Rows read into dataframe = 150
	# Rows Skipped = 800
	dfIn.shape = (150, 1)

Total rows to process = 150
Batch size for worker = 10
Tasks required = 15




****** taskNumber = 0  ****** Data for task: ******
	accessTokenRecheckInSeconds     = 300
	apiCallThrottleLimit            = 20
	apiCallThrottleSleepTime        = 10
	statusRowPrintFreq              = 5
	Voices passed                   = 13
	Short names of voices:
			en-AU-Catherine
			en-AU-HayleyRUS
			en-CA-HeatherRUS
			en-CA-Linda
			en-GB-George-Apollo
			en-GB-HazelRUS
			en-GB-Susan-Apollo
			en-IN-Heera-Apollo
			en-IN-PriyaRUS
			en-IN-Ravi-Apollo
			en-US-BenjaminRUS
			en-US-JessaRUS
			en-US-ZiraRUS
	startDataPosititionForTask      = 0
	startingFileNumberFo


TaskNumber 0 PICKED by Consumer process = Consumer-1


TaskNumber 1 PICKED by Consumer process = Consumer-2



 ************ Worker Function Invoked for taskNumber = 0 ************
    Number of sentences passed to worker = 10





TaskNumber 2 PICKED by Consumer process = Consumer-3



 ************ Worker Function Invoked for taskNumber = 1 ************
    Number of sentences passed to worker = 10




TaskNumber 3 PICKED by Consumer process = Consumer-4




 ************ Worker Function Invoked for taskNumber = 2 ************
    Number of sentences passed to worker = 10




TaskNumber 4 PICKED by Consumer process = Consumer-5




 ************ Worker Function Invoked for taskNumber = 4 ************
    Number of sentences passed to worker = 10



TaskNumber 5 PICKED by Consumer process = Consumer-6






 ************ Worker Function Invoked for taskNumber = 3 ************
    Number of sentences passed to worker = 10





 ************ Worker Function Invoked for taskNumber = 5 *



 ----------------------- **** Processing sentence # 1 *** -----------------------
Sentence =
this supports the concept of users forwardslash user groups having different authority for different areas of the system

First time Access Token = 
eyJhbGciOiJodHRwOi8vd3d3LnczLm9yZy8yMDAxLzA0L3htbGRzaWctbW9yZSNobWFjLXNoYTI1NiIsInR5cCI6IkpXVCJ9.eyJpc3MiOiJ1cm46bXMuY29nbml0aXZlc2VydmljZXMiLCJleHAiOiIxNTc0OTUyNTQ2IiwicmVnaW9uIjoid2VzdGV1cm9wZSIsInN1YnNjcmlwdGlvbi1pZCI6IjcxYWI3NTY3YWJhMzRkNmU5ZWI0N2E4MDk5MzliMjE3IiwicHJvZHVjdC1pZCI6IlNwZWVjaFNlcnZpY2VzLlMwIiwiY29nbml0aXZlLXNlcnZpY2VzLWVuZHBvaW50IjoiaHR0cHM6Ly9hcGkuY29nbml0aXZlLm1pY3Jvc29mdC5jb20vaW50ZXJuYWwvdjEuMC8iLCJhenVyZS1yZXNvdXJjZS1pZCI6Ii9zdWJzY3JpcHRpb25zL2Q3ZWU5Y2NjLThiZGUtNDVjNi1iZWNlLTA5NDQwNDcxNDNhOC9yZXNvdXJjZUdyb3Vwcy9zcGVlY2h0ZXN0L3Byb3ZpZGVycy9NaWNyb3NvZnQuQ29nbml0aXZlU2VydmljZXMvYWNjb3VudHMvc3BlZWNoZnVsbCIsInNjb3BlIjoic3BlZWNoc2VydmljZXMiLCJhdWQiOiJ1cm46bXMuc3BlZWNoc2VydmljZXMud2VzdGV1cm9wZSJ9.zuzKxpEsUp5n27bYLEhaKukYzFhEg1yrEf





 ************ Worker Function Invoked for taskNumber = 9 ************
    Number of sentences passed to worker = 10




First time Access Token = 
eyJhbGciOiJodHRwOi8vd3d3LnczLm9yZy8yMDAxLzA0L3htbGRzaWctbW9yZSNobWFjLXNoYTI1NiIsInR5cCI6IkpXVCJ9.eyJpc3MiOiJ1cm46bXMuY29nbml0aXZlc2VydmljZXMiLCJleHAiOiIxNTc0OTUyNjk3IiwicmVnaW9uIjoid2VzdGV1cm9wZSIsInN1YnNjcmlwdGlvbi1pZCI6IjcxYWI3NTY3YWJhMzRkNmU5ZWI0N2E4MDk5MzliMjE3IiwicHJvZHVjdC1pZCI6IlNwZWVjaFNlcnZpY2VzLlMwIiwiY29nbml0aXZlLXNlcnZpY2VzLWVuZHBvaW50IjoiaHR0cHM6Ly9hcGkuY29nbml0aXZlLm1pY3Jvc29mdC5jb20vaW50ZXJuYWwvdjEuMC8iLCJhenVyZS1yZXNvdXJjZS1pZCI6Ii9zdWJzY3JpcHRpb25zL2Q3ZWU5Y2NjLThiZGUtNDVjNi1iZWNlLTA5NDQwNDcxNDNhOC9yZXNvdXJjZUdyb3Vwcy9zcGVlY2h0ZXN0L3Byb3ZpZGVycy9NaWNyb3NvZnQuQ29nbml0aXZlU2VydmljZXMvYWNjb3VudHMvc3BlZWNoZnVsbCIsInNjb3BlIjoic3BlZWNoc2VydmljZXMiLCJhdWQiOiJ1cm46bXMuc3BlZWNoc2VydmljZXMud2VzdGV1cm9wZSJ9.Qx0SAgBoVZ2j7TTmsHRHBxUElhZs4nm-2PH1DR7xsW8
First time Access Token Start Time = 
20191128-154338


 ----------------------- **



RESULT queue entry by Consumer process = Consumer-8


TaskNumber 12 PICKED by Consumer process = Consumer-8




 ************ Worker Function Invoked for taskNumber = 12 ************
    Number of sentences passed to worker = 10




First time Access Token = 
eyJhbGciOiJodHRwOi8vd3d3LnczLm9yZy8yMDAxLzA0L3htbGRzaWctbW9yZSNobWFjLXNoYTI1NiIsInR5cCI6IkpXVCJ9.eyJpc3MiOiJ1cm46bXMuY29nbml0aXZlc2VydmljZXMiLCJleHAiOiIxNTc0OTUyNzExIiwicmVnaW9uIjoid2VzdGV1cm9wZSIsInN1YnNjcmlwdGlvbi1pZCI6IjcxYWI3NTY3YWJhMzRkNmU5ZWI0N2E4MDk5MzliMjE3IiwicHJvZHVjdC1pZCI6IlNwZWVjaFNlcnZpY2VzLlMwIiwiY29nbml0aXZlLXNlcnZpY2VzLWVuZHBvaW50IjoiaHR0cHM6Ly9hcGkuY29nbml0aXZlLm1pY3Jvc29mdC5jb20vaW50ZXJuYWwvdjEuMC8iLCJhenVyZS1yZXNvdXJjZS1pZCI6Ii9zdWJzY3JpcHRpb25zL2Q3ZWU5Y2NjLThiZGUtNDVjNi1iZWNlLTA5NDQwNDcxNDNhOC9yZXNvdXJjZUdyb3Vwcy9zcGVlY2h0ZXN0L3Byb3ZpZGVycy9NaWNyb3NvZnQuQ29nbml0aXZlU2VydmljZXMvYWNjb3VudHMvc3BlZWNoZnVsbCIsInNjb3BlIjoic3BlZWNoc2VydmljZXMiLCJhdWQiOiJ1cm46bXMuc3BlZWNoc2VydmljZXMud2VzdGV1cm9wZSJ9.0tHwpRZxM_tjuqXv

smtp forwardslash email is used to send messages based on alarm and event information to an smtp server over the internet

 ----------------------- **** Processing sentence # 5 *** -----------------------
Sentence =
it is abbs goal to protect our customers intellectual investment beyond the life cycles of the underlying platform products

 ----------------------- **** Processing sentence # 5 *** -----------------------
Sentence =
in limited phase products are maintained on request and as far as applicable for each case

 ----------------------- **** Processing sentence # 5 *** -----------------------
Sentence =
a user can choose to install those features or remain at the initially released version and only adopt the corrections released through the revisions

 ----------------------- **** Processing sentence # 5 *** -----------------------
Sentence =
in practice this means a verification cycle is run each day where the latest update files are checked primarily so that they dont find so

     Status flag for 429 error             = False


Worker function completed and exiting for task number = 11.

TaskNumber 11 COMPLETED by Consumer process = Consumer-3


RESULT queue entry by Consumer process = Consumer-3


None Task poison pill picked and EXITING for consumer Consumer-3


 ----------------------- **** Processing sentence # 10 *** -----------------------
Sentence =
as above plus access to never versions of the same product


 ############# WORKER SUMMARY for TASK NUMBER = 12 #############
Full Data CSV file created here:
/home/rohit/dpspTraining/data/wavFiles/azure1/pdfExtraction/workerLogic_801_950_1950wavs/System_800xA_Summary.pdf_extractedText_4_NOT_forDS_task_14.csv
Deepspeech training CSV file created here:
/home/rohit/dpspTraining/data/wavFiles/azure1/pdfExtraction/workerLogic_801_950_1950wavs/System_800xA_Summary.pdf_extractedText_4_forDS_task_14.csv
WAV files location:
/home/rohit/dpspTraining/data/wavFiles/azure1/pdfExtraction/workerLogic_801_950_1950wavs/c

In [None]:
# Get a list of voices https://docs.microsoft.com/en-us/azure/cognitive-services/speech-service/rest-text-to-speech#get-a-list-of-voices
app.get_voices_list()