# Main Loop 

Will check an Azure Storage Location, look for new .mp4 video files and process those files through a pipeline

In [2]:
import cv2
import requests 
import uuid
import os
from pyspark.sql import *
from datetime import datetime, timedelta

# shouls be stored in databricks secrets backed by Azure Databrick secrets
# 
# replace with your subscription key for face api 
subscription_key = 'KEY_FOR_FACE_API'
assert subscription_key


# replace below with the correct location of face api account (need to test with containerized version)
face_api_url = 'https://LOCATION.api.cognitive.microsoft.com/face/v1.0/detect?returnFaceLandmarks=true'
headers = { 'Ocp-Apim-Subscription-Key': subscription_key }

sas_token ='REPLACE_WITH_STORAGE_SAS_KEY'
storage_source ='wasbs://BLOB_CONTAINER@STORAGE_ACCOUNT.blob.core.windows.net'
storage_sas_pointer = 'fs.azure.sas.BLOB_CONTAINER.STORAGE_ACCOUNT.blob.core.windows.net'
speech_transcript_key = "REPLACE_WITH_SPEECH_API_KEY"
sub_key = "REPLACE_WITH_SUBSCRIPTION_KEY"
speech_key, service_region = speech_transcript_key, "REPLACE_WITH_LOCATION"

# adjust to your parameter set     
params = {
    'returnFaceId': 'true',
    'returnFaceLandmarks': 'false',
    'returnFaceAttributes': 'age,gender,headPose,smile,emotion,hair,makeup,occlusion,accessories,blur,exposure,noise',
}



In [3]:
%run "./AudioProcessing"

In [4]:
%run "./VideoProcessing"

In [5]:
# We should store this in a databrick secret 
mounts = dbutils.fs.mounts()
mountPoint = next((x for x in mounts if x.mountPoint == '/mnt/bs/'), None)
print (mountPoint)
if mountPoint == None:
  dbutils.fs.mount(source =  storage_source,mount_point = '/mnt/bs/',extra_configs = {stoarge_sas_pointer: sas_token})
  
# If you need to unmount to refresh keys do htis
# dbutils.fs.unmount(mount_point = '/mnt/bs/')

In [6]:
def ExtractWavFromMp4(inFile, wavFile):
  command = "ffmpeg -i {} -f wav -ac 1 -ar 16000 -vn -acodec pcm_s16le  {}".format(inFile, wavFile)
  output = os.system(command)
  print(output)


In [7]:
def flatten_json(y):
    out = {}

    def flatten(x, name=''):
        if type(x) is dict:
            for a in x:
                flatten(x[a], name + a + '_')
        elif type(x) is list:
            i = 0
            for a in x:
                flatten(a, name + str(i) + '_')
                i += 1
        else:
            out[name[:-1]] = x

    flatten(y)
    return out

In [8]:
def persistAudioDataToTable(audio_output,videoId,tableName):
  audio_output.write.mode("append").saveAsTable(tableName)

In [9]:
files = dbutils.fs.ls("/mnt/bs/IncomingVideos")

for file in files:
  path = '/' + file.path.replace(':','')
  
  if path.endswith('.mp4'):
    print('Now Processing: ', path)
    # add a record of the file being processed in a databricks table
    # Extract Audio 
    newId = uuid.uuid4()
    print (str(newId))
    pathToAudio = '/dbfs/mnt/bs/IncomingVideos/{}.wav'.format(newId)
    
    print('Id of this run: ', newId)
    print ('Extracting Audio to wav for Transcription')
    ExtractWavFromMp4(path, pathToAudio)
    print('Now extracting Face Info')
    process_sample_frames(path,10, str(newId))
    
    print('Run Audio Processing - Speech To Text for {}'.format(pathToAudio))
    output = speech_recognition_with_pull_stream(pathToAudio, str(newId)) 
    persistAudioDataToTable(output,newId,"tempAudioTable")
    # Processing is complete --- copy blobs so they are not reprocessed 
    completePath = file.path.replace('IncomingVideos','IncomingVideos/completed')
    dbutils.fs.cp(file.path, completePath)
    dbutils.fs.rm(file.path)
    realAudioPath = 'dbfs:/mnt/bs/IncomingVideos/{}.wav'.format(newId)
    completeAudioPath = realAudioPath.replace('IncomingVideos','IncomingVideos/completed')
    # move audio 
    print ('cleaning up')
    dbutils.fs.cp(realAudioPath, completeAudioPath)
    dbutils.fs.rm(realAudioPath)
    sqlCommand = "INSERT INTO table Videos VALUES('{}','{}','{}','{}','{}')".format(newId, path, completePath,pathToAudio,completeAudioPath)
    spark.sql(sqlCommand)