-
Notifications
You must be signed in to change notification settings - Fork 7
/
policy_tags_manager.py
100 lines (70 loc) · 2.86 KB
/
policy_tags_manager.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
"""
This application demonstrates how to manage Taxonomy and Policy Tags
in Google Cloud Data Catalog.
"""
import argparse
import logging
import sys
from google.cloud import datacatalog
_CLOUD_PLATFORM_LOCATION = 'us'
class TaxonomyManager:
def __init__(self):
self.__datacatalog_facade = DataCatalogFacade()
def create_taxonomy(self, project_id, display_name, description=None):
return self.__datacatalog_facade.create_taxonomy(project_id, display_name, description)
"""
API communication classes
========================================
"""
class DataCatalogFacade:
"""
Manage Taxonomy and Policy Tags by communicating to Data Catalog's API.
"""
def __init__(self):
# Initialize the API client.
self.__datacatalog = datacatalog.PolicyTagManagerClient()
def create_taxonomy(self, project_id, display_name, description=None):
"""Create a Taxonomy."""
location = datacatalog.PolicyTagManagerClient.common_location_path(
project_id, _CLOUD_PLATFORM_LOCATION)
taxonomy = datacatalog.Taxonomy()
taxonomy.display_name = display_name
taxonomy.description = description
created_taxonomy = self.__datacatalog.create_taxonomy(parent=location, taxonomy=taxonomy)
logging.info(f'===> Taxonomy created: {created_taxonomy.name}')
"""
Command-line interface
========================================
"""
class PolicyTagsManagerCLI:
@classmethod
def run(cls, argv):
cls.__setup_logging()
args = cls._parse_args(argv)
args.func(args)
@classmethod
def __setup_logging(cls):
logging.basicConfig(level=logging.INFO)
@classmethod
def _parse_args(cls, argv):
parser = argparse.ArgumentParser(description='Manage Taxonomy and Policy Tags')
subparsers = parser.add_subparsers()
create_taxonomy_parser = subparsers.add_parser('create-taxonomy', help='Create Taxonomy')
create_taxonomy_parser.add_argument('--display-name', help='Display name', required=True)
create_taxonomy_parser.add_argument('--description', help='Description')
create_taxonomy_parser.add_argument('--project-id',
help='GCP Project to create the Taxonomy into',
required=True)
create_taxonomy_parser.set_defaults(func=cls.__create_taxonomy)
return parser.parse_args(argv)
@classmethod
def __create_taxonomy(cls, args):
TaxonomyManager().create_taxonomy(project_id=args.project_id,
display_name=args.display_name,
description=args.description)
"""
Main program entry point
========================================
"""
if __name__ == "__main__":
PolicyTagsManagerCLI.run(sys.argv[1:] if len(sys.argv) > 0 else sys.argv)