In [None]:
# mount to google drive
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
# install necessary libraries
!pip install wikipedia
!pip install Whoosh
!pip install transformers
!pip install sentencepiece
!pip install transformers[sentencepiece]
!pip install fasttext
!pip install pycountry
!pip install sacremoses


In [None]:
# import all needed libraries
import wikipedia
import codecs 
from IPython.core.display import display, HTML
from whoosh.index import * 
from whoosh.fields import *
from whoosh import qparser
import glob
import random
from transformers import BertForQuestionAnswering
import torch
from transformers import BertTokenizer
from transformers import pipeline
import fasttext
from pycountry import languages
import spacy

In [None]:
# first attempts of combining IR and the Wikipedia Search, with intial tests
class IR(object):
    def __init__(self, passages):
        schema = Schema(id = ID(stored=True,unique=True),
                        text = TEXT(analyzer=analysis.StemmingAnalyzer())
                        )
        self.passages = passages
        if not os.path.exists("index"):
              os.mkdir("index")
        ix = create_in("index", schema)
        writer = ix.writer()
        for ind,passage_text in enumerate(self.passages): 
            writer.add_document(id=str(ind),text=passage_text)
        writer.commit()
        self.ix = ix
        

    def retrieve_documents(self, query,topk):
        scores=[]
        text=[]
        with self.ix.searcher() as searcher:
            searcher = self.ix.searcher()
            q = qparser.QueryParser("text", self.ix.schema, group=qparser.OrGroup).parse(query)
            results = searcher.search(q, limit=topk)
        for hit in results:
            scores.append(hit.score)
            text.append(self.passages[int(hit['id'])])
        return text, scores

def get_article(question):
  candidates = wikipedia.search(question)
  searcher = IR(candidates)
  return searcher.retrieve_documents(question,1)

print(get_article("Who was george washington?"))
print(get_article("Who is Barack Obama?"))
print(get_article("What frequency has visible light?"))
print(wikipedia.summary(get_article("What frequency has visible light?")[0][0]))

In [None]:
# spacy test stuff
nlp = spacy.load("en_core_web_sm")
nlp.get_pipe("ner").labels
doc = nlp(u"Where is spain")
for ent in doc.ents:
    print(ent.text, ent.start_char, ent.end_char, ent.label_)
doc = nlp(u"What is NLP?")
for ent in doc.ents:
    print(ent.text, ent.start_char, ent.end_char, ent.label_)
doc = nlp(u"What is the BERT language model?")
for ent in doc.ents:
    print(ent.text, ent.start_char, ent.end_char, ent.label_)   
doc = nlp(u"Who was George Washington and what did he have to do with elvis presley?")
for ent in doc.ents:
    print(ent.text, ent.start_char, ent.end_char, ent.label_)

In [None]:
# All necessary modules and submodules

# informantion retrival module
class IR(object):
    def __init__(self, passages):
        schema = Schema(id = ID(stored=True,unique=True),
                        text = TEXT(analyzer=analysis.StemmingAnalyzer())
                        )
        self.passages = passages
        if not os.path.exists("index"):
              os.mkdir("index")
        ix = create_in("index", schema)
        writer = ix.writer() #run once! or restart runtime
        for ind,passage_text in enumerate(self.passages): 
            writer.add_document(id=str(ind),text=passage_text)
        writer.commit()
        self.ix = ix
        

    def retrieve_documents(self, query,topk):
        scores=[]
        text=[]
        with self.ix.searcher() as searcher:
            searcher = self.ix.searcher()
            q = qparser.QueryParser("text", self.ix.schema, group=qparser.OrGroup).parse(query)
            results = searcher.search(q, limit=topk)
        for hit in results:
            scores.append(hit.score)
            text.append(self.passages[int(hit['id'])])
        return text, scores

# question answering module
class QA(object):
    def __init__(self):       
        self.model = BertForQuestionAnswering.from_pretrained('bert-large-uncased-whole-word-masking-finetuned-squad')
        self.tokenizer = BertTokenizer.from_pretrained('bert-large-uncased-whole-word-masking-finetuned-squad')

    def answer_question(self, query,passage):
        # encode inputs, only use first 512 tokens, since the BERT transformer is limited
        input_ids = self.tokenizer.encode(query,passage)[:512]
        tokens = self.tokenizer.convert_ids_to_tokens(input_ids)
        sep_index = input_ids.index(self.tokenizer.sep_token_id)
        num_seg_a = sep_index + 1
        num_seg_b = len(input_ids) - num_seg_a
        segment_ids = [0]*num_seg_a + [1]*num_seg_b
        outputs = self.model(torch.tensor([input_ids]), 
                             token_type_ids=torch.tensor([segment_ids]),
                             return_dict=True) 
        # extract scores
        start_scores = outputs.start_logits[0][:512]
        end_scores = outputs.end_logits[0][:512]
        max_score = -float('inf')
        index_start = None
        index_end = None
        # get best combination of indices
        for i in range(len(start_scores[num_seg_a:])):
            for j in range(len(end_scores[num_seg_a + i:-1])):
              current_score = start_scores[num_seg_a + i] + end_scores[num_seg_a + i + j]
              if current_score >= max_score:
                  max_score = current_score
                  index_start = num_seg_a + i
                  index_end = num_seg_a + i + j
        answer_start = index_start
        answer_end = index_end
        answer = tokens[answer_start]
        # buid answer from tokens
        for i in range(answer_start+1, answer_end+1):          
            if tokens[i][0:2] == '##':
                answer += tokens[i][2:]           
            else:
                # filter out remaining seperators if necessary
                if tokens[i] == '[SEP]':
                  continue
                else: 
                  answer += ' ' + tokens[i]
        # return answer plus confidence score
        return 'Answer: "' + answer + '"', max_score.item()/2

# Wrapper class for IR and QA modules, combined with the wikipedia pipeline
class QA_pipeline(object):
    def __init__(self):        
        self.qa = QA()
        self.nlp = spacy.load("en_core_web_sm")

    def get_best_candidate(self, question, verbose=False):
        doc = self.nlp(question)
        # if an entity has been found, go down this path      
        if doc.ents:
          entity = doc.ents[0]
          if (entity.label_ == "GPE") or (entity.label_ == "PERSON") or(entity.label_ == "ORG"):
            candidate = entity.text
            if verbose:
              print("initial result of spacy: " + candidate)
            # compare wikipedia search results with the found entity using the IR module
            candidates = wikipedia.search(candidate)
            searcher = IR(candidates)
            ordered_candidates = searcher.retrieve_documents(question,1)
            if verbose:
              print(candidates)
              print(ordered_candidates)
            if ordered_candidates[0]:
              candidate = ordered_candidates[0][0]
            else:
              candidate = candidates[0]
            if verbose:
              print("result after retrieval: " + candidate)
            try:
              passage = wikipedia.summary(candidate)         
            except:
              return "There is no Information on this topic available, please ask something else", False
            else:
              return passage, True
        # otherwise compare wikipedia search results with the question using the IR module
        candidates = wikipedia.search(question)
        searcher = IR(candidates)
        ordered_candidates = searcher.retrieve_documents(question,1)
        if verbose:
          print(candidates)
          print(ordered_candidates)
        if ordered_candidates[0]:
          candidate = ordered_candidates[0][0]
        else:
          candidate = candidates[0]
        try:
            passage = wikipedia.summary(candidate)
        except:
            return "There is no Information on this topic available, please ask something else", False
        else:
            if verbose:
              print(passage)
            return passage, True

    def answer_question(self, query, verbose=False):
        passage, success = self.get_best_candidate(query, verbose=verbose)
        if success:
          return self.qa.answer_question(query, passage)
        else:
          return passage, 0

In [None]:
# language identification and translation components as well as final wrapper class
class FastTextIdentifier(object):
    def __init__(self, file_path, k=5):
        # load fasttext model and define a k
        self.identifier = fasttext.load_model(file_path)
        self.k = k

    def predict_language(self, language_sample, verbose=False):
        # filter out languages in wrong ISO format
        forbidden_lang_list = ['als', 'arz', 'ast', 'azb', 'bar', 'bcl', 'bh', 'bpy', 'bxr', 'cbk', 'ceb', 'ckb', 'diq', 'dsb', 'dty', 'eml', 'frr', 'gom', 'hif', 'hsb', 'ilo', 'jbo', 'krc', 'lez', 'lmo', 'lrc', 'mai', 'mhr', 'min', 'mrj', 'mwl', 'myv', 'mzn', 'nah', 'nap', 'nds', 'new', 'pam', 'pfl', 'pms', 'pnb', 'rue', 'sah', 'scn', 'sco', 'tyv', 'vec', 'vep', 'vls', 'war', 'wuu', 'xal', 'xmf', 'yue']
        predictions, pred_score = self.identifier.predict(language_sample, k=self.k if verbose else 2)
        short_list = [prediction.split("__")[-1] for prediction in predictions]
        index_list = []
        # filter trough candidates
        list_it = short_list.copy()
        for i, j in enumerate(list_it):
            if j in forbidden_lang_list:
                pass
            else:
                index_list.append(i)
        short_list = [short_list[i] for i in index_list]
        if short_list is None:
            success = False
        else:
            success = True
        if verbose and success:
            name_list = [languages.get(alpha_2=lang).name for lang in short_list]
            for i, language in enumerate(name_list):
                print("{order}. language_detected: {lang} wit a score of {score}".format(order=i+1, lang=language, score=pred_score[i]))
        
        return short_list[0], pred_score[0], success



class TwoWayTranslator(object):
    def __init__(self):
        self.language_list = []
        self.unknown_language_list = []
        self.pipeline_dict = {}
        self.back_pipeline_dict = {}


    def get_language_model(self, source_language):
        success = True
        # prepare parameters
        model = "Helsinki-NLP/opus-mt-{src}-{dst}".format(src = source_language, dst = "en")
        model_back = "Helsinki-NLP/opus-mt-{src}-{dst}".format(src = "en", dst = source_language)
        translation_direction = "translation_{src}_to_{dst}".format(src = source_language, dst = "en")
        translation_direction_back = "translation_{src}_to_{dst}".format(src = "en", dst = source_language)
        # try to get translation pipelines
        try:
            translator  = pipeline(translation_direction, model=model, tokenizer=model)
            back_translator  = pipeline(translation_direction_back, model=model_back, tokenizer=model_back)
        except:
            success = False
            self.unknown_language_list.append(source_language)
        else:
            self.pipeline_dict[source_language] = translator
            self.back_pipeline_dict[source_language] = back_translator
            self.language_list.append(source_language)
        finally:
            return success


    def translate(self, text, source_language, to_english):
        if source_language in self.unknown_language_list:
            return "Translation was not possible, no Language Model exists for this Language", False
        if source_language not in self.language_list:
            success = self.get_language_model(source_language)
        else:
            success = True
        if not success:
            return "Translation was not possible, no Language Model exists for this Language", False
        else:
            if to_english:
                return self.pipeline_dict[source_language](text)[0]["translation_text"], True
            else: 
                return self.back_pipeline_dict[source_language](text)[0]["translation_text"], True


# final wrapper class containing all modules
class MT_QA_Pipeline(object):
    def __init__(self, identification_threshold=0.40):
        model_path = 'drive/My Drive/Colab Notebooks/nlp-appl2-project/models/lid.176.bin' # big model works a lot better
        self.fasttxt = FastTextIdentifier(model_path, k=6)
        self.qa_system = QA_pipeline()
        self.two_way_translator = TwoWayTranslator()
        self.thresh = identification_threshold

    def answer_question(self, query, verbose=False):
        source_language, score, success = self.fasttxt.predict_language(query, verbose=verbose)
        if score < self.thresh:
            return "Language could not be identified for certain, Please try a different language or mean of phrasing it"
        if source_language == 'en':
            #print("success")
            english_question = query
        else:
            english_question, success = self.two_way_translator.translate(query, source_language, to_english=True)
            if verbose:
              print(english_question)
            if not success:
                return english_question

        english_answer, _ = self.qa_system.answer_question(english_question, verbose=verbose)
        if verbose:
            print(english_answer)
        if source_language == 'en':
            answer = english_answer
        else:
            answer, _ = self.two_way_translator.translate(english_answer, source_language, to_english=False)
        return answer

In [None]:
mt_qa_module=MT_QA_Pipeline()



In [None]:
# input loop for question input, can by stopped with the safe word "stop"
while True:
    question = input()
    if question == "stop":
      break
    print(mt_qa_module.answer_question(question))
# Who was George Washington?
# Who was Barack Obama?
# What frequency has visible light?
# When was the first car built? --> summary of car leads to cat for whatever reason lol
# What is NLP?
# What is the BERT language model?
# How many people live in london?
# How many people live in paris?
# How many people live in france?
# What language is spoken in france?
# What language is spoken in Nigeria?
# What are the symptoms of Covid19?
# what is Pernicious anemia?
# What language is spoken on the Bahamas?
# What frequency has visible light? 
# Wer war George Washington?
# Wer ist Barack Obama?
# Wer ist Barack Hussein Obama II?
# Wo ist Barack Obama geboren worden?
# ¿Quien es Barack Obama?
# ¿Quien es Stephen King?
# ¿Que es la pelicula mas famosa de Steven Spielberg?
# ¿Que es el libro mas famoso de Hemmingway?
# Wann wurde Deutschland wiedervereinigt?
# Was ist der Sinn des Lebens?
# ¿Que es el sentido de la vida?