/
recommender.py
220 lines (189 loc) · 6.5 KB
/
recommender.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
# -*- coding: utf-8 -*-
from __future__ import absolute_import, division, print_function, unicode_literals
import jubatus
import jubatus.embedded
from .base import GenericSchema, BaseDataset, BaseService, GenericConfig
from .compat import *
class Schema(GenericSchema):
"""
Schema for Recommender service.
"""
ID = 'i'
def __init__(self, mapping, fallback=None):
self._id_key = self._get_unique_mapping(mapping, fallback, self.ID, 'ID', True)
super(Schema, self).__init__(mapping, fallback)
def transform(self, row):
"""
Recommender schema transforms the row into Datum, its associated ID.
"""
row_id = row.get(self._id_key, None)
if row_id is not None:
row_id = unicode_t(row_id)
d = self._transform_as_datum(row, None, [self._id_key])
return (row_id, d)
class Dataset(BaseDataset):
"""
Dataset for Recommender service.
"""
@classmethod
def _predict(cls, row):
return Schema.predict(row, False)
class Recommender(BaseService):
"""
Recommender service.
"""
@classmethod
def name(cls):
return 'recommender'
@classmethod
def _client_class(cls):
return jubatus.recommender.client.Recommender
@classmethod
def _embedded_class(cls):
return jubatus.embedded.Recommender
def clear_row(self, dataset):
"""
Removes the given rows from the recommendation table.
"""
cli = self._client()
for (idx, (row_id, d)) in dataset:
if row_id is None:
raise RuntimeError('dataset must have `id`.')
result = cli.clear_row(row_id)
yield (idx, row_id, result)
def update_row(self, dataset):
"""
Update data points to the recommender model using the given dataset.
"""
cli = self._client()
for (idx, (row_id, d)) in dataset:
if row_id is None:
raise RuntimeError('datasets must have `id`')
result = cli.update_row(row_id, d)
yield (idx, row_id, result)
def complete_row_from_id(self, dataset):
"""
Returns data points from the row id in the recommender model,
with missing value completed by predicted value.
"""
cli = self._client()
for (idx, (row_id, d)) in dataset:
if row_id is None:
raise RuntimeError('Non ID-based datasets must use `complete_row_from_datum`')
result = cli.complete_row_from_id(row_id)
yield (idx, row_id, result)
def complete_row_from_datum(self, dataset):
"""
Returns data points from the datum in the recommender model,
with missing value completed by predicted value.
"""
cli = self._client()
for (idx, (row_id, d)) in dataset:
result = cli.complete_row_from_datum(d)
yield (idx, row_id, result)
def similar_row_from_id(self, dataset, size=10):
"""
Returns similar data points from the row id in the recommender model.
"""
cli = self._client()
for (idx, (row_id, d)) in dataset:
if row_id is None:
raise RuntimeError('Non ID-based datasets must use `similar_row_from_datum`')
result = cli.similar_row_from_id(row_id, size)
yield (idx, row_id, result)
def similar_row_from_id_and_score(self, dataset, score=0.8):
"""
Returns rows which are most similar to the row id and have a greater similarity score than score.
"""
cli = self._client()
for (idx, (row_id, d)) in dataset:
if row_id is None:
raise RuntimeError('Non ID-based datasets must use `similar_row_from_datum_and_score`')
result = cli.similar_row_from_id_and_score(row_id, score)
yield (idx, row_id, result)
def similar_row_from_id_and_rate(self, dataset, rate=0.1):
"""
Returns the top rate of all the rows which are most similar to the row id.
For example, return the top 10% of all the rows when 0.1 is specified as rate.
The rate must be in (0, 1].
"""
if rate <= 0.0 or 1.0 < rate:
raise ValueError('rate must be in (0, 1], but {}'.format(rate))
cli = self._client()
for (idx, (row_id, d)) in dataset:
if row_id is None:
raise RuntimeError('Non ID-based datasets must use `similar_row_from_datum_and_rate`')
result = cli.similar_row_from_id_and_rate(row_id, rate)
yield (idx, row_id, result)
def similar_row_from_datum(self, dataset, size=10):
"""
Returns similar data points from the datum in the recommender model.
"""
cli = self._client()
for (idx, (row_id, d)) in dataset:
result = cli.similar_row_from_datum(d, size)
yield (idx, row_id, result)
def similar_row_from_datum_and_score(self, dataset, score=0.8):
"""
Returns rows which are most similar to row and have a greater similarity score than score.
"""
cli = self._client()
for (idx, (row_id, d)) in dataset:
result = cli.similar_row_from_datum_and_score(d, score)
yield (idx, row_id, result)
def similar_row_from_datum_and_rate(self, dataset, rate=0.1):
"""
Returns the top rate of all the rows which are most similar to row.
For example, return the top 10% of all the rows when 0.1 is specified as rate.
The rate must be in (0, 1].
"""
if rate <= 0.0 or 1.0 < rate:
raise ValueError('rate must be in (0, 1], but {}'.format(rate))
cli = self._client()
for (idx, (row_id, d)) in dataset:
result = cli.similar_row_from_datum_and_rate(d, rate)
yield (idx, row_id, result)
def decode_row(self, dataset):
"""
Returns data points in the row id.
"""
cli = self._client()
for (idx, (row_id, d)) in dataset:
if row_id is None:
raise RuntimeError('Each data in datasets must has `row_id`')
result = cli.decode_row(row_id)
yield (idx, row_id, result)
class Config(GenericConfig):
"""
Configuration to run Recommender service.
"""
@classmethod
def methods(cls):
return ['lsh', 'euclid_lsh', 'minhash', 'inverted_index',
'inverted_index_euclid', 'nearest_neighbor_recommender']
@classmethod
def _default_method(cls):
return 'lsh'
@classmethod
def _default_parameter(cls, method):
if method in ('inverted_index', 'inverted_index_euclid'):
return None
elif method in ('minhash'):
return {
'hash_num': 128,
}
elif method in ('lsh', 'euclid_lsh'):
return {
'hash_num': 128,
'threads': -1,
}
elif method in ('nearest_neighbor_recommender'):
return {
'method': 'euclid_lsh',
'parameter': {
'threads': -1, # use number of logical CPU cores
'hash_num': 128,
},
}
else:
raise RuntimeError('unknown method: {0}'.format(method))