From b5fc918367b4a7a272a81e1c26bcbcd53894b1a7 Mon Sep 17 00:00:00 2001 From: mesmacosta Date: Mon, 4 Nov 2019 15:08:56 -0300 Subject: [PATCH 1/2] ADD logic to handle multiple gcs file patterns --- .../datacatalog_fileset_enricher.py | 55 ++++--- .../datacatalog_helper.py | 2 +- .../gcs_storage_filter.py | 17 ++- .../gcs_storage_stats_summarizer.py | 62 +++++--- .../datacatalog_fileset_enricher_test.py | 136 +++++++++++++++--- .../gcs_storage_filter_test.py | 2 +- .../gcs_storage_stats_summarizer_test.py | 9 +- 7 files changed, 218 insertions(+), 65 deletions(-) diff --git a/src/datacatalog_fileset_enricher/datacatalog_fileset_enricher.py b/src/datacatalog_fileset_enricher/datacatalog_fileset_enricher.py index 36c0f5a..34ca9de 100644 --- a/src/datacatalog_fileset_enricher/datacatalog_fileset_enricher.py +++ b/src/datacatalog_fileset_enricher/datacatalog_fileset_enricher.py @@ -73,31 +73,21 @@ def enrich_datacatalog_fileset_entry(self, entry_group_id, entry_id, tag_fields= logging.info('') logging.info('===> Get Entry from DataCatalog...') entry = self.__dacatalog_helper.get_entry(entry_group_id, entry_id) - file_pattern = entry.gcs_fileset_spec.file_patterns[0] + file_patterns = list(entry.gcs_fileset_spec.file_patterns) logging.info('==== DONE ==================================================') logging.info('') # Split the file pattern into bucket_name and file_regex. - parsed_gcs_pattern = self.__storage_filter.parse_gcs_file_pattern(file_pattern) + parsed_gcs_patterns = self.__storage_filter.parse_gcs_file_patterns(file_patterns) - bucket_name = parsed_gcs_pattern['bucket_name'] - - # Get the execution time right before retrieving the files, so the stats are accurate execution_time = pd.Timestamp.utcnow() - # If we have a wildcard on the bucket_name, we have to retrieve all buckets from the project - if '*' in bucket_name: - dataframe, filtered_buckets_stats = self.__storage_filter. \ - create_filtered_data_for_multiple_buckets(bucket_name, parsed_gcs_pattern[ - "file_regex"], bucket_prefix) - - else: - dataframe, filtered_buckets_stats = self.__storage_filter. \ - create_filtered_data_for_single_bucket(bucket_name, - parsed_gcs_pattern["file_regex"]) + dataframe, filtered_buckets_stats = self.__create_dataframe_for_parsed_gcs_patterns( + parsed_gcs_patterns, + bucket_prefix) logging.info('===> Generate Fileset statistics...') - stats = GCStorageStatsSummarizer.create_stats_from_dataframe(dataframe, file_pattern, + stats = GCStorageStatsSummarizer.create_stats_from_dataframe(dataframe, file_patterns, filtered_buckets_stats, execution_time, bucket_prefix) @@ -109,3 +99,36 @@ def enrich_datacatalog_fileset_entry(self, entry_group_id, entry_id, tag_fields= self.__dacatalog_helper.create_tag_from_stats(entry, stats, tag_fields) logging.info('==== DONE ==================================================') logging.info('') + + def __create_dataframe_for_parsed_gcs_patterns(self, parsed_gcs_patterns, bucket_prefix): + dataframe = None + filtered_buckets_stats = [] + for parsed_gcs_pattern in parsed_gcs_patterns: + + bucket_name = parsed_gcs_pattern['bucket_name'] + + # If we have a wildcard on the bucket_name, + # we have to retrieve all buckets from the project + if '*' in bucket_name: + aux_dataframe, inner_filtered_buckets_stats = self.__storage_filter. \ + create_filtered_data_for_multiple_buckets(bucket_name, parsed_gcs_pattern[ + "file_regex"], bucket_prefix) + if dataframe is not None: + dataframe = dataframe.append(aux_dataframe) + else: + dataframe = aux_dataframe + # We are dealing with a list of buckets so we extend it + filtered_buckets_stats.extend(inner_filtered_buckets_stats) + + else: + aux_dataframe, inner_filtered_buckets_stats = self.__storage_filter. \ + create_filtered_data_for_single_bucket(bucket_name, + parsed_gcs_pattern["file_regex"]) + if dataframe is not None: + dataframe = dataframe.append(aux_dataframe) + else: + dataframe = aux_dataframe + # We are dealing with a list of buckets so we extend it + filtered_buckets_stats.extend(inner_filtered_buckets_stats) + + return dataframe, filtered_buckets_stats diff --git a/src/datacatalog_fileset_enricher/datacatalog_helper.py b/src/datacatalog_fileset_enricher/datacatalog_helper.py index 6f3b4ee..58d1bfa 100644 --- a/src/datacatalog_fileset_enricher/datacatalog_helper.py +++ b/src/datacatalog_fileset_enricher/datacatalog_helper.py @@ -21,7 +21,7 @@ class DataCatalogHelper: __MANUALLY_CREATED_FILESET_ENTRIES_SEARCH_QUERY = \ 'not name:crawler AND projectId=$project_id AND type=fileset' __LOCATION = 'us-central1' - __TAG_TEMPLATE = 'fileset_enricher_tag_template' + __TAG_TEMPLATE = 'fileset_enricher_findings' def __init__(self, project_id): self.__datacatalog = datacatalog_v1beta1.DataCatalogClient() diff --git a/src/datacatalog_fileset_enricher/gcs_storage_filter.py b/src/datacatalog_fileset_enricher/gcs_storage_filter.py index a25098a..b24f7ad 100644 --- a/src/datacatalog_fileset_enricher/gcs_storage_filter.py +++ b/src/datacatalog_fileset_enricher/gcs_storage_filter.py @@ -89,9 +89,14 @@ def convert_str_to_usable_regex(cls, plain_str): return plain_str.replace('*', '.*') @classmethod - def parse_gcs_file_pattern(cls, gcs_file_pattern): - re_match = re.match(cls.__FILE_PATTERN_REGEX, gcs_file_pattern) - if re_match: - bucket_name, gcs_file_pattern = re_match.groups() - return {'bucket_name': cls.convert_str_to_usable_regex(bucket_name), - 'file_regex': cls.convert_str_to_usable_regex(gcs_file_pattern)} + def parse_gcs_file_patterns(cls, gcs_file_patterns): + parsed_gcs_patterns = [] + for gcs_file_pattern in gcs_file_patterns: + re_match = re.match(cls.__FILE_PATTERN_REGEX, gcs_file_pattern) + if re_match: + bucket_name, gcs_file_pattern = re_match.groups() + parsed_gcs_patterns.append({'bucket_name': + cls.convert_str_to_usable_regex(bucket_name), + 'file_regex': + cls.convert_str_to_usable_regex(gcs_file_pattern)}) + return parsed_gcs_patterns diff --git a/src/datacatalog_fileset_enricher/gcs_storage_stats_summarizer.py b/src/datacatalog_fileset_enricher/gcs_storage_stats_summarizer.py index 13c2e26..64ee9a3 100644 --- a/src/datacatalog_fileset_enricher/gcs_storage_stats_summarizer.py +++ b/src/datacatalog_fileset_enricher/gcs_storage_stats_summarizer.py @@ -1,8 +1,11 @@ class GCStorageStatsSummarizer: @classmethod - def create_stats_from_dataframe(cls, dataframe, prefix, filtered_buckets_stats, + def create_stats_from_dataframe(cls, dataframe, file_patterns, filtered_buckets_stats, execution_time, bucket_prefix): + + buckets_found, files_by_bucket = cls.__process_bucket_stats(filtered_buckets_stats) + if dataframe is not None: size = dataframe['size'] time_created = dataframe['time_created'] @@ -19,25 +22,18 @@ def create_stats_from_dataframe(cls, dataframe, prefix, filtered_buckets_stats, 'max_updated': time_updated.max(), 'created_files_by_day': cls.__get_daily_stats(time_created, 'time_created'), 'updated_files_by_day': cls.__get_daily_stats(time_updated, 'time_updated'), - 'prefix': prefix, - 'files_by_bucket': cls.__get_files_by_bucket(filtered_buckets_stats), + 'prefix': cls.__get_prefix(file_patterns), + 'files_by_bucket': files_by_bucket, 'files_by_type': cls.__get_files_by_type(dataframe), - 'buckets_found': len(filtered_buckets_stats), + 'buckets_found': buckets_found, 'execution_time': execution_time, 'bucket_prefix': bucket_prefix } else: - buckets_found = 0 - for bucket_stats in filtered_buckets_stats: - # This placeholder controls if the prefix was created with a non existent bucket - bucket_not_found = bucket_stats.get('bucket_not_found') - if not bucket_not_found: - buckets_found += 1 - stats = { 'count': 0, - 'prefix': prefix, - 'files_by_bucket': cls.__get_files_by_bucket(filtered_buckets_stats), + 'prefix': cls.__get_prefix(file_patterns), + 'files_by_bucket': files_by_bucket, 'buckets_found': buckets_found, 'execution_time': execution_time, 'bucket_prefix': bucket_prefix @@ -53,13 +49,6 @@ def __get_daily_stats(cls, series, timestamp_column): value += f'{day} [count: {count}], ' return value[:-2] - @classmethod - def __get_files_by_bucket(cls, filtered_buckets_stats): - value = '' - for bucket_stats in filtered_buckets_stats: - value += f'{bucket_stats["bucket_name"]} [count: {bucket_stats["files"]}], ' - return value[:-2] - @classmethod def __get_files_by_type(cls, dataframe): series = dataframe['name'] @@ -69,6 +58,13 @@ def __get_files_by_type(cls, dataframe): value += f'{file_type} [count: {count}], ' return value[:-2] + @classmethod + def __get_prefix(cls, file_patterns): + value = '' + for file_pattern in file_patterns: + value += f'{file_pattern}, ' + return value[:-2] + @classmethod def __extract_file_type(cls, file_name): file_type_at = file_name.rfind('.') @@ -76,3 +72,29 @@ def __extract_file_type(cls, file_name): return file_name[file_type_at+1:] else: return 'unknown_file_type' + + @classmethod + def __process_bucket_stats(cls, filtered_buckets_stats): + processed_bucket_stats_dict = {} + + # Consolidate repeated buckets in case we have more than one file_pattern + for bucket_stats in filtered_buckets_stats: + bucket_not_found = bucket_stats.get('bucket_not_found') + if not bucket_not_found: + bucket_name = bucket_stats['bucket_name'] + bucket_files_sum = processed_bucket_stats_dict.get(bucket_name) + bucket_files_count = bucket_stats['files'] + if not bucket_files_sum: + bucket_files_sum = 0 + + bucket_files_sum += bucket_files_count + + processed_bucket_stats_dict[bucket_name] = bucket_files_sum + + files_by_bucket = '' + for bucket_name, files_sum in processed_bucket_stats_dict.items(): + files_by_bucket += f'{bucket_name} [count: {files_sum}], ' + + files_by_bucket = files_by_bucket[:-2] + buckets_found = len(processed_bucket_stats_dict.keys()) + return buckets_found, files_by_bucket diff --git a/tests/datacatalog_fileset_enricher/datacatalog_fileset_enricher_test.py b/tests/datacatalog_fileset_enricher/datacatalog_fileset_enricher_test.py index e101d2f..14d7132 100644 --- a/tests/datacatalog_fileset_enricher/datacatalog_fileset_enricher_test.py +++ b/tests/datacatalog_fileset_enricher/datacatalog_fileset_enricher_test.py @@ -1,3 +1,5 @@ +import pandas as pd + from unittest import TestCase from unittest.mock import patch @@ -39,13 +41,13 @@ def test_clean_up_fileset_template_and_tags_should_call_the_right_clean_up_metho @patch('datacatalog_fileset_enricher.gcs_storage_stats_summarizer.GCStorageStatsSummarizer.create_stats_from_dataframe') @patch('datacatalog_fileset_enricher.gcs_storage_filter.StorageFilter.create_filtered_data_for_multiple_buckets') @patch('datacatalog_fileset_enricher.gcs_storage_filter.StorageFilter.create_filtered_data_for_single_bucket') - @patch('datacatalog_fileset_enricher.gcs_storage_filter.StorageFilter.parse_gcs_file_pattern') + @patch('datacatalog_fileset_enricher.gcs_storage_filter.StorageFilter.parse_gcs_file_patterns') @patch('datacatalog_fileset_enricher.datacatalog_helper.DataCatalogHelper.get_entry') @patch('datacatalog_fileset_enricher.datacatalog_helper.DataCatalogHelper.get_manually_created_fileset_entries') def test_run_given_entry_group_id_and_entry_id_should_enrich_a_single_entry(self, get_manually_created_fileset_entries, get_entry, - parse_gcs_file_pattern, + parse_gcs_file_patterns, create_filtered_data_for_single_bucket, create_filtered_data_for_multiple_buckets, create_stats_from_dataframe, @@ -53,12 +55,12 @@ def test_run_given_entry_group_id_and_entry_id_should_enrich_a_single_entry(self ): get_entry.return_value = self.__make_fake_fileset_entry() - parse_gcs_file_pattern.return_value = { + parse_gcs_file_patterns.return_value = [{ 'bucket_name': 'my_bucket', 'file_regex': '.*' - } + }] - dataframe = {} + dataframe = pd.DataFrame() filtered_buckets_stats = {} create_filtered_data_for_single_bucket.return_value = (dataframe, filtered_buckets_stats) @@ -70,7 +72,7 @@ def test_run_given_entry_group_id_and_entry_id_should_enrich_a_single_entry(self get_manually_created_fileset_entries.assert_not_called() get_entry.assert_called_once() - parse_gcs_file_pattern.assert_called_once() + parse_gcs_file_patterns.assert_called_once() create_filtered_data_for_single_bucket.assert_called_once() create_filtered_data_for_multiple_buckets.assert_not_called() create_stats_from_dataframe.assert_called_once() @@ -80,13 +82,62 @@ def test_run_given_entry_group_id_and_entry_id_should_enrich_a_single_entry(self @patch('datacatalog_fileset_enricher.gcs_storage_stats_summarizer.GCStorageStatsSummarizer.create_stats_from_dataframe') @patch('datacatalog_fileset_enricher.gcs_storage_filter.StorageFilter.create_filtered_data_for_multiple_buckets') @patch('datacatalog_fileset_enricher.gcs_storage_filter.StorageFilter.create_filtered_data_for_single_bucket') - @patch('datacatalog_fileset_enricher.gcs_storage_filter.StorageFilter.parse_gcs_file_pattern') + @patch('datacatalog_fileset_enricher.gcs_storage_filter.StorageFilter.parse_gcs_file_patterns') + @patch('datacatalog_fileset_enricher.datacatalog_helper.DataCatalogHelper.get_entry') + @patch('datacatalog_fileset_enricher.datacatalog_helper.DataCatalogHelper.get_manually_created_fileset_entries') + def test_run_given_entry_group_id_and_entry_id_and_multiple_gcs_patterns_should_enrich_a_single_entry( + self, + get_manually_created_fileset_entries, + get_entry, + parse_gcs_file_patterns, + create_filtered_data_for_single_bucket, + create_filtered_data_for_multiple_buckets, + create_stats_from_dataframe, + create_tag_from_stats + ): + entry = self.__make_fake_fileset_entry() + + entry.gcs_fileset_spec.file_patterns.append('gs://my_bucket/*csv') + + get_entry.return_value = entry + + parse_gcs_file_patterns.return_value = [{ + 'bucket_name': 'my_bucket', + 'file_regex': '.*' + }, { + 'bucket_name': 'my_bucket', + 'file_regex': '.*csv' + }] + + dataframe = pd.DataFrame() + filtered_buckets_stats = {} + create_filtered_data_for_single_bucket.return_value = (dataframe, filtered_buckets_stats) + + stats = {} + create_stats_from_dataframe.return_value = stats + + datacatalog_fileset_enricher = DatacatalogFilesetEnricher('test_project') + datacatalog_fileset_enricher.run('entry_group_id', 'entry_id') + + get_manually_created_fileset_entries.assert_not_called() + get_entry.assert_called_once() + parse_gcs_file_patterns.assert_called_once() + self.assertEqual(2, create_filtered_data_for_single_bucket.call_count) + create_filtered_data_for_multiple_buckets.assert_not_called() + create_stats_from_dataframe.assert_called_once() + create_tag_from_stats.assert_called_once() + + @patch('datacatalog_fileset_enricher.datacatalog_helper.DataCatalogHelper.create_tag_from_stats') + @patch('datacatalog_fileset_enricher.gcs_storage_stats_summarizer.GCStorageStatsSummarizer.create_stats_from_dataframe') + @patch('datacatalog_fileset_enricher.gcs_storage_filter.StorageFilter.create_filtered_data_for_multiple_buckets') + @patch('datacatalog_fileset_enricher.gcs_storage_filter.StorageFilter.create_filtered_data_for_single_bucket') + @patch('datacatalog_fileset_enricher.gcs_storage_filter.StorageFilter.parse_gcs_file_patterns') @patch('datacatalog_fileset_enricher.datacatalog_helper.DataCatalogHelper.get_entry') @patch('datacatalog_fileset_enricher.datacatalog_helper.DataCatalogHelper.get_manually_created_fileset_entries') def test_run_given_bucket_with_wildcard_should_call_retrieve_multiple_buckets(self, get_manually_created_fileset_entries, get_entry, - parse_gcs_file_pattern, + parse_gcs_file_patterns, create_filtered_data_for_single_bucket, create_filtered_data_for_multiple_buckets, create_stats_from_dataframe, @@ -94,12 +145,12 @@ def test_run_given_bucket_with_wildcard_should_call_retrieve_multiple_buckets(se ): get_entry.return_value = self.__make_fake_fileset_entry() - parse_gcs_file_pattern.return_value = { + parse_gcs_file_patterns.return_value = [{ 'bucket_name': 'my_bucket*', 'file_regex': '.*' - } + }] - dataframe = {} + dataframe = pd.DataFrame() filtered_buckets_stats = {} create_filtered_data_for_multiple_buckets.return_value = (dataframe, filtered_buckets_stats) @@ -111,7 +162,7 @@ def test_run_given_bucket_with_wildcard_should_call_retrieve_multiple_buckets(se get_manually_created_fileset_entries.assert_not_called() get_entry.assert_called_once() - parse_gcs_file_pattern.assert_called_once() + parse_gcs_file_patterns.assert_called_once() create_filtered_data_for_single_bucket.assert_not_called() create_filtered_data_for_multiple_buckets.assert_called_once() create_stats_from_dataframe.assert_called_once() @@ -121,13 +172,62 @@ def test_run_given_bucket_with_wildcard_should_call_retrieve_multiple_buckets(se @patch('datacatalog_fileset_enricher.gcs_storage_stats_summarizer.GCStorageStatsSummarizer.create_stats_from_dataframe') @patch('datacatalog_fileset_enricher.gcs_storage_filter.StorageFilter.create_filtered_data_for_multiple_buckets') @patch('datacatalog_fileset_enricher.gcs_storage_filter.StorageFilter.create_filtered_data_for_single_bucket') - @patch('datacatalog_fileset_enricher.gcs_storage_filter.StorageFilter.parse_gcs_file_pattern') + @patch('datacatalog_fileset_enricher.gcs_storage_filter.StorageFilter.parse_gcs_file_patterns') + @patch('datacatalog_fileset_enricher.datacatalog_helper.DataCatalogHelper.get_entry') + @patch('datacatalog_fileset_enricher.datacatalog_helper.DataCatalogHelper.get_manually_created_fileset_entries') + def test_run_given_bucket_with_wildcard_and_multiple_gcs_patterns_should_call_retrieve_multiple_buckets( + self, + get_manually_created_fileset_entries, + get_entry, + parse_gcs_file_patterns, + create_filtered_data_for_single_bucket, + create_filtered_data_for_multiple_buckets, + create_stats_from_dataframe, + create_tag_from_stats + ): + entry = self.__make_fake_fileset_entry() + + entry.gcs_fileset_spec.file_patterns.append('gs://my_bucket*/*csv') + + get_entry.return_value = entry + + parse_gcs_file_patterns.return_value = [{ + 'bucket_name': 'my_bucket*', + 'file_regex': '.*' + }, { + 'bucket_name': 'my_bucket*', + 'file_regex': '.*csv' + }] + + dataframe = pd.DataFrame() + filtered_buckets_stats = {} + create_filtered_data_for_multiple_buckets.return_value = (dataframe, filtered_buckets_stats) + + stats = {} + create_stats_from_dataframe.return_value = stats + + datacatalog_fileset_enricher = DatacatalogFilesetEnricher('test_project') + datacatalog_fileset_enricher.run('entry_group_id', 'entry_id') + + get_manually_created_fileset_entries.assert_not_called() + get_entry.assert_called_once() + parse_gcs_file_patterns.assert_called_once() + create_filtered_data_for_single_bucket.assert_not_called() + self.assertEqual(2, create_filtered_data_for_multiple_buckets.call_count) + create_stats_from_dataframe.assert_called_once() + create_tag_from_stats.assert_called_once() + + @patch('datacatalog_fileset_enricher.datacatalog_helper.DataCatalogHelper.create_tag_from_stats') + @patch('datacatalog_fileset_enricher.gcs_storage_stats_summarizer.GCStorageStatsSummarizer.create_stats_from_dataframe') + @patch('datacatalog_fileset_enricher.gcs_storage_filter.StorageFilter.create_filtered_data_for_multiple_buckets') + @patch('datacatalog_fileset_enricher.gcs_storage_filter.StorageFilter.create_filtered_data_for_single_bucket') + @patch('datacatalog_fileset_enricher.gcs_storage_filter.StorageFilter.parse_gcs_file_patterns') @patch('datacatalog_fileset_enricher.datacatalog_helper.DataCatalogHelper.get_entry') @patch('datacatalog_fileset_enricher.datacatalog_helper.DataCatalogHelper.get_manually_created_fileset_entries') def test_run_given_no_entry_group_id_and_entry_id_should_enrich_multiple_entries(self, get_manually_created_fileset_entries, get_entry, - parse_gcs_file_pattern, + parse_gcs_file_patterns, create_filtered_data_for_single_bucket, create_filtered_data_for_multiple_buckets, create_stats_from_dataframe, @@ -137,12 +237,12 @@ def test_run_given_no_entry_group_id_and_entry_id_should_enrich_multiple_entries get_entry.return_value = self.__make_fake_fileset_entry() - parse_gcs_file_pattern.return_value = { + parse_gcs_file_patterns.return_value = [{ 'bucket_name': 'my_bucket*', 'file_regex': '.*' - } + }] - dataframe = {} + dataframe = pd.DataFrame() filtered_buckets_stats = {} create_filtered_data_for_multiple_buckets.return_value = (dataframe, filtered_buckets_stats) @@ -154,7 +254,7 @@ def test_run_given_no_entry_group_id_and_entry_id_should_enrich_multiple_entries get_manually_created_fileset_entries.assert_called_once() get_entry.assert_called_once() - parse_gcs_file_pattern.assert_called_once() + parse_gcs_file_patterns.assert_called_once() create_filtered_data_for_single_bucket.assert_not_called() create_filtered_data_for_multiple_buckets.assert_called_once() create_stats_from_dataframe.assert_called_once() diff --git a/tests/datacatalog_fileset_enricher/gcs_storage_filter_test.py b/tests/datacatalog_fileset_enricher/gcs_storage_filter_test.py index f67ad79..1d58784 100644 --- a/tests/datacatalog_fileset_enricher/gcs_storage_filter_test.py +++ b/tests/datacatalog_fileset_enricher/gcs_storage_filter_test.py @@ -149,7 +149,7 @@ def test_create_filtered_data_for_single_bucket_with_nonexistent_bucket_should_c def test_parse_gcs_file_pattern_should_split_bucket_name_and_file_pattern(self): storage_filter = StorageFilter('test_project') - parsed_gcs_file_pattern = storage_filter.parse_gcs_file_pattern('gs://my_bucket*/*') + parsed_gcs_file_pattern = storage_filter.parse_gcs_file_patterns(['gs://my_bucket*/*'])[0] self.assertEqual('my_bucket.*', parsed_gcs_file_pattern['bucket_name']) self.assertEqual('.*', parsed_gcs_file_pattern['file_regex']) diff --git a/tests/datacatalog_fileset_enricher/gcs_storage_stats_summarizer_test.py b/tests/datacatalog_fileset_enricher/gcs_storage_stats_summarizer_test.py index 93260ec..47988d3 100644 --- a/tests/datacatalog_fileset_enricher/gcs_storage_stats_summarizer_test.py +++ b/tests/datacatalog_fileset_enricher/gcs_storage_stats_summarizer_test.py @@ -13,7 +13,8 @@ def test_create_stats_from_dataframe_with_no_dataframe_should_summarize_the_buck execution_time = pd.Timestamp.utcnow() bucket_prefix = None - stats = GCStorageStatsSummarizer.create_stats_from_dataframe(dataframe, 'gs://my_bucket/*', + stats = GCStorageStatsSummarizer.create_stats_from_dataframe(dataframe, + ['gs://my_bucket/*'], filtered_buckets_stats, execution_time, bucket_prefix) @@ -30,7 +31,8 @@ def test_create_stats_from_dataframe_with_no_dataframe_with_bucket_prefix_should execution_time = pd.Timestamp.utcnow() bucket_prefix = 'my_b' - stats = GCStorageStatsSummarizer.create_stats_from_dataframe(dataframe, 'gs://my_bucket/*', + stats = GCStorageStatsSummarizer.create_stats_from_dataframe(dataframe, + ['gs://my_bucket/*'], filtered_buckets_stats, execution_time, bucket_prefix) @@ -67,7 +69,8 @@ def test_create_stats_from_dataframe_with_dataframe_should_summarize_the_bucket_ columns=['name', 'public_url', 'size', 'time_created', 'time_updated']) - stats = GCStorageStatsSummarizer.create_stats_from_dataframe(dataframe, 'gs://my_bucket/*', + stats = GCStorageStatsSummarizer.create_stats_from_dataframe(dataframe, + ['gs://my_bucket/*'], filtered_buckets_stats, execution_time, bucket_prefix) From 4ae665beb70e3d610ffa8d340907c4e8d7547ae5 Mon Sep 17 00:00:00 2001 From: mesmacosta Date: Mon, 4 Nov 2019 15:22:52 -0300 Subject: [PATCH 2/2] ADD test for no bucket stats --- .../gcs_storage_stats_summarizer.py | 4 ++++ .../gcs_storage_stats_summarizer_test.py | 18 ++++++++++++++++++ 2 files changed, 22 insertions(+) diff --git a/src/datacatalog_fileset_enricher/gcs_storage_stats_summarizer.py b/src/datacatalog_fileset_enricher/gcs_storage_stats_summarizer.py index 64ee9a3..80cf965 100644 --- a/src/datacatalog_fileset_enricher/gcs_storage_stats_summarizer.py +++ b/src/datacatalog_fileset_enricher/gcs_storage_stats_summarizer.py @@ -96,5 +96,9 @@ def __process_bucket_stats(cls, filtered_buckets_stats): files_by_bucket += f'{bucket_name} [count: {files_sum}], ' files_by_bucket = files_by_bucket[:-2] + + if not files_by_bucket: + files_by_bucket = 'bucket_not_found' + buckets_found = len(processed_bucket_stats_dict.keys()) return buckets_found, files_by_bucket diff --git a/tests/datacatalog_fileset_enricher/gcs_storage_stats_summarizer_test.py b/tests/datacatalog_fileset_enricher/gcs_storage_stats_summarizer_test.py index 47988d3..ac3292e 100644 --- a/tests/datacatalog_fileset_enricher/gcs_storage_stats_summarizer_test.py +++ b/tests/datacatalog_fileset_enricher/gcs_storage_stats_summarizer_test.py @@ -25,6 +25,24 @@ def test_create_stats_from_dataframe_with_no_dataframe_should_summarize_the_buck self.assertEqual(execution_time, stats['execution_time']) self.assertEqual(None, stats['bucket_prefix']) + def test_create_stats_from_dataframe_with_no_dataframe_and_no_bucket_stats_should_summarize_the_bucket_stats(self): + dataframe = None + filtered_buckets_stats = [] + execution_time = pd.Timestamp.utcnow() + bucket_prefix = None + + stats = GCStorageStatsSummarizer.create_stats_from_dataframe(dataframe, + ['gs://my_bucket/*'], + filtered_buckets_stats, + execution_time, + bucket_prefix) + self.assertEqual(0, stats['count']) + self.assertEqual('gs://my_bucket/*', stats['prefix']) + self.assertEqual('bucket_not_found', stats['files_by_bucket']) + self.assertEqual(0, stats['buckets_found']) + self.assertEqual(execution_time, stats['execution_time']) + self.assertEqual(None, stats['bucket_prefix']) + def test_create_stats_from_dataframe_with_no_dataframe_with_bucket_prefix_should_summarize_the_bucket_stats(self): dataframe = None filtered_buckets_stats = [{'bucket_name': 'my_bucket', 'files': 100}]