# Question & Answer System
by ES

import:

In [1]:
%matplotlib inline

import numpy as np
import matplotlib.pyplot as plt
import matplotlib.ticker as ticker
import sys
import json 
import re
import gensim
import string
import math
import nltk
import random
import csv
import io
import spacy
import scipy
import heapq
from gensim.summarization import bm25
from collections import Counter
from nltk import pos_tag, word_tokenize
from nltk.tree import Tree
from nltk.corpus import wordnet as wn
from nltk.corpus import stopwords



Open and load files:

In [2]:
with open('training.json','r') as f:
    training = json.load(f)

with open('documents.json','r') as f:
    documents = json.load(f)

with open('devel.json','r') as f:
    develop = json.load(f)
    
with open('testing.json','r') as f:
    testing = json.load(f)

TEXT SIMILARITY
Use bm25 to calculate the similarity between question and each paragraph in the target document. Get the paragraph with highest score.
For test set, return the question and paragraph.
For training/develop set, return the index of paragraph, index of right paragraph, and right answer as well

In [3]:
lemmatizer = nltk.stem.wordnet.WordNetLemmatizer()
def lemmatize(word):
    lemma = lemmatizer.lemmatize(word,'v')
    if lemma == word:
        lemma = lemmatizer.lemmatize(word,'n')
    return lemma

def wordPreprocess(word_list):
    stopWords = set(stopwords.words('english'))
    word_list_preprocessed = []
    for word in word_list:
        word_processed = lemmatize(word.lower())
        if word_processed not in stopWords and word_processed not in string.punctuation:
            word_list_preprocessed.append(word_processed)
    return word_list_preprocessed

documents_dict = {}
t = str(len(documents))
num = 0
for doc in documents:
    sys.stdout.write(('getting documents dict: '+'{0}/'+t+'\r').format(num + 1))
    sys.stdout.flush()
    para_list = []
    for para in doc["text"]:
        word_list = wordPreprocess(word_tokenize(para))
        para_list.append(word_list)
    documents_dict[doc["docid"]]=para_list
    num+=1

getting documents dict: 441/441

In [4]:
develop_data = []
t = str(len(develop))
num = 0
for question in develop:
    sys.stdout.write(('getting develop data: '+'{0}/'+t+'\r').format(num + 1))
    sys.stdout.flush()
    bm25Model = bm25.BM25(documents_dict[question["docid"]])
    average_idf = sum(map(lambda k: float(bm25Model.idf[k]), bm25Model.idf.keys())) / len(bm25Model.idf.keys())
    scores = bm25Model.get_scores(wordPreprocess(word_tokenize(question["question"])),average_idf)
    idx_list = heapq.nlargest(2, enumerate(scores), key=lambda x:x[1])
    idx, vals = zip(*idx_list)
    develop_data.append((question["question"],[documents[question["docid"]]["text"][idx[0]],documents[question["docid"]]["text"][idx[0]]],idx,question["answer_paragraph"],question["text"]))
    num+=1

getting develop data: 3097/3097

In [5]:
test_data = []
t = str(len(testing))
num = 0
for question in testing:
    sys.stdout.write(('getting develop data: '+'{0}/'+t+'\r').format(num + 1))
    sys.stdout.flush()
    bm25Model = bm25.BM25(documents_dict[question["docid"]])
    average_idf = sum(map(lambda k: float(bm25Model.idf[k]), bm25Model.idf.keys())) / len(bm25Model.idf.keys())
    scores = bm25Model.get_scores(word_tokenize(question["question"]),average_idf)
    idx_list = heapq.nlargest(2, enumerate(scores), key=lambda x:x[1])
    idx, vals = zip(*idx_list)
    test_data.append((question["question"],[documents[question["docid"]]["text"][idx[0]],documents[question["docid"]]["text"][idx[0]]],idx))
    num+=1

getting develop data: 3618/3618

Evaluate function:
print the acurate of similarity part

In [6]:
def tfidfAcurate(data_list):
    data_count = 0
    correct_count = 0
    for data in data_list:
        data_count+=1
        if data[2][0]==data[3] or data[2][1]==data[3]:
            correct_count +=1
    return float(correct_count)/data_count

print(tfidfAcurate(develop_data))

0.8737487891507911


Get Answer:

In [32]:
def getEntity(sentence):
    ## spacy
    nlp = spacy.load('en_core_web_sm')
    doc = nlp(sentence)
    return doc.ents

def getTextByLabel(entitys,label_list):
    answer_list = []
    for lab in label_list:
        for entity in entitys:
            for ent in entity:
                label = ent.label_
                if label == lab:
                    answer_list.append(ent.text)
    return answer_list

def getAnswer(data_list):
    nlp = spacy.load('en_core_web_sm')
    t = str(len(data_list))
    now = 0
    answer=[]
#     print(type(answer))
    id_num = 0
    count = [0,0,[0,0,0,0,0,0,0,0],[0,0,0,0],0,[0,0,0,0],0]
    useList = [0,0]
    for data in data_list:
        sys.stdout.write(('{0}/'+t+'\r').format(id_num + 1))
        sys.stdout.flush()
        paragraph_entity = [getEntity(data[1][0]),getEntity(data[1][1])]
        answer_list = list()
        
        if bool(re.search('who',data[0],re.IGNORECASE) or re.search('name',data[0],re.IGNORECASE)):
            count[0]+=1
            answer_list = getTextByLabel(paragraph_entity,["PERSON","NORP","ORG"])
        elif bool(re.search('where',data[0],re.IGNORECASE) or re.search('place',data[0],re.IGNORECASE)):
            count[1]+=1
            answer_list = getTextByLabel(paragraph_entity,["GPE","LOC","FACILITY","ORG"])
        elif bool(re.search('what',data[0],re.IGNORECASE)):
            if bool(re.search('what time',data[0],re.IGNORECASE)):
                count[2][0]+=1
                answer_list = getTextByLabel(paragraph_entity,["DATE","TIME"])
            elif bool(re.search('what year',data[0],re.IGNORECASE) or re.search('what day',data[0],re.IGNORECASE) or re.search('what date',data[0],re.IGNORECASE)):
                count[2][1]+=1
                answer_list = getTextByLabel(paragraph_entity,["DATE"])
            elif bool(re.search('what city',data[0],re.IGNORECASE) or re.search('what country',data[0],re.IGNORECASE)):
                count[2][2]+=1
                answer_list = getTextByLabel(paragraph_entity,["GPE"])
            elif bool(re.search('what value',data[0],re.IGNORECASE)):
                count[2][3]+=1
                answer_list = getTextByLabel(paragraph_entity,["QUANTITY","MONEY","CARDINAL","PERCENT"])
            elif bool(re.search('what percentage',data[0],re.IGNORECASE)):
                count[2][4]+=1
                answer_list = getTextByLabel(paragraph_entity,["PERCENT"])
            elif bool(re.search('event',data[0],re.IGNORECASE)):
                count[2][5]+=1
                answer_list = getTextByLabel(paragraph_entity,["EVENT"])
            elif bool(re.search('language',data[0],re.IGNORECASE)):
                count[2][6]+=1
                answer_list = getTextByLabel(paragraph_entity,["LANGUAGE"])
            else:
                count[2][7]+=1
                answer_list = getTextByLabel(paragraph_entity,["PRODUCT","WORK_OF_ART","EVENT","FACILITY","ORG"])
        elif bool(re.search('how',data[0],re.IGNORECASE)):
            if bool(re.search('how much',data[0],re.IGNORECASE)):
                count[3][0]+=1
                answer_list = getTextByLabel(paragraph_entity,["MONEY"])
            if bool(re.search('how many',data[0],re.IGNORECASE)):
                count[3][1]+=1
                answer_list = getTextByLabel(paragraph_entity,["QUANTITY","MONEY","CARDINAL","PERCENT"])
            if bool(re.search('how does',data[0],re.IGNORECASE) or re.search('how was',data[0],re.IGNORECASE)):
                count[3][2]+=1
                answer_list = getTextByLabel(paragraph_entity,["LAW","EVENT"])
            else:
                count[3][3]+=1
                answer_list = getTextByLabel(paragraph_entity,["QUANTITY","MONEY","CARDINAL"])
        elif bool(re.search('when',data[0],re.IGNORECASE)):
            count[4]+=1
            answer_list = getTextByLabel(paragraph_entity,["DATE","TIME"])
        elif bool(re.search('which',data[0],re.IGNORECASE)):
            if bool(re.search('which person',data[0],re.IGNORECASE)):
                count[5][0]+=1
                answer_list = getTextByLabel(paragraph_entity,["PERSON"])
            elif bool(re.search('which time',data[0],re.IGNORECASE)):
                count[5][1]+=1
                answer_list = getTextByLabel(paragraph_entity,["DATE","TIME"])
            elif bool(re.search('which country',data[0],re.IGNORECASE)):
                count[5][2]+=1
                answer_list = getTextByLabel(paragraph_entity,["NORP","ORG"])
            else:
                count[5][3]+=1
                answer_list = getTextByLabel(paragraph_entity,["FACILITY","PRODUCT","WORK_OF_ART","EVENT","ORG"])
        else:
            count[6]+=1
        for ans in answer_list:
            if ans in data[0]:
                answer_list.remove(ans)
                
        if len(answer_list)==0:
            useList[1]+=1
            
            try:
                if not len(paragraph_entity[0])==0:
                    for ent in paragraph_entity[0]:
                        if ent.text in data[0]:
                            paragraph_entity[0].remove(ent)
                    index = random.randint(0,len(paragraph_entity[0])-1) 
                    answer.append (([str(id_num)+','+paragraph_entity[0][index].text.replace(',','')],data[2]))
                else:
                    for ent in paragraph_entity[1]:
                        if ent.text in data[0]:
                            paragraph_entity[1].remove(ent)
                    index = random.randint(0,len(paragraph_entity[1])-1) 
                    answer.append (([str(id_num)+','+paragraph_entity[1][index].text.replace(',','')],data[2]))
            except:
                doc = nlp(data[1][0])
                index = random.randint(0,len(doc)-1)
                answer.append(([str(id_num)+','+doc[index].text.replace(',','')],data[2]))
        else:
            try:
                if answer_list [0] not in data[0]:
#                     print(answer_list[0])
                    answer.append(([str(id_num)+','+answer_list[0].replace(',','')],data[2]))
                elif answer_list [1] not in data[0]:
                    answer.append(([str(id_num)+','+answer_list[1].replace(',','')],data[2]))
                elif answer_list [2] not in data[0]:
                    answer.append(([str(id_num)+','+answer_list[2].replace(',','')],data[2]))
                else:
                    answer.append(([str(id_num)+','+answer_list[3].replace(',','')],data[2]))
            except:
                doc = nlp(data[1][0])
                index = random.randint(0,len(doc)-1)
                answer.append(([str(id_num)+','+doc[index].text.replace(',','')],data[2]))
            useList[0]+=1
        id_num +=1
    print("count ",count)
    print("use list ",useList)
    return answer

In [33]:
# answer = getAnswer(test_data)
answer = getAnswer(develop_data)

8/3097

KeyboardInterrupt: 

Evaluate function:
print the acurate of answer:

In [None]:
def campare(question,answer):
    if question[4].lower()==str(answer[0]).strip("'").split(',',1)[1][:-2].lower():
        return True
    else:
        print(question[0],"answer: "+question[4],"your answer: "+str(answer[0]).strip("'").split(',',1)[1][:-2])
        return False

def answerAcurate(data_list,answer):
    data_count = 0
    correct_count = 0
    results = map(campare,data_list,answer)
    for result in results:
        data_count+=1
        if result:
            correct_count +=1
    return float(correct_count)/data_count

print(answerAcurate(develop_data,answer))
    

Write function:

In [None]:
##Write into csv file
def writeFile(answer):
#     sys.stdout = io.TextIOWrapper(sys.stdout.buffer,encoding='utf8')
    out = open('answer.csv','a',newline='')
    csv_write =csv.writer(out,dialect='excel')
    csv_write.writerow(["id,answer"])
    answerID = 0
    for line in answer:
        try:
            csv_write.writerow(line[0])
        except:
            print("line "+str(answerID)+" cannot write： "+str(line[0]))
        answerID+=1
    out.close()
    
writeFile(answer)
