-
Notifications
You must be signed in to change notification settings - Fork 0
/
search_engine_interface.py
197 lines (153 loc) Β· 6.95 KB
/
search_engine_interface.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
from reader import ReadFile
from configuration import ConfigClass
from parser_module import Parse
from indexer import Indexer
from searcher import Searcher
import concurrent.futures
import timeit
import traceback
import pandas as pd
class search_engine_interface:
def __init__(self, config=None):
self._config = config
self._parser = Parse()
self._indexer = Indexer(config)
self._model = None
def build_index_from_parquet(self, fn):
"""
Reads parquet file and passes it to the parser, then indexer.
Input: fn - path to parquet file
Output: No output, just modifies the internal _indexer object.
"""
# Configure the path
config = ConfigClass()
config.corpusPath = fn
# Get list of all parquets files
corpus = []
df = pd.read_parquet(fn, engine="pyarrow")
value = df.values.tolist()
corpus.extend(value)
one_third = len(corpus) // 3
with concurrent.futures.ProcessPoolExecutor() as executor:
f1 = executor.submit(self.process_index, corpus[:one_third], 1)
f2 = executor.submit(self.process_index, corpus[one_third:2 * one_third], 2)
f3 = executor.submit(self.process_index, corpus[2 * one_third:], 3)
indexer1 = f1.result()
indexer2 = f2.result()
indexer3 = f3.result()
self._indexer.inverted_idx = self.merge_inverted_index(indexer1.inverted_idx, indexer2.inverted_idx, indexer3.inverted_idx)
self.three_way_external_merge(self._indexer.inverted_idx, indexer1.postingDict, indexer2.postingDict, indexer3.postingDict)
self.merge_indexer(indexer1, indexer2, indexer3)
def merge_indexer(self, *indexers):
for indexer in indexers:
for doc_id, value in indexer.tweet_index.items():
self._indexer.tweet_index[doc_id] = value
def merge_inverted_index(self, *inverted_index_dicts):
super_dict = {}
for inverted_index_dict in inverted_index_dicts:
for term, value in inverted_index_dict.items():
if term in super_dict:
super_dict[term][0] += value[0]
super_dict[term][1] += value[1]
else:
super_dict[term] = value
return super_dict
def process_index(self, documents_list, num_thread):
config = ConfigClass()
p = Parse()
indexer = Indexer(config)
indexer.set_thread(num_thread)
print("Number of tweet in the process {} : {}".format(num_thread, len(documents_list)))
number_of_documents = 0
start = timeit.default_timer()
try:
# Iterate over every document in the file
for idx, document in enumerate(documents_list):
# parse the document
parsed_document = p.parse_doc(document)
number_of_documents += 1
# index the document data
indexer.add_new_doc(parsed_document)
except:
print("Problem with process {}".format(num_thread))
print(traceback.print_exc())
stop = timeit.default_timer()
print("Time of indexer and posting of process {} : ".format(num_thread), stop - start)
return indexer
def three_way_external_merge(self, inverted_index, posting_file1, posting_file2, posting_file3):
try:
line = 1
with open("posting_file.txt", 'w', encoding="cp437", errors='ignore') as file:
for term in sorted(inverted_index.keys()):
posting_file = []
lower_term = term.lower() if term[0].isupper() else None
is_duplicate = False
if term in posting_file1:
posting_file.extend(posting_file1[term])
if term in posting_file2:
posting_file.extend(posting_file2[term])
if term in posting_file3:
posting_file.extend(posting_file3[term])
if lower_term is not None:
if lower_term in posting_file1 and not is_duplicate:
posting_file1[lower_term].extend(posting_file)
is_duplicate = True
if lower_term in posting_file2 and not is_duplicate:
posting_file2[lower_term].extend(posting_file)
is_duplicate = True
if lower_term in posting_file3 and not is_duplicate:
posting_file3[lower_term].extend(posting_file)
is_duplicate = True
if is_duplicate:
inverted_index[lower_term][0] += inverted_index[term][0]
inverted_index[lower_term][1] += inverted_index[term][1]
del inverted_index[term]
else:
file.write("{}: {} \n".format(term, posting_file))
inverted_index[term][2] = line
line += 1
except:
print(traceback.print_exc())
def search(self, query):
"""
Executes a query over an existing index and returns the number of
relevant docs and an ordered list of search results.
Input: query - string.
Output: A tuple containing the number of relevant search results, and
a list of tweet_ids where the first element is the most relavant
and the last is the least relevant result.
"""
def query_expansion(self, query_as_list):
pass
def add_similar_word_to_query(self, query_as_list, query_expansion):
pass
def add_hashtag_entities(self, query_as_list):
result = []
for term in query_as_list:
result.append('#{}'.format(term.lower()))
if term[0].isupper():
result.append(term[0].lower() + term[1:])
print(result)
return result
# DO NOT MODIFY THIS SIGNATURE
# You can change the internal implementation as you see fit.
def load_index(self, fn):
"""
Loads a pre-computed index (or indices) so we can answer queries.
Input:
fn - file name of pickled index.
"""
self._indexer.load_index(fn)
# DO NOT MODIFY THIS SIGNATURE
# You can change the internal implementation as you see fit.
def load_precomputed_model(self, model_dir=None):
"""
Loads a pre-computed model (or models) so we can answer queries.
This is where you would load models like word2vec, LSI, LDA, etc. and
assign to self._model, which is passed on to the searcher at query time.
"""
pass
if __name__ == '__main__':
s = search_engine_interface(ConfigClass())
#s.build_index_from_parquet("/Users/samuel/Desktop/Corpus/test")
s.add_hashtag_entities(['covid','LESS','Dangerous'])