<a href="https://colab.research.google.com/github/lavanaythakral/IAI-Pipeline-demo/blob/main/Final_demo.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

#Documentation

Code by: Lavanay Thakral

---

This Colab is a demo for my undergraduate thesis project. Since the code is a little difficult to navigate through, this introduction is meant to give a basic overview of how to work with it:

*   The Web app Demo is hosted using this colab as the backend, and ngrok to redirect to a https url.
*   SSLify is used to force an https url, because that is required by chrome to run the microphone access
*   The flow goes as follows:
    1.   Text input: This is sent as a POST request to the server
    2.   Audio input (Microphone): This is converted to text using webkitSpeechRecognition and sent as a POST request to the server
    3.   The input is converted into a sentence embedding using a BERT model (Bert as a service). This embedding is compared with the Questions cache, using cosine similarity as the metric. 
    4.   If the similarity is above a threshold, the Cached response and video is updated on the server.
    5.   If that is not the case, we use TF-IDF vectorizer, and extract the most relevant context from the book, hoping that the answer to the input lies in that context; and use BERT for QNA to extract the answer from this context.
    6.   If there is no response, we try to fetch the answer box result from a google search using the google API
    7.   If the google search does not yield an answer box, we move to improvising, by passing the input as a prompt to GPT2 for generation. 
    8.   We extract our response from steps 5,6,7 and build audio with this text, using Google Text2Speech
    9.   Further, using Wav2Lip model, speech audio, and a driving face video of Dr. Kalam, we animate to make a clip of Dr. Kalam speaking the response.
    10.  Finally, this video is updated on the Web application. 

*  We believe the model does mimic Dr. Kalam's style but we don't claim or take responsibility of the accuracy of the model. We don't claim that the model is robust enough to say exactly what Dr. Kalam would have said.
*  Such projects come with a lot of ethical issues around them. The authors have discussed these in detail in an article. 






#Dependencies

In [None]:
%tensorflow_version 1.x
import tensorflow as tf
print(tf.__version__)

TensorFlow 1.x selected.
1.15.2


In [None]:
!pip install flask
!pip install flask_ngrok
!pip install flask_sslify

In [None]:
!git clone https://github.com/lavanaythakral/Kalam-AI-2.git

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
!pip install google-cloud-texttospeech

In [None]:
cd Kalam-AI-2

/content/Kalam-AI-2


In [None]:
!pip install -r requirements.txt

In [None]:
!apt-get install espeak
!apt-get install portaudio19-dev python-pyaudio

In [None]:
!pip install PyAudio

In [None]:
%tensorflow_version 1.x
import gpt_2_simple as gpt2
from google.colab import drive
import scipy
import numpy

Copying Fine tuned GPT2 model to workspace

In [None]:
gpt2.copy_checkpoint_from_gdrive(run_name='run1_topical_token')

Initializing credentials for Google Text2Speech

In [None]:
import os
os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = '/content/drive/MyDrive/Thesis project/My First Project-aa638edce601.json' 
!echo $GOOGLE_APPLICATION_CREDENTIALS

/content/drive/MyDrive/Thesis project/My First Project-aa638edce601.json


Copying Wav2Lip models from Drive

In [None]:
!cp -ri "/content/drive/My Drive/Thesis project/Wav2Lip/Wav2Lip/wav2lip_gan.pth" /content/Kalam-AI-2/Wav2Lip/checkpoints/
!wget "https://www.adrianbulat.com/downloads/python-fan/s3fd-619a316812.pth" -O "/content/Kalam-AI-2/Wav2Lip/face_detection/detection/sfd/s3fd.pth"

Copying static files for the Web app from drive

In [None]:
!cp -r /content/drive/MyDrive/Webapputils/static /content/Kalam-AI-2

Initializing BERT for Q/A, BERT For NSP and Fine-tuned GPT2

In [None]:
%tensorflow_version 1.x
from transformers import AutoTokenizer, AutoModelForQuestionAnswering, BertForNextSentencePrediction, BertTokenizer
import gpt_2_simple as gpt2

sess = gpt2.start_tf_sess()
gpt2.load_gpt2(sess, run_name='run1_topical_token')

print("GPT2 loaded")

tokenizer = AutoTokenizer.from_pretrained("deepset/bert-large-uncased-whole-word-masking-squad2")
model = AutoModelForQuestionAnswering.from_pretrained("deepset/bert-large-uncased-whole-word-masking-squad2")

print("BERT For Q/A downloaded")

model_nsp = BertForNextSentencePrediction.from_pretrained('bert-base-cased')
tokenizer_nsp = BertTokenizer.from_pretrained('bert-base-cased')

print("BERT NSP downloaded")

Loading Cache responses

In [None]:
import pandas as pd
df = pd.read_csv('/content/drive/MyDrive/Thesis project/Thesis project/datasets/database_new1.csv')
index_database = {}
data_questions = list(df['Question'])
for idx,row in df.iterrows():
  index_database[row['Question'].lower()] = [row['Index'],row['Answer'],row['Source']]

#Script Functions.
(These may be converted to python scripts for ease. Here, I have kept all the functions in the colab itself)

In [None]:
# -*- coding: utf-8 -*-
import re
alphabets= "([A-Za-z])"
prefixes = "(Mr|St|Mrs|Ms|Dr)[.]"
suffixes = "(Inc|Ltd|Jr|Sr|Co)"
starters = "(Mr|Mrs|Ms|Dr|He\s|She\s|It\s|They\s|Their\s|Our\s|We\s|But\s|However\s|That\s|This\s|Wherever)"
acronyms = "([A-Z][.][A-Z][.](?:[A-Z][.])?)"
websites = "[.](com|net|org|io|gov)"

def split_into_sentences(text):
    text = " " + text + "  "
    text = text.replace("\n"," ")
    text = re.sub(prefixes,"\\1<prd>",text)
    text = re.sub(websites,"<prd>\\1",text)
    if "Ph.D" in text: text = text.replace("Ph.D.","Ph<prd>D<prd>")
    text = re.sub("\s" + alphabets + "[.] "," \\1<prd> ",text)
    text = re.sub(acronyms+" "+starters,"\\1<stop> \\2",text)
    text = re.sub(alphabets + "[.]" + alphabets + "[.]" + alphabets + "[.]","\\1<prd>\\2<prd>\\3<prd>",text)
    text = re.sub(alphabets + "[.]" + alphabets + "[.]","\\1<prd>\\2<prd>",text)
    text = re.sub(" "+suffixes+"[.] "+starters," \\1<stop> \\2",text)
    text = re.sub(" "+suffixes+"[.]"," \\1<prd>",text)
    text = re.sub(" " + alphabets + "[.]"," \\1<prd>",text)
    if "”" in text: text = text.replace(".”","”.")
    if "\"" in text: text = text.replace(".\"","\".")
    if "!" in text: text = text.replace("!\"","\"!")
    if "?" in text: text = text.replace("?\"","\"?")
    text = text.replace(".",".<stop>")
    text = text.replace("?","?<stop>")
    text = text.replace("!","!<stop>")
    text = text.replace("<prd>",".")
    sentences = text.split("<stop>")
    sentences = sentences[:-1]
    sentences = [s.strip() for s in sentences]
    return sentences

Grammar correction scripts

In [None]:
#!/usr/bin/env python
# -*- coding: utf-8 -*-

"""Simple grammar checker

This grammar checker will fix grammar mistakes using Ginger.
"""

import sys
import urllib.parse
import urllib.request
from urllib.error import HTTPError
from urllib.error import URLError
import json


class ColoredText:
    """Colored text class"""
    colors = ['black', 'red', 'green', 'orange', 'blue', 'magenta', 'cyan', 'white']
    color_dict = {}
    for i, c in enumerate(colors):
        color_dict[c] = (i + 30, i + 40)

    @classmethod
    def colorize(cls, text, color=None, bgcolor=None):
        """Colorize text
        @param cls Class
        @param text Text
        @param color Text color
        @param bgcolor Background color
        """
        c = None
        bg = None
        gap = 0
        if color is not None:
            try:
                c = cls.color_dict[color][0]
            except KeyError:
                print("Invalid text color:", color)
                return(text, gap)

        if bgcolor is not None:
            try:
                bg = cls.color_dict[bgcolor][1]
            except KeyError:
                print("Invalid background color:", bgcolor)
                return(text, gap)

        s_open, s_close = '', ''
        if c is not None:
            s_open = '\033[%dm' % c
            gap = len(s_open)
        if bg is not None:
            s_open += '\033[%dm' % bg
            gap = len(s_open)
        if not c is None or bg is None:
            s_close = '\033[0m'
            gap += len(s_close)
        return('%s%s%s' % (s_open, text, s_close), gap)


def get_ginger_url(text):
    """Get URL for checking grammar using Ginger.
    @param text English text
    @return URL
    """
    API_KEY = "6ae0c3a0-afdc-4532-a810-82ded0054236"

    scheme = "http"
    netloc = "services.gingersoftware.com"
    path = "/Ginger/correct/json/GingerTheText"
    params = ""
    query = urllib.parse.urlencode([
        ("lang", "US"),
        ("clientVersion", "2.0"),
        ("apiKey", API_KEY),
        ("text", text)])
    fragment = ""

    return(urllib.parse.urlunparse((scheme, netloc, path, params, query, fragment)))


def get_ginger_result(text):
    """Get a result of checking grammar.
    @param text English text
    @return result of grammar check by Ginger
    """
    url = get_ginger_url(text)

    try:
        response = urllib.request.urlopen(url)
    except HTTPError as e:
            print("HTTP Error:", e.code)
            quit()
    except URLError as e:
            print("URL Error:", e.reason)
            quit()

    try:
        result = json.loads(response.read().decode('utf-8'))
    except ValueError:
        print("Value Error: Invalid server response.")
        quit()

    return(result)


def fix(original_text):
    """main function"""
    # original_text = " ".join(sys.argv[1:])
    # if len(original_text) > 600:
    #     print("You can't check more than 600 characters at a time.")
    #     quit()
    fixed_text = original_text
    results = get_ginger_result(original_text)
    print(results)
    # Correct grammar
    if(not results["LightGingerTheTextResult"]):
        print("Good English :)")
        quit()

    # Incorrect grammar
    color_gap, fixed_gap = 0, 0
    for result in results["LightGingerTheTextResult"]:
        if(result["Suggestions"]):
            from_index = result["From"] + color_gap
            to_index = result["To"] + 1 + color_gap
            suggest = result["Suggestions"][0]["Text"]

            # Colorize text
            colored_incorrect = ColoredText.colorize(original_text[from_index:to_index], 'red')[0]
            colored_suggest, gap = ColoredText.colorize(suggest)

            original_text = original_text[:from_index] + colored_incorrect + original_text[to_index:]
            fixed_text = fixed_text[:from_index-fixed_gap] + colored_suggest + fixed_text[to_index-fixed_gap:]
            # fixed_text = fixed_text[:from_index-fixed_gap] + suggest + fixed_text[to_index-fixed_gap:]

            color_gap += gap
            fixed_gap += to_index-from_index-len(suggest)

    print("from: " + original_text)
    print("to:   " + fixed_text)
    return fixed_text

In [None]:
ansi_escape = re.compile(r'\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])')

Fetching Result from Google API

In [None]:
#fetch_google
import yake
import requests

API_KEY = '4e1eb040-18e7-11eb-a969-4d03007837e9'

def keys(query):
  kw_extractor = yake.KeywordExtractor()
  keywords = kw_extractor.extract_keywords(query)
  txt = "A P J Abdul Kalam"
  for kw in keywords:
    # print(kw[1])
    txt += " "
    txt += kw[1]
  return txt,keywords

def get_ans(question):
  headers = { 'apikey': API_KEY }
  params = (
    ("q",question),
    ("device","desktop"),
    ("gl","IN"),
    ("hl","en"),
    ("location","Navi Mumbai,Maharashtra,India"),
    ("num","50"),
  )

  response = requests.get('https://app.zenserp.com/api/v2/search', headers=headers, params=params)
  data = response.json()
  if 'answer_box' in data.keys():
    res = data['answer_box']['answer']
  else:
    res = "No clue"
  return res

def master(query):
  txt,keywords = keys(query)
  # print(txt)
  res = get_ans(txt)
  return res

BERT for QNA

In [None]:
#qnautils
import torch
import pandas as pd
import nltk
nltk.download('vader_lexicon')
from nltk import wordpunct_tokenize, WordNetLemmatizer, sent_tokenize, pos_tag
from nltk.corpus import stopwords as sw, wordnet as wn
from sklearn.metrics.pairwise import cosine_similarity
from sklearn.preprocessing import normalize
from sklearn.feature_extraction.text import CountVectorizer, TfidfVectorizer
import spacy
nlp = spacy.load("en_core_web_sm")
# from fetch_google import *

def get_data():
  df = pd.read_csv('WOF_split_into_sentences.csv')
  sentences = list(df['Sentences'])
  print("Number of sentences in the dataframe : ",len(sentences))
  return df

def get_context_from_data(query,df):
  sentences = list(df['Sentences'])
  vectorizer = TfidfVectorizer(stop_words='english')
  X = vectorizer.fit_transform(sentences)
  X = normalize(X)
  print("Data vectorization completed")
  Question = vectorizer.transform([query])
  Question = normalize(Question)
  cosineSimilarities = cosine_similarity(Question, X).flatten()
  idx = cosineSimilarities.argsort()[::-1][:20]
  temp = ""
  # print(query)
  for i in idx:                                                                             
    if(cosineSimilarities[i] != 0):
      temp = temp + sentences[i]
  print("Context has been extracted")
  return temp

def qna(questions,df,model,tokenizer):
  answers = []
  for question in questions:
    print("Looking for answer for question : ", question)
    text = get_context_from_data(question,df)
    inputs = tokenizer(question, text, add_special_tokens=True, return_tensors="pt")
    input_ids = inputs["input_ids"].tolist()[0]
    text_tokens = tokenizer.convert_ids_to_tokens(input_ids)
    answer_start_scores, answer_end_scores = model(**inputs).values()
    print("_________",answer_start_scores)
    answer_start = torch.argmax(answer_start_scores)  # Get the most likely beginning of answer with the argmax of the score
    answer_end = torch.argmax(answer_end_scores) + 1  # Get the most likely end of answer with the argmax of the score
    answer = tokenizer.convert_tokens_to_string(tokenizer.convert_ids_to_tokens(input_ids[answer_start:answer_end]))
    print(f"Question: {question}")
    print(f"Answer: {answer}")
    print("Answer found\n")
    answers.append(answer)
  return answers

def Fetching_answers(test_questions,df,model,tokenizer):
  answers = qna(test_questions,df,model,tokenizer)
  for idx,ans in enumerate(answers):
    if ans == '[CLS]' or ans == '':
      res = master(test_questions[idx])
      if res == 'No clue':
        answers[idx] = 'PASS' 
      else:
        answers[idx] = res
  return answers


def ner(sentence):
  doc = nlp(sentence)
  entities = []
  for ent in doc.ents:
    entities.append([ent.text,ent.label_])
  return entities

def entities(sentences,answers):
  for idx,sen in enumerate(sentences):
    if(answers[idx] == 'PASS' and len(ner(sen)) == 0):
      print(sen)
      answers[idx] = 'GPT2'
  return answers

def phase_one_end(questions,df,model,tokenizer):
  ans = Fetching_answers(questions,df,model,tokenizer)
  final = entities(questions,ans)
  return final


[nltk_data] Downloading package vader_lexicon to /root/nltk_data...


GPT2 Generation

In [None]:
#improvutils
import gpt_2_simple as gpt2
from torch.nn.functional import softmax
from transformers import BertForNextSentencePrediction,BertTokenizer
import nltk
nltk.download('vader_lexicon')
from nltk.sentiment.vader import SentimentIntensityAnalyzer
sid = SentimentIntensityAnalyzer()
import re

def generate_candidates(input,sess):
	print("GPT2 generating for :",input)
	generated_text = gpt2.generate(sess,
		length=100,
		run_name='run1_topical_token',
		return_as_list=True,
		temperature=0.7,
		prefix=input,
		nsamples=15,
		truncate = '.',
		batch_size=5,
		top_k = 5,
		include_prefix = False)

	def clean(input_st, sub):
		return input_st.replace(sub, '').lstrip()

	cleaned = []
	for text in generated_text:
		cleaned.append(re.sub(r"^\W+", "",clean(text,'<|endoftext|>')))

	return cleaned


def top_result(seq_A,seq_B,model,tokenizer):
	response = seq_B[0]
	max_prob = -1
	for seq in seq_B:
		encoded = tokenizer.encode_plus(seq_A, text_pair=seq, return_tensors='pt')
		seq_relationship_logits = model(**encoded)[0]
		probs = softmax(seq_relationship_logits, dim=1)
		if probs[0][0] > max_prob:
			max_prob = probs[0][0]
			response = seq
		if max_prob >= 0.97:
			return response
	else:
		return -1


def master_GPT2(inp,model,tokenizer,sess):
  generation_cleaned = generate_candidates(inp,sess)
  candidates = []
  for gen in generation_cleaned:
    if len(ner(gen)) == 0:
      candidates.append(gen)
  print(candidates)
  
  res = top_result(inp,candidates,model,tokenizer)
  polarity = sid.polarity_scores(res)['compound']
  return res,polarity

def regeneration(inp,words,model,tokenizer,sess):
  flg = 1
  resp = ""
  pol = ""
  
  while(flg == 1):
    flg = 0
    resp,pol = master_GPT2(inp,model,tokenizer,sess)
    txt,keywords = keys(resp)
    print(resp,pol)
    if pol > 0.0:
      polarity = 'positive'
    elif pol < 0.0:
      polarity = 'negative'
    else:
      polarity = 'neutral'
    for x in keywords:
      if x in words.keys() and polarity != words[x]:
        flg = 1
  return resp


[nltk_data] Downloading package vader_lexicon to /root/nltk_data...
[nltk_data]   Package vader_lexicon is already up-to-date!




Google Text2Speech

In [None]:
def synthesize_text(text):
    """Synthesizes speech from the input string of text."""
    from google.cloud import texttospeech

    client = texttospeech.TextToSpeechClient()

    input_text = texttospeech.SynthesisInput(text=text)

    # Note: the voice can also be specified by name.
    # Names of voices can be retrieved with client.list_voices().
    voice = texttospeech.VoiceSelectionParams(
        language_code="en-US",
        name="en-US-Wavenet-J",
        ssml_gender=texttospeech.SsmlVoiceGender.MALE,
    )

    audio_config = texttospeech.AudioConfig(
        audio_encoding=texttospeech.AudioEncoding.MP3
    )

    response = client.synthesize_speech(
        request={"input": input_text, "voice": voice, "audio_config": audio_config}
    )

    # The response's audio_content is binary.
    with open('t2s.mp3', "wb") as out:
        out.write(response.audio_content)
        print('Audio content written to file "output.mp3"')


#Bert as a service
Used to convert questions into embeddings, to compare with the closest match in the cache

In [None]:
!pip install bert-serving-client
!pip install -U bert-serving-server[http]

In [None]:
!wget https://storage.googleapis.com/bert_models/2018_10_18/uncased_L-12_H-768_A-12.zip
!unzip uncased_L-12_H-768_A-12.zip
!nohup bert-serving-start -model_dir=./uncased_L-12_H-768_A-12 > out.file 2>&1 &

In [None]:
data_questions

In [None]:
from bert_serving.client import BertClient
bc = BertClient(check_length = False)
question_bert_embeddings = bc.encode(data_questions)

In [None]:
from numpy import dot
from numpy.linalg import norm
from sklearn.metrics.pairwise import cosine_similarity

def find_similar_question(sentence,data_questions,question_embeddings):
  sentence_embedding = bc.encode([sentence])
  maxm = -1
  id = 0
  for j,question in enumerate(question_embeddings):
    dist = dot(question, sentence_embedding[0])/(norm(question)*norm(sentence_embedding[0]))
    if(dist > maxm):
      maxm = dist
      id = j
  return data_questions[id],maxm

#Server

In [None]:
from flask import Flask, render_template, request, jsonify
from flask_ngrok import run_with_ngrok
from flask_sslify import SSLify

In [None]:
import time
times = []

app = Flask(__name__)
sslify = SSLify(app)
app.config['SEND_FILE_MAX_AGE_DEFAULT'] = 0
run_with_ngrok(app)  

@app.route('/') # default route
def new():
  return render_template('index_final.html') 

@app.route('/process', methods = ['POST'])
def process():
  curr = time.time()
  threshold_score = 0.5
  
  name = request.get_data().decode('utf-8')
  print("The text is", name)
  
  result = ""
  video_filename = ""
  source = ""
  cached = True
  
  if name.lower() in index_database:
    print("Response found in the cache")
    video_filename = str(index_database[name.lower()][0]) + ".mp4"
    result = index_database[name.lower()][1]
    source = index_database[name.lower()][2]
    video_filename = "/static/Video_cache2/" + video_filename

  else:
    similarity_response = find_similar_question(name,data_questions,question_bert_embeddings)
    sim_qs = similarity_response[0]
    sim_score = similarity_response[1]
    print(sim_score)
    
    if (sim_score > threshold_score):
      print("Similar response found in the cache")
      video_filename = str(index_database[sim_qs.lower()][0]) + ".mp4"
      result = index_database[sim_qs.lower()][1]
      source = index_database[sim_qs.lower()][2]
      video_filename = "/static/Video_cache2/" + video_filename

    else:
      cached = False
      answers = phase_one_end([name],data,model,tokenizer)
      if (answers[0] != 'GPT2' and answers[0] != 'PASS' and answers[0] != '[CLS]'):
        print("Response extracted")
        result = answers[0]
        source = "Fact"
      else:
        print("Response generated")
        result = regeneration(query,words,model_nsp,tokenizer_nsp,sess)
        source = "GPT2"

  sen = split_into_sentences(result)
  
  if (len(sen) == 0):
    result = result + "."
  else:
    result = sen[0]
  
  result = fix(result)
  result = ansi_escape.sub('', result)
  
  print(result)
  print(source)

  if (cached == False):
    !rm -f /content/Kalam-AI-2/static/result_voice.mp4
    synthesize_text(result)
    !cd Wav2Lip && python inference.py --checkpoint_path checkpoints/wav2lip_gan.pth --face "/static/Final-speaking_111.mp4" --audio "t2s.mp3" 
    !cp -ru "/content/Kalam-AI-2/Wav2Lip/results/result_voice.mp4" "/content/Kalam-AI-2/static"
    video_filename = "/static/result_voice.mp4"
  
  times.append([name,time.time() - curr])
  return jsonify(result = result, vid = video_filename, source = source)

In [None]:
app.run()