In [1]:
import xml.etree.ElementTree as ET
from typing import Callable
from pathlib import Path
import logging as log
import unicodedata
import string
import json
import time
import math
import csv
import re

import nltk

from stemmer import PorterStemmer


nltk.download('stopwords')
logger = log.getLogger()
logger.setLevel(log.INFO)

data_dir = Path('../data')
results_dir = Path('../RESULT')
configs_dir = Path('../configs')

def config_parser(config_file_path: Path) -> dict:
    '''Retorna dict no formato {instrução:[arquivos]}'''
    log.info(f'Lendo arquivo de configuração {config_file_path}')

    with open(config_file_path, 'r') as config_file:
        lines = config_file.read().split('\n')

    instructions = {}

    stemmer_options = ['STEMMER', 'NOSTEMMER']
    if lines[0] in stemmer_options:
        instructions['STEMMER'] = True if lines[0] == stemmer_options[0] else False
        lines = lines[1:]

    for k,v in [i.split('=') for i in lines]:
        k = k.strip()
        v = v.strip()
        if k in instructions:
            instructions[k].append(v)
        else:
            instructions[k] = [v]

    log.info(f'Configurações encontradas em {config_file_path.name}:\n{instructions}\n')
    
    return instructions


def to_csv(csv_file_path: str|Path, data: dict, headers: bool = True) -> None:
    log.info(f'Salvando dados no arquivo {csv_file_path}')

    with open(csv_file_path, 'w', newline='') as csv_file:
        writer = csv.writer(csv_file, delimiter=';')
        if headers:
            writer.writerow(list(data.keys()))

        lines = list(zip(*data.values()))
        writer.writerows(lines)

        if headers:
            log.info(f'Cabeçalho do arquivo salvo:\n{list(data.keys())}')


class TextPreprocessor:
    def __init__(self, data: list[str]):
        self.data = data

    def load(self, data: list[str]):
        self.data = data
        return self

    def to_upper(self):
        log.info(f'Convertendo letras para maiúsculas...')
        self.data = list(map(lambda x: x.upper(), self.data))
        return self
    
    def remove_accents(self):
        log.info(f'Removendo acentos...')
        self.data = list(map(lambda x: unicodedata.normalize('NFKD', x).encode('ascii', 'ignore').decode('utf-8'), self.data))
        return self
    
    def remove_stopwords(self, stopwords: list[str] = nltk.corpus.stopwords.words('english')):
        log.info(f'Removendo stopwords...')
        self.data = list(map(
            lambda x: ' '.join([ word for word in x.split() if word.lower() not in stopwords]),
            self.data))
        return self
    
    def remove_punctuation(self):
        log.info(f'Removendo pontuação...')
        self.data = list(map(lambda x: x.translate(str.maketrans('', '', string.punctuation)), self.data))
        return self
    
    def remove_escape_sequences(self):
        log.info(f'Removendo sequências de escapes...')
        self.data = list(map(lambda x: re.sub(r'\s+',' ',x).strip(), self.data))
        return self
    
    def stemming(self, stemmer, use=True):
        if use:
            log.info(f'Aplicando stemming...')
            self.data = list(map(lambda x: ' '.join([stemmer.stem(s, 0, len(s)-1) for s in x.split()]), self.data))
        return self

[nltk_data] Downloading package stopwords to /Users/yama/nltk_data...
[nltk_data]   Package stopwords is already up-to-date!


### Processador de Consultas

In [2]:
log.info('Executando processador de consultas...\n')

pc_config_file = configs_dir/'PC.CFG'
pc_config = config_parser(pc_config_file)

queries_source_file = data_dir/pc_config['LEIA'][0]
queries_dest_file = results_dir/pc_config['CONSULTAS'][0]
expected_dest_file = results_dir/pc_config['ESPERADOS'][0]


log.info(f'Lendo arquivo de consultas em {queries_source_file}')
queries = ET.parse(queries_source_file)

queries_dict = {'QueryNumber':[], 'QueryText':[]}

start = time.time()
for query in queries.findall('QUERY'):
    query_number = query.find('QueryNumber').text
    query_text = query.find('QueryText').text
    queries_dict['QueryNumber'].append(query_number)
    queries_dict['QueryText'].append(query_text)

log.info(f'Número de consultas encontradas: {len(queries_dict['QueryNumber'])}\n')


log.info('Processando texto das consultas...')
stemmer = pc_config['STEMMER']
queries_text = TextPreprocessor(queries_dict['QueryText'])

queries_dict['QueryText'] = queries_text \
                            .remove_escape_sequences() \
                            .remove_accents() \
                            .remove_punctuation() \
                            .remove_stopwords() \
                            .stemming(PorterStemmer(), use=stemmer) \
                            .to_upper() \
                            .data

end = time.time()
log.info('Processamento de texto concluído.\n')
log.info(f'Consultas processadas em {(end-start) * 1000:.2f}ms.\n')

to_csv(queries_dest_file, queries_dict)

# ------------------

def score_to_votes(score: str) -> int:
    num_votes = 0
    for digit in score:
        num_votes += int(digit)
    return num_votes

log.info(f'Criando base de dados dos resultados esperados...')

expected_dict = {'QueryNumber':[], 'DocNumber':[], 'DocVotes':[]}

for query in queries.findall('QUERY'):
    query_number = query.find('QueryNumber').text
    for item in query.find('Records'):
        expected_dict['QueryNumber'].append(query_number)
        expected_dict['DocNumber'].append(item.text)
        expected_dict['DocVotes'].append(score_to_votes(item.get('score')))

to_csv(expected_dest_file, expected_dict)

INFO:root:Executando processador de consultas...

INFO:root:Lendo arquivo de configuração ../configs/PC.CFG
INFO:root:Configurações encontradas em PC.CFG:
{'STEMMER': True, 'LEIA': ['cfquery.xml'], 'CONSULTAS': ['consultas.csv'], 'ESPERADOS': ['esperados.csv']}

INFO:root:Lendo arquivo de consultas em ../data/cfquery.xml
INFO:root:Número de consultas encontradas: 99

INFO:root:Processando texto das consultas...
INFO:root:Removendo sequências de escapes...
INFO:root:Removendo acentos...
INFO:root:Removendo pontuação...
INFO:root:Removendo stopwords...
INFO:root:Aplicando stemming...
INFO:root:Convertendo letras para maiúsculas...
INFO:root:Processamento de texto concluído.

INFO:root:Consultas processadas em 12.08ms.

INFO:root:Salvando dados no arquivo ../RESULT/consultas.csv
INFO:root:Cabeçalho do arquivo salvo:
['QueryNumber', 'QueryText']
INFO:root:Criando base de dados dos resultados esperados...
INFO:root:Salvando dados no arquivo ../RESULT/esperados.csv
INFO:root:Cabeçalho do arq

### Gerador de Lista Invertida

In [3]:
log.info('Executando gerador de lista invertida...\n')

gli_config_file = configs_dir/'GLI.CFG'
gli_config = config_parser(gli_config_file)
gli_source_files = gli_config['LEIA']
gli_dest_file = results_dir/gli_config['ESCREVA'][0]


log.info(f'Lendo diretório de documentos em /{data_dir}')

docs_dict = {'DocNumber':[], 'Abstract': []}

for file in gli_source_files:

    log.info(f'Lendo arquivo {file}...')
    docs = ET.parse(data_dir/file)

    doc_count = 0
    for doc in docs.findall('RECORD'):
        doc_number = doc.find('RECORDNUM').text
        abstract = doc.find('ABSTRACT')
        if abstract is None:
            abstract = doc.find('EXTRACT')
        if abstract is not None:
            doc_count += 1
            docs_dict['DocNumber'].append(int(doc_number.strip()))
            docs_dict['Abstract'].append(abstract.text)
    log.info(f'Documentos encontrados com "Abstract" ou "Extract": {doc_count}')

log.info(f'Total de documentos encontrados com "Abstract" ou "Extract": {len(docs_dict['DocNumber'])}\n')

log.info('Processando texto dos documentos...')
stemmer = gli_config['STEMMER']
abstracts_text = TextPreprocessor(docs_dict['Abstract'])
docs_dict['Abstract'] = abstracts_text \
                        .remove_escape_sequences() \
                        .remove_accents() \
                        .remove_punctuation() \
                        .remove_stopwords() \
                        .stemming(PorterStemmer(), use=stemmer) \
                        .to_upper() \
                        .data
log.info('Processamento de texto concluído.\n')


log.info(f'Iniciando construção da lista invertida...')

gli_dict = {}
start = time.time()
for doc_number, abstract in zip(*docs_dict.values()):
    words = abstract.split()
    for word in words:
        if word in gli_dict:
            gli_dict[word].append(doc_number)
        else:
            gli_dict[word] = [doc_number]
end = time.time()
log.info(f'Número de palavras únicas: {len(gli_dict)}')
log.info(f'Lista invertida construída em {(end-start) * 1000:.2f}ms.\n')

to_csv(gli_dest_file, {'Word':list(gli_dict.keys()), 'WordDocs':list(gli_dict.values())}, headers=False)

INFO:root:Executando gerador de lista invertida...

INFO:root:Lendo arquivo de configuração ../configs/GLI.CFG
INFO:root:Configurações encontradas em GLI.CFG:
{'STEMMER': True, 'LEIA': ['cf74.xml', 'cf75.xml', 'cf76.xml', 'cf77.xml', 'cf78.xml', 'cf79.xml'], 'ESCREVA': ['li.csv']}

INFO:root:Lendo diretório de documentos em /../data
INFO:root:Lendo arquivo cf74.xml...
INFO:root:Documentos encontrados com "Abstract" ou "Extract": 164
INFO:root:Lendo arquivo cf75.xml...
INFO:root:Documentos encontrados com "Abstract" ou "Extract": 185
INFO:root:Lendo arquivo cf76.xml...
INFO:root:Documentos encontrados com "Abstract" ou "Extract": 224
INFO:root:Lendo arquivo cf77.xml...
INFO:root:Documentos encontrados com "Abstract" ou "Extract": 195
INFO:root:Lendo arquivo cf78.xml...
INFO:root:Documentos encontrados com "Abstract" ou "Extract": 195
INFO:root:Lendo arquivo cf79.xml...
INFO:root:Documentos encontrados com "Abstract" ou "Extract": 252
INFO:root:Total de documentos encontrados com "Abstra

### Indexador

In [4]:
class Indexador:

    def __init__(self, inverted_index_path: Path|str):

        self.tf_normalization_factor_func = lambda x: max(x.values())

        log.info('Criando modelo vetorial...')
        start = time.time()
        self.vector_model_tfidfs = self._create_vector_model_tfidf(inverted_index_path)
        end = time.time()
        log.info(f'Modelo vetorial criado em {(end-start) * 1000:.2f}ms.\n')
            

    def save_vector_model_to_json(self, json_file_path: Path|str) -> None:
        log.info(f'Salvando modelo em {json_file_path}\n')
        json_model = {'vector_model_tfidf': self.vector_model_tfidfs,
                      'word_index': self.word_index,
                      'word_idfs': self.word_idfs}
        
        with open(json_file_path, 'w') as json_file:
            json.dump(json_model, json_file)

    def _create_vector_model_tfidf(self, inverted_index_path: Path|str) -> dict:

        inverted_index = self._load_inverted_index_from_csv(inverted_index_path)
        inverted_index = self._preprocess_inverted_index(inverted_index)
        self.word_index = self._create_word_index(inverted_index)
        return self._create_tfidfs(inverted_index)


    def _load_inverted_index_from_csv(self, inverted_index_path: Path|str) -> dict:
        '''Lê o arquivo da lista invertida e armazena em um dict'''
        log.info(f'Carregando lista invertida de /{inverted_index_path}')
        inverted_index_dict = {}

        with open(inverted_index_path) as csv_file:
            csv_reader = csv.reader(csv_file, delimiter=';')
            for word, word_docs in csv_reader:
                inverted_index_dict[word] = list(map(lambda x: int(x), word_docs[1:-1].split(',')))
        return inverted_index_dict
    
    def _preprocess_inverted_index(self, inverted_index: dict) -> dict:
        '''Somente palavras com tamanho >= 2 e sem números'''

        log.info('Normalizando lista (somente palavras com tamanho >=2 e sem dígitos)...')
        words = list(inverted_index.keys())
        for word in words:
            if len(word) < 2 or any(ch.isdigit() for ch in word):
                del inverted_index[word]

        log.info(f'Número de palavras únicas: {len(inverted_index)}')
        return inverted_index
    
    def _create_word_index(self, inverted_index: dict) -> dict:
        '''Dicionário para mapear cada palavra a um índice para o vetor de representação dos documentos'''
        return dict(zip(list(inverted_index.keys()), list(range(len(inverted_index)))))
    
    
    def _create_doc_vectors_freq(self, inverted_index: dict) -> dict:
        doc_index_dict = {}
        for word, word_docs in inverted_index.items():
            for doc in word_docs:
                word_index = self.word_index[word]
                if doc not in doc_index_dict:
                    doc_index_dict[doc] = {}
                if word_index not in doc_index_dict[doc]:
                    doc_index_dict[doc][word_index] = 1
                else:
                    doc_index_dict[doc][word_index] += 1
        return doc_index_dict

    def _create_idfs(self, inverted_index: dict) -> dict:
        '''idf de cada palavra do vocabulário'''

        flat_docs = []
        for row in list(inverted_index.values()):
            flat_docs += row
        number_of_docs = len(set(flat_docs))

        log.info(f'Número de documentos: {number_of_docs}')

        idfs = {}
        for word, word_docs in inverted_index.items():
            df = len(set(word_docs))
            idf = math.log10(number_of_docs/df)
            idfs[self.word_index[word]] = idf
        return idfs
    
    def set_tf_normalization_factor_func(self, tf_normalization_factor_func: Callable[[dict], int]):
        self.tf_normaliztion_factor_func = tf_normalization_factor_func
    
    def _create_tfidfs(self, inverted_index: dict):
        log.info('Criando modelo de representação tf-idf com o seguinte formato para cada documento:\n'\
                 '{índice_palavra1: tfidf_palavra1, índice_palavra2: tfidf_palavra2, ...}')
        self.word_idfs = self._create_idfs(inverted_index)
        doc_vectors_freq = self._create_doc_vectors_freq(inverted_index)

        doc_index_tfidf = {}

        for doc, word_freqs in doc_vectors_freq.items():
            doc_index_tfidf[str(doc)] = {}
            tf_normalization_factor = self.tf_normalization_factor_func(word_freqs)
            for word, tf in word_freqs.items():
                ntf = tf/tf_normalization_factor
                doc_index_tfidf[str(doc)][str(word)] = ntf * self.word_idfs[word]
        return doc_index_tfidf
    

index_config_file = configs_dir/'INDEX.CFG'
index_config = config_parser(index_config_file)
inverted_index_path = results_dir/index_config['LEIA'][0]
vector_model_json_path = results_dir/index_config['ESCREVA'][0]

log.info('Executando indexador...\n')
idx = Indexador(inverted_index_path)
idx.save_vector_model_to_json(vector_model_json_path)

INFO:root:Lendo arquivo de configuração ../configs/INDEX.CFG
INFO:root:Configurações encontradas em INDEX.CFG:
{'LEIA': ['li.csv'], 'ESCREVA': ['modelo_vetorial.json']}

INFO:root:Executando indexador...

INFO:root:Criando modelo vetorial...
INFO:root:Carregando lista invertida de /../RESULT/li.csv
INFO:root:Normalizando lista (somente palavras com tamanho >=2 e sem dígitos)...
INFO:root:Número de palavras únicas: 7140
INFO:root:Criando modelo de representação tf-idf com o seguinte formato para cada documento:
{índice_palavra1: tfidf_palavra1, índice_palavra2: tfidf_palavra2, ...}
INFO:root:Número de documentos: 1215
INFO:root:Modelo vetorial criado em 70.72ms.

INFO:root:Salvando modelo em ../RESULT/modelo_vetorial.json



### Buscador

In [5]:
class BuscadorModeloVetorial:

    def __init__(self, vector_model_json_path: Path|str|None = None):
        
        with open(vector_model_json_path) as json_file:
            model_dict = json.load(json_file)

        self.vector_model_tfidfs = model_dict['vector_model_tfidf']
        self.word_index = model_dict['word_index']
        self.word_idfs = model_dict['word_idfs']
        log.info(f'Modelo carregado de {vector_model_json_path}\n')


    def _norm2(self, vec: dict) -> float:
        return math.sqrt(sum([x**2 for x in vec.values()]))

    def _cosine_similarity(self, vec1: dict, vec2: dict) -> float:
        vec1_norm = self._norm2(vec1)
        vec2_norm = self._norm2(vec2)

        dot_product = 0
        for w_index, w_tfidf in vec1.items():
            dot_product += w_tfidf * vec2.get(str(w_index), 0)

        return dot_product/(vec1_norm * vec2_norm)

    def find_docs(self, query: dict) -> dict:
        distances = {}
        for doc, tfidf_vector in self.vector_model_tfidfs.items():
            distances.update({doc: 1-self._cosine_similarity(query, tfidf_vector)})
        return dict(sorted(distances.items(), key=lambda x: x[1]))   
    
    def query_setup(self, query: list) -> dict:
        query_dict = {}
        for word in query:
            w_index = self.word_index.get(word)
            if w_index is not None:
                query_dict[w_index] = 1
        return query_dict
    

busca_config_file = configs_dir/'BUSCA.CFG'
busca_config = config_parser(busca_config_file)
vector_model_json_path = results_dir/busca_config['MODELO'][0]
queries_file = results_dir/busca_config['CONSULTAS'][0]

stm = 'STEMMER' if stemmer else 'NOSTEMMER'
results_file = results_dir/f'{busca_config['RESULTADOS'][0].split('.')[0]}-{stm}.csv'



log.info('Executando buscador...\n')
vm = BuscadorModeloVetorial(vector_model_json_path)

log.info(f'Carregando consultas de {queries_file}...')
queries_dict = {}
header = True
with open(queries_file) as csv_file:
    csv_reader = csv.reader(csv_file, delimiter=';')
    for query_number, query_text in csv_reader:
        if header:
            header = False
            continue
        queries_dict[query_number] = vm.query_setup(query_text.split())

log.info(f'Consultas encontradas: {len(queries_dict)}\n')

results_dict = {'Consulta': [], 'Ranking': []}

start = time.time()
for query_number, query in queries_dict.items():
    docs_ranking = vm.find_docs(query)
    result_rank = []
    for i, (doc_number, distance ) in enumerate(docs_ranking.items(), 1):
        result_rank.append((i, doc_number, distance))
    results_dict['Consulta'].append(query_number)
    results_dict['Ranking'].append(result_rank)
end = time.time()
log.info(f'Buscas realizadas em {(end-start) * 1000:.2f}ms.')
log.info(f'Tempo médio por busca {((end-start) * 1000)/len(queries_dict):.2f}ms.\n')

to_csv(results_file, results_dict, headers=False)

INFO:root:Lendo arquivo de configuração ../configs/BUSCA.CFG
INFO:root:Configurações encontradas em BUSCA.CFG:
{'MODELO': ['modelo_vetorial.json'], 'CONSULTAS': ['consultas.csv'], 'RESULTADOS': ['RESULTADOS.csv']}

INFO:root:Executando buscador...

INFO:root:Modelo carregado de ../RESULT/modelo_vetorial.json

INFO:root:Carregando consultas de ../RESULT/consultas.csv...
INFO:root:Consultas encontradas: 99

INFO:root:Buscas realizadas em 603.06ms.
INFO:root:Tempo médio por busca 6.09ms.

INFO:root:Salvando dados no arquivo ../RESULT/RESULTADOS-STEMMER.csv
