-
Notifications
You must be signed in to change notification settings - Fork 184
/
Copy pathcommon.py
1835 lines (1658 loc) · 91 KB
/
common.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
import os
import json
import urllib
import logging
import functools
import webbrowser
from string import Template
from typing import Dict
from pkg_resources import resource_string
from importlib.metadata import entry_points
from functools import cached_property
import yaml
import requests
from botocore.exceptions import ClientError, NoCredentialsError, CredentialRetrievalError
from cid import utils
from cid.base import CidBase
from cid.plugin import Plugin
from cid.utils import get_parameter, get_parameters, set_parameters, unset_parameter, get_yesno_parameter, cid_print, isatty, merge_objects, IsolatedParameters
from cid.helpers.account_map import AccountMap
from cid.helpers import Athena, S3, IAM, CUR, ProxyCUR, Glue, QuickSight, Dashboard, Dataset, Datasource, csv2view, Organizations, CFN
from cid.helpers.quicksight.template import Template as CidQsTemplate
from cid._version import __version__
from cid.export import export_analysis
from cid.logger import set_cid_logger
from cid.exceptions import CidError, CidCritical
from cid.commands import InitQsCommand
logger = logging.getLogger(__name__)
class Cid():
def __init__(self, **kwargs) -> None:
self.base: CidBase = None
# Defined resources
self.resources = dict()
self.dashboards = dict()
self.plugins = self.__loadPlugins()
self._clients = dict()
self._visited_views = [] # Views updated in the current session
self.qs_url = 'https://{region}.quicksight.{domain}/sn/dashboards/{dashboard_id}'
self.all_yes = kwargs.get('yes')
self.verbose = kwargs.get('verbose')
# save main parameters to global parameters but do not override parameters that were set from outside
set_parameters({key: val for key, val in kwargs.items() if key.replace('_', '-') not in get_parameters()}, self.all_yes)
self._logger = None
self.catalog_urls = [
'https://raw.githubusercontent.com/aws-samples/aws-cudos-framework-deployment/main/dashboards/catalog.yaml',
]
def aws_login(self):
params = {
'profile_name': None,
'region_name': None,
'aws_access_key_id': None,
'aws_secret_access_key': None,
'aws_session_token': None
}
for key in params.keys():
value = get_parameters().get(key.replace('_', '-'), '<NO VALUE>')
if value != '<NO VALUE>':
params[key] = value
if get_parameters().get('region'):
params['region_name'] = get_parameters().get('region') # use region as a synonym of region_name
print('Checking AWS environment...')
try:
self.base = CidBase(session=utils.get_boto_session(**params))
if self.base.session.profile_name:
print(f'\tprofile name: {self.base.session.profile_name}')
logger.info(f'AWS profile name: {self.base.session.profile_name}')
self.qs_url_params = {
'account_id': self.base.account_id,
'region': self.base.session.region_name,
'domain': self.base.domain,
}
except (NoCredentialsError, CredentialRetrievalError):
raise CidCritical('Error: Not authenticated, please check AWS credentials')
except ClientError as e:
raise CidCritical(f'ClientError: {e}')
print(f'\taccountId: {self.base.account_id}\n\tAWS userId: {self.base.username}')
logger.info(f'AWS accountId: {self.base.account_id}')
logger.info(f'AWS userId: {self.base.username}')
print('\tRegion: {}'.format(self.base.session.region_name))
logger.info(f'AWS region: {self.base.session.region_name}')
print('\n')
@cached_property
def qs(self) -> QuickSight:
return QuickSight(self.base.session, resources=self.resources)
@cached_property
def athena(self) -> Athena:
return Athena(self.base.session, resources=self.resources)
@cached_property
def glue(self) -> Glue:
return Glue(self.base.session)
@cached_property
def iam(self) -> IAM:
return IAM(self.base.session)
@cached_property
def cfn(self) -> CFN:
return CFN(self.base.session)
@cached_property
def organizations(self) -> Organizations:
return Organizations(self.base.session)
@cached_property
def s3(self) -> S3:
return S3(self.base.session)
@cached_property
def cur1(self):
""" get/create a cur1 """
return self.get_cur('1')
@cached_property
def cur2(self):
""" get/create a cur2 """
return self.get_cur('2')
def get_cur(self, target_cur_version):
""" get a cur """
cur_version = self.cur.version
if cur_version != target_cur_version or get_parameters().get('use-cur-proxy'):
return ProxyCUR(self.cur, target_cur_version=target_cur_version)
return self.cur
@property
def cur(self) -> CUR:
'''can return any CUR (1 or 2) that customer provides'''
if not self._clients.get('cur'):
while True:
try:
_cur = CUR(self.athena, self.glue)
print('Checking if CUR is enabled and available...')
if not _cur.metadata:
raise CidCritical("Error: please ensure CUR is enabled, if yes allow it some time to propagate")
cid_print(f'\tAthena table: {_cur.table_name}')
cid_print(f"\tResource IDs: {'yes' if _cur.has_resource_ids else 'no'}")
if not _cur.has_resource_ids:
raise CidCritical("Error: CUR has to be created with Resource IDs")
cid_print(f"\tSavingsPlans: {'yes' if _cur.has_savings_plans else 'no'}")
cid_print(f"\tReserved Instances: {'yes' if _cur.has_reservations else 'no'}")
cid_print('\n')
self._clients['cur'] = _cur
break
except CidCritical:
if not utils.isatty():
raise # do not allow CUR creation in lambda
cid_print(f'CUR not found in {self.athena.DatabaseName}. If you have S3 bucket with CUR in this account you can create a CUR table with Crawler.')
self.create_cur_table()
return self._clients['cur']
def create_or_update_account_map(self, name):
account_map = AccountMap(
self.base.session,
self.athena,
self.cur, # can be any CUR. But it is only needed for trends and dummy
)
return account_map.create_or_update(name)
def command(func):
''' a decorator that ensure that we logged in to AWS acc, and loaded additional resource files
'''
@functools.wraps(func)
def wrap(self, *args, **kwargs):
self.all_yes = self.all_yes or kwargs.get('yes') # Flag params need special treatment
if kwargs.get('verbose'): # Count params need special treatment
self.verbose = self.verbose + kwargs.get('verbose')
set_parameters(kwargs, all_yes=self.all_yes)
logger.debug(json.dumps(get_parameters()))
if not self._logger:
self._logger = set_cid_logger(
verbosity=self.verbose,
log_filename=get_parameters().get('log_filename', 'cid.log')
)
logger.info(f'Initializing CID {__version__} for {func.__name__}')
if not self.base:
self.aws_login()
self.load_resources()
return func(self, *args, **kwargs)
return wrap
def __loadPlugins(self) -> dict:
try:
_entry_points = entry_points().get('cid.plugins')
except: # fallback for python version more than 3.7.x AND still less then 3.8
_entry_points = [ep for ep in entry_points() if ep.group == 'cid.plugins']
plugins = dict()
print('Loading plugins...')
logger.info(f'Located {len(_entry_points)} plugin(s)')
for ep in _entry_points:
if ep.value in plugins.keys():
logger.info(f'Plugin {ep.value} already loaded, skipping')
continue
logger.info(f'Loading plugin: {ep.name} ({ep.value})')
plugin = Plugin(ep.value)
print(f"\t{ep.name} loaded")
plugins.update({ep.value: plugin})
try:
resources = plugin.provides()
if ep.value != 'cid.builtin.core':
resources.get('views', {}).pop('account_map', None) # protect account_map from overriding
self.resources = merge_objects(self.resources, resources, depth=1)
except AttributeError:
logger.warning(f'Failed to load {ep.name}')
print('\n')
logger.info('Finished loading plugins')
return plugins
def resources_with_global_parameters(self, resources):
""" render resources with global parameters """
params = self.get_template_parameters(self.resources.get('parameters', {}))
def _recursively_process_strings(item, str_func):
""" recursively update elements of a dict """
if isinstance(item, str):
return str_func(item)
elif isinstance(item, dict):
res = {}
for key, value in item.items():
res[_recursively_process_strings(key, str_func)] = _recursively_process_strings(value, str_func)
return res
elif isinstance(item, list):
return [_recursively_process_strings(value, str_func) for value in item]
return item
def _str_func(text):
return Template(text).safe_substitute(params)
return _recursively_process_strings(resources, _str_func)
def getPlugin(self, plugin) -> dict:
return self.plugins.get(plugin)
def get_definition(self, type: str, name: str=None, id: str=None, noparams: bool=False) -> dict:
""" return resource definition that matches parameters
:noparams: do not process parameters as they may not exist by this time
"""
res = None
if type not in ['dashboard', 'dataset', 'view', 'schedule', 'crawler']:
raise ValueError(f'{type} is not a valid definition type')
if type in ['dataset', 'view', 'schedule', 'crawler'] and name:
res = self.resources.get(f'{type}s').get(name)
elif type in ['dashboard']:
for definition in self.resources.get(f'{type}s').values():
if name is not None and definition.get('name') != name:
continue
if id is not None and definition.get('dashboardId') != id:
continue
res = definition
break
# template
if isinstance(res, dict) and not noparams:
name = name or res.get('name')
params = self.get_template_parameters(res.get('parameters', {}), param_prefix=f'{type}-{name}-')
# FIXME: can be recursive?
for key, value in res.items():
if isinstance(value, str):
res[key] = Template(value).safe_substitute(params)
return res
@command
def export(self, **kwargs):
export_analysis(self.qs, self.athena, glue=self.glue)
def track(self, action, dashboard_id):
""" Send dashboard_id and account_id to adoption tracker """
method = {'created':'PUT', 'updated':'PATCH', 'deleted': 'DELETE'}.get(action, None)
if not method:
logger.debug(f"This will not fail the deployment. Logging action {action} is not supported. This issue will be ignored")
return
endpoint = 'https://okakvoavfg.execute-api.eu-west-1.amazonaws.com/'
payload = {
'dashboard_id': dashboard_id,
'account_id': self.base.account_id,
action + '_via': 'Lambda' if os.environ.get('AWS_EXECUTION_ENV', '').startswith('AWS_Lambda') else 'CID',
}
try:
res = requests.request(
method=method,
url=endpoint,
data=json.dumps(payload),
headers={'Content-Type': 'application/json'}
)
if res.status_code != 200:
logger.debug(f"This will not fail the deployment. There has been an issue logging action {action} for dashboard {dashboard_id} and account {self.base.account_id}, server did not respond with a 200 response,actual status: {res.status_code}, response data {res.text}. This issue will be ignored")
except Exception as e:
logger.debug(f"Issue logging action {action} for dashboard {dashboard_id} , due to a urllib3 exception {str(e)} . This issue will be ignored")
def get_page(self, source):
resp = requests.get(source, timeout=10, headers={'User-Agent': 'cid'})
resp.raise_for_status()
return resp
def load_resources(self):
''' load additional resources from command line parameters
'''
if get_parameters().get('catalog'):
self.catalog_urls = get_parameters().get('catalog').split(',')
for catalog_url in self.catalog_urls:
self.load_catalog(catalog_url)
if get_parameters().get('resources'):
source = get_parameters().get('resources')
self.load_resource_file(source)
self.resources = self.resources_with_global_parameters(self.resources)
def load_resource_file(self, source):
''' load additional resources from resource file
'''
logger.debug(f'Loading resources from {source}')
resources = {}
try:
if source.startswith('https://'):
resources = yaml.safe_load(self.get_page(source).text)
if not isinstance(resources, dict):
raise CidCritical(f'Failed to load {source}. Got {type(resources)} ({repr(resources)[:150]}...)')
else:
with open(source, encoding='utf-8') as file_:
resources = yaml.safe_load(file_)
except Exception as exc:
logger.warning(f'Failed to load resources from {source}: {exc}')
return
resources.get('views', {}).pop('account_map', None) # Exclude account map as it is a special view
self.resources = merge_objects(self.resources, resources, depth=1)
def load_catalog(self, catalog_url):
''' load additional resources from catalog
'''
try:
if 'https://' in catalog_url:
text = self.get_page(catalog_url).text
else:
with open(catalog_url, encoding='utf-8') as catalog_file:
text = catalog_file.read()
catalog = yaml.safe_load(text)
except (requests.exceptions.RequestException, yaml.error.MarkedYAMLError) as exc:
logger.warning(f'Failed to load a catalog url: {exc}')
logger.debug(exc, exc_info=True)
return
for resource_ref in catalog.get('Resources', []):
url = urllib.parse.urljoin(catalog_url, resource_ref.get("Url"))
self.load_resource_file(url)
def get_template_parameters(self, parameters: dict, param_prefix: str='', others: dict=None):
""" Get template parameters. """
params = get_parameters()
for key, value in parameters.items():
logger.debug(f'reading template parameter: {key} / {value}')
prefix = '' if value.get('global') else param_prefix
if isinstance(value, str):
params[key] = value
elif isinstance(value, dict) and value.get('type') == 'cur.tag_and_cost_category_fields':
params[key] = get_parameter(
param_name=prefix + key,
message=f"Required parameter: {key} ({value.get('description')})",
choices=self.cur.tag_and_cost_category_fields + ["'none'"],
)
elif isinstance(value, dict) and value.get('type') == 'athena':
if get_parameters().get(prefix + key): # priority to user input
params[key] = get_parameters().get(prefix + key)
else:
if 'query' not in value:
raise CidCritical(f'Failed fetching parameter {prefix}{key}: parameter with type Athena must have query value.')
query = value['query']
try:
res_list = self.athena.query(query)
except (self.athena.client.exceptions.ClientError, CidError, CidCritical) as exc:
raise CidCritical(f'Failed fetching parameter {prefix}{key}: {exc}.') from exc
if not res_list:
raise CidCritical(f'Failed fetching parameter {prefix}{key}, {value}. Athena returns empty results. {value.get("error")}')
elif len(res_list) == 1:
params[key] = '-'.join(res_list[0])
else:
options = ['-'.join(res) for res in res_list]
default = value.get('default')
params[key] = get_parameter(
param_name=prefix + key,
message=f"Required parameter: {key} ({value.get('description')})",
choices=options,
default=default if default in options else None,
)
elif isinstance(value, dict):
params[key] = value.get('value')
while params[key] is None:
if value.get('silentDefault') is not None and get_parameters().get(key) is None:
params[key] = value.get('silentDefault')
else:
params[key] = get_parameter(
param_name=prefix + key,
message=f"Required parameter: {key} ({value.get('description')})",
default=value.get('default'),
template_variables=dict(account_id=self.base.account_id),
)
else:
raise CidCritical(f'Unknown parameter type for "{key}". Must be a string or a dict with value or with default key')
return merge_objects(params, others or {}, depth=1)
@command
def deploy(self, dashboard_id: str=None, recursive=True, update=False, **kwargs):
""" Deploy Dashboard Command"""
self._deploy(dashboard_id, recursive, update, **kwargs)
def ensure_subscription(self):
for _ in range(3):
try:
return self.qs.ensure_subscription()
except CidCritical as exc:
if 'QuickSight is not activated' in str(exc):
self.init_qs()
unset_parameter('enable-quicksight-enterprise') # in case if customer answered no
else:
raise
else:
raise CidCritical('QuickSight is not activated. Please open https://quicksight.aws.amazon.com/ and activate ENTERPRISE subscription.')
def _deploy(self, dashboard_id: str=None, recursive=True, update=False, **kwargs):
""" Deploy Dashboard """
self.ensure_subscription()
self.qs.pre_discover()
dashboard_id = dashboard_id or get_parameters().get('dashboard-id')
category_filter = [cat for cat in get_parameters().get('category', '').upper().split(',') if cat]
if not dashboard_id:
standard_categories = ['Foundational', 'Advanced', 'Additional'] # Show these categories first
all_categories = set([f"{dashboard.get('category', 'Other')}" for dashboard in self.resources.get('dashboards').values()])
non_standard_categories = [cat for cat in all_categories if cat not in standard_categories]
categories = standard_categories + sorted(non_standard_categories)
dashboard_options = {}
for category in categories:
if category_filter and category.upper() not in category_filter:
continue
dashboard_options[f'{category.upper()}'] = '[category]'
counter = 0
for dashboard in self.resources.get('dashboards').values():
if dashboard.get('deprecationNotice'):
continue
if dashboard.get('category', 'Other') == category:
check = '✓' if dashboard.get('dashboardId') in self.qs.dashboards else ' '
dashboard_options[f" {check}[{dashboard.get('dashboardId')}] {dashboard.get('name')}"] = dashboard.get('dashboardId')
counter += 1
if not counter: # remove empty categories
del dashboard_options[f'{category.upper()}']
while True:
dashboard_id = get_parameter(
param_name='dashboard-id',
message="Please select a dashboard to deploy",
choices=dashboard_options,
)
if dashboard_id == '[category]':
unset_parameter('dashboard-id')
continue
break
if not dashboard_id:
print('No dashboard selected')
return
# Get selected dashboard definition
dashboard_definition = self.get_definition("dashboard", id=dashboard_id)
dashboard = None
try:
dashboard = self.qs.discover_dashboard(dashboard_id)
except CidCritical:
pass
if not dashboard_definition:
if isinstance(dashboard, Dashboard):
dashboard_definition = dashboard.definition
else:
raise ValueError(f'Cannot find dashboard with id={dashboard_id} in resources file.')
definition_dependency_datasets = dashboard_definition.get('dependsOn', {}).get('datasets', [])
required_datasets_names = [dsname for dsname in definition_dependency_datasets]
ds_map = definition_dependency_datasets if isinstance(definition_dependency_datasets, dict) else {}
dashboard_datasets = dashboard.datasets if dashboard else {}
for name, id in dashboard_datasets.items():
if id not in self.qs.datasets:
logger.info(f'Removing unknown dataset "{name}" ({id}) from dashboard {dashboard_id}')
del dashboard_datasets[name]
if dashboard_definition.get('templateId'):
# Get QuickSight template details
try:
source_template = self.qs.describe_template(
template_id=dashboard_definition.get('templateId'),
account_id=dashboard_definition.get('sourceAccountId'),
region=dashboard_definition.get('region', 'us-east-1')
)
except CidError as exc:
raise CidCritical(exc) # Cannot proceed without a valid template
dashboard_definition['sourceTemplate'] = source_template
print(f'\nLatest template: {source_template.arn}/version/{source_template.version}')
elif dashboard_definition.get('data'):
data = dashboard_definition.get('data')
params = self.get_template_parameters(dashboard_definition.get('parameters', dict()))
if isinstance(data, dict):
#TODO: need to apply template to data structure as well
data = yaml.safe_dump(data)
if isinstance(data, str):
data = Template(data).safe_substitute(params)
dashboard_definition['definition'] = yaml.safe_load(data)
elif dashboard_definition.get('file'):
raise NotImplementedError('File option is not implemented')
else:
raise CidCritical('Definition of dashboard resource must contain data or template_id')
compatible = self.check_dashboard_version_compatibility(dashboard_id)
if not recursive and compatible == False:
if not get_yesno_parameter(
param_name=f'confirm-recursive',
message=f'This is a major update and require recursive action. This could lead to the loss of dataset customization. Continue anyway?',
default='yes'):
return
logger.info("Switch to recursive mode")
recursive = True
if recursive:
self.create_datasets(required_datasets_names, dashboard_datasets, recursive=recursive, update=update)
# Find datasets for template or definition
if not dashboard_definition.get('datasets'):
dashboard_definition['datasets'] = {}
for dataset_name in required_datasets_names:
dataset = None
# First try to find the dataset with the id
try:
dataset = self.qs.describe_dataset(id=dataset_name)
except Exception as exc:
logger.debug(f'Failed to describe_dataset {dataset_name} {exc}')
if isinstance(dataset, Dataset):
logger.debug(f'Found dataset {dataset_name} with id match = {dataset.arn}')
dashboard_definition['datasets'][dataset_name] = dataset.arn
else:
# Then search dataset by name.
# This is not ideal as there can be several with the same name,
# but if dataset is created manually we cannot use id.
matching_datasets = []
for ds in self.qs.datasets.values():
if not isinstance(ds, Dataset) or ds.name != dataset_name:
continue
if dashboard_definition.get('templateId'):
# For templates we can additionally verify dataset fields
dataset_fields = {col.get('Name'): col.get('Type') for col in ds.columns}
src_fields = source_template.datasets.get(ds_map.get(dataset_name, dataset_name) )
required_fields = {col.get('Name'): col.get('DataType') for col in src_fields}
unmatched = {}
for field_name, field_type in required_fields.items():
if field_name not in dataset_fields or dataset_fields[field_name] != field_type:
unmatched.update({field_name: {'expected': field_type, 'found': dataset_fields.get(field_name)}})
logger.debug(f'unmatched_fields={unmatched}')
if unmatched:
logger.warning(f'Found Dataset "{dataset_name}" ({ds.id}) but it is missing required fields. {(unmatched)}')
else:
matching_datasets.append(ds)
else:
# for definitions datasets we do not have any possibility to check if dataset with a given name matches
matching_datasets.append(ds)
if not matching_datasets:
reco = ''
logger.warning(f'Dataset {dataset_name} is not found')
if utils.exec_env()['shell'] == 'lambda':
# We are in lambda
reco = 'You can try deleting existing dataset and re-run.'
else:
# We are in command line mode
reco = 'Please retry with --update "yes" --force --recursive flags.'
raise CidCritical(f'Failed to find a Dataset "{dataset_name}" with required fields. ' + reco)
elif len(matching_datasets) >= 1:
if len(matching_datasets) > 1:
# FIXME: propose a choice?
logger.warning(
f'Found {len(matching_datasets)} Datasets found with name "{dataset_name}":'
f' {str([ds.id for ds in matching_datasets])}'
)
ds = matching_datasets[0]
print(f'Using dataset {dataset_name}: {ds.id}')
dashboard_definition['datasets'][dataset_name] = ds.arn
# Update datasets to the mapping name if needed
# Dashboard definition must contain names that are specific to template.
dashboard_definition['datasets'] = {ds_map.get(name, name): arn for name, arn in dashboard_definition['datasets'].items() }
logger.debug(f"datasets: {dashboard_definition['datasets']}")
_url = self.qs_url.format(dashboard_id=dashboard_id, **self.qs_url_params)
dashboard = self.qs.describe_dashboard(DashboardId=dashboard_id)
if isinstance(dashboard, Dashboard):
if update:
return self.update_dashboard(dashboard_id, dashboard_definition)
else:
print(f'Dashboard {dashboard_id} exists. See {_url}')
return dashboard_id
print(f'Deploying dashboard {dashboard_id}')
try:
dashboard = self.qs.create_dashboard(dashboard_definition)
print(f"\n#######\n####### Congratulations!\n####### {dashboard_definition.get('name')} is available at: {_url}\n#######")
self.track('created', dashboard_id)
except self.qs.client.exceptions.ResourceExistsException:
print('error, already exists')
print(f"#######\n####### {dashboard_definition.get('name')} is available at: {_url}\n#######")
except Exception as e:
# Catch exception and dump a reason
logger.debug(e, exc_info=True)
print(f'failed with an error message: {e}')
self.delete(dashboard_id)
raise CidCritical(f'Deploy failed: {e}')
if get_yesno_parameter(
param_name=f'share-with-account',
message=f'Share this dashboard with everyone in the account?',
default='yes'):
set_parameters({'share-method': 'account'})
self.share(dashboard_id)
return dashboard_id
@command
def open(self, dashboard_id, **kwargs):
"""Open QuickSight dashboard in browser"""
aws_execution_env = os.environ.get('AWS_EXECUTION_ENV', '')
if aws_execution_env == 'CloudShell' or aws_execution_env.startswith('AWS_Lambda'):
print(f"Operation is not supported in {aws_execution_env}")
return dashboard_id
if not dashboard_id:
dashboard_id = self.qs.select_dashboard(force=True)
dashboard = self.qs.discover_dashboard(dashboard_id)
logger.info('Getting dashboard status...')
if not dashboard:
logger.error(f'{dashboard_id} is not deployed.')
return None
if dashboard.version.get('Status') not in ['CREATION_SUCCESSFUL', 'UPDATE_IN_PROGRESS', 'UPDATE_SUCCESSFUL']:
cid_print(json.dumps(dashboard.version.get('Errors'), indent=4, sort_keys=True, default=str))
cid_print(f'Dashboard {dashboard_id} is unhealthy, please check errors above.')
logger.info('healthy, opening...')
webbrowser.open(self.qs_url.format(dashboard_id=dashboard_id, **self.qs_url_params))
return dashboard_id
@command
def status(self, dashboard_id, **kwargs):
"""Check QuickSight dashboard status"""
next_selection = None
while next_selection != 'exit':
if not dashboard_id:
if not self.qs.dashboards:
print('No deployed dashboards found')
return
dashboard_id = self.qs.select_dashboard(force=True)
if not dashboard_id:
print('No dashboard selected')
return
dashboard = self.qs.discover_dashboard(dashboard_id)
if dashboard is not None:
dashboard.display_status()
dashboard.display_url(self.qs_url, **self.qs_url_params)
with IsolatedParameters():
next_selection = get_parameter(
param_name=f'{dashboard.id}',
message="Please make a selection",
choices={
'[◀] Back': 'back',
'[↗] Open': 'open',
'[◴] Refresh datasets': 'refresh',
'[↺] Update dashboard': 'update',
'[✕] Exit': 'exit',
}
)
if next_selection == 'open':
self.open(dashboard.id, **kwargs)
elif next_selection == 'refresh':
dashboard.refresh_datasets()
elif next_selection == 'update':
if dashboard.latest:
if not get_yesno_parameter(
param_name=f'redeploy-{dashboard.id}',
message=f'\nThe selected dashboard {dashboard.id} is already on the latest version.\nDo you want to re-deploy it?',
default='no'):
logger.info(f'Not re-deploying {dashboard.id} as it is on latest version.\n')
continue
recursive = get_parameter(
param_name='recursive',
message=f'\nRecursive update the Datasets and Views in addition to the Dashboard update?\nATTENTION: This could lead to the loss of dataset customization.\nRecursive update?',
choices={
'[→] Simple Update (only dashboard)': 'simple',
'[⇶] Recursive Update (dashboard and all dependencies)': 'recursive',
}
) == 'recursive'
logger.info(f'Updating dashboard: {dashboard.id} with Recursive = {recursive}')
self._deploy(dashboard_id, recursive=recursive, update=True)
logger.info('Rediscover dashboards after update')
self.qs.discover_dashboards(refresh_overrides=[dashboard.id])
self.qs.clear_dashboard_selection()
dashboard_id = None
else:
cid_print('not deployed.')
@command
def delete(self, dashboard_id, **kwargs):
"""Delete QuickSight dashboard"""
if not dashboard_id:
if not self.qs.dashboards:
print('No deployed dashboards')
return
dashboard_id = self.qs.select_dashboard(force=True)
if not dashboard_id:
return
if self.qs.dashboards and dashboard_id in self.qs.dashboards:
datasets = self.qs.discover_dashboard(dashboard_id).datasets # save for later
else:
dashboard_definition = self.get_definition("dashboard", id=dashboard_id)
datasets = {d: None for d in (dashboard_definition or {}).get('dependsOn', {}).get('datasets', [])}
try:
# Execute query
print('Deleting dashboard')
self.qs.delete_dashboard(dashboard_id=dashboard_id)
print(f'Dashboard {dashboard_id} deleted')
self.track('deleted', dashboard_id)
except self.qs.client.exceptions.ResourceNotFoundException:
print('not found')
except Exception as e:
# Catch exception and dump a reason
logger.debug(e, exc_info=True)
print(f'failed with an error message: {e}')
return dashboard_id
print('Processing dependencies')
for dataset_name, dataset_id in datasets.items():
self.delete_dataset(name=dataset_name, id=dataset_id)
return dashboard_id
def delete_dataset(self, name: str, id: str=None):
if name not in self.resources['datasets']:
logger.info(f'Dataset {name} is not managed by CID. Skipping.')
print(f'Dataset {name} is not managed by CID. Skipping.')
return False
for dataset in list(self.qs._datasets.values()) if self.qs._datasets else []:
if dataset.id == id or dataset.name == name:
# Check if dataset is used in some other dashboard
for dashboard in (self.qs.dashboards or {}).values():
if dataset.id in dashboard.datasets.values():
cid_print(f'Dataset {dataset.name} ({dataset.id}) is still used by dashboard "{dashboard.id}". Skipping.')
return False
else: #not used
# try to get the database name from the dataset (might need this for later)
schema = next(iter(dataset.schemas), None) # FIXME: manage choice if multiple data sources
if schema:
logger.debug(f'Picking the first of dataset databases: {dataset.schemas}')
self.athena.DatabaseName = schema
if get_yesno_parameter(
param_name=f'confirm-{dataset.name}',
message=f'Delete QuickSight Dataset {dataset.name}?',
default='no'):
print(f'Deleting dataset {dataset.name} ({dataset.id})')
self.qs.delete_dataset(dataset.id)
else:
cid_print(f'Skipping dataset {dataset.name}')
return False
if not dataset.datasources:
continue
datasources = dataset.datasources
athena_datasource = self.qs.datasources.get(datasources[0])
if athena_datasource and not get_parameters().get('athena-workgroup'):
self.athena.WorkGroup = athena_datasource.AthenaParameters.get('WorkGroup')
break
logger.debug(f'Cannot find QuickSight DataSource {datasources[0]}. So cannot define Athena WorkGroup')
continue
else:
logger.info(f'Dataset not found for deletion: {name} ({id})')
for view_name in list(set(self.resources['datasets'][name].get('dependsOn', {}).get('views', []))):
self.delete_view(view_name)
return True
def delete_view(self, view_name):
if view_name not in self.resources['views']:
logger.info(f'View {view_name} is not managed by CID. Skipping.')
return False
logger.info(f'Deleting view "{view_name}"')
definition = self.get_definition("view", name=view_name)
if not definition:
logger.info(f'Definition not found for view: "{view_name}"')
return False
for dashboard in (self.qs.dashboards or {}).values():
if view_name in dashboard.views:
print(f'View {view_name} is used by dashboard "{dashboard.id}". Skipping')
return False
self.athena.discover_views([view_name])
if view_name not in self.athena._metadata.keys():
print(f'Table for deletion not found: {view_name}')
else:
if definition.get('type', '') == 'Glue_Table':
print(f'Deleting table: {view_name}')
self.athena.delete_table(view_name)
else:
print(f'Deleting view: {view_name}')
self.athena.delete_view(view_name)
# manage dependancies
for dependancy_view in list(set(definition.get('dependsOn', {}).get('views', []))):
self.delete_view(dependancy_view)
return True
@command
def cleanup(self, **kwargs):
"""Delete unused resources (QuickSight datasets not used in Dashboards)"""
self.qs.pre_discover()
self.qs.discover_datasets()
references = {}
for dashboard in self.qs.dashboards.values():
for dataset_id in dashboard.datasets.values():
if dataset_id not in references:
references[dataset_id] = []
references[dataset_id].append(dashboard.id)
for dataset in list(self.qs._datasets.values()):
if dataset.id in references:
cid_print(f'Dataset {dataset.name} ({dataset.id}) is in use ({", ".join(references[dataset.id])})')
continue
if get_yesno_parameter(f'confirm-delete-dataset-{dataset.id}',
message=f'Delete dataset "{dataset.name}" (not used in dashboards, but can be used in analysis)?',
default='no',
):
logger.info(f'Deleting dataset {dataset.name} ({dataset.id})')
self.qs.delete_dataset(dataset.id)
cid_print(f'Deleted dataset {dataset.name} ({dataset.id})')
@command
def share(self, dashboard_id, **kwargs):
"""Share resources (QuickSight datasets, dashboards)"""
self._share(dashboard_id, **kwargs)
def _share(self, dashboard_id, **kwargs):
"""Share resources (QuickSight datasets, dashboards)"""
if not dashboard_id:
if not self.qs.dashboards:
print('No deployed dashboards found')
return
dashboard_id = self.qs.select_dashboard(force=True)
if not dashboard_id:
return
else:
# Describe dashboard by the ID given, no discovery
self.qs.discover_dashboard(dashboard_id)
dashboard = self.qs.discover_dashboard(dashboard_id)
if dashboard is None:
print('not deployed.')
return
share_methods = {
'Shared Folder (except datasource)': 'folder',
'Specific User only': 'user',
'Everyone in this account': 'account',
}
share_method = get_parameter(
param_name='share-method',
message="Please select sharing method",
choices=share_methods,
)
if share_method == 'folder':
folder = None
folder_methods = {
'Select Existing folder': 'existing',
'Create New folder': 'new'
}
folder_method = get_parameter(
param_name='folder-method',
message="Please select folder method",
choices=folder_methods,
)
if folder_method == 'existing':
try:
folder = self.qs.select_folder()
except self.qs.client.exceptions.AccessDeniedException:
# If user is not allowed to select folder, prompt for it
print('\nYou are not allowed to select folder, please enter folder ID')
while not folder:
folder_id = get_parameter(
param_name='folder-id',
message='Please enter the folder Id to use'
)
folder = self.qs.describe_folder(folder_id)
print(f'Selected folder {folder.get("Name")} ({folder.get("FolderId")})')
elif folder_method == 'new' or not folder:
# If user is allowed to select folder, but there is no folder exists, prompt to create one
if folder_method != 'new':
print("No folders found, creating one...")
while not folder:
try:
folder_name = get_parameter(
param_name='folder-name',
message='Please enter the folder name to create'
)
folder_permissions_tpl = Template(resource_string(
package_or_requirement='cid.builtin.core',
resource_name=f'data/permissions/folder_permissions.json',
).decode('utf-8'))
columns_tpl = {
'PrincipalArn': self.qs.get_principal_arn()
}
folder_permissions = json.loads(folder_permissions_tpl.safe_substitute(columns_tpl))
folder = self.qs.create_folder(folder_name, **folder_permissions)
except self.qs.client.exceptions.AccessDeniedException:
raise CidError('You are not allowed to create folder, unable to proceed')
self.qs.create_folder_membership(folder.get('FolderId'), dashboard.id, 'DASHBOARD')
for _id in dashboard.datasets.values():
self.qs.create_folder_membership(folder.get('FolderId'), _id, 'DATASET')
print(f'Sharing complete')
elif share_method in ['account', 'user']:
if share_method == 'account':
principal_arn = f"arn:{self.base.partition}:quicksight:{self.qs.identityRegion}:{self.qs.account_id}:namespace/default"
template_filename = 'data/permissions/dashboard_permissions_namespace.json'
elif share_method == 'user':
template_filename = 'data/permissions/dashboard_permissions.json'
print('Fetching QuickSight users. Duration will scale with the number of users.')
user = self.qs.select_user()
while not user:
user_name = get_parameter(
param_name='quicksight-user',
message='Please enter the user name to share with'
)
user = self.qs.describe_user(user_name)
if not user:
print(f'QuickSight user {user_name} was not found')
unset_parameter('quicksight-user')
principal_arn = user.get('Arn')
# Update Dashboard permissions
columns_tpl = {
'PrincipalArn': principal_arn
}
dashboard_permissions_tpl = Template(resource_string(
package_or_requirement='cid.builtin.core',
resource_name=template_filename,
).decode('utf-8'))
dashboard_permissions = json.loads(dashboard_permissions_tpl.safe_substitute(columns_tpl))
dashboard_params = {
"GrantPermissions": [
dashboard_permissions
]
}
if share_method == 'account':
dashboard_params.update({
"GrantLinkPermissions": [
dashboard_permissions
]
})
logger.info(f'Sharing dashboard {dashboard.name} ({dashboard.id})')
try:
self.qs.update_dashboard_permissions(DashboardId=dashboard.id, **dashboard_params)
logger.info(f'Shared dashboard {dashboard.name} ({dashboard.id})')
except self.qs.client.exceptions.AccessDeniedException:
logger.error('An error occurred (AccessDeniedException) when calling the UpdateDashboardPermissions operation')
# Update DataSet permissions
if share_method == 'account':