-
Notifications
You must be signed in to change notification settings - Fork 2
/
cost_report_generator.py
315 lines (250 loc) · 12.6 KB
/
cost_report_generator.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
import json
import logging
import os
from datetime import datetime
from functools import reduce
from typing import List, Tuple, Dict
import plotly.graph_objs as go
from plotly.offline import plot
from costreport import consts, data_utils
from costreport.app_config import AppConfig
from costreport.consts import OUTPUT_DIR, ItemType, ReportItemName
from costreport.cost_client import AwsCostClient
from costreport.date_utils import get_today, get_months_back, get_days_back, get_first_day_next_month, \
format_datetime, TIME_FORMAT
from costreport.intermediate_data import IntermediateData, IntermediateSimpleResult, IntermediateComplexResults
logger = logging.getLogger(__name__)
def _load_config():
with open('../configuration.json', 'r') as c:
configuration = c.read()
return json.loads(configuration)
class DataSeries:
def __init__(self, name, values: List[int]):
self.name = name
self.values = values
class ItemDefinition:
def __init__(self, item_name, item_type: ItemType, x: List[str], y: List[DataSeries] = None, group=None):
self.item_name = item_name
self.chart_type = item_type
self.x = x
self.y = y
self.group = group
class ChartPlotter:
@staticmethod
def _get_value_div(chart_def: ItemDefinition) -> str:
return chart_def.x[0]
@staticmethod
def _get_chart_div(chart_def: ItemDefinition) -> str:
data = []
x = chart_def.x
y = chart_def.y
for series in y:
if chart_def.chart_type == ItemType.BAR or chart_def.chart_type == ItemType.STACK:
data.append(go.Bar(name=series.name, x=x, y=series.values))
elif chart_def.chart_type == ItemType.LINE:
data.append(go.Line(name=series.name, x=x, y=series.values))
else:
raise Exception(f'unsupported chart type {chart_def.chart_type}')
fig = go.Figure(data=data)
# TODO: configurable
fig.update_layout(template="plotly_dark")
if chart_def.chart_type == ItemType.STACK:
fig.update_layout(barmode='stack')
return plot(fig, output_type='div')
def get_div(self, chart_def: ItemDefinition) -> str:
if chart_def.chart_type == ItemType.BAR or \
chart_def.chart_type == ItemType.LINE or \
chart_def.chart_type == ItemType.STACK:
div = self._get_chart_div(chart_def)
elif chart_def.chart_type == ItemType.VALUE:
div = self._get_value_div(chart_def)
else:
raise Exception(f'unsupported chart type {chart_def.chart_type}')
return div
class CostReporter:
def __init__(self, exec_time: datetime, config: AppConfig, cost_client: AwsCostClient):
self.exec_time = exec_time
self.config = config
self.cost_client = cost_client
self._intermediate_results: IntermediateData = IntermediateData()
# report data items definitions
self.item_defs = []
if not os.path.exists(OUTPUT_DIR):
os.makedirs(OUTPUT_DIR)
def generate(self):
logger.info('fetching data and creating data items')
self.generate_current_date()
self.generate_current_month_forecast()
self.generate_daily_report()
self.generate_monthly_report()
self.generate_services_report()
self.generate_tag_reports()
self.exec_post_actions()
def exec_post_actions(self):
logger.info('executing post actions')
month_totals: IntermediateComplexResults = self._intermediate_results.get(
ReportItemName.MONTHLY_TOTAL_COST.value)
last_closed_month = month_totals.values[-2]
forecast = self._intermediate_results.get(ReportItemName.FORECAST.value).value
forecast_per = data_utils.calc_percentage(forecast, last_closed_month)
self.item_defs.append(ItemDefinition(ReportItemName.FORECAST_PER.value,
ItemType.VALUE,
[forecast_per]))
def parse_result(self, cost_results) -> Tuple[List, Dict]:
"""
returns list of identifiers and dict of values by key
:param cost_results:
:return:
"""
x_values = []
data_series = {}
for v in cost_results:
start = v['TimePeriod']['Start']
end = v['TimePeriod']['End']
x_values.append(f'{start}_{end}')
if v['Groups']:
for i in v['Groups']:
key = i['Keys'][0]
# map account id to name:
if self.config.accounts and key in self.config.accounts:
key = self.config.accounts[key]
if not data_series.get(key):
data_series[key] = DataSeries(key, [])
series = data_series[key]
series.values.append(float(i['Metrics']['UnblendedCost']['Amount']))
return x_values, data_series
def create_item_definition(self,
cost_results,
item_name,
chart_type: ItemType,
filtered_keys=None,
group=None) -> ItemDefinition:
if not filtered_keys:
filtered_keys = []
x_values = []
data_series = {}
for v in cost_results:
start = v['TimePeriod']['Start']
end = v['TimePeriod']['End']
x_values.append(f'{start}_{end}')
if v['Groups']:
for i in v['Groups']:
key = i['Keys'][0]
if key not in filtered_keys:
# map account id to name:
if self.config.accounts and key in self.config.accounts:
key = self.config.accounts[key]
if not data_series.get(key):
data_series[key] = DataSeries(key, [])
series = data_series[key]
series.values.append(float(i['Metrics']['UnblendedCost']['Amount']))
# TODO: support case when no groups are available
# else:
# update({'Total': float(v['Total']['UnblendedCost']['Amount'])})
return ItemDefinition(item_name,
chart_type,
x_values,
list(data_series.values()),
group=group)
def generate_current_date(self):
item_name = consts.ReportItemName.CURRENT_DATE.value
exec_time_str = format_datetime(self.exec_time, TIME_FORMAT)
self._intermediate_results.add(item_name, IntermediateSimpleResult(exec_time_str))
self.item_defs.append(ItemDefinition(item_name,
ItemType.VALUE,
[exec_time_str]))
def generate_monthly_report(self):
item_name = consts.ReportItemName.MONTHLY_COST.value
results = self.cost_client.request_cost_and_usage(
start=get_months_back(self.config.periods.monthly_report_months_back),
end=get_today(),
request_name=item_name,
group_by_dimensions=['LINKED_ACCOUNT'])
identifiers, values = self.parse_result(results)
self._intermediate_results.add(item_name, IntermediateComplexResults(identifiers, values))
chart_def: ItemDefinition = self.create_item_definition(cost_results=results,
item_name=item_name,
chart_type=ItemType.STACK,
group="charts")
# calculate months total and percentage
def get_groups_total(groups):
return round(reduce(lambda a, i: a + float(i['Metrics']['UnblendedCost']['Amount']), groups, 0))
totals = {r['TimePeriod']['End']: get_groups_total(r['Groups']) for r in results}
self._intermediate_results.add(consts.ReportItemName.MONTHLY_TOTAL_COST.value,
IntermediateComplexResults(list(totals.keys()),
list(totals.values())))
self.item_defs.append(chart_def)
def generate_daily_report(self):
item_name = consts.ReportItemName.DAILY_COST.value
results = self.cost_client.request_cost_and_usage(
start=get_days_back(self.config.periods.daily_report_days_back),
end=get_today(),
request_name=item_name,
group_by_dimensions=['LINKED_ACCOUNT'],
granularity='DAILY')
identifiers, values = self.parse_result(results)
self._intermediate_results.add(item_name, IntermediateComplexResults(identifiers, values))
item_def: ItemDefinition = self.create_item_definition(cost_results=results,
item_name=item_name,
chart_type=ItemType.BAR,
group="charts")
self.item_defs.append(item_def)
final_day = results[-2]
final_day_date = final_day['TimePeriod']['Start']
for group in final_day['Groups']:
cost = int(float(group['Metrics']['UnblendedCost']['Amount']))
account_id = group["Keys"][0]
account_name = account_id
if self.config.accounts and self.config.accounts.get(account_id):
account_name = self.config.accounts[account_id]
self.item_defs.append(ItemDefinition(f'{account_name}({final_day_date})',
ItemType.VALUE,
[f'${str(cost)}'],
group='Account Cost'))
def generate_services_report(self):
item_name = consts.ReportItemName.SERVICES_COST.value
results = self.cost_client.request_cost_and_usage(
start=get_days_back(self.config.periods.services_report_days_back),
end=get_today(),
request_name=item_name,
granularity='DAILY',
group_by_dimensions=['SERVICE'])
identifiers, values = self.parse_result(results)
self._intermediate_results.add(item_name, IntermediateComplexResults(identifiers, values))
item_def: ItemDefinition = self.create_item_definition(cost_results=results,
item_name=item_name,
chart_type=ItemType.LINE,
filtered_keys=self.config.filtered_services,
group="charts")
self.item_defs.append(item_def)
def generate_tag_reports(self):
resource_tags = self.config.resource_tags
if resource_tags:
for tag in resource_tags:
logger.info(f'generating cost report for tag {tag}')
item_name = f"'{tag}' Resources Cost"
results = self.cost_client.request_cost_and_usage(
start=get_days_back(self.config.periods.tags_report_days_back),
end=get_today(),
request_name=item_name,
granularity='DAILY',
group_by_tags=[tag])
item_def: ItemDefinition = self.create_item_definition(cost_results=results,
item_name=item_name,
chart_type=ItemType.LINE,
group="charts")
self.item_defs.append(item_def)
def generate_current_month_forecast(self):
item_name = consts.ReportItemName.FORECAST.value
forecast = self.cost_client.get_monthly_cost_forecast(get_today().isoformat(),
get_first_day_next_month().isoformat())
self._intermediate_results.add(item_name, IntermediateSimpleResult(forecast))
self.item_defs.append(ItemDefinition(item_name,
ItemType.VALUE,
[f'${str(forecast)}']))
def post_processing(self):
"""
generate additional data items based on existing data items.
:return:
"""
...