Skip to content

Commit

Permalink
Merge pull request #515 from nettorta/develop
Browse files Browse the repository at this point in the history
Common utils moved to common package
  • Loading branch information
fomars committed Feb 27, 2018
2 parents 12e0bd0 + 41cf720 commit 4f89cc3
Show file tree
Hide file tree
Showing 16 changed files with 39 additions and 490 deletions.
1 change: 1 addition & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
'pip>=8.1.2',
'matplotlib>=1.5.3', 'seaborn>=0.7.1',
'pyyaml>=3.12', 'cerberus>=1.1', 'influxdb>=5.0.0',
'netort>=0.0.3'
],
setup_requires=[
'pytest-runner', 'flake8',
Expand Down
19 changes: 5 additions & 14 deletions yandextank/aggregator/tank_aggregator.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
""" Core module to calculate aggregate data """
import json
import logging

import queue as q

from pkg_resources import resource_string

from .aggregator import Aggregator, DataPoller
from .chopper import TimeChopper
from yandextank.common.interfaces import AggregateResultListener, StatsReader
from yandextank.common.util import Drain, Chopper

from netort.data_processing import Drain, Chopper, get_nowait_from_queue

logger = logging.getLogger(__name__)

Expand All @@ -21,16 +22,6 @@ def on_aggregated_data(self, data, stats):
logger.info("Stats:\n%s", json.dumps(stats, indent=2))


def get_from_queue(queue):
data = []
for _ in range(queue.qsize()):
try:
data.append(queue.get_nowait())
except q.Empty:
break
return data


class TankAggregator(object):
"""
Plugin that manages aggregation and stats collection
Expand Down Expand Up @@ -91,8 +82,8 @@ def _collect_data(self, end=False):
"""
Collect data, cache it and send to listeners
"""
data = get_from_queue(self.results)
stats = get_from_queue(self.stats)
data = get_nowait_from_queue(self.results)
stats = get_nowait_from_queue(self.stats)
logger.debug("Data timestamps:\n%s" % [d.get('ts') for d in data])
logger.debug("Stats timestamps:\n%s" % [d.get('ts') for d in stats])
for item in data:
Expand Down
8 changes: 4 additions & 4 deletions yandextank/aggregator/tests/test_pipeline.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,17 @@
import json

import numpy as np
import pytest

from queue import Queue
from conftest import MAX_TS, random_split

from yandextank.aggregator import TankAggregator
from yandextank.aggregator.aggregator import Aggregator, DataPoller
from yandextank.aggregator.chopper import TimeChopper
from yandextank.plugins.Phantom.reader import string_to_df

from conftest import MAX_TS, random_split
from yandextank.common.util import Drain
from netort.data_processing import Drain

from yandextank.plugins.Phantom.reader import string_to_df

AGGR_CONFIG = TankAggregator.load_config()

Expand Down

0 comments on commit 4f89cc3

Please sign in to comment.