diff --git a/task/bq2bq/executor/bumblebee/bigquery_service.py b/task/bq2bq/executor/bumblebee/bigquery_service.py index ad1a319..23b209f 100644 --- a/task/bq2bq/executor/bumblebee/bigquery_service.py +++ b/task/bq2bq/executor/bumblebee/bigquery_service.py @@ -32,7 +32,8 @@ def transform_load(self, source_project_id=None, destination_table=None, write_disposition=None, - create_disposition=CreateDisposition.CREATE_NEVER): + create_disposition=CreateDisposition.CREATE_NEVER, + allow_field_addition=False): pass @abstractmethod @@ -94,7 +95,8 @@ def transform_load(self, source_project_id=None, destination_table=None, write_disposition=None, - create_disposition=CreateDisposition.CREATE_NEVER): + create_disposition=CreateDisposition.CREATE_NEVER, + allow_field_addition=False): if query is None or len(query) == 0: raise ValueError("query must not be Empty") @@ -103,6 +105,11 @@ def transform_load(self, query_job_config.write_disposition = write_disposition query_job_config.use_legacy_sql = False query_job_config.labels = self.labels + if allow_field_addition: + query_job_config.schema_update_options = [ + bigquery.SchemaUpdateOption.ALLOW_FIELD_ADDITION, + bigquery.SchemaUpdateOption.ALLOW_FIELD_RELAXATION + ] if destination_table is not None: table_ref = TableReference.from_string(destination_table) @@ -160,6 +167,7 @@ def create_bigquery_service(task_config: TaskConfigFromEnv, labels, writer): credentials = _get_bigquery_credentials() default_query_job_config = QueryJobConfig() default_query_job_config.priority = task_config.query_priority + default_query_job_config.allow_field_addition = task_config.allow_field_addition client = bigquery.Client(project=task_config.execution_project, credentials=credentials, default_query_job_config=default_query_job_config) bigquery_service = BigqueryService(client, labels, writer) return bigquery_service @@ -214,7 +222,7 @@ def execute_query(self, query): return [] def transform_load(self, query, source_project_id=None, destination_table=None, write_disposition=None, - create_disposition=CreateDisposition.CREATE_NEVER): + create_disposition=CreateDisposition.CREATE_NEVER, allow_field_addition=False): log = """ transform and load with config : {} {} diff --git a/task/bq2bq/executor/bumblebee/config.py b/task/bq2bq/executor/bumblebee/config.py index 9760b94..7762399 100644 --- a/task/bq2bq/executor/bumblebee/config.py +++ b/task/bq2bq/executor/bumblebee/config.py @@ -125,6 +125,7 @@ def __init__(self): self._timezone = _validate_timezone_exist(get_env_config("TIMEZONE", default="UTC")) self._use_spillover = _bool_from_str(get_env_config("USE_SPILLOVER", default="true")) self._concurrency = _validate_greater_than_zero(int(get_env_config("CONCURRENCY", default=1))) + self._allow_field_addition = _bool_from_str(get_env_config("ALLOW_FIELD_ADDITION", default="false")) @property def destination_project(self) -> str: @@ -138,6 +139,10 @@ def execution_project(self) -> str: def destination_dataset(self) -> str: return self._destination_dataset + @property + def allow_field_addition(self) -> bool: + return self._allow_field_addition + @property def destination_table_name(self) -> str: return self._destination_table_name @@ -342,6 +347,7 @@ def __init__(self, raw_properties): self._use_spillover = _bool_from_str(self._get_property_or_default("USE_SPILLOVER", "true")) self._concurrency = _validate_greater_than_zero(int(self._get_property_or_default("CONCURRENCY", 1))) + self._allow_field_addition = _bool_from_str(self._get_property_or_default("ALLOW_FIELD_ADDITION", "false")) @property def sql_type(self) -> str: @@ -402,6 +408,10 @@ def concurrency(self) -> int: def filter_expression(self) -> str: return self._filter_expression + @property + def allow_field_addition(self) -> bool: + return self._allow_field_addition + def print(self): logger.info("task config:\n{}".format( "\n".join([ diff --git a/task/bq2bq/executor/bumblebee/loader.py b/task/bq2bq/executor/bumblebee/loader.py index a8a0133..5950259 100644 --- a/task/bq2bq/executor/bumblebee/loader.py +++ b/task/bq2bq/executor/bumblebee/loader.py @@ -14,34 +14,37 @@ def load(self, query): class PartitionLoader(BaseLoader): - def __init__(self, bigquery_service, destination: str, load_method: LoadMethod, partition: datetime): + def __init__(self, bigquery_service, destination: str, load_method: LoadMethod, partition: datetime, allow_field_addition=False): self.bigquery_service = bigquery_service self.destination_name = destination self.load_method = load_method self.partition_date = partition + self.allow_field_addition = allow_field_addition def load(self, query): partition_date_str = self.partition_date.strftime("%Y%m%d") load_destination = "{}${}".format(self.destination_name, partition_date_str) - - write_disposition = self.load_method.write_disposition + allow_field_addition = self.allow_field_addition return self.bigquery_service.transform_load(query=query, write_disposition=write_disposition, - destination_table=load_destination) + destination_table=load_destination, + allow_field_addition=allow_field_addition) class TableLoader(BaseLoader): - def __init__(self, bigquery_service, destination: str, load_method: LoadMethod): + def __init__(self, bigquery_service, destination: str, load_method: LoadMethod, allow_field_addition=False): self.bigquery_service = bigquery_service self.full_table_name = destination self.load_method = load_method + self.allow_field_addition = allow_field_addition def load(self, query): return self.bigquery_service.transform_load(query=query, write_disposition=self.load_method.write_disposition, - destination_table=self.full_table_name) + destination_table=self.full_table_name, + allow_field_addition=self.allow_field_addition) class DMLLoader(BaseLoader): diff --git a/task/bq2bq/executor/bumblebee/transformation.py b/task/bq2bq/executor/bumblebee/transformation.py index 836d39c..32afe02 100644 --- a/task/bq2bq/executor/bumblebee/transformation.py +++ b/task/bq2bq/executor/bumblebee/transformation.py @@ -219,7 +219,8 @@ def __init__(self, bigquery_service: BigqueryService, self.execution_time = execution_time def transform(self): - loader = TableLoader(self.bigquery_service, self.task_config.destination_table, self.task_config.load_method) + loader = TableLoader(self.bigquery_service, self.task_config.destination_table, self.task_config.load_method, + self.task_config.allow_field_addition) logger.info("create transformation for table") task = PartitionTransformation(self.task_config, @@ -411,7 +412,8 @@ def transform(self): logger.info("create transformation for partition: {}".format(destination_partition)) task_loader = PartitionLoader(self.bigquery_service, self.task_config.destination_table, - self.task_config.load_method, destination_partition) + self.task_config.load_method, destination_partition, + self.task_config.allow_field_addition) task = PartitionTransformation(self.task_config, task_loader, diff --git a/task/bq2bq/executor/example.py b/task/bq2bq/executor/example.py index 3a014d0..a7d1b9b 100644 --- a/task/bq2bq/executor/example.py +++ b/task/bq2bq/executor/example.py @@ -1,5 +1,6 @@ from bumblebee.bq2bq import bq2bq from datetime import datetime, timezone +import os execution_date = datetime.utcnow() @@ -202,4 +203,17 @@ def replace_all(): ) -replace_all() +def allow_field_addition(): + bq2bq( + DEFAULT_PATH + "/samples/tasks/allow_field_addition/basic/properties.cfg", + DEFAULT_PATH + "/samples/tasks/allow_field_addition/basic/query.sql", + None, + datetime(2021, 9, 1, 1), + datetime(2021, 9, 2, 1), + execution_date, + False + ) + + +DEFAULT_PATH = os.path.dirname(os.path.realpath(__file__)) +allow_field_addition() \ No newline at end of file diff --git a/task/bq2bq/executor/samples/tasks/allow_field_addition/basic/properties.cfg b/task/bq2bq/executor/samples/tasks/allow_field_addition/basic/properties.cfg new file mode 100644 index 0000000..bd61971 --- /dev/null +++ b/task/bq2bq/executor/samples/tasks/allow_field_addition/basic/properties.cfg @@ -0,0 +1,13 @@ +[DESTINATION] +PROJECT="g-data-gojek-id-mart" +DATASET="playground" +TABLE="allow_field_addition_test" + +[TRANSFORMATION] +WINDOW_SIZE = 720h +WINDOW_OFFSET = 0 +WINDOW_TRUNCATE_UPTO = M + +[LOAD] +LOAD_METHOD="REPLACE" +ALLOW_FIELD_ADDITION=TRUE \ No newline at end of file diff --git a/task/bq2bq/executor/samples/tasks/allow_field_addition/basic/query.sql b/task/bq2bq/executor/samples/tasks/allow_field_addition/basic/query.sql new file mode 100644 index 0000000..b2e5f43 --- /dev/null +++ b/task/bq2bq/executor/samples/tasks/allow_field_addition/basic/query.sql @@ -0,0 +1,6 @@ +select + "hakai" as hakai, + "rasengan" as rasengan, + "over" as overs, + "allow_field_addition" as test_column, + TIMESTAMP ('2021-09-01T01:02:03') as `event_timestamp` diff --git a/task/bq2bq/executor/tests/test_config.py b/task/bq2bq/executor/tests/test_config.py index 4c96fbe..ec87c06 100644 --- a/task/bq2bq/executor/tests/test_config.py +++ b/task/bq2bq/executor/tests/test_config.py @@ -117,7 +117,7 @@ def setUp(self): def test_localise_datetime(self): tzname = "Asia/Jakarta" start_time = datetime(2019, 1, 1) - localised_start_time = localise_datetime(start_time,tzname) + localised_start_time = localise_datetime(start_time, tzname) expected_start_time = self.timezone.localize(datetime(year=2019, month=1, day=1)) self.assertEqual(expected_start_time,localised_start_time) @@ -194,6 +194,20 @@ def test_empty_destination_exception(self): self.assertEqual("config 'PROJECT' must be provided", str(ex.exception)) + def test_allow_field_addition(self): + self.set_vars_with_default() + os.environ['ALLOW_FIELD_ADDITION'] = 'true' + + config = TaskConfigFromEnv() + self.assertEqual(True, config.allow_field_addition) + del os.environ['ALLOW_FIELD_ADDITION'] + + def test_allow_field_addition_should_be_false_by_default(self): + self.set_vars_with_default() + + config = TaskConfigFromEnv() + self.assertEqual(False, config.allow_field_addition) + class TestTaskFiles(TestCase): @@ -227,8 +241,8 @@ def test_task_files_with_spillover_query(self): fs.exist.assert_has_calls([call(query_sql_file)]) fs.read.assert_has_calls([call(query_sql_file)]) - self.assertEqual(task_files.query,"content") - self.assertEqual(task_files.spillover_query,"content") + self.assertEqual(task_files.query, "content") + self.assertEqual(task_files.spillover_query, "content") class TestLoadMethod(TestCase): diff --git a/task/bq2bq/executor/tests/test_transformation.py b/task/bq2bq/executor/tests/test_transformation.py index ea9673b..54167ea 100644 --- a/task/bq2bq/executor/tests/test_transformation.py +++ b/task/bq2bq/executor/tests/test_transformation.py @@ -49,8 +49,9 @@ def test_partition_transform_execute(self, BigqueryServiceMock): """ bigquery_service.transform_load.assert_called_with(query=final_query, - write_disposition=WriteDisposition.WRITE_TRUNCATE, - destination_table="bq_project.playground_dev.abcd$20190101") + write_disposition=WriteDisposition.WRITE_TRUNCATE, + destination_table="bq_project.playground_dev.abcd$20190101", + allow_field_addition=False) @mock.patch("bumblebee.bigquery_service.BigqueryService") def test_table_transform(self, BigqueryServiceMock): @@ -73,6 +74,7 @@ def test_table_transform(self, BigqueryServiceMock): LOAD_METHOD="REPLACE" """ set_vars_with_default() + task_config = TaskConfigFromEnv() localized_start_time = localise_datetime(datetime(2019, 1, 2), task_config.timezone) localized_end_time = localise_datetime(datetime(2019, 1, 3), task_config.timezone) @@ -87,7 +89,8 @@ def test_table_transform(self, BigqueryServiceMock): bigquery_service.transform_load.assert_called_with(query=final_query, write_disposition=WriteDisposition.WRITE_TRUNCATE, - destination_table="bq_project.playground_dev.abcd") + destination_table="bq_project.playground_dev.abcd", + allow_field_addition=False) @mock.patch("bumblebee.bigquery_service.BigqueryService") def test_single_partition_transform_1d_window_0_offset_without_spillover(self, BigqueryServiceMock): @@ -109,7 +112,8 @@ def test_single_partition_transform_1d_window_0_offset_without_spillover(self, B bigquery_service.transform_load.assert_called_with(query=final_query, write_disposition=WriteDisposition.WRITE_TRUNCATE, - destination_table="bq_project.playground_dev.abcd$20190101") + destination_table="bq_project.playground_dev.abcd$20190101", + allow_field_addition=False) @mock.patch("bumblebee.bigquery_service.BigqueryService") def test_single_partition_transform_2d_window_24h_offset_without_spillover(self, BigqueryServiceMock): @@ -130,7 +134,8 @@ def test_single_partition_transform_2d_window_24h_offset_without_spillover(self, bigquery_service.transform_load.assert_called_with(query=final_query, write_disposition=WriteDisposition.WRITE_TRUNCATE, - destination_table="bq_project.playground_dev.abcd$20190104") + destination_table="bq_project.playground_dev.abcd$20190104", + allow_field_addition=False) @mock.patch("bumblebee.bigquery_service.BigqueryService") def test_single_partition_transform_7d_window_without_spillover(self, BigqueryServiceMock): @@ -152,6 +157,7 @@ def test_single_partition_transform_7d_window_without_spillover(self, BigquerySe [LOAD] LOAD_METHOD="REPLACE" """ + set_vars_with_default() task_config = TaskConfigFromEnv() localized_start_time = localise_datetime(datetime(2019, 1, 3), task_config.timezone) @@ -167,7 +173,8 @@ def test_single_partition_transform_7d_window_without_spillover(self, BigquerySe bigquery_service.transform_load.assert_called_with(query=final_query, write_disposition=WriteDisposition.WRITE_TRUNCATE, - destination_table="bq_project.playground_dev.abcd$20190103") + destination_table="bq_project.playground_dev.abcd$20190103", + allow_field_addition=False) @mock.patch("bumblebee.bigquery_service.BigqueryService") def test_single_partition_transform_2d_with_spillover(self, BigqueryServiceMock): @@ -189,9 +196,11 @@ def test_single_partition_transform_2d_with_spillover(self, BigqueryServiceMock) final_query_2 = """\nselect count(1) from table where date >= '2019-01-04' and date < '2019-01-05'""" calls = [call(query=final_query_1, write_disposition=WriteDisposition.WRITE_TRUNCATE, - destination_table="bq_project.playground_dev.abcd$20190103"), + destination_table="bq_project.playground_dev.abcd$20190103", + allow_field_addition=False), call(query=final_query_2, write_disposition=WriteDisposition.WRITE_TRUNCATE, - destination_table="bq_project.playground_dev.abcd$20190104")] + destination_table="bq_project.playground_dev.abcd$20190104", + allow_field_addition=False)] bigquery_service.transform_load.assert_has_calls(calls, any_order=True) self.assertEqual(len(bigquery_service.transform_load.call_args_list), len(calls)) @@ -231,6 +240,31 @@ def test_execute_dry_run(self, BigqueryServiceMock): bigquery_service.transform_load.assert_not_called() + @mock.patch("bumblebee.bigquery_service.BigqueryService") + def test_allow_field_addition(self, BigqueryServiceMock): + query = """select count(1) from table where date >= '__dstart__' and date < '__dend__'""" + + set_vars_with_default() + os.environ['ALLOW_FIELD_ADDITION'] = 'true' + task_config = TaskConfigFromEnv() + del os.environ['ALLOW_FIELD_ADDITION'] + + localized_start_time = localise_datetime(datetime(2019, 1, 1), task_config.timezone) + localized_end_time = localise_datetime(datetime(2019, 1, 2), task_config.timezone) + localized_execution_time = localise_datetime(datetime(2019, 1, 1), task_config.timezone) + + bigquery_service = BigqueryServiceMock() + task = TableTransformation(bigquery_service, task_config, query, localized_start_time, + localized_end_time, False, localized_execution_time) + task.transform() + + final_query = """select count(1) from table where date >= '2019-01-01' and date < '2019-01-02'""" + bigquery_service.transform_load.assert_called_with(query=final_query, + write_disposition=WriteDisposition.WRITE_TRUNCATE, + destination_table="bq_project.playground_dev.abcd", + allow_field_addition=True) + + class TestTransformation(TestCase): @mock.patch("bumblebee.bigquery_service.BigqueryService") @@ -290,7 +324,8 @@ def get_table_mock(table_name): final_query = """select count(1) from table where date >= '2019-02-01' and date < '2019-02-02'""" bigquery_service.transform_load.assert_called_with(query=final_query, write_disposition=WriteDisposition.WRITE_APPEND, - destination_table="bq_project.playground_dev.abcd") + destination_table="bq_project.playground_dev.abcd", + allow_field_addition=False) @mock.patch("bumblebee.bigquery_service.BigqueryService") def test_table_transform_with_merge_load_method_and_non_partitioned_destination(self, BigqueryServiceMock): @@ -326,7 +361,8 @@ def get_table_mock(table_name): final_query = """select count(1) from table where date >= '2019-01-03' and date < '2019-01-04'\n""" bigquery_service.transform_load.assert_called_with(query=final_query, write_disposition=WriteDisposition.WRITE_TRUNCATE, - destination_table="bq_project.playground_dev.abcd") + destination_table="bq_project.playground_dev.abcd", + allow_field_addition=False) @mock.patch("bumblebee.bigquery_service.BigqueryService") def test_should_run_partition_task_on_field(self, BigqueryServiceMock):