-
Notifications
You must be signed in to change notification settings - Fork 1
/
classify_dlp.py
163 lines (141 loc) · 5.71 KB
/
classify_dlp.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
from google.cloud import dlp
from google.cloud import storage
from google.cloud import pubsub
import os
# ----------------------------
# User-configurable Constants
PROJECT_ID = '[PROJECT_ID_HOSTING_STAGING_BUCKET]'
"""The bucket the to-be-scanned files are uploaded to."""
STAGING_BUCKET = '[YOUR_QUARANTINE_BUCKET]'
"""The bucket to move "sensitive" files to."""
SENSITIVE_BUCKET = '[YOUR_SENSITIVE_DATA_BUCKET]'
"""The bucket to move "non sensitive" files to."""
NONSENSITIVE_BUCKET = '[YOUR_NON_SENSITIVE_DATA_BUCKET]'
""" Pub/Sub topic to notify once the DLP job completes."""
PUB_SUB_TOPIC = '[PUB/SUB_TOPIC]'
"""The minimum_likelihood (Enum) required before returning a match"""
"""For more info visit: https://cloud.google.com/dlp/docs/likelihood"""
MIN_LIKELIHOOD = 'POSSIBLE'
"""The maximum number of findings to report (0 = server maximum)"""
MAX_FINDINGS = 0
"""The infoTypes of information to match"""
"""For more info visit: https://cloud.google.com/dlp/docs/concepts-infotypes"""
INFO_TYPES = [
'FIRST_NAME', 'PHONE_NUMBER', 'EMAIL_ADDRESS', 'US_SOCIAL_SECURITY_NUMBER'
]
PROJECT_ID = '[PROJECT_ID_FOR_DLP_FINDINGS]'
DATASET_ID = '[DATASET_ID_FOR_DLP_FINDINGS]'
TABLE_ID = '[TABLE_ID_FOR_DLP_FINDINGS]'
# End of User-configurable Constants
# ----------------------------------
# Initialize the Google Cloud client libraries
dlp = dlp.DlpServiceClient()
storage_client = storage.Client()
publisher = pubsub.PublisherClient()
subscriber = pubsub.SubscriberClient()
def create_DLP_job(data, done):
"""This function is triggered by new files uploaded to the designated Cloud Storage quarantine/staging bucket.
It creates a dlp job for the uploaded file.
Arg:
data: The Cloud Storage Event
Returns:
None. Debug information is printed to the log.
"""
# Get the targeted file in the quarantine bucket
file_name = data['name']
print('Function triggered for file [{}]'.format(file_name))
# Prepare info_types by converting the list of strings (INFO_TYPES) into a list of dictionaries
info_types = [{'name': info_type} for info_type in INFO_TYPES]
# Convert the project id into a full resource id.
parent = dlp.project_path(PROJECT_ID)
# Construct the configuration dictionary.
inspect_job = {
'inspect_config': {
'info_types': info_types,
'min_likelihood': MIN_LIKELIHOOD,
'limits': {
'max_findings_per_request': MAX_FINDINGS
},
},
'storage_config': {
'cloud_storage_options': {
'file_set': {
'url':
'gs://{bucket_name}/{file_name}'.format(
bucket_name=STAGING_BUCKET, file_name=file_name)
}
}
},
'actions': [{
'pub_sub': {
'topic':
'projects/{project_id}/topics/{topic_id}'.format(
project_id=PROJECT_ID, topic_id=PUB_SUB_TOPIC)
}
}, {
'save_findings': {
'output_config': {
'table': {
'project_id': PROJECT_ID,
'dataset_id': DATASET_ID,
'table_id': TABLE_ID
}
}
}
}, {
'publish_summary_to_cscc': {}
}
]
}
# Create the DLP job and let the DLP api processes it.
try:
dlp.create_dlp_job(parent, inspect_job)
print('Job created by create_DLP_job')
except Exception as e:
print(e)
def resolve_DLP(data, context):
"""This function listens to the pub/sub notification from function above.
As soon as it gets pub/sub notification, it picks up results from the
DLP job and moves the file to sensitive bucket or nonsensitive bucket
accordingly.
Args:
data: The Cloud Pub/Sub event
Returns:
None. Debug information is printed to the log.
"""
# Get the targeted DLP job name that is created by the create_DLP_job function
job_name = data['attributes']['DlpJobName']
print('Received pub/sub notification from DLP job: {}'.format(job_name))
# Get the DLP job details by the job_name
job = dlp.get_dlp_job(job_name)
print('Job Name:{name}\nStatus:{status}'.format(
name=job.name, status=job.state))
# Fetching Filename in Cloud Storage from the original dlpJob config.
# See defintion of "JSON Output' in Limiting Cloud Storage Scans':
# https://cloud.google.com/dlp/docs/inspecting-storage
file_path = (
job.inspect_details.requested_options.job_config.storage_config
.cloud_storage_options.file_set.url)
file_name = os.path.basename(file_path)
info_type_stats = job.inspect_details.result.info_type_stats
source_bucket = storage_client.get_bucket(STAGING_BUCKET)
source_blob = source_bucket.blob(file_name)
if (len(info_type_stats) > 0):
# Found at least one sensitive data
for stat in info_type_stats:
print('Found {stat_cnt} instances of {stat_type_name}.'.format(
stat_cnt=stat.count, stat_type_name=stat.info_type.name))
print('Moving item to sensitive bucket')
destination_bucket = storage_client.get_bucket(SENSITIVE_BUCKET)
source_bucket.copy_blob(source_blob, destination_bucket,
file_name) # copy the item to the sensitive bucket
source_blob.delete() # delete item from the quarantine bucket
else:
# No sensitive data found
print('Moving item to non-sensitive bucket')
destination_bucket = storage_client.get_bucket(NONSENSITIVE_BUCKET)
source_bucket.copy_blob(
source_blob, destination_bucket,
file_name) # copy the item to the non-sensitive bucket
source_blob.delete() # delete item from the quarantine bucket
print('{} Finished'.format(file_name))