Skip to content

Commit

Permalink
ADD logic to handle multiple gcs file patterns
Browse files Browse the repository at this point in the history
  • Loading branch information
mesmacosta committed Nov 4, 2019
1 parent 91ac5e1 commit b5fc918
Show file tree
Hide file tree
Showing 7 changed files with 218 additions and 65 deletions.
55 changes: 39 additions & 16 deletions src/datacatalog_fileset_enricher/datacatalog_fileset_enricher.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,31 +73,21 @@ def enrich_datacatalog_fileset_entry(self, entry_group_id, entry_id, tag_fields=
logging.info('')
logging.info('===> Get Entry from DataCatalog...')
entry = self.__dacatalog_helper.get_entry(entry_group_id, entry_id)
file_pattern = entry.gcs_fileset_spec.file_patterns[0]
file_patterns = list(entry.gcs_fileset_spec.file_patterns)

logging.info('==== DONE ==================================================')
logging.info('')

# Split the file pattern into bucket_name and file_regex.
parsed_gcs_pattern = self.__storage_filter.parse_gcs_file_pattern(file_pattern)
parsed_gcs_patterns = self.__storage_filter.parse_gcs_file_patterns(file_patterns)

bucket_name = parsed_gcs_pattern['bucket_name']

# Get the execution time right before retrieving the files, so the stats are accurate
execution_time = pd.Timestamp.utcnow()
# If we have a wildcard on the bucket_name, we have to retrieve all buckets from the project
if '*' in bucket_name:
dataframe, filtered_buckets_stats = self.__storage_filter. \
create_filtered_data_for_multiple_buckets(bucket_name, parsed_gcs_pattern[
"file_regex"], bucket_prefix)

else:
dataframe, filtered_buckets_stats = self.__storage_filter. \
create_filtered_data_for_single_bucket(bucket_name,
parsed_gcs_pattern["file_regex"])
dataframe, filtered_buckets_stats = self.__create_dataframe_for_parsed_gcs_patterns(
parsed_gcs_patterns,
bucket_prefix)

logging.info('===> Generate Fileset statistics...')
stats = GCStorageStatsSummarizer.create_stats_from_dataframe(dataframe, file_pattern,
stats = GCStorageStatsSummarizer.create_stats_from_dataframe(dataframe, file_patterns,
filtered_buckets_stats,
execution_time,
bucket_prefix)
Expand All @@ -109,3 +99,36 @@ def enrich_datacatalog_fileset_entry(self, entry_group_id, entry_id, tag_fields=
self.__dacatalog_helper.create_tag_from_stats(entry, stats, tag_fields)
logging.info('==== DONE ==================================================')
logging.info('')

def __create_dataframe_for_parsed_gcs_patterns(self, parsed_gcs_patterns, bucket_prefix):
dataframe = None
filtered_buckets_stats = []
for parsed_gcs_pattern in parsed_gcs_patterns:

bucket_name = parsed_gcs_pattern['bucket_name']

# If we have a wildcard on the bucket_name,
# we have to retrieve all buckets from the project
if '*' in bucket_name:
aux_dataframe, inner_filtered_buckets_stats = self.__storage_filter. \
create_filtered_data_for_multiple_buckets(bucket_name, parsed_gcs_pattern[
"file_regex"], bucket_prefix)
if dataframe is not None:
dataframe = dataframe.append(aux_dataframe)
else:
dataframe = aux_dataframe
# We are dealing with a list of buckets so we extend it
filtered_buckets_stats.extend(inner_filtered_buckets_stats)

else:
aux_dataframe, inner_filtered_buckets_stats = self.__storage_filter. \
create_filtered_data_for_single_bucket(bucket_name,
parsed_gcs_pattern["file_regex"])
if dataframe is not None:
dataframe = dataframe.append(aux_dataframe)
else:
dataframe = aux_dataframe
# We are dealing with a list of buckets so we extend it
filtered_buckets_stats.extend(inner_filtered_buckets_stats)

return dataframe, filtered_buckets_stats
2 changes: 1 addition & 1 deletion src/datacatalog_fileset_enricher/datacatalog_helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ class DataCatalogHelper:
__MANUALLY_CREATED_FILESET_ENTRIES_SEARCH_QUERY = \
'not name:crawler AND projectId=$project_id AND type=fileset'
__LOCATION = 'us-central1'
__TAG_TEMPLATE = 'fileset_enricher_tag_template'
__TAG_TEMPLATE = 'fileset_enricher_findings'

def __init__(self, project_id):
self.__datacatalog = datacatalog_v1beta1.DataCatalogClient()
Expand Down
17 changes: 11 additions & 6 deletions src/datacatalog_fileset_enricher/gcs_storage_filter.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,9 +89,14 @@ def convert_str_to_usable_regex(cls, plain_str):
return plain_str.replace('*', '.*')

@classmethod
def parse_gcs_file_pattern(cls, gcs_file_pattern):
re_match = re.match(cls.__FILE_PATTERN_REGEX, gcs_file_pattern)
if re_match:
bucket_name, gcs_file_pattern = re_match.groups()
return {'bucket_name': cls.convert_str_to_usable_regex(bucket_name),
'file_regex': cls.convert_str_to_usable_regex(gcs_file_pattern)}
def parse_gcs_file_patterns(cls, gcs_file_patterns):
parsed_gcs_patterns = []
for gcs_file_pattern in gcs_file_patterns:
re_match = re.match(cls.__FILE_PATTERN_REGEX, gcs_file_pattern)
if re_match:
bucket_name, gcs_file_pattern = re_match.groups()
parsed_gcs_patterns.append({'bucket_name':
cls.convert_str_to_usable_regex(bucket_name),
'file_regex':
cls.convert_str_to_usable_regex(gcs_file_pattern)})
return parsed_gcs_patterns
62 changes: 42 additions & 20 deletions src/datacatalog_fileset_enricher/gcs_storage_stats_summarizer.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
class GCStorageStatsSummarizer:

@classmethod
def create_stats_from_dataframe(cls, dataframe, prefix, filtered_buckets_stats,
def create_stats_from_dataframe(cls, dataframe, file_patterns, filtered_buckets_stats,
execution_time, bucket_prefix):

buckets_found, files_by_bucket = cls.__process_bucket_stats(filtered_buckets_stats)

if dataframe is not None:
size = dataframe['size']
time_created = dataframe['time_created']
Expand All @@ -19,25 +22,18 @@ def create_stats_from_dataframe(cls, dataframe, prefix, filtered_buckets_stats,
'max_updated': time_updated.max(),
'created_files_by_day': cls.__get_daily_stats(time_created, 'time_created'),
'updated_files_by_day': cls.__get_daily_stats(time_updated, 'time_updated'),
'prefix': prefix,
'files_by_bucket': cls.__get_files_by_bucket(filtered_buckets_stats),
'prefix': cls.__get_prefix(file_patterns),
'files_by_bucket': files_by_bucket,
'files_by_type': cls.__get_files_by_type(dataframe),
'buckets_found': len(filtered_buckets_stats),
'buckets_found': buckets_found,
'execution_time': execution_time,
'bucket_prefix': bucket_prefix
}
else:
buckets_found = 0
for bucket_stats in filtered_buckets_stats:
# This placeholder controls if the prefix was created with a non existent bucket
bucket_not_found = bucket_stats.get('bucket_not_found')
if not bucket_not_found:
buckets_found += 1

stats = {
'count': 0,
'prefix': prefix,
'files_by_bucket': cls.__get_files_by_bucket(filtered_buckets_stats),
'prefix': cls.__get_prefix(file_patterns),
'files_by_bucket': files_by_bucket,
'buckets_found': buckets_found,
'execution_time': execution_time,
'bucket_prefix': bucket_prefix
Expand All @@ -53,13 +49,6 @@ def __get_daily_stats(cls, series, timestamp_column):
value += f'{day} [count: {count}], '
return value[:-2]

@classmethod
def __get_files_by_bucket(cls, filtered_buckets_stats):
value = ''
for bucket_stats in filtered_buckets_stats:
value += f'{bucket_stats["bucket_name"]} [count: {bucket_stats["files"]}], '
return value[:-2]

@classmethod
def __get_files_by_type(cls, dataframe):
series = dataframe['name']
Expand All @@ -69,10 +58,43 @@ def __get_files_by_type(cls, dataframe):
value += f'{file_type} [count: {count}], '
return value[:-2]

@classmethod
def __get_prefix(cls, file_patterns):
value = ''
for file_pattern in file_patterns:
value += f'{file_pattern}, '
return value[:-2]

@classmethod
def __extract_file_type(cls, file_name):
file_type_at = file_name.rfind('.')
if file_type_at != -1:
return file_name[file_type_at+1:]
else:
return 'unknown_file_type'

@classmethod
def __process_bucket_stats(cls, filtered_buckets_stats):
processed_bucket_stats_dict = {}

# Consolidate repeated buckets in case we have more than one file_pattern
for bucket_stats in filtered_buckets_stats:
bucket_not_found = bucket_stats.get('bucket_not_found')
if not bucket_not_found:
bucket_name = bucket_stats['bucket_name']
bucket_files_sum = processed_bucket_stats_dict.get(bucket_name)
bucket_files_count = bucket_stats['files']
if not bucket_files_sum:
bucket_files_sum = 0

bucket_files_sum += bucket_files_count

processed_bucket_stats_dict[bucket_name] = bucket_files_sum

files_by_bucket = ''
for bucket_name, files_sum in processed_bucket_stats_dict.items():
files_by_bucket += f'{bucket_name} [count: {files_sum}], '

files_by_bucket = files_by_bucket[:-2]
buckets_found = len(processed_bucket_stats_dict.keys())
return buckets_found, files_by_bucket

0 comments on commit b5fc918

Please sign in to comment.