In [127]:
!pip install rank_bm25

python(32268) MallocStackLogging: can't turn off malloc stack logging because it was not enabled.



[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip is available: [0m[31;49m23.2.1[0m[39;49m -> [0m[32;49m24.0[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpip install --upgrade pip[0m


In [128]:
import json
from typing import List, Dict, Callable, Union
import re
from functools import partial

from rank_bm25 import BM25Okapi
import numpy as np
from sklearn.feature_extraction.text import TfidfVectorizer

from tangent_cft.tangent_cft_parser import TangentCFTParser
from tangent_cft.tangent_cft_back_end import TangentCFTBackEnd
from tangent_cft.wiki_data_reader import WikiDataReader

# Dataset preprocessing

In [5]:
def parse_and_save(save_path: str, dataset_path: str, slt: bool = True, train: bool = True):
    wiki_reader = WikiDataReader(read_slt=slt)
    if train:
        tuples = wiki_reader.get_collection(dataset_path, tuples=True)
        formula_strings = wiki_reader.get_collection(dataset_path, tuples=False)
    else:
        tuples = wiki_reader.get_query(dataset_path, tuples=True)
        formula_strings = wiki_reader.get_query(dataset_path, tuples=False)
        
    combined = {}
    for k in tuples:
        combined[k] = formula_strings[k] + '[NGRAMS]' + '[SEP]'.join(tuples[k])
        
    print('Saving to {}'.format(save_path))
    with open(save_path, 'w') as f:
        json.dump(combined, f, indent=2)

In [7]:
parse_and_save(
    save_path='ParsedDataset/train_opt.json', 
    dataset_path='../NTCIR-12_MathIR_Wikipedia_Corpus/MathTagArticles',
    slt=False, train=True
)
parse_and_save(
    save_path='ParsedDataset/train_slt.json', 
    dataset_path='../NTCIR-12_MathIR_Wikipedia_Corpus/MathTagArticles',
    slt=True, train=True
)
parse_and_save(
    save_path='ParsedDataset/test_opt.json', 
    dataset_path='TestQueries',
    slt=False, train=False
)
parse_and_save(
    save_path='ParsedDataset/test_slt.json', 
    dataset_path='TestQueries',
    slt=True, train=False
)

Saving to ParsedDataset/test_opt.json
Saving to ParsedDataset/test_slt.json


## TF-IDF

In [14]:
with open('ParsedDataset/train_opt.json', 'r') as f:
    parsed_train_opt = json.load(f)
with open('ParsedDataset/train_slt.json', 'r') as f:
    parsed_train_slt = json.load(f)

In [15]:
with open('ParsedDataset/test_opt.json', 'r') as f:
    parsed_test_opt = json.load(f)
with open('ParsedDataset/test_slt.json', 'r') as f:
    parsed_test_slt = json.load(f)

In [22]:
class TFIDFSearch(TfidfVectorizer):
    def __init__(self, corpus: Dict[str, str], analyzer: Callable[[str], List[str]]):
        self.doc_idx = np.array(list(corpus.keys()))
        self.vectorizer = TfidfVectorizer(analyzer=analyzer)
        self.embeddings = self.vectorizer.fit_transform(corpus.values())
      
    def rank(self, query: str, top_n: int = 1000) -> Dict[str, List[Union[str, float]]]:
        query_embedding = self.vectorizer.transform([query])
        scores = query_embedding.dot(self.embeddings.T).toarray().flatten()
        ranked_indices = np.argsort(scores)[::-1][:top_n] 
        result = {'ids': [], 'scores': []}
        for idx in ranked_indices:
            result['ids'].append(self.doc_idx[idx])
            result['scores'].append(float(scores[idx]))
        return result

In [20]:
def search_queries(queries: Dict[str, str], searcher, top_n: int = 1000) -> Dict[str, Dict[str, float]]:
    query_results = {}
    for query_id, query in queries.items():
        ranked = searcher.rank(query)
        query_results[query_id] = dict(zip(ranked['ids'], ranked['scores']))
    return query_results

In [26]:
def node_type_val_extractor(parsed_formula: str) -> List[str]:
    formula_tree_string = parsed_formula.split('[NGRAMS]')[0]
    return re.findall(r'(?<=\[)(.*?)(?=[,\]\[])', formula_tree_string)

In [49]:
def node_vonly_extractor(parsed_formula: str) -> List[str]:
    formula_tree_string = parsed_formula.split('[NGRAMS]')[0]
    return re.findall(r'(?<=[\[])\+?!?.!(.*?)(?=[,\]\[])', formula_tree_string)

In [86]:
def node_ngram_extractor(parsed_formula: str) -> List[str]:
    formula_tuple = parsed_formula.split('[NGRAMS]')[1].split('[SEP]')
    result = []
    for t in formula_tuple:
        elements = t.split('\t')
        if elements[0] != '-' and elements[0] != '!' and elements[1] != '-' and elements[1] != '!':
            result.append(' '.join(elements[:2]))
    return result

In [115]:
def formula_feature_extractor(
        parsed_formula: str,
        with_type: bool = True,
        nodes: bool = True,
        tuples: int = 0) -> List[str]:
    formula_tree_string, formula_tuples = parsed_formula.split('[NGRAMS]')
    features = []
    if nodes:
        if with_type:
            features.extend(re.findall(r'(?<=\[)(.*?)(?=[,\]\[])', formula_tree_string))
        else:
            features.extend(re.findall(r'(?<=[\[])\+?!?.!(.*?)(?=[,\]\[])', formula_tree_string))
    if tuples:
        for t in formula_tuples.split('[SEP]'):
            features.append(' '.join(t.split('\t')[:tuples]))
    return features
    

In [118]:
formula_feature_extractor(parsed_test_slt['18'], with_type=False, nodes=True, tuples=0)

['P',
 'divide',
 'p',
 '()1x1',
 'N',
 'n',
 'x',
 '1',
 'p',
 'x',
 'n',
 'x',
 'x',
 'N',
 'n',
 '()1x1',
 'N',
 'n',
 'x',
 'x',
 'x',
 'i']

In [119]:
tfidf_searcher1 = TFIDFSearch(
    parsed_train_opt, 
    partial(formula_feature_extractor, with_type=True, nodes=True, tuples=0))
TangentCFTBackEnd.create_result_file(
    search_queries(parsed_test_opt, tfidf_searcher1), 
    'Retrieval_Results/tfidf_opt_node_tv.tsv', run_id=4)

In [120]:
tfidf_searcher2 = TFIDFSearch(
    parsed_train_opt, 
    partial(formula_feature_extractor, with_type=False, nodes=True, tuples=0))
TangentCFTBackEnd.create_result_file(
    search_queries(parsed_test_opt, tfidf_searcher2), 
    'Retrieval_Results/tfidf_opt_node_v.tsv', run_id=5)

In [121]:
tfidf_searcher3 = TFIDFSearch(
    parsed_train_slt,
    partial(formula_feature_extractor, with_type=True, nodes=True, tuples=0))
TangentCFTBackEnd.create_result_file(
    search_queries(parsed_test_slt, tfidf_searcher3), 
    'Retrieval_Results/tfidf_slt_node_tv.tsv', run_id=6)

In [122]:
tfidf_searcher4 = TFIDFSearch(
    parsed_train_slt, 
    partial(formula_feature_extractor, with_type=False, nodes=True, tuples=0))
TangentCFTBackEnd.create_result_file(
    search_queries(parsed_test_slt, tfidf_searcher4), 
    'Retrieval_Results/tfidf_slt_node_v.tsv', run_id=7)

In [123]:
tfidf_searcher5 = TFIDFSearch(
    parsed_train_opt, 
    partial(formula_feature_extractor, with_type=True, nodes=False, tuples=2))
TangentCFTBackEnd.create_result_file(
    search_queries(parsed_test_opt, tfidf_searcher5), 
    'Retrieval_Results/tfidf_opt_ngrams_tv.tsv', run_id=8)

In [124]:
tfidf_searcher6 = TFIDFSearch(
    parsed_train_slt, 
    partial(formula_feature_extractor, with_type=True, nodes=False, tuples=2))
TangentCFTBackEnd.create_result_file(
    search_queries(parsed_test_slt, tfidf_searcher6), 
    'Retrieval_Results/tfidf_slt_ngrams_tv.tsv', run_id=9)

In [125]:
tfidf_searcher7 = TFIDFSearch(
    parsed_train_opt, 
    partial(formula_feature_extractor, with_type=True, nodes=True, tuples=2))
TangentCFTBackEnd.create_result_file(
    search_queries(parsed_test_opt, tfidf_searcher7), 
    'Retrieval_Results/tfidf_opt_nodes_ngrams_tv.tsv', run_id=10)

In [126]:
tfidf_searcher8 = TFIDFSearch(
    parsed_train_slt, 
    partial(formula_feature_extractor, with_type=True, nodes=True, tuples=2))
TangentCFTBackEnd.create_result_file(
    search_queries(parsed_test_slt, tfidf_searcher8), 
    'Retrieval_Results/tfidf_slt_nodes_ngrams_tv.tsv', run_id=11)

# BM25

In [130]:
class BM25Search:
    def __init__(self, corpus: Dict[str, str], analyzer: Callable[[str], List[str]]):
        self.doc_idx = np.array(list(corpus.keys()))
        self.analyzer = analyzer
        parsed_corpus = [self.analyzer(ex) for ex in corpus.values()]
        self._bm25 = BM25Okapi(parsed_corpus)
    
    def rank(self, query: str, top_n: int = 1000) -> Dict[str, List[Union[str, float]]]:
        tokenized_query = self.analyzer(query)
        scores = self._bm25.get_scores(tokenized_query)
        top_n = np.argsort(scores)[::-1][:top_n]
        return {
          'ids': self.doc_idx[top_n],
          'scores': scores[top_n]
        }

In [131]:
bm25_searcher1 = BM25Search(
    parsed_train_opt, 
    partial(formula_feature_extractor, with_type=True, nodes=True, tuples=0))
TangentCFTBackEnd.create_result_file(
    search_queries(parsed_test_opt, bm25_searcher1), 
    'Retrieval_Results/bm25_opt_node_tv.tsv', run_id=12)

In [132]:
bm25_searcher2 = BM25Search(
    parsed_train_opt, 
    partial(formula_feature_extractor, with_type=False, nodes=True, tuples=0))
TangentCFTBackEnd.create_result_file(
    search_queries(parsed_test_opt, bm25_searcher2), 
    'Retrieval_Results/bm25_opt_node_v.tsv', run_id=13)

In [133]:
bm25_searcher3 = BM25Search(
    parsed_train_opt, 
    partial(formula_feature_extractor, with_type=True, nodes=False, tuples=2))
TangentCFTBackEnd.create_result_file(
    search_queries(parsed_test_opt, bm25_searcher3), 
    'Retrieval_Results/bm25_opt_ngrams_tv.tsv', run_id=14)

In [134]:
bm25_searcher4 = BM25Search(
    parsed_train_opt, 
    partial(formula_feature_extractor, with_type=True, nodes=True, tuples=2))
TangentCFTBackEnd.create_result_file(
    search_queries(parsed_test_opt, bm25_searcher4), 
    'Retrieval_Results/bm25_opt_nodes_ngrams_tv.tsv', run_id=15)

In [135]:
bm25_searcher5 = BM25Search(
    parsed_train_slt, 
    partial(formula_feature_extractor, with_type=True, nodes=True, tuples=0))
TangentCFTBackEnd.create_result_file(
    search_queries(parsed_test_slt, bm25_searcher5), 
    'Retrieval_Results/bm25_slt_node_tv.tsv', run_id=16)

In [136]:
bm25_searcher6 = BM25Search(
    parsed_train_slt, 
    partial(formula_feature_extractor, with_type=False, nodes=True, tuples=0))
TangentCFTBackEnd.create_result_file(
    search_queries(parsed_test_slt, bm25_searcher6), 
    'Retrieval_Results/bm25_slt_node_v.tsv', run_id=17)

In [137]:
bm25_searcher7 = BM25Search(
    parsed_train_slt, 
    partial(formula_feature_extractor, with_type=True, nodes=False, tuples=2))
TangentCFTBackEnd.create_result_file(
    search_queries(parsed_test_slt, bm25_searcher7), 
    'Retrieval_Results/bm25_slt_ngrams_tv.tsv', run_id=18)

In [138]:
bm25_searcher8 = BM25Search(
    parsed_train_slt, 
    partial(formula_feature_extractor, with_type=True, nodes=True, tuples=2))
TangentCFTBackEnd.create_result_file(
    search_queries(parsed_test_slt, bm25_searcher8), 
    'Retrieval_Results/bm25_slt_nodes_ngrams_tv.tsv', run_id=19)