### Importación de librerias

In [None]:
import xml.etree.ElementTree as ET
import pandas as pd
import numpy as np
import os, re
from gensim.parsing.porter import PorterStemmer
from nltk.corpus import stopwords

__paths to change__

In [None]:
# input variables
documents_path = './datos/docs-raw-texts/'
queries_path = './datos/queries-raw-texts/'
output_path = './salida/'
relevance_judgements_path = 'relevance-judgments.tsv'
results_file_path = './salida/'

### Read documents methods

In [None]:
def get_documents(path: str) -> list:
    """
    read raw text from naf documents located in the directory path
    """
    data = []
    for file in sorted(os.listdir(path)):
        if file.endswith(".naf"):
            tree = ET.parse(path + file)
            text = tree.find('raw').text
            header = tree.find('nafHeader')
            if header:
                desc = header.find('fileDesc')
                if desc:
                    title = desc.attrib.get('title')
                    text = title + ' ' + text if title else text
            data.append(text)
    return data

In [None]:
def remove_stopwords(document: str) -> list:
    """
    remove the english stop words from data
    """
    lower = document.lower()
    words = lower.split(' ')
    stop_words = stopwords.words('english')
    return [word for word in words if word not in stop_words]

In [None]:
def remove_nonlatin(document: str) -> str:
    """
    replace problematic characters
    """
    document = re.sub('\n', ' ', document)
    document = re.sub('[^a-zA-Z]|[0-9]', ' ', document)
    document = re.sub('\s+', ' ', document)
    return document

In [None]:
def preprocessing(document: str) -> list:
    """
    clean data by removing non-latin characters
    stem data sentences
    remove stop words from a document
    """
    porter = PorterStemmer()
    document = remove_nonlatin(document)
    document = porter.stem_sentence(document)
    document = remove_stopwords(document)
    return document

### indexes and doc-term matrix

In [None]:
def get_words_index(documents: pd.Series) -> pd.Index:
    """
    return a sorted index of every word in the texts
    """
    # get all words in all documents
    words = set()
    for document in documents:
        words.update(set(document))
    # sort the words
    sorted_words = sorted(list(words))
    # get index of sorted words
    words_frame = pd.DataFrame(sorted_words, columns=['data'])
    words_index = words_frame.set_index('data').index
    return words_index

In [None]:
def get_index_word(word: str, words_index: pd.Index) -> int:
    """
    return the provided word index
    """
    try: return words_index.get_loc(word)
    except: return -1

In [None]:
def get_doc_term(documents: pd.DataFrame, words_index: pd.Index) -> list:
    """
    return the document term matrix that indicate how many terms repeats in each document
    """
    doc_term = [[0]*len(documents) for _ in range(len(words_index))]
    for doc_index, document in documents.iterrows():
        for word in document.filtered:
            word_index = get_index_word(word, words_index)
            if word_index != -1:
                doc_term[word_index][doc_index] += 1
    return doc_term

## Representación vectorial ponderada tf.idf

In [None]:
def get_tf(doc_term: list) -> list:
    """
    return the ft score from each word in all the documents
    """
    return [[1 + np.log10(doc) if doc > 0 else 0 for doc in word] for word in doc_term]

In [None]:
def get_idf(doc_term: list) -> list:
    """
    return the idf score from each word in the entire collection
    """
    word_num = len(doc_term)
    return [np.log10(word_num/sum([1 if doc > 0 else 0 for doc in word])) for word in doc_term]

In [None]:
def get_tfidf(doc_term: list) -> list:
    """
    ponderate the tf-idf scores multiping them
    """
    tf = get_tf(doc_term)
    idf = get_idf(doc_term)
    return [[tf_scr * idf[i] for tf_scr in words] for i, words in enumerate(tf)]

In [None]:
doc_term = [
    [157, 73, 0, 0, 0, 0],
    [4, 157, 0, 1, 0, 0],
    [232, 227, 0, 2, 1, 1],
    [0, 1, 0, 0, 0, 0, 0],
    [57, 0, 0, 0, 0, 0],
    [2, 0, 3, 5, 5, 1],
    [2, 0, 1, 1, 1, 0]
]

In [None]:
tfidf = get_tfidf(doc_term)
tfidf

In [None]:
# Step 1: get documents
data = get_documents(documents_path)
documents = pd.DataFrame(data, columns=['data'])
documents

In [None]:
# Step 2: apply the preprocessing method
documents['filtered'] = documents.data.apply(preprocessing)
documents.head()

In [None]:
# Step 3: get word-index, doc-term, and the tfidf index
words_index = get_words_index(documents.filtered)
doc_term = get_doc_term(documents, words_index)
tfidf = get_tfidf(doc_term)

## similitud del coseno

In [None]:
def similitud_coseno(vector1:list,vector2:list) -> float:
    """
    Calcula la similitud entre dos vectores de documentos de acuerdo al ángulo entre estos
    haciendo uso de la función coseno

    Se hace uso de las funciones de numpy para poder calcular el coseno entre dos vectores,
    que por definición es el producto punto entre el vector1 y el vector2, dividido entre el 
    producto de las normas de ambos vectores

    Parameters
    ----------
    vector1 : list
        vector del primer documento a comparar
    vector2 : list
        vector del segundo documento a comparar

    Returns
    -------
    float
        La similitud coseno entre vector1 y vector2
    """
    producto_punto = np.dot(vector1,vector2)
    norma_1 = np.linalg.norm(vector1)
    norma_2 = np.linalg.norm(vector2)
    return producto_punto/(norma_1*norma_2)



## Procesamiento

In [None]:
"""
Se leen los documentos, se cargan en un dataFrame, se preprocesan para obtener los
tokens del corpus de los documentos.
"""
datos = get_documents(documents_path)
documentos = pd.DataFrame(datos,columns=['Documento'])
documentos['filtrada']=documentos['Documento'].apply(preprocessing)
doc_proc= documentos
doc_proc.filtrada = doc_proc.filtrada.apply(np.unique)
doc_proc.head()

In [None]:
"""
Se crea un diccionario a partir de los tokens obtenidos en todos
los documentos cargados anteriormente, donde la llave es el token 
y el valor es un id del token.
"""
dictionary = {}
for i in range(len(doc_proc)):
    for j in range(len(doc_proc.iloc[i]['filtrada'])):
        if doc_proc.iloc[i]['filtrada'][j] not in dictionary:
            dictionary[doc_proc.iloc[i]['filtrada'][j]] = len(dictionary)
dictionary_size = len(dictionary)
dictionary

In [None]:
def doc_to_vector(doc):
    """
    Transforma un documento a un vector binario de acuerdo al 
    diccionario generado a partir del conjunto total de documentos

    Parameters
    ----------
    doc : list
        vector de tokens del documento a convertir a vector binario

    Returns
    -------
    list
        documento como vector binario
    """

    vector = np.zeros(dictionary_size)
    for token in doc:
        if token in dictionary:
            vector[dictionary[token]] = 1
    return vector

In [None]:
"""
A partir del diccionario de palabras generado se convierte el documento a 
vector
"""
doc_proc['doc_vector'] = doc_proc.filtrada.apply(doc_to_vector)
doc_proc

In [None]:
"""
Se leen y se cargan las queries en dataframes 
"""
datos_querry = get_documents(queries_path)
queries = pd.DataFrame(datos_querry,columns=['Query'])
queries

In [None]:
"""
Se preprocessan las queries cargadas para obtener los tokens de estas
"""
queries['filtrada'] = queries.Query.apply(preprocessing)
quer_proc = queries
quer_proc.filtrada = quer_proc.filtrada.apply(np.unique)
quer_proc.head()

In [None]:
"""
Se convierten las queries a vectores
"""
quer_proc['query_vector'] = quer_proc.filtrada.apply(doc_to_vector)
quer_proc

In [None]:
def similitud_coseno_docs(query_vector:list) -> str:
    """
    Calcula la similitud coseno para cada uno de los documentos con respecto
    a una query realizada. A partir de los resultados obtenidos, elimina los
    que sean igual a 0, se ordenan descendentement y se genera el string 
    resultante en el formato especificado para el output de RRDV

    Parameters
    ----------
    query_vector : list
        documento como vector de la query respectiva

    Returns
    -------
    str 
        string resultante en el formato especificado para el output de RRDV con
        los documentos recuperados clasficados - ordenados por el puntaje de 
        similitud del coseno.
    """

    similitud = doc_proc['doc_vector'].apply(lambda x: similitud_coseno(x,query_vector))
    similitud = similitud[similitud>0]
    similitud = similitud.sort_values(ascending=False)
    output = ''
    for index, value in similitud.items():
        if output == '':
            output += f'd{index}:{value}'
        else:
            output += f',d{index}:{value}'
    return output

    


In [None]:
"""
Se le aplica la funcion de similitud coseno a cada una de las queries 
sobre todos los documentos. El resultado es escrito en el archivo de
salida
"""
q = quer_proc.query_vector.apply(similitud_coseno_docs)
f = open(output_path+"RRDV-consultas_resultados.txt", "w")
for i in range(len(q)):
    f.write(f'q{i+1} {q[i]}\n')
f.close()

## Evaluación de resultados

Usando las funciones descritas en `metricas.ipynb`:

In [None]:
def precision_at_k(relevance_query: list, k: int) -> float:
    if k > 0 and k <= len(relevance_query):
        return sum(relevance_query[:k]) / k
    else:
        return None

def recall_at_k(relevance_query: list, number_relevant_docs:int, k: int) -> float:
    if k > 0 and k <= len(relevance_query):
        return sum(relevance_query[:k]) / number_relevant_docs
    else:
        return None

def average_precision(relevance_query:list) -> float :
    relevant_docs = np.sum(relevance_query)
    last_recall = 0
    precisions = np.array([])
    for i in range(1,len(relevance_query)+1):
        recall = recall_at_k(relevance_query,relevant_docs,i)
        if recall > last_recall:
            last_recall = recall 
            precisions= np.append(precisions,precision_at_k(relevance_query,i))
    return np.average(precisions)

def mean_average_precision(list_vectors: list)-> float:
    average_precisions = list(map(average_precision, list_vectors))
    return np.average(average_precisions)

def dcg_at_k(rel, k):
    import math
    result = 0
    for i in range(k):
        discount_factor = 1/math.log(max([i+1, 2]), 2)
        gain = + (rel[i]*discount_factor)
        result = result + gain 
    return result
    
def ndcg_at_k(rel, k):
    DCG = dcg_at_k(rel, k)
    IDCG = dcg_at_k(sorted(rel, reverse=True), k)
    result = DCG/IDCG
    return result

In [None]:
def get_relevance_judgements(path: str) -> list:
    """
    Transforma el documento de juicios de relevancia a una lista de diccionarios por consulta

    Parameters
    ----------
    path : str
        Ubicación del archivo (incluyendo nombre del archivo)

    Returns
    -------
    list
        Lista de diccionarios, un diccionario representa los documentos relevantes para una consulta
    """
    relevance_dicts = []
    for line in open(path, "r"):
        line_proc = line.split("\n")[0]
        docs_str = line_proc.split("\t")[1]
        docs_list = docs_str.split(",")
        current_relevance_dict = {}
        for doc in docs_list:
            doc_split = doc.split(":")
            doc_id = doc_split[0].split("d")[1]
            doc_relevance = doc_split[1]
            current_relevance_dict[doc_id] = doc_relevance
        relevance_dicts.append(current_relevance_dict)
    return relevance_dicts

In [None]:
def get_result_relevances(path: str, relevance_lists: list) -> list:
    """
    Retorna una lista donde los resultados de cada consulta se representan como una lista
    con la relevancia entera.

    Parameters
    ----------
    path : str
        Ubicación del archivo de resultados (incluyendo nombre del archivo)
    relevance_lists : list
        Lista de diccionarios obtenida con la función get_relevance_judgements()

    Returns
    -------
    list
        Lista de listas, cada lista representa los resultados para una consulta con relevancia entera
    """
    res = []
    for i, line in enumerate(open(path, "r")):
        line_proc = line.split("\n")[0]
        if len(line_proc.split("\t")) == 0:
            docs_str = line_proc.split("\t")[1]
        else:
            docs_str = line_proc.split(" ")[1]
        docs_list = docs_str.split(",")
        current_relevance_list = []
        for doc in docs_list:
            doc_split = doc.split(":")
            doc_id = doc_split[0].split("d")[1]
            if doc_id in relevance_lists[i]:
                current_relevance_list.append(int(relevance_lists[i][doc_id]))
            else:
                current_relevance_list.append(0)
        res.append(current_relevance_list)
    return res

In [None]:
def get_binary_result_relevances(path: str, relevance_lists: list) -> list:
    """
    Retorna una lista donde los resultados de cada consulta se representan como una lista
    con la relevancia binaria.

    Parameters
    ----------
    path : str
        Ubicación del archivo de resultados (incluyendo nombre del archivo)
    relevance_lists : list
        Lista de diccionarios obtenida con la función get_relevance_judgements()

    Returns
    -------
    list
        Lista de listas, cada lista representa los resultados para una consulta con relevancia binaria
    """
    res = []
    for i, line in enumerate(open(path, "r")):
        line_proc = line.split("\n")[0]
        if len(line_proc.split("\t")) == 0:
            docs_str = line_proc.split("\t")[1]
        else:
            docs_str = line_proc.split(" ")[1]
        docs_list = docs_str.split(",")
        current_relevance_list = []
        if docs_list[0] != '':
            for doc in docs_list:
                doc_split = doc.split(":")
                doc_id = doc_split[0].split("d")[1]
                if doc_id in relevance_lists[i]:
                    current_relevance_list.append(1)
                else:
                    current_relevance_list.append(0)
        res.append(current_relevance_list)
    return res

In [None]:
def precision_list(binary_result_relevances: list, relevance_lists: list) -> list:
    """
    Retorna una lista con el valor de P@M para cada consulta.

    Parameters
    ----------
    binary_result_relevances : list
        Lista de relevancias binarias para cada consulta
    relevance_lists : list
        Lista de diccionarios obtenida con la función get_relevance_judgements()

    Returns
    -------
    list
        Lista de float con los valores de P@M por consulta
    """
    p_list = []
    for i, query in enumerate(binary_result_relevances):
        p_list.append(precision_at_k(query,len(relevance_lists[i])))
    return p_list

In [None]:
def recall_list(binary_result_relevances: list, relevance_lists: list) -> list:
    """
    Retorna una lista con el valor de R@M para cada consulta.

    Parameters
    ----------
    binary_result_relevances : list
        Lista de relevancias binarias para cada consulta
    relevance_lists : list
        Lista de diccionarios obtenida con la función get_relevance_judgements()

    Returns
    -------
    list
        Lista de float con los valores de R@M por consulta
    """
    r_list = []
    for i, query in enumerate(binary_result_relevances):
        r_list.append(recall_at_k(query,len(relevance_lists[i]),len(relevance_lists[i])))
    return r_list

In [None]:
def ndcg_list(result_relevances: list, relevance_lists: list) -> list:
    """
    Retorna una lista con el valor de NDCG@M para cada consulta.

    Parameters
    ----------
    result_relevances : list
        Lista de relevancias enteras para cada consulta
    relevance_lists : list
        Lista de diccionarios obtenida con la función get_relevance_judgements()

    Returns
    -------
    list
        Lista de float con los valores de NDCG@M por consulta
    """
    ndcg_l = []
    for i, query in enumerate(result_relevances):
        ndcg_l.append(ndcg_at_k(query,len(relevance_lists[i])))
    return ndcg_l

In [None]:
"""
Se obtienen los juicios de relevancia desde el archivo
"""
relevance_dicts = get_relevance_judgements(relevance_judgements_path)
relevance_dicts

In [None]:
"""
Se obtiene la lista de relevancias binarias para las consultas
"""
binary_result_relevances = get_binary_result_relevances(output_path+"RRDV-consultas_resultados.txt", relevance_dicts)
binary_result_relevances

In [None]:
"""
Se obtiene la lista de relevancias enteras para las consultas
"""
result_relevances = get_result_relevances(output_path+"RRDV-consultas_resultados.txt", relevance_dicts)
result_relevances

In [None]:
"""
Se calculan las métricas para todas las consultas. El resultado es escrito en el archivo de
salida para métricas
"""
precision = precision_list(binary_result_relevances, relevance_dicts)
recall = recall_list(binary_result_relevances, relevance_dicts)
ndcg = ndcg_list(result_relevances, relevance_dicts)
f = open(output_path+"RRDV-metricas.txt", "w")
for i in range(len(precision)):
    f.write(f'q{i+1} P@M:{precision[i]}, R@M:{recall[i]}, NDCG@M:{ndcg[i]}\n')
f.write(f'\nMAP:{mean_average_precision(binary_result_relevances)}')
f.close()