From 39f0de2f6ef40ad93656ade045d69f212bd151d1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Marcin=20Ko=C5=82da?= Date: Tue, 7 Nov 2017 00:55:06 +0100 Subject: [PATCH] Fix for #9, BigQuery views are now automatically created --- .../table_metadata_v0_1.sql | 11 +++ gcp_census/model/model_creator.py | 87 +++++++++++++------ gcp_census/model/model_creator_handler.py | 3 +- gcp_census/model/table.py | 8 ++ gcp_census/model/view.py | 18 ++++ tests/model_creator_handler_test.py | 9 +- tests/model_creator_test.py | 79 +++++++++++++++-- tests/model_view_test.py | 14 +++ tests/test_utils.py | 6 ++ 9 files changed, 200 insertions(+), 35 deletions(-) create mode 100644 bq_schemas/bigquery_views_legacy/table_metadata_v0_1.sql create mode 100644 gcp_census/model/table.py create mode 100644 gcp_census/model/view.py create mode 100644 tests/model_view_test.py diff --git a/bq_schemas/bigquery_views_legacy/table_metadata_v0_1.sql b/bq_schemas/bigquery_views_legacy/table_metadata_v0_1.sql new file mode 100644 index 0000000..f0de3cd --- /dev/null +++ b/bq_schemas/bigquery_views_legacy/table_metadata_v0_1.sql @@ -0,0 +1,11 @@ +-- This view aggregates all data from last 2 days and deduplicates it based on table reference. +-- Deleted table can be returned by this query up to 2 days. +SELECT * FROM ( +SELECT projectId, datasetId, tableId, creationTime, lastModifiedTime, location, numBytes, numLongTermBytes, numRows, type, timePartitioning.type AS partitioningType, +row_number() OVER (PARTITION BY projectId, datasetId, tableId ORDER BY snapshotTime DESC) AS rownum, +COUNT(partition.partitionId) WITHIN RECORD AS partitionCount +FROM [bigquery.table_metadata_v0_1] +WHERE +_PARTITIONTIME BETWEEN TIMESTAMP(UTC_USEC_TO_DAY(NOW() - 48 * 60 * 60 * 1000000)) + AND TIMESTAMP(UTC_USEC_TO_DAY(CURRENT_TIMESTAMP()))) +WHERE rownum = 1 \ No newline at end of file diff --git a/gcp_census/model/model_creator.py b/gcp_census/model/model_creator.py index 8c156be..62d7223 100644 --- a/gcp_census/model/model_creator.py +++ b/gcp_census/model/model_creator.py @@ -8,6 +8,9 @@ from oauth2client.client import GoogleCredentials from google.appengine.api import app_identity +from gcp_census.model.table import Table +from gcp_census.model.view import View + class ModelCreator(object): def __init__(self, model_directory): @@ -24,18 +27,30 @@ def __init__(self, model_directory): def _create_http(): return httplib2.Http(timeout=10) - def list_models(self): + def __list_tables(self): + for table in self.__list_files('.json'): + with open(table[2]) as json_file: + json_dict = json.load(json_file) + yield Table(table[0], table[1], json_dict) + + def __list_views(self): + for view in self.__list_files('.sql'): + with open(view[2]) as view_file: + content = view_file.readlines() + yield View(view[0], view[1], content) + + def __list_files(self, extension): for group_dir in os.listdir(self.model_directory): subdirectory = os.path.join(self.model_directory, group_dir) if os.path.isdir(subdirectory): for model_file in os.listdir(subdirectory): - model_name = os.path.splitext(model_file)[0] - with open(os.path.join(self.model_directory, group_dir, - model_file)) as json_file: - json_dict = json.load(json_file) - yield Model(group_dir, model_name, json_dict) + if model_file.endswith(extension): + model_name = os.path.splitext(model_file)[0] + filename = os.path.join(self.model_directory, group_dir, + model_file) + yield group_dir, model_name, filename - def list_groups(self): + def __list_groups(self): for group_dir in os.listdir(self.model_directory): subdirectory = os.path.join(self.model_directory, group_dir) if os.path.isdir(subdirectory): @@ -44,7 +59,7 @@ def list_groups(self): def create_missing_datasets(self): project_id = self.__get_project_id() location = os.getenv('BIGQUERY_LOCATION') - for dataset_id in self.list_groups(): + for dataset_id in self.__list_groups(): self.__create_dataset_if_missing(project_id, dataset_id, location) def __create_dataset_if_missing(self, project_id, dataset_id, location): @@ -68,30 +83,60 @@ def __create_dataset_if_missing(self, project_id, dataset_id, location): else: raise e - def create_missing_models(self): + def create_missing_tables(self): project_id = self.__get_project_id() - for model in self.list_models(): + for table in self.__list_tables(): logging.debug("Creating BQ table %s:%s.%s", - project_id, model.group, model.name) + project_id, table.group, table.name) body = { 'tableReference': { 'projectId': project_id, - 'datasetId': model.group, - 'tableId': model.name + 'datasetId': table.group, + 'tableId': table.name } } - body.update(model.json_dict) + body.update(table.json_dict) try: self.service.tables().insert( - projectId=project_id, datasetId=model.group, + projectId=project_id, datasetId=table.group, body=body ).execute() logging.info("Table %s:%s.%s created successfully", project_id, - model.group, model.name) + table.group, table.name) except HttpError as e: if e.resp.status == 409: logging.info("Table %s:%s.%s already exists", project_id, - model.group, model.name) + table.group, table.name) + else: + raise e + + def create_missing_views(self): + project_id = self.__get_project_id() + for view in self.__list_views(): + logging.debug("Creating BQ view %s:%s.%s", + project_id, view.group, view.name) + body = { + 'tableReference': { + 'projectId': project_id, + 'datasetId': view.group, + 'tableId': view.name + }, + "view": { + "query": view.query + }, + "description": view.description + } + try: + self.service.tables().insert( + projectId=project_id, datasetId=view.group, + body=body + ).execute() + logging.info("View %s:%s.%s created successfully", project_id, + view.group, view.name) + except HttpError as e: + if e.resp.status == 409: + logging.info("View %s:%s.%s already exists", project_id, + view.group, view.name) else: raise e @@ -101,11 +146,3 @@ def __get_project_id(): app_identity.get_application_id()) -class Model(object): - def __init__(self, group, name, json_dict): - self.group = group - self.name = name - self.json_dict = json_dict - - def __str__(self): - return "Model(group={}, name={})".format(self.group, self.name) diff --git a/gcp_census/model/model_creator_handler.py b/gcp_census/model/model_creator_handler.py index a7c7ab4..b76fdf9 100644 --- a/gcp_census/model/model_creator_handler.py +++ b/gcp_census/model/model_creator_handler.py @@ -10,4 +10,5 @@ def __init__(self, request=None, response=None): def get(self): model_creator = ModelCreator("bq_schemas") model_creator.create_missing_datasets() - model_creator.create_missing_models() + model_creator.create_missing_tables() + model_creator.create_missing_views() diff --git a/gcp_census/model/table.py b/gcp_census/model/table.py new file mode 100644 index 0000000..79f3294 --- /dev/null +++ b/gcp_census/model/table.py @@ -0,0 +1,8 @@ +class Table(object): + def __init__(self, group, name, json_dict): + self.group = group + self.name = name + self.json_dict = json_dict + + def __str__(self): + return "Table(group={}, name={})".format(self.group, self.name) diff --git a/gcp_census/model/view.py b/gcp_census/model/view.py new file mode 100644 index 0000000..ddc7a55 --- /dev/null +++ b/gcp_census/model/view.py @@ -0,0 +1,18 @@ +class View(object): + def __init__(self, group, name, file_content): + self.group = group + self.name = name + self.file_content = file_content + + def __str__(self): + return "View(group={}, name={})".format(self.group, self.name) + + @property + def query(self): + return "".join(self.file_content) + + @property + def description(self): + desc_lines = [line[2:] for line in self.file_content + if line.startswith('--')] + return "".join(desc_lines) diff --git a/tests/model_creator_handler_test.py b/tests/model_creator_handler_test.py index e7d626a..173b5b4 100644 --- a/tests/model_creator_handler_test.py +++ b/tests/model_creator_handler_test.py @@ -27,13 +27,16 @@ def tearDown(self): patch.stopall() @patch.object(ModelCreator, 'create_missing_datasets') - @patch.object(ModelCreator, 'create_missing_models') - def test_happy_path(self, create_missing_models, create_missing_datasets): + @patch.object(ModelCreator, 'create_missing_tables') + @patch.object(ModelCreator, 'create_missing_views') + def test_happy_path(self, create_missing_views, create_missing_tables, + create_missing_datasets): # when response = self.under_test.get( '/createModels' ) # then self.assertEqual(response.status_int, 200) - create_missing_models.assert_called_once() + create_missing_views.assert_called_once() + create_missing_tables.assert_called_once() create_missing_datasets.assert_called_once() diff --git a/tests/model_creator_test.py b/tests/model_creator_test.py index 528bcfe..8220648 100644 --- a/tests/model_creator_test.py +++ b/tests/model_creator_test.py @@ -26,6 +26,8 @@ def test_should_create_dataset(self, _create_http): http_mock = Mock(wraps=HttpMockSequence([ ({'status': '200'}, test_utils.content( 'tests/json_samples/bigquery_v2_test_schema.json')), + ({'status': '200'}, test_utils.content( + 'tests/json_samples/bigquery_v2_datasets_insert_200.json')), ({'status': '200'}, test_utils.content( 'tests/json_samples/bigquery_v2_datasets_insert_200.json')) ])) @@ -37,7 +39,7 @@ def test_should_create_dataset(self, _create_http): # then calls = http_mock.mock_calls - self.assertEqual(2, len(calls)) + self.assertEqual(3, len(calls)) @patch.object(ModelCreator, '_create_http') def test_should_ignore_dataset_already_exists_error(self, _create_http): @@ -45,6 +47,8 @@ def test_should_ignore_dataset_already_exists_error(self, _create_http): http_mock = Mock(wraps=HttpMockSequence([ ({'status': '200'}, test_utils.content( 'tests/json_samples/bigquery_v2_test_schema.json')), + ({'status': '409'}, test_utils.content( + 'tests/json_samples/bigquery_v2_datasets_insert_409.json')), ({'status': '409'}, test_utils.content( 'tests/json_samples/bigquery_v2_datasets_insert_409.json')) ])) @@ -56,7 +60,7 @@ def test_should_ignore_dataset_already_exists_error(self, _create_http): # then calls = http_mock.mock_calls - self.assertEqual(2, len(calls)) + self.assertEqual(3, len(calls)) @patch.object(ModelCreator, '_create_http') def test_should_propagate_dataset_500_error(self, _create_http): @@ -79,7 +83,7 @@ def test_should_propagate_dataset_500_error(self, _create_http): self.assertEqual(500, context.exception.resp.status) @patch.object(ModelCreator, '_create_http') - def test_should_create_table(self, _create_http): + def test_should_create_tables(self, _create_http): # given http_mock = Mock(wraps=HttpMockSequence([ ({'status': '200'}, test_utils.content( @@ -95,7 +99,7 @@ def test_should_create_table(self, _create_http): under_test = ModelCreator("bq_schemas") # when - under_test.create_missing_models() + under_test.create_missing_tables() # then calls = http_mock.mock_calls @@ -118,7 +122,7 @@ def test_should_ignore_table_already_exists_error(self, _create_http): under_test = ModelCreator("bq_schemas") # when - under_test.create_missing_models() + under_test.create_missing_tables() # then calls = http_mock.mock_calls @@ -137,9 +141,72 @@ def test_should_propagate_table_500_error(self, _create_http): # when with self.assertRaises(HttpError) as context: - under_test.create_missing_models() + under_test.create_missing_tables() # then calls = http_mock.mock_calls self.assertEqual(2, len(calls)) self.assertEqual(500, context.exception.resp.status) + + @patch.object(ModelCreator, '_create_http') + def test_should_create_views(self, _create_http): + # given + http_mock = Mock(wraps=HttpMockSequence([ + ({'status': '200'}, test_utils.content( + 'tests/json_samples/bigquery_v2_test_schema.json')), + ({'status': '200'}, test_utils.content( + 'tests/json_samples/bigquery_v2_tables_insert_200.json')), + ])) + _create_http.return_value = http_mock + under_test = ModelCreator("bq_schemas") + + # when + under_test.create_missing_views() + + # then + calls = http_mock.mock_calls + self.assertEqual(2, len(calls)) + json_request = test_utils.get_body_from_http_request( + calls[1]) + self.assertTrue('description' in json_request) + self.assertTrue('query' in json_request['view']) + + @patch.object(ModelCreator, '_create_http') + def test_should_ignore_view_already_exists_error(self, _create_http): + # given + http_mock = Mock(wraps=HttpMockSequence([ + ({'status': '200'}, test_utils.content( + 'tests/json_samples/bigquery_v2_test_schema.json')), + ({'status': '409'}, test_utils.content( + 'tests/json_samples/bigquery_v2_tables_insert_409.json')) + ])) + _create_http.return_value = http_mock + under_test = ModelCreator("bq_schemas") + + # when + under_test.create_missing_views() + + # then + calls = http_mock.mock_calls + self.assertEqual(2, len(calls)) + + @patch.object(ModelCreator, '_create_http') + def test_should_propagate_view_500_error(self, _create_http): + # given + http_mock = Mock(wraps=HttpMockSequence([ + ({'status': '200'}, test_utils.content( + 'tests/json_samples/bigquery_v2_test_schema.json')), + ({'status': '500'}, '') + ])) + _create_http.return_value = http_mock + under_test = ModelCreator("bq_schemas") + + # when + with self.assertRaises(HttpError) as context: + under_test.create_missing_views() + + # then + calls = http_mock.mock_calls + self.assertEqual(2, len(calls)) + self.assertEqual(500, context.exception.resp.status) + diff --git a/tests/model_view_test.py b/tests/model_view_test.py new file mode 100644 index 0000000..9784ec1 --- /dev/null +++ b/tests/model_view_test.py @@ -0,0 +1,14 @@ +import unittest + +from gcp_census.model.view import View + + +class TestModelCreator(unittest.TestCase): + def test_should_parse_view(self): + # given + view = View('group', 'name', + ['--desc1\n', '--desc2\n', 'SELECT * FROM\n', 'table']) + + # when & then + self.assertEqual(view.description, 'desc1\ndesc2\n') + self.assertEqual(view.query, '--desc1\n--desc2\nSELECT * FROM\ntable') diff --git a/tests/test_utils.py b/tests/test_utils.py index d8df3e2..a9ba3e5 100644 --- a/tests/test_utils.py +++ b/tests/test_utils.py @@ -1,4 +1,5 @@ import os +import json def content(filename): @@ -6,3 +7,8 @@ def content(filename): raise Exception("File not found: {0}".format(filename)) with open(filename, 'r') as f: return f.read() + + +def get_body_from_http_request(call): + payload = call[1][2] + return json.loads(payload) if payload else None