In [1]:
import re
import spacy
import operator
import numpy as np
from nltk.corpus import stopwords
from nltk.tokenize import sent_tokenize, word_tokenize
import tensorflow as tf
from sentence_transformers import SentenceTransformer
from rank_bm25 import *
import fitz
from datasets import load_dataset

C:\Users\Melvin\anaconda3\lib\site-packages\numpy\.libs\libopenblas.QVLO2T66WEPI7JZ63PS3HMOHFEY472BC.gfortran-win_amd64.dll
C:\Users\Melvin\anaconda3\lib\site-packages\numpy\.libs\libopenblas.WCDJNK7YVMPZQ2ME2ZZHJJRJ3JIKNDB7.gfortran-win_amd64.dll


In [2]:
#Deutsche Stopwörter und Lemmatizer laden
german_stopwords = stopwords.words('german')
lemmatizer = spacy.load('de_core_news_sm')

In [3]:
#BERT Modell aus Huggingface laden
bert_model = SentenceTransformer("svalabs/bi-electra-ms-marco-german-uncased")



In [4]:
# Preprocessing der PDF-Texte
def BERTpreprocessing(text):
    sentenceDictionary = dict()
    sentences = sent_tokenize(text, language='german')
    for i in range(len(sentences)):
        sentence = sentences[i]
        cleanedSentence = cleanData(sentence)
        sentenceVector = bert_model.encode(cleanedSentence)
        sentenceDictionary[i] = [sentence,cleanedSentence,sentenceVector]
    return sentenceDictionary

# BM25 spezifisches Preprocessing
def BMPreprocessing(text):
    text = cleanData(text)
    text = removeStopwords(text)
    text = lemmatize(text)
    text = word_tokenize(text, language='german')
    return text

def removeStopwords(text):
    return ' '.join([word for word in text.split() if word not in german_stopwords])

def lemmatize(text):
    doc = lemmatizer(text)
    return ' '.join([x.lemma_ for x in doc]) 

def cleanData(text):
    text = text.lower()
    text = re.sub(r"\s+[a-zA-Z]\s+", ' ', text)
    text = re.sub(r'\s+', ' ', text)
    text = re.sub('[^a-zA-ZäöüÄÖÜß]', ' ', text)
    return text

In [5]:
# Evaluierung des Modells anhand GermanDPR
dataset = load_dataset("deepset/germandpr")

# Anfragen extrahieren
queriesDPR = dataset['test']['question']

# Kontextkorpus bilden
contextsDPR = []
for i in range(len(dataset['test'])):
    contextsDPR.extend([dataset['test']['positive_ctxs'][i]['text'],
                    dataset['test']['negative_ctxs'][i]['text'],
                    dataset['test']['hard_negative_ctxs'][i]['text']])

filteredContextsDPR = []
for listItem in contextsDPR:
    if len(listItem) == 0:
        continue
    elif len(listItem) == 1:
        filteredContextsDPR.append(listItem[0])
    else:
        tempList = []
        for item in listItem:
            tempList.append(item)
        filteredContextsDPR.extend(tempList)
        
# Deduplizieren
filteredContextsDPR = list(set(filteredContextsDPR))

# Richtige Antworten extrahieren
correctPassagesDPR = []
for i in range(len(dataset['test'])):
    correctPassagesDPR.append(filteredContextsDPR.index(dataset['test']['positive_ctxs'][i]['text'][0]))



  0%|          | 0/2 [00:00<?, ?it/s]

In [6]:
#Daten für BM25 bereinigen
BM25_EvaluationData, i = dict(), 0
for text in filteredContextsDPR:
    BM25_EvaluationData[i] = BMPreprocessing(text)
    i+=1

#Daten für BERT bereinigen
BERT_EvaluationData, i = dict(), 0
for text in filteredContextsDPR:
    BERT_EvaluationData[i] = BERTpreprocessing(text)
    i+=1

In [7]:
#BM25 Modell initialisieren
bm25 = BM25Okapi(list(BM25_EvaluationData.values()))

In [10]:
#Kosinus-Ähnlichkeit
def cosineSimilarity(vector1, vector2):
    return np.dot(vector1,vector2) / (np.linalg.norm(vector1) * np.linalg.norm(vector2))

#BERT Voraussage
def get_BERT_Prediction_Eval(query, docIds=None):
    queryEncoded = bert_model.encode(query)
    bertScores = dict()
    
    for docID, sentenceDictionary in BERT_EvaluationData.items():
        if docIds != None:
            if docID not in docIds:
                continue
        scores,docBestSentences = [], dict()
        for sentID, values in sentenceDictionary.items():
            similarity = cosineSimilarity(queryEncoded,values[2])
            docBestSentences[sentID] = similarity
            scores.append(similarity)
        docBestSentences = dict(sorted(docBestSentences.items(), key=operator.itemgetter(1),reverse=True))
        docSimilarity = np.mean(sorted(scores,reverse=True)[:5])
        bertScores[docID] = np.mean(sorted(scores,reverse=True)[:5])
    return dict(sorted(bertScores.items(), key=operator.itemgetter(1),reverse=True))

#BM25 Voraussage
def get_BM25_Prediction_Eval(query):
    query= cleanData(query)
    query = removeStopwords(query)
    query = lemmatize(query)
    query = word_tokenize(query.lower(), language='german')
    
    doc_scores = bm25.get_scores(query)
    bm25Scores = dict()
    for i in range(len(doc_scores)):
        #if doc_scores[i] != 0:
        bm25Scores[i] = doc_scores[i]
    return dict(sorted(bm25Scores.items(), key=operator.itemgetter(1),reverse=True))

# Kombinierte Voraussage mit Theta als Schwellwert und k1 als Gewichtubngsparameter
def get_Combined_Prediction_Eval(query,theta,k1):
    bertScores = get_BERT_Prediction_Eval(query)
    bm25Scores = get_BM25_Prediction_Eval(query)
    combinedScores = dict()
    
    if len(bm25Scores) == 0 and len(bertScores) == 0:
        return dict()
    if len(bm25Scores) == 0:
         return bertScores
    if len(bertScores) == 0:
        return bm25Scores
    
    if len(bm25Scores) == 1:
        for docId, score in bm25Scores.items():
            bm25Scores[docId] = 1
    else:
        minBM25,maxBM25 = np.min(list(bm25Scores.values())), np.max(list(bm25Scores.values())) 
        
        #BM25 normalisieren
        for docId, score in bm25Scores.items():
            bm25Scores[docId] = (score - minBM25) / (maxBM25 - minBM25)
            
    if len(bertScores) == 1:
        for docId, score in bertScores.items():
            bertScores[docId] = 1
    else:
        minBert,maxBert = np.min(list(bertScores.values())), np.max(list(bertScores.values())) 
        
       #BERT normalisieren
        for docId, score in bertScores.items():
            bertScores[docId] = (score - minBert) / (maxBert - minBert)
        
    #Kombinieren mit k1 Gewichtung
    for docId in BM25_EvaluationData.keys():
        bm25Score, bertScore = 0,0
        if docId in bm25Scores:
            bm25Score = bm25Scores[docId]  
        if docId in bertScores:
            bertScore = bertScores[docId]
        combined = k1 * bertScore + (1-k1)* bm25Score 
        if combined > theta:
            combinedScores[docId] = k1 * bertScore + (1-k1)* bm25Score 
    resultScores = dict(sorted(combinedScores.items(), key=operator.itemgetter(1),reverse=True))
    return resultScores

In [11]:
#Methode zur Erzeugung aller Voraussagen
def getPredictions():
    predicted = []
    relevant = []
    for i in range(len(queriesDPR)):
        prediction = get_Combined_Prediction_Eval(queriesDPR[i],0,0.7)
        if len(prediction) != 0:
            predicted.append(list(prediction.keys()))
            relevant.append(correctPassagesDPR[i]) 
    return predicted, relevant

predicted, relevant = getPredictions()

  bm25Scores[docId] = (score - minBM25) / (maxBM25 - minBM25)



In [12]:
# Recall@k
def recallEval(predicted, relevant,k):
    count = 0
    for i in range(k):
        if predicted[i] == relevant:
            count +=1
    return count 

# Average Recall@k
def averageRecall(predicted, relevant,k=10):
    averageRecall = 0
    for i in range(len(predicted)):
        averageRecall += recallEval(predicted[i],relevant[i],k)
    return averageRecall / len(relevant)


# Exact Matches (first is hit)
def EM(predicted, relevant):
    em = 0
    for i in range(len(predicted)):
        if predicted[i][0] == relevant[i]:
            em+=1
    return em /len(relevant)

#Precision@k
def precisionEval(predicted, truth, k):
    summe,count = 0, 0
    for i in range(k+1):
        if int(predicted[i]) == truth:
            count+=1
    summe+= count/ (i+1)
    return summe    

# AP@k
def average_precisionEval(predicted,relevant):
    averageSum = 0
    scores = dict()
    for i in range(0,len(predicted)):
        summe = 0  
        if int(predicted[i]) == relevant:
            summe+= precisionEval(predicted, relevant, i)
        averageSum += summe
    return averageSum

#MAP
def mean_average_precisionEval(predicted,relevant):
    summe = 0
    for i in range(0,len(predicted)):
        summe += average_precisionEval(predicted[i],relevant[i])
    return summe/len(predicted)

#RR
def reciprocal_rankEval(predicted,relevant):
    for i in range(0, len(predicted)):
        if int(predicted[i]) == relevant:
            return 1/(i+1)
    return 0

#MRR
def mean_reciprocal_rankEval(predicted,relevant):
    summe = 0
    for i in range(0,len(predicted)):
        rec_rank = reciprocal_rankEval(predicted[i],relevant[i])
        summe += rec_rank
    return summe/len(predicted)

In [13]:
def getMetrics(predicted, relevant):
    em = EM(predicted, relevant)
    ar = averageRecall(predicted, relevant,10)
    return em, ar
getMetrics(predicted, relevant)

(0.3583984375, 0.9296875)