forked from globusonline/agamemnon
-
Notifications
You must be signed in to change notification settings - Fork 0
/
elasticsearch.py
138 lines (121 loc) · 5.17 KB
/
elasticsearch.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
from pyes.es import ES
from pyes import exceptions
from pyes import query
from itertools import groupby
from operator import itemgetter
class FullTextSearch(object):
def __init__(self, server, settings=None):
self.conn = ES(server)
if settings:
self.settings = settings
else:
self.settings = {
'index': {
'analysis': {
'analyzer': {
'ngram_analyzer': {
'tokenizer': 'keyword',
'filter': ['lowercase', 'filter_ngram'],
'type': 'custom'
}
},
'filter': {
'filter_ngram': {
'type': 'nGram',
'max_gram': 30,
'min_gram': 1
}
}
}
}
}
self.refresh_index_cache()
def search_index_text(self, query_string, fields="_all", **args):
q = query.TextQuery(fields, query_string)
return self.search_index(q, **args)
def search_index(self, query, indices=None, num_results=None, node_type=None):
results = self.conn.search(
query=query, indices=indices, doc_types=node_type)
meta_list = [r.get_meta() for r in results[0:num_results]]
node_dict = {}
# fetch nodes grouped by type to reduce number of db calls
key = itemgetter('type')
for t, grouped_list in groupby(sorted(meta_list, key=key), key=key):
ids = [meta['id'] for meta in grouped_list]
for node in self.datastore.get_nodes(t, ids):
node_dict[(node.type, node.key)] = node
# return nodes in original order
nodelist = [node_dict[(meta['type'], meta['id'])]
for meta in meta_list]
return nodelist
def create_index(self, type, indexed_variables, index_name):
self.conn.create_index_if_missing(index_name, self.settings)
mapping = {}
for arg in indexed_variables:
mapping[arg] = {'boost': 1.0,
'analyzer': 'ngram_analyzer',
'type': 'string',
'term_vector': 'with_positions_offsets'}
index_settings = {'index_analyzer': 'ngram_analyzer',
'search_analyzer': 'standard',
'properties': mapping}
self.conn.put_mapping(str(type), index_settings, [index_name])
self.refresh_index_cache()
self.populate_index(type, index_name)
def refresh_index_cache(self):
try:
self.indices = self.conn.get_mapping()
except exceptions.IndexMissingException:
self.indices = {}
def delete_index(self, index_name):
self.conn.delete_index_if_exists(index_name)
self.refresh_index_cache()
def populate_index(self, type, index_name):
#add all the currently existing nodes into the index
ref_node = self.datastore.get_reference_node(type)
node_list = [rel.target_node for rel in ref_node.instance.outgoing]
for node in node_list:
key = node.key
index_dict = self.populate_index_document(node, index_name)
try:
self.conn.delete(index_name, type, key)
except exceptions.NotFoundException:
pass
self.conn.index(index_dict, index_name, type, key)
self.conn.refresh([index_name])
def on_create(self, node):
type_indices = self.get_indices_of_type(node.type)
for index_name in type_indices:
index_dict = self.populate_index_document(node, index_name)
self.conn.index(index_dict, index_name, node.type, node.key)
self.conn.refresh([index_name])
def on_delete(self, node):
type_indices = self.get_indices_of_type(node.type)
for index_name in type_indices:
try:
self.conn.delete(index_name, node.type, node.key)
self.conn.refresh([index_name])
except exceptions.NotFoundException:
pass
def on_modify(self, node):
type_indices = self.get_indices_of_type(node.type)
for index_name in type_indices:
index_dict = self.populate_index_document(node, index_name)
try:
self.conn.delete(index_name, node.type, node.key)
self.conn.index(index_dict, index_name, node.type, node.key)
self.conn.refresh([index_name])
except exceptions.NotFoundException:
pass
def get_indices_of_type(self, type):
type_indices = [
key for key, value in self.indices.items()
if type in value
]
return type_indices
def populate_index_document(self, node, index_name):
indexed_variables = self.indices[index_name][node.type]['properties'].keys()
index_dict = {
field: node[field] for field in indexed_variables
}
return index_dict