Skip to content

Commit

Permalink
optimized count, maximum, minimum in es adapter
Browse files Browse the repository at this point in the history
fixes #39

with this we'll have a tremendous performance boost since since these
reductions are done in the storage layer and we don't need to pull
thousands of data points all the way into the **flume** runtime engine
in order to calculate the same things.

As an example of the performance boost:

```bash
rlgomes@t460s> flume "emit(limit=100000, start='1970-01-01') |
put(count=count()) | write('elastic', index='test', batch=1024)"
2016-09-06 21:39:50 $?=0 pwd=/home/rlgomes/workspace/python/flume
venv=env branch=(no duration=38.256s
rlgomes@t460s> flume --no-optimize "read('elastic', index='test') |
reduce(count=count()) | write('stdio')"
{"count": 100000, "time": "1970-01-01T00:00:00.000Z"}
2016-09-06 21:41:26 $?=0 pwd=/home/rlgomes/workspace/python/flume
venv=env branch=(no duration=72.940s
rlgomes@t460s> flume "read('elastic', index='test') |
reduce(count=count()) | write('stdio')"
{"count": 100000, "time": "1970-01-01T00:00:00.000Z"}
2016-09-06 21:42:07 $?=0 pwd=/home/rlgomes/workspace/python/flume
venv=env branch=(no duration=.473s
rlgomes@t460s> flume --no-optimize "read('elastic', index='test') |
reduce(max=maximum('count')) | write('stdio')"
{"max": 100000, "time": "1970-01-01T00:00:00.000Z"}
2016-09-06 21:43:21 $?=0 pwd=/home/rlgomes/workspace/python/flume
venv=env branch=(no duration=57.814s
rlgomes@t460s> flume "read('elastic', index='test') |
reduce(max=maximum('count')) | write('stdio')"
{"max": 100000.0, "time": "1970-01-01T00:00:00.000Z"}
2016-09-06 21:44:17 $?=0 pwd=/home/rlgomes/workspace/python/flume
venv=env branch=(no duration=.439s
rlgomes@t460s> flume --no-optimize "read('elastic', index='test') |
reduce(min=minimum('count')) | write('stdio')"
{"time": "1970-01-01T00:00:00.000Z", "min": 1}
2016-09-06 21:45:16 $?=0 pwd=/home/rlgomes/workspace/python/flume
venv=env branch=(no duration=44.500s
rlgomes@t460s> flume "read('elastic', index='test') |
reduce(min=minimum('count')) | write('stdio')"
{"min": 1.0, "time": "1970-01-01T00:00:00.000Z"}
2016-09-06 21:45:24 $?=0 pwd=/home/rlgomes/workspace/python/flume
venv=env branch=(no duration=.381s
```

The above example is just when applied to 100K data points and yet the
different is anywhere from 117x to 154x faster with the new
optimizations.
  • Loading branch information
rlgomes committed Sep 7, 2016
1 parent 501b4c1 commit 87e0075
Show file tree
Hide file tree
Showing 7 changed files with 424 additions and 151 deletions.
7 changes: 1 addition & 6 deletions docs/adapters/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -71,9 +71,4 @@ Have a look at how we implemented the `optimize` method for
[elastic](https://github.com/rlgomes/flume/blob/master/flume/adapters/elastic/node.py)
and
[stdio](https://github.com/rlgomes/flume/blob/master/flume/adapters/stdio.py)
to get a simple view of how optimization can be done. We'll have more optimizations
around doing counts and filters coming soon...




to get a simple view of how optimization can be done.
110 changes: 86 additions & 24 deletions flume/adapters/elastic/node.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@
import json
import re

from elasticsearch import Elasticsearch, helpers
import elasticsearch

from elasticsearch import helpers
from elasticsearch.exceptions import RequestError

from flume import logger
Expand Down Expand Up @@ -40,6 +42,7 @@ def __init__(self,
self.clients = {}

self.limit = None
self.aggs = None

def _get_elasticsearch(self, host, port):
"""
Expand All @@ -48,7 +51,7 @@ def _get_elasticsearch(self, host, port):
key = '%s:%s' % (host, port)

if key not in self.clients:
self.clients[key] = Elasticsearch([host], port=port)
self.clients[key] = elasticsearch.Elasticsearch([host], port=port)

return self.clients[key]

Expand All @@ -59,43 +62,102 @@ def optimize(self, proc):
self.limit = proc.howmany
proc.remove_node()

# reduce optimizations
if proc.name == 'reduce':
# currently only handling a single count reducer optimization
if len(proc.fields) == 1:
self.aggs = {}

for (name, value) in proc.fields.items():
if value.name() == 'count':
# we can have es calculate this aggregation for us
self.aggs[name] = {
'value_count': {
'field': '_type'
}
}

elif value.name() == 'maximum':
# we can have es calculate this aggregation for us
self.aggs[name] = {
'max': {
'field': value.fieldname
}
}

elif value.name() == 'minimum':
# we can have es calculate this aggregation for us
self.aggs[name] = {
'min': {
'field': value.fieldname
}
}

proc.remove_node()

def read(self):
"""
read points out of ES
"""
points = []
client = self._get_elasticsearch(self.host, self.port)

query = {
'query': filter_to_es_query(self.filter)
}
try:
if self.aggs is not None:
query = {
'aggs': self.aggs,
# get the first result so we have a timestamp for our
# resulting reduction
'size': 1
}

if self.time is not None:
query['sort'] = [self.time]
if self.time is not None:
query['sort'] = [self.time]

logger.debug('es query %s' % json.dumps(query))
logger.debug('es query %s' % json.dumps(query))

try:
count = 0
for result in helpers.scan(client,
index=self.index or '_all',
query=query,
preserve_order=True):
count += 1

point = Point(**result['_source'])
points.append(point)
response = client.search(index=self.index or '_all',
body=query)
point = Point()

if len(points) >= self.batch:
yield self.process_time_field(points, self.time)
points = []
for (name, nested_value) in response['aggregations'].items():
point[name] = nested_value['value']

if count == self.limit:
break
point[self.time] = response['hits']['hits'][0]['_source'][self.time]

if len(points) > 0:
points.append(point)
yield self.process_time_field(points, self.time)

else:
count = 0
query = {
'query': filter_to_es_query(self.filter)
}

if self.time is not None:
query['sort'] = [self.time]

logger.debug('es query %s' % json.dumps(query))

for result in helpers.scan(client,
index=self.index or '_all',
query=query,
preserve_order=True):
count += 1

point = Point(**result['_source'])
points.append(point)

if len(points) >= self.batch:
yield self.process_time_field(points, self.time)
points = []

if count == self.limit:
break

if len(points) > 0:
yield self.process_time_field(points, self.time)

except RequestError as exception:
# make time field related errors a little easier to digest instead
# of spewing the elasticsearch internal error which is a little less
Expand Down
13 changes: 10 additions & 3 deletions flume/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,16 @@

@click.command()
@click.version_option(version=VERSION)
@click.option('--debug', '-d', count=True, help='debug mode')
@click.option('--debug', '-d',
count=True,
help='debug mode')
@click.option('--optimize/--no-optimize',
default=True,
help='turns read optimizations on/off, default: on')
@click.argument('program')
def main(program=None,
debug=False):
debug=False,
optimize=True):
"""
simple command line entry point for executing flume programs
"""
Expand All @@ -47,7 +53,8 @@ def main(program=None,
for thing in dir(module):
globals()[thing] = module.__dict__[thing]

eval(program, globals(), locals()).execute(loglevel=loglevel)
eval(program, globals(), locals()).execute(loglevel=loglevel,
optimize=optimize)

if __name__ == '__main__':
main()
27 changes: 20 additions & 7 deletions flume/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,8 @@ def __init__(self, *args, **kwargs):
self.stats = dici(points_pushed=0,
points_pulled=0)

self.config = dici()

self.parent = None
self.child = None

Expand Down Expand Up @@ -287,7 +289,8 @@ def run(self):

def execute(self,
wait=True,
loglevel=logger.WARN):
loglevel=logger.WARN,
optimize=True):

if 'inited' not in self.__dict__ or not self.inited:
raise FlumeException('node.__init__ was never used')
Expand All @@ -296,7 +299,8 @@ def execute(self,
# on it shouldn't fail
if not hasattr(self, 'outputs'):
node.init_node(self, outputs=[])


self.config.optimize = optimize
logger.setLogLevel(loglevel)

# XXX: pooling here ?
Expand All @@ -308,7 +312,9 @@ def execute(self,
thread.start()

if self.parent:
self.parent.execute(wait=wait, loglevel=loglevel)
self.parent.execute(wait=wait,
loglevel=loglevel,
optimize=optimize)

if wait:
while thread.is_alive():
Expand Down Expand Up @@ -382,6 +388,9 @@ def result(self):
"""
raise FlumeException('missing implementation for result method')

def name(self):
return type(self).__name__

class splitter(node):
"""
# splitter
Expand Down Expand Up @@ -415,7 +424,8 @@ def loop(self):

def execute(self,
wait=True,
loglevel=logging.ERROR):
loglevel=logging.ERROR,
optimize=True):

def find_root(flume):
"""
Expand Down Expand Up @@ -461,14 +471,17 @@ def find_root(flume):
source=source)

forwarder.execute(wait=False,
loglevel=loglevel)
loglevel=loglevel,
optimize=optimize)

# start underlying flumes
for flume in self.flumes:
flume.execute(wait=False,
loglevel=loglevel)
loglevel=loglevel,
optimize=optimize)

# override default behavior to execute the underlying flume
node.execute(self,
wait=False,
loglevel=loglevel)
loglevel=loglevel,
optimize=optimize)
2 changes: 1 addition & 1 deletion flume/sinks/write.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,6 @@ def loop(self):
buffered = buffered[self.batch:]

if len(buffered) > 0:
self.write(buffered[:self.batch])
self.write(buffered)

self.instance.eof()
2 changes: 1 addition & 1 deletion flume/sources/read.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ def __init__(self,
self.read = self.instance.read

def loop(self):
if self.child is not None:
if self.child is not None and self.config.optimize:
self.instance.optimize(self.child)

for points in self.read():
Expand Down

0 comments on commit 87e0075

Please sign in to comment.