Skip to content

Commit

Permalink
elastic: fix insertion of timeseries prediction
Browse files Browse the repository at this point in the history
- with elasticsearch, we use 'doc_type', not 'measurement' (measurement is
  an InfluxDB concept)
- store prediction in appropriate index
  • Loading branch information
Vianney Bajart authored and regel committed May 25, 2018
1 parent ad0d3db commit 6566d84
Show file tree
Hide file tree
Showing 6 changed files with 111 additions and 19 deletions.
64 changes: 50 additions & 14 deletions loudml-elastic/loudml/elastic.py
Original file line number Diff line number Diff line change
Expand Up @@ -248,21 +248,41 @@ def send_bulk(self, requests):
) as exn:
raise errors.TransportError(str(exn))

def insert_data(self,
data,
doc_type='generic',
doc_id=None,
timestamp=0,
):

def get_index_name(self, index=None, timestamp=None):
"""
Build index name
"""

if index is None:
index = self.index

if '*' in index:
if timestamp is None:
dt = datetime.datetime.now()
else:
dt = datetime.datetime.fromtimestamp(timestamp)

index = index.replace('*', dt.strftime("%Y.%m.%d"))

return index

def insert_data(
self,
data,
index=None,
doc_type='generic',
doc_id=None,
timestamp=None,
):
"""
Insert entry into the index
"""

index = self.get_index_name(index, timestamp)

req = {
'_index': "{}-{}".format(
self.index,
datetime.datetime.fromtimestamp(timestamp).strftime("%Y.%m.%d"),
),
'_index': index,
'_type': doc_type,
'_source': data,
}
Expand All @@ -277,9 +297,12 @@ def insert_times_data(
ts,
data,
tags=None,
measurement='generic',
index=None,
doc_type='generic',
doc_id=None,
timestamp_field='timestamp',
*args,
**kwargs
):
"""
Insert time-indexed entry
Expand All @@ -292,9 +315,15 @@ def insert_times_data(
for tag, tag_val in tags.items():
data[tag] = tag_val

self.insert_data(data, doc_type=measurement, doc_id=doc_id, timestamp=int(ts))
self.insert_data(
data,
index=index,
doc_type=doc_type,
doc_id=doc_id,
timestamp=int(ts),
)

def search(self, body, index=None, routing=None, size=0):
def search(self, body, index=None, routing=None, doc_type=None, size=0):
"""
Send search query to Elasticsearch
"""
Expand All @@ -309,6 +338,7 @@ def search(self, body, index=None, routing=None, size=0):
try:
return self.es.search(
index=index,
doc_type=doc_type,
size=size,
body=body,
params=params,
Expand Down Expand Up @@ -666,9 +696,15 @@ def get_times_data(

yield (timestamp - t0) / 1000, X, timeval

def save_timeseries_prediction(self, prediction, model):
def save_timeseries_prediction(
self,
prediction,
model,
index=None,
):
for bucket in prediction.format_buckets():
self.insert_times_data(
index=index,
doc_type='prediction_{}'.format(model.name),
ts=bucket['timestamp'],
data=bucket['predicted'],
Expand Down
14 changes: 14 additions & 0 deletions loudml-elastic/tests/test_elastic.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,20 @@ def setUp(self):
def tearDown(self):
self.source.drop()

def test_get_index_name(self):
ts = 1527156069

self.assertEqual(self.source.get_index_name(), self.index)
self.assertEqual(self.source.get_index_name("test"), "test")
self.assertEqual(
self.source.get_index_name("test", timestamp=ts),
"test"
)
self.assertEqual(
self.source.get_index_name("test-*", timestamp=ts),
"test-2018.05.24",
)

def test_get_times_data(self):
res = self.source.get_times_data(
self.model,
Expand Down
16 changes: 15 additions & 1 deletion loudml-import/loudml/import_tool.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,13 @@ def main():
required=False,
default='generic',
)
parser.add_argument(
'--doc-type',
help="Document type",
type=str,
required=False,
default='generic',
)
parser.add_argument(
'-F', '--flush',
help="Flush database",
Expand Down Expand Up @@ -86,13 +93,20 @@ def main():

source.init(**kwargs)

kwargs = {}

if arg.measurement:
kwargs['measurement'] = arg.measurement
if arg.doc_type:
kwargs['doc_type'] = arg.doc_type

i = None
for i, (ts, tag_dict, data) in enumerate(parser.run(arg.path)):
source.insert_times_data(
measurement=arg.measurement,
ts=ts,
data=data,
tags=tag_dict,
**kwargs,
)

if i == None:
Expand Down
11 changes: 10 additions & 1 deletion loudml-influx/loudml/influx.py
Original file line number Diff line number Diff line change
Expand Up @@ -409,7 +409,16 @@ def drop(self, db=None):
def insert_data(self, data):
raise errors.NotImplemented("InfluxDB is a pure time-series database")

def insert_times_data(self, ts, data, measurement='generic', tags=None, timestamp_field=None):
def insert_times_data(
self,
ts,
data,
measurement='generic',
tags=None,
timestamp_field=None,
*args,
**kwargs
):
"""
Insert data
"""
Expand Down
9 changes: 8 additions & 1 deletion loudml/loudml/datasource.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,14 @@ def insert_data(self, data):
"""

@abstractmethod
def insert_times_data(self, ts, data, measurement, tags=None):
def insert_times_data(
self,
ts,
data,
tags=None,
*args,
**kwargs
):
"""
Insert time-indexed entry
"""
Expand Down
16 changes: 14 additions & 2 deletions loudml/loudml/faker.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,6 @@ def dump_to_elastic(generator, addr, index, doc_type, tags=None, clear=False):

data.update(tags)
source.insert_times_data(
measurement=measurement,
ts=ts,
data=data,
doc_type=doc_type,
Expand Down Expand Up @@ -159,6 +158,12 @@ def main():
type=str,
default='dummy_data',
)
parser.add_argument(
'--doc-type',
help="Document type",
type=str,
default='generic',
)
parser.add_argument(
'--from',
help="From date",
Expand Down Expand Up @@ -233,6 +238,13 @@ def main():
tags=tag_dict,
)
elif arg.output == 'elastic':
pass
dump_to_elastic(
generator,
addr=arg.addr,
db=arg.database,
clear=arg.clear,
doc_type=arg.doc_type,
tags=tag_dict,
)
except errors.LoudMLException as exn:
logging.error(exn)

0 comments on commit 6566d84

Please sign in to comment.