In [1]:
# Libraries

#Connect to mysql
import mysql.connector

#Operating system
import os
import sys
from pathlib import Path
from datetime import timedelta
from threading import Thread
from multiprocessing import Process

#Pandas
import pandas as pd

#Regex
import re

#Transcription & subtitles
import whisper
import stable_whisper

#Keywords
from keybert import KeyBERT
import tensorflow_hub

#Summary
from transformers import pipeline

#Translation
import translators as ts
import translators.server as tss
import textwrap



2023-03-29 11:47:58.242281: W tensorflow/compiler/xla/stream_executor/platform/default/dso_loader.cc:64] Could not load dynamic library 'libnvinfer.so.7'; dlerror: libnvinfer.so.7: cannot open shared object file: No such file or directory
2023-03-29 11:47:58.242477: W tensorflow/compiler/xla/stream_executor/platform/default/dso_loader.cc:64] Could not load dynamic library 'libnvinfer_plugin.so.7'; dlerror: libnvinfer_plugin.so.7: cannot open shared object file: No such file or directory
Using state Castille and LeÃ³n server backend.


In [None]:
def connection_ddbb():
    """Function to create the conection to the data base
    
    Keyword arguments:    
    Return: connection objet, secrets object
    """
       
    secrets={}
    #secrets_file = open('secrets.txt','r') #Had errors using this within a virtual environment
    secrets_file = os.fdopen(os.open('secrets.txt', os.O_RDONLY))
    for line in secrets_file:
        (key, val) = line.replace('\n','').split("|")
        secrets[key] = val    

    #Conection to mysql

    conn = mysql.connector.connect(user=secrets['user'],
                            password=secrets['pass'],
                            host=secrets['server'])
    
    return conn, secrets

In [None]:
connection, secrets = connection_ddbb()

In [None]:
def create_ddbb(_conn,_secrets):
    """Function to create the data base
    
    Keyword arguments:  
    _conn: connection object 
    _secrets: secrets object 
    Return: None
    """
    
    #connection_ddbb
    
    #Creating schema in mysql

    if _conn.is_connected():
        cursor = _conn.cursor()

        print('Connection open')        
        
        print('Creating database if necessary...')
        
        query = ('CREATE DATABASE IF NOT EXISTS ironrep')
        
        cursor.execute(query)
        _conn.commit()
        
        query = ('USE ironrep')
        
        cursor.execute(query)
        _conn.commit()
        
        print('Database created if necessary...')

        print('Creating tables if necessary...')

        #Configuration        
        
        print('     - Configuration...')

        query = ("""CREATE TABLE IF NOT EXISTS ironrep.configuration (  
                        id enum('1') PRIMARY KEY NOT NULL,
                        temp_directory  nvarchar(250),
                        video_player nvarchar(250),
                        languages_subtitles nvarchar(250) COMMENT 'List of languages codes separated by commas')""")

        cursor.execute(query)    
        _conn.commit()

        #Default values
        query = ("""REPLACE INTO ironrep.configuration (temp_directory,video_player,languages_subtitles)
                        VALUES (%s,%s,%s)""")
        val = (str(_secrets['temp_dir']),str(_secrets['video_play']),str(_secrets['lang_subt']))
        """val = (str('/home/roque/01. IronHack/00. Data Analytics/01. Course/63. Week 23 - Day 3/git/final-project-bootcamp/data/'),
                str("vlc '{videoparam}' --sub-file '{subtitleparam}' --no-sub-autodetect-file --start-time '{positionparam}'"),
                str('es,pt,it,zh,de,hi'))"""
        cursor.execute(query,val)
        _conn.commit()

        #Videos

        print('     - Videos...')

        query = ("""CREATE TABLE IF NOT EXISTS ironrep.videos (
                        id INT PRIMARY KEY NOT NULL AUTO_INCREMENT,
                        video_name nvarchar(250),
                        video_path nvarchar(250),
                        ADD FULLTEXT(video_name)
                    )"""
        )

        cursor.execute(query)    
        _conn.commit()

        #print('     - Videos (index)...')

        #query = ("""ALTER TABLE ironrep.videos ADD FULLTEXT(video_name)""")

        cursor.execute(query)    
        _conn.commit()

        #Transcriptions

        print('     - Transcriptions...')

        query = ("""CREATE TABLE IF NOT EXISTS ironrep.transcriptions (
                        id INT PRIMARY KEY NOT NULL AUTO_INCREMENT,
                        videoid INT NOT NULL,
                        languageid nvarchar(5),
                        transcription mediumtext,
                        ADD FULLTEXT(transcription)
                    )"""
        )

        cursor.execute(query)    
        _conn.commit()

        #Transcriptions index

        #print('     - Transcriptions (index)...')

        #query = ("""ALTER TABLE ironrep.transcriptions ADD FULLTEXT(transcription)""")

        cursor.execute(query)    
        _conn.commit()


        #Summaries

        print('     - Summaries...')

        query = ("""CREATE TABLE IF NOT EXISTS ironrep.summaries (
                        id INT PRIMARY KEY NOT NULL AUTO_INCREMENT,
                        videoid INT NOT NULL,
                        languageid nvarchar(5),
                        summary TEXT,
                        ADD FULLTEXT(summary)                   
                    )"""
        )

        cursor.execute(query)    
        _conn.commit()

        #print('     - Summaries (index)...')

        #query = ("""ALTER TABLE ironrep.summaries ADD FULLTEXT(summary)""")

        #cursor.execute(query)    
        #_conn.commit()

        #Subtitles

        print('     - Subtitles...')
        
        query = ("""CREATE TABLE IF NOT EXISTS ironrep.subtitles (
                        id INT PRIMARY KEY NOT NULL AUTO_INCREMENT,
                        videoid INT NOT NULL,
                        languageid nvarchar(5),
                        subtitles mediumtext,
                        ADD FULLTEXT(subtitles)                   
                    )"""
        )

        cursor.execute(query)    
        _conn.commit()

        #print('     - Subtitles (index)...')

        #query = ("""ALTER TABLE ironrep.subtitles ADD FULLTEXT(subtitles)""")

        #cursor.execute(query)    
        #_conn.commit()

        #Keywords

        print('     - Keywords...')

        query = ("""CREATE TABLE IF NOT EXISTS ironrep.keywords (
                        id INT PRIMARY KEY NOT NULL AUTO_INCREMENT,
                        videoid INT NOT NULL,
                        languageid nvarchar(5),
                        keywords nvarchar(250),
                        ADD FULLTEXT(keywords)                   
                    )"""
        )

        cursor.execute(query)    
        _conn.commit()

        #print('     - Keywords (index)...')

        #query = ("""ALTER TABLE ironrep.keywords ADD FULLTEXT(keywords)""")

        #cursor.execute(query)    
        #_conn.commit()


        print('Tables created if necessary...')
    else:
        print('Error connecting')


    

In [None]:
def insert_data_sql(_conn, _table, _videoid, _langid, _text):
    """Function to insert data in the database
    
    Keyword arguments:
    _table: the object where insert ['video','transcription','subtitle','summary','keywords']
    _videoid: the id of the video
    _langid: the id of the language
    _text: value to insert
    Return: cursor
    """
    
    if _conn.is_connected():
        cursor = _conn.cursor()
        query = ''
        val = []

        if (_table == 'video'):                 
            vid_path = os.path.split(os.path.abspath(Path(_text)))

            query = """INSERT INTO ironrep.videos(video_name,video_path)
                        VALUES (%s,%s)"""
            val = [vid_path[1].split('.')[0],_text]
        elif (_table == 'transcription'):
            query = """INSERT INTO ironrep.transcriptions(videoid,languageid,transcription)
                        VALUES (%s,%s,%s)"""
            val = [int(_videoid),_langid,_text]
        elif (_table == 'subtitle'):
            query = """INSERT INTO ironrep.subtitles(videoid,languageid,subtitles)
                        VALUES (%s,%s,%s)"""
            val = [int(_videoid),_langid,_text]
        elif (_table == 'summary'):
            query = """INSERT INTO ironrep.summaries(videoid,languageid,summary)
                        VALUES (%s,%s,%s)"""
            val = [int(_videoid),_langid,_text]
        elif (_table == 'keywords'):
            query = """INSERT INTO ironrep.keywords(videoid,languageid,keywords)
                        VALUES (%s,%s,%s)"""
            val = [int(_videoid),_langid,_text]
        else:
            return 'none'

        if (query != ''):      
            cursor.execute(query,val)
            _conn.commit()  
            return cursor

In [None]:
def transcribe(_conn, filepath):
    """
    With this function we can transcribe all the texts from a video/audio and also the subtitles
    
    Keyword arguments:
    argument -- filepath:the file to transcribe
    Return: No return (insert in mysql and create 2 text files -temporary-)
    """
    
    video = os.path.split(os.path.abspath(Path(filepath)))
    name = video[1].split(sep='.')

    result_sql = insert_data_sql(_conn,'video', '', '', filepath)
    videoid = result_sql.lastrowid
    
    # speech transcription
    
    model = whisper.load_model("base.en",device='cpu')
    #model = stable_whisper.load_model('base')

    result = model.transcribe(filepath)
    
    with open(Path(video[0]+"/"+name[0]+"_transcription.txt"), "w+") as f:
        f.write(result["text"])

    result_sql = insert_data_sql(_conn,'transcription', videoid, 'en', result['text'])

    # subtitles 
    stable_whisper.results_to_sentence_srt(result, video[0]+"/"+name[0]+"_subtitles.srt")

    text_subtitles = open(video[0]+"/"+name[0]+"_subtitles.srt").read()

    result_sql = insert_data_sql(_conn,'subtitle', videoid, 'en', text_subtitles)
    


In [None]:
def extract_keywords(_conn, _videoid):
    """Function to extract the top 50 keywords of the transcription
    
    Keyword arguments:
    _conn: Connection to the database
    _videoid: id of the video to extract the keywords
    Return: Nothing
    """
    
    if _conn.is_connected():

        cursor = connection.cursor(buffered=True)
        query = """SELECT videoid, languageid, transcription
                    FROM ironrep.transcriptions
                    WHERE videoid = %s and languageid = %s"""
        val = [int(_videoid),str('en')]
        cursor.execute(query,val)
        
        trans_table = cursor.fetchall()
        if (len(trans_table)>0):            
            #trans_df = pd.DataFrame(trans_table)
            #trans_df.columns = [i[0] for i in cursor.description]                                            
            for transcription in trans_table:                
                max_ngram_size = 2
                deduplication_threshold = 0.1
                numOfKeywords = 50

                extracted_keyword = []

                embedding_model = tensorflow_hub.load("https://tfhub.dev/google/universal-sentence-encoder/4")                
                kw_model = KeyBERT(model=embedding_model)
                extracted_keyword = kw_model.extract_keywords(transcription[2], keyphrase_ngram_range=(1, max_ngram_size), top_n=numOfKeywords, diversity=deduplication_threshold)
                #print(extracted_keyword)
                for keyw,_ in sorted(extracted_keyword, key=lambda x: x[1],reverse=True):
                    result_sql = insert_data_sql(_conn,'keywords',_videoid,'en',keyw)
        else:
            print('No video transcription to extract')

In [None]:
def create_summary(_conn, _videoid):
    """Function to create one summary of the text transcripted in english
    
    Keyword arguments:
    _conn: the connection to the database
    _videoid: the id of the video to summarize
    Return: Nothing
    """
    if _conn.is_connected():

        cursor = connection.cursor(buffered=True)
        query = """SELECT videoid, languageid, transcription
                    FROM ironrep.transcriptions
                    WHERE videoid = %s and languageid = %s"""
        val = [int(_videoid),str('en')]
        cursor.execute(query,val)
        
        trans_table = cursor.fetchall()
        if (len(trans_table)>0):
            hub_model_id = "mrm8488/flan-t5-large-finetuned-openai-summarize_from_feedback"                                                     
            for transcription in trans_table:                
                                
                summary = ''
                
                summarizer = pipeline("summarization", model=hub_model_id)
                summary = summarizer(' '.join(transcription[2].split(' ')[:250]), max_length=250)

                result_sql = insert_data_sql(_conn,'summary',_videoid,'en',summary[0]['summary_text'])
        else:
            print('No video summary to create')

In [None]:
def translate_from_en(text_en,to_language):
    """FUNCTION TO TRANSLATE THE TEXT FROM ENGLISH TO ANY LANGUAGE
    
    Keyword arguments:
    text_en: text in english
    to_language: the code of the language to translate (es,zh, ...) 
    Return: text translated
    """
    
    if (to_language==''):
        display('Need to include the language to translate.')
    else:
        ts.translators_pool

        text_translated = ''
        
        from_language = 'en'
            
        
        if (len(text_en)>2000):
            textsplited_to = []
            text_splited = textwrap.wrap(text_en, 2000, break_long_words=False)
            for line in text_splited:
                textsplited_to.append(tss.google(line, from_language, to_language))
            text_translated = ' '.join(textsplited_to)
        else:
            try:
                text_translated = tss.google(text_en, from_language, to_language)
            except:
                try:
                    text_translated = ts.translate_text(text_en, from_language, to_language)
                except:
                    text_translated = text_en

    return text_translated

In [None]:
def translate_subtitles(_conn, _videoid):
    if _conn.is_connected():

        cursor_conf = connection.cursor(buffered=True)

        query_conf = """SELECT languages_subtitles, temp_directory 
                        FROM ironrep.configuration
                        LIMIT 1;"""
        cursor_conf.execute(query_conf)

        conf_table = cursor_conf.fetchall()
        conf_df = pd.DataFrame(conf_table)
        conf_df.columns = [i[0] for i in cursor_conf.description]

        cursor = connection.cursor(buffered=True)
        query = """SELECT videoid, languageid, subtitles
                    FROM ironrep.subtitles
                    WHERE videoid = %s and languageid = %s"""
        val = [int(_videoid),str('en')]
        cursor.execute(query,val)
        subt_table = cursor.fetchall()
        if (len(subt_table)>0):           
            subt_df = pd.DataFrame(subt_table)
            subt_df.columns = [i[0] for i in cursor.description]
                        

            #We discard 'en' language since this approach will work from English, not multilingual option.
            #At the moment only create subtitles in Spanish because don't have many time
            #for lang in [language for languages in conf_df['languages_subtitles'].str.split(',') for language in languages if language == 'es']:
            #print(lang)
            
            
            for lang in [language for languages in conf_df['languages_subtitles'].str.split(',') for language in languages if language != 'en']:                           
                translated = []
                text_subtitles = ''

                for sub in subt_table:                
                    for row in sub[2].split('\n'):
                        translated.append(translate_from_en(row,lang))                    
                    
                    with open(Path(list(conf_df['temp_directory'])[0]+"/"+lang+"_subtitle_tmp.srt"), mode='wt', encoding='utf-8') as f:    
                        f.write('\n'.join(translated))

                    text_subtitles = open(Path(list(conf_df['temp_directory'])[0]+"/"+lang+"_subtitle_tmp.srt")).read().replace(' -> ',' --> ').replace(': ',':')

                    result_sql = insert_data_sql(connection,'subtitle', _videoid, lang, text_subtitles)

                    os.remove(Path(list(conf_df['temp_directory'])[0]+"/"+lang+"_subtitle_tmp.srt"))        
        else:
            print('No subtitles to translate.')

In [None]:
def translate_transcriptions(_conn, _videoid):
    """Function to translate the tramscriptions in English 
       to all the languages predefined in the configuration
    
    Keyword arguments:
    _conn: connection to the database
    _videoid: Id of the video to translate
    Return: Nothing
    """
    
    if _conn.is_connected():

        cursor_conf = connection.cursor(buffered=True)

        query_conf = """SELECT languages_subtitles, temp_directory 
                        FROM ironrep.configuration
                        LIMIT 1;"""
        cursor_conf.execute(query_conf)

        conf_table = cursor_conf.fetchall()
        conf_df = pd.DataFrame(conf_table)
        conf_df.columns = [i[0] for i in cursor_conf.description]

        cursor = connection.cursor(buffered=True)
        query = """SELECT videoid, languageid, transcription
                    FROM ironrep.transcriptions
                    WHERE videoid = %s and languageid = %s"""
        val = [int(_videoid),str('en')]
        cursor.execute(query,val)
        
        transc_table = cursor.fetchall()
        if (len(transc_table)>0):      
            transcription_translated = ''

            #We discard 'en' language since this approach will work from English, not multilingual option.
            for lang in [language for languages in conf_df['languages_subtitles'].str.split(',') for language in languages if language != 'en']:
                for trans in transc_table:
                    trans_translated = translate_from_en(trans[2],lang)                               
                    result_sql = insert_data_sql(connection,'transcription', _videoid, lang, trans_translated)
        else:
            print('No video transcription to translate')

            

In [None]:
def translate_keywords(_conn, _videoid):
    """Function to translate the keywords in English 
       to all the languages predefined in the configuration
    
    Keyword arguments:
    _conn: connection to the database
    _videoid: Id of the video to translate
    Return: Nothing
    """
    
    if _conn.is_connected():

        cursor_conf = connection.cursor(buffered=True)

        query_conf = """SELECT languages_subtitles, temp_directory 
                        FROM ironrep.configuration
                        LIMIT 1;"""
        cursor_conf.execute(query_conf)

        conf_table = cursor_conf.fetchall()
        conf_df = pd.DataFrame(conf_table)
        conf_df.columns = [i[0] for i in cursor_conf.description]

        cursor = connection.cursor(buffered=True)
        query = """SELECT videoid, languageid, keywords
                    FROM ironrep.keywords
                    WHERE videoid = %s and languageid = %s"""
        val = [int(_videoid),str('en')]
        cursor.execute(query,val)
        
        keyw_table = cursor.fetchall()
        if (len(keyw_table)>0):      
            transcription_translated = ''

            #We discard 'en' language since this approach will work from English, not multilingual option.
            for lang in [language for languages in conf_df['languages_subtitles'].str.split(',') for language in languages if language != 'en']:
                for keyw in keyw_table:
                    keyw_translated = translate_from_en(keyw[2],lang)                               
                    result_sql = insert_data_sql(connection,'keywords', _videoid, lang, keyw_translated)
        else:
            print('No video transcription to translate')

            

In [None]:
def translate_summary(_conn, _videoid):
    """Function to translate the keywords in English 
       to all the languages predefined in the configuration
    
    Keyword arguments:
    _conn: connection to the database
    _videoid: Id of the video to translate
    Return: Nothing
    """
    
    if _conn.is_connected():

        cursor_conf = connection.cursor(buffered=True)

        query_conf = """SELECT languages_subtitles, temp_directory 
                        FROM ironrep.configuration
                        LIMIT 1;"""
        cursor_conf.execute(query_conf)

        conf_table = cursor_conf.fetchall()
        conf_df = pd.DataFrame(conf_table)
        conf_df.columns = [i[0] for i in cursor_conf.description]

        cursor = connection.cursor(buffered=True)
        query = """SELECT videoid, languageid, summary
                    FROM ironrep.summaries
                    WHERE videoid = %s and languageid = %s"""
        val = [int(_videoid),str('en')]
        cursor.execute(query,val)
        
        summ_table = cursor.fetchall()
        if (len(summ_table)>0):      
            tsummary_translated = ''

            #We discard 'en' language since this approach will work from English, not multilingual option.
            for lang in [language for languages in conf_df['languages_subtitles'].str.split(',') for language in languages if language != 'en']:
                for summ in summ_table:
                    summ_translated = translate_from_en(summ[2],lang)                               
                    result_sql = insert_data_sql(connection,'summary', _videoid, lang, summ_translated)
        else:
            print('No video transcription to translate')

            

In [None]:
def video_player(_conn,_videoid, _langid, _position = 0):
    """Function to launch the video with subtitles
    
    Keyword arguments:
    _conn: connection object
    _videoid: the id of the video
    _langid: the id of the language to use for the subtitles
    _position: time in seconds to start the video
    Return: None
    """
    
    cursor = _conn.cursor() 
    query = """SELECT video_player, temp_directory
                        FROM ironrep.configuration 
                        LIMIT 1;"""

    cursor.execute(query)
    conf_table = cursor.fetchall()
    conf_df = pd.DataFrame(conf_table)
    conf_df.columns = [i[0] for i in cursor.description]

    query = """SELECT subtitles
                    FROM ironrep.subtitles
                WHERE videoid = %s
                    AND languageid = %s"""
    val = [int(_videoid), _langid]
    cursor.execute(query, val)    
    subt_table = cursor.fetchall()
    if (len(subt_table)>0):
        subt_df = pd.DataFrame(subt_table)
        subt_df.columns = [i[0] for i in cursor.description]

        with open(Path(list(conf_df['temp_directory'])[0]+"/play_subtitle.srt"), "w+") as f:
                f.write(list(subt_df['subtitles'])[0])

        query = """SELECT video_path
                    FROM ironrep.videos
                    WHERE id = %s"""
        val = [int(_videoid)]
        cursor.execute(query, val)
        video_table = cursor.fetchall()
        video_df = pd.DataFrame(video_table)
        video_df.columns = [i[0] for i in cursor.description]

        os.system(list(conf_df['video_player'])[0].replace('{videoparam}',list(video_df['video_path'])[0]).replace('{subtitleparam}',list(conf_df['temp_directory'])[0]+"/play_subtitle.srt").replace('{positionparam}',str(_position)))
    else:
         print(f'No subtitles found in language {_langid}')

In [None]:
def search_pos_video(_subitles,_text):
    """Function to locate the positions of the subtitles
       where the text to search is located.
       This function will use to find text inside a video
    
    Keyword arguments:
    _subtitles: the subtitles of the video
    _text: text to find
    Return: a list of tuples with the positition (seconds) and the time
    """
    
    pos_find = [re.findall('(\d{2}:\d{2}:\d{2},\d{3})',_subitles[:i.start()])[-2].split(':') for i in re.finditer(_text.lower(), _subitles.lower())]
    positions_final = [(int(pos[0])*3600+int(pos[1])*60+int(float(pos[2].replace(',','.'))),':'.join(pos)) for pos in pos_find]
    return positions_final

In [None]:
from datetime import datetime
print(f'transcribe: {datetime.now()}')
transcribe(connection, '/home/roque/01. IronHack/00. Data Analytics/03. Hackshow/recordings/GMT20230327-140629_Recording_1760x900.mp4')

print(f'transcribe translation: {datetime.now()}')
translate_transcriptions(connection, 78)

print(f'summary: {datetime.now()}')
create_summary(connection, 78)

print(f'summary translation: {datetime.now()}')
translate_summary(connection, 78)

print(f'keywords: {datetime.now()}')
extract_keywords(connection, 78)

print(f'kwywords translation: {datetime.now()}')
translate_keywords(connection, 78)

print(f'translation subtitles: {datetime.now()}')
translate_subtitles(connection, 78)
print(f'translation subtitles end: {datetime.now()}')

'''for vid in range(9,12):
    print(f'translation subtitles video {vid}: {datetime.now()}')
    translate_subtitles(connection, vid)'''

In [None]:
from datetime import datetime
print(f'dad {datetime.now()}')