Skip to content

Commit

Permalink
aggr - reports
Browse files Browse the repository at this point in the history
  • Loading branch information
Signorini committed Dec 3, 2018
1 parent b6272af commit 7950c17
Show file tree
Hide file tree
Showing 8 changed files with 114 additions and 90 deletions.
26 changes: 18 additions & 8 deletions app/libs/makeAggregation.py
@@ -1,10 +1,21 @@

import sys
import pandas as pd
from app.services.aggregator.factoryAggr import FactoryAggr
from app.services.mappers.aggregator import mapperA


def make_aggregation(data):
def view_label(result):
return {
'label': result.index.tolist(),
'data': result.tolist()
}


def view_dict(result):
return result.to_dict()


def make_aggregation(data, view='dict'):
aggr = {}

df = pd.DataFrame(data)
Expand All @@ -13,11 +24,10 @@ def make_aggregation(data):
for mapp in mapperA():
result = factory.run(mapp)

if isinstance(result, pd.Series):
if isinstance(result, pd.Series) and len(result) > 0:
entity = mapp.uniqueField()
aggr[entity] = {
'label': result.index.tolist(),
'data': result.tolist()
}

return aggr
mth = "view_%s" % view
aggr[entity] = getattr(sys.modules[__name__], mth)(result)

return aggr
62 changes: 57 additions & 5 deletions app/services/aggregator/aggr.py
@@ -1,16 +1,68 @@
import pandas as pd
from pydash.objects import get

class Aggr(object):

def __init__(self, field, sub=''):
class Aggregator(object):
def __init__(self, field, lens=None, sublens='_id'):
self._field = field
self._sub = sub
self._lens = lens
self._sublens = sublens

self._result = []
self._transf = []
self._df = None

def aggregate(self, df):

self._tmp_dataframe = df \
.dropna() \
.apply(self.transformData)

if "stack" in self._transf:
self._tmp_dataframe = self._tmp_dataframe.stack() \
.reset_index(level=1, drop=True)

self.groupCount()

def groupAggrCount(self):
self._result = self._tmp_dataframe \
.groupby(self._sub) \
.agg({self._aggrk: self._aggr}) \
.get(self._aggrk)

def groupCount(self):
self._result = self._tmp_dataframe \
.groupby(self._tmp_dataframe) \
.count()

def transformData(self, data):

if isinstance(data, dict):
data = get(data, self._lens)

if isinstance(data, list):
self._transf.append("stack")
data = map(self.reducev, data)
return pd.Series(data)

return data

def reducev(self, data):

if isinstance(data, dict):
return get(data, self._sublens)

return data

def getField(self):
return self._field

def uniqueField(self):
return "%s_%s" % (self._field, self._sub)
arr = [self._field]
if self._lens:
arr += self._lens.split(".")

return "_".join(arr)

def getResult(self):
return self._result
return self._result
16 changes: 0 additions & 16 deletions app/services/aggregator/aggrDict.py

This file was deleted.

7 changes: 0 additions & 7 deletions app/services/aggregator/aggrImmutable.py

This file was deleted.

29 changes: 0 additions & 29 deletions app/services/aggregator/aggrListObj.py

This file was deleted.

2 changes: 1 addition & 1 deletion app/services/aggregator/factoryAggr.py
@@ -1,4 +1,5 @@

from app.libs.logger import logger

class FactoryAggr(object):

Expand All @@ -11,5 +12,4 @@ def run(self, cls):
if field in self._dataframe:
df = self._dataframe[field]
cls.aggregate(df)

return cls.getResult()
60 changes: 37 additions & 23 deletions app/services/mappers/aggregator.py
@@ -1,28 +1,42 @@

from app.services.aggregator.aggrDict import AggrDict
from app.services.aggregator.aggrListObj import AggrListObj
from app.services.aggregator.aggrImmutable import AggrImmutable
from app.services.aggregator.aggr import Aggregator

def mapperA():
return [
AggrImmutable("family"),
AggrImmutable("size"),
AggrImmutable("provider"),
AggrDict("datacenters", sub="name"),
AggrDict("datacenters", sub="provider"),
AggrDict("datacenters", sub="instance"),
AggrDict("datacenters", sub="region"),
AggrDict("os", sub="base", aggrk="base"),
AggrListObj("regions"),
AggrListObj("zones"),
AggrListObj("deps", sub="name"),
AggrListObj("services", sub="name"),
AggrListObj("deploy", sub="type"),
AggrListObj("contacts", sub="channel"),
AggrListObj("applications", sub="name"),
AggrListObj("system", sub="name"),
AggrListObj("clients", sub="name"),
AggrListObj("entry", sub="name"),
AggrListObj("auth", sub="type"),
AggrListObj("tags", sub="key"),
Aggregator("datacenters", lens="name"),
Aggregator("datacenters", lens="provider"),
Aggregator("datacenters", lens="instance"),
Aggregator("datacenters", lens="region"),
Aggregator("provider"),
Aggregator("size"),

Aggregator("servers", lens="hostname"),
Aggregator("servers", lens="datacenters.name"),
Aggregator("servers", lens="datacenters.provider"),
Aggregator("servers", lens="datacenters.instance"),
Aggregator("servers", lens="datacenters.region"),
Aggregator("os", lens="base"),

Aggregator("applications", lens="name"),
Aggregator("applications", lens="family"),
Aggregator("applications", lens="datacenters.name"),
Aggregator("applications", lens="datacenters.provider"),
Aggregator("family"),

Aggregator("system", lens="name"),
Aggregator("systems", lens="name"),
Aggregator("systems", lens="clients", sublens="name"),

Aggregator("clients", lens="name"),

Aggregator("regions"),
Aggregator("zones"),
Aggregator("deps", sublens="name"),
Aggregator("services", sublens="name"),
Aggregator("deploy", sublens="type"),
Aggregator("contacts", sublens="channel"),

Aggregator("entry", sublens="name"),
Aggregator("auth", sublens="type"),
Aggregator("tags", sublens="key"),
]
2 changes: 1 addition & 1 deletion app/tasks/upload_json.py
Expand Up @@ -26,7 +26,7 @@ def task_upload(report_id, owner_user, name, result):
webhook_id.append(str(tt))

prefetch = DataFrame(result[:50], False).getHeaders()
aggr = make_aggregation(result)
aggr = make_aggregation(result, view='label')

notification_id = task_notification.delay(report_id=report_id, msg=id, status='finished',
more={'columns': prefetch, 'aggr': aggr})
Expand Down

0 comments on commit 7950c17

Please sign in to comment.