Skip to content

Commit

Permalink
feat: add automate column addition based on query. (#56)
Browse files Browse the repository at this point in the history
* feat: add automate column addition based on query.

Co-authored-by: Ramadhan Putra <flamecrossram@gmail.com>

* fix and add some test case

Co-authored-by: Teguh Riadi <teguh.riadi@gojek.com>
Co-authored-by: Ramadhan Putra <flamecrossram@gmail.com>
  • Loading branch information
3 people committed Oct 12, 2022
1 parent f91bd46 commit c325b86
Show file tree
Hide file tree
Showing 9 changed files with 131 additions and 25 deletions.
14 changes: 11 additions & 3 deletions task/bq2bq/executor/bumblebee/bigquery_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@ def transform_load(self,
source_project_id=None,
destination_table=None,
write_disposition=None,
create_disposition=CreateDisposition.CREATE_NEVER):
create_disposition=CreateDisposition.CREATE_NEVER,
allow_field_addition=False):
pass

@abstractmethod
Expand Down Expand Up @@ -94,7 +95,8 @@ def transform_load(self,
source_project_id=None,
destination_table=None,
write_disposition=None,
create_disposition=CreateDisposition.CREATE_NEVER):
create_disposition=CreateDisposition.CREATE_NEVER,
allow_field_addition=False):
if query is None or len(query) == 0:
raise ValueError("query must not be Empty")

Expand All @@ -103,6 +105,11 @@ def transform_load(self,
query_job_config.write_disposition = write_disposition
query_job_config.use_legacy_sql = False
query_job_config.labels = self.labels
if allow_field_addition:
query_job_config.schema_update_options = [
bigquery.SchemaUpdateOption.ALLOW_FIELD_ADDITION,
bigquery.SchemaUpdateOption.ALLOW_FIELD_RELAXATION
]

if destination_table is not None:
table_ref = TableReference.from_string(destination_table)
Expand Down Expand Up @@ -160,6 +167,7 @@ def create_bigquery_service(task_config: TaskConfigFromEnv, labels, writer):
credentials = _get_bigquery_credentials()
default_query_job_config = QueryJobConfig()
default_query_job_config.priority = task_config.query_priority
default_query_job_config.allow_field_addition = task_config.allow_field_addition
client = bigquery.Client(project=task_config.execution_project, credentials=credentials, default_query_job_config=default_query_job_config)
bigquery_service = BigqueryService(client, labels, writer)
return bigquery_service
Expand Down Expand Up @@ -214,7 +222,7 @@ def execute_query(self, query):
return []

def transform_load(self, query, source_project_id=None, destination_table=None, write_disposition=None,
create_disposition=CreateDisposition.CREATE_NEVER):
create_disposition=CreateDisposition.CREATE_NEVER, allow_field_addition=False):
log = """ transform and load with config :
{}
{}
Expand Down
10 changes: 10 additions & 0 deletions task/bq2bq/executor/bumblebee/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ def __init__(self):
self._timezone = _validate_timezone_exist(get_env_config("TIMEZONE", default="UTC"))
self._use_spillover = _bool_from_str(get_env_config("USE_SPILLOVER", default="true"))
self._concurrency = _validate_greater_than_zero(int(get_env_config("CONCURRENCY", default=1)))
self._allow_field_addition = _bool_from_str(get_env_config("ALLOW_FIELD_ADDITION", default="false"))

@property
def destination_project(self) -> str:
Expand All @@ -138,6 +139,10 @@ def execution_project(self) -> str:
def destination_dataset(self) -> str:
return self._destination_dataset

@property
def allow_field_addition(self) -> bool:
return self._allow_field_addition

@property
def destination_table_name(self) -> str:
return self._destination_table_name
Expand Down Expand Up @@ -342,6 +347,7 @@ def __init__(self, raw_properties):

self._use_spillover = _bool_from_str(self._get_property_or_default("USE_SPILLOVER", "true"))
self._concurrency = _validate_greater_than_zero(int(self._get_property_or_default("CONCURRENCY", 1)))
self._allow_field_addition = _bool_from_str(self._get_property_or_default("ALLOW_FIELD_ADDITION", "false"))

@property
def sql_type(self) -> str:
Expand Down Expand Up @@ -402,6 +408,10 @@ def concurrency(self) -> int:
def filter_expression(self) -> str:
return self._filter_expression

@property
def allow_field_addition(self) -> bool:
return self._allow_field_addition

def print(self):
logger.info("task config:\n{}".format(
"\n".join([
Expand Down
15 changes: 9 additions & 6 deletions task/bq2bq/executor/bumblebee/loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,34 +14,37 @@ def load(self, query):

class PartitionLoader(BaseLoader):

def __init__(self, bigquery_service, destination: str, load_method: LoadMethod, partition: datetime):
def __init__(self, bigquery_service, destination: str, load_method: LoadMethod, partition: datetime, allow_field_addition=False):
self.bigquery_service = bigquery_service
self.destination_name = destination
self.load_method = load_method
self.partition_date = partition
self.allow_field_addition = allow_field_addition

def load(self, query):
partition_date_str = self.partition_date.strftime("%Y%m%d")
load_destination = "{}${}".format(self.destination_name, partition_date_str)


write_disposition = self.load_method.write_disposition
allow_field_addition = self.allow_field_addition
return self.bigquery_service.transform_load(query=query,
write_disposition=write_disposition,
destination_table=load_destination)
destination_table=load_destination,
allow_field_addition=allow_field_addition)


class TableLoader(BaseLoader):

def __init__(self, bigquery_service, destination: str, load_method: LoadMethod):
def __init__(self, bigquery_service, destination: str, load_method: LoadMethod, allow_field_addition=False):
self.bigquery_service = bigquery_service
self.full_table_name = destination
self.load_method = load_method
self.allow_field_addition = allow_field_addition

def load(self, query):
return self.bigquery_service.transform_load(query=query,
write_disposition=self.load_method.write_disposition,
destination_table=self.full_table_name)
destination_table=self.full_table_name,
allow_field_addition=self.allow_field_addition)


class DMLLoader(BaseLoader):
Expand Down
6 changes: 4 additions & 2 deletions task/bq2bq/executor/bumblebee/transformation.py
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,8 @@ def __init__(self, bigquery_service: BigqueryService,
self.execution_time = execution_time

def transform(self):
loader = TableLoader(self.bigquery_service, self.task_config.destination_table, self.task_config.load_method)
loader = TableLoader(self.bigquery_service, self.task_config.destination_table, self.task_config.load_method,
self.task_config.allow_field_addition)
logger.info("create transformation for table")

task = PartitionTransformation(self.task_config,
Expand Down Expand Up @@ -411,7 +412,8 @@ def transform(self):

logger.info("create transformation for partition: {}".format(destination_partition))
task_loader = PartitionLoader(self.bigquery_service, self.task_config.destination_table,
self.task_config.load_method, destination_partition)
self.task_config.load_method, destination_partition,
self.task_config.allow_field_addition)

task = PartitionTransformation(self.task_config,
task_loader,
Expand Down
16 changes: 15 additions & 1 deletion task/bq2bq/executor/example.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from bumblebee.bq2bq import bq2bq
from datetime import datetime, timezone
import os

execution_date = datetime.utcnow()

Expand Down Expand Up @@ -202,4 +203,17 @@ def replace_all():
)


replace_all()
def allow_field_addition():
bq2bq(
DEFAULT_PATH + "/samples/tasks/allow_field_addition/basic/properties.cfg",
DEFAULT_PATH + "/samples/tasks/allow_field_addition/basic/query.sql",
None,
datetime(2021, 9, 1, 1),
datetime(2021, 9, 2, 1),
execution_date,
False
)


DEFAULT_PATH = os.path.dirname(os.path.realpath(__file__))
allow_field_addition()
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
[DESTINATION]
PROJECT="g-data-gojek-id-mart"
DATASET="playground"
TABLE="allow_field_addition_test"

[TRANSFORMATION]
WINDOW_SIZE = 720h
WINDOW_OFFSET = 0
WINDOW_TRUNCATE_UPTO = M

[LOAD]
LOAD_METHOD="REPLACE"
ALLOW_FIELD_ADDITION=TRUE
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
select
"hakai" as hakai,
"rasengan" as rasengan,
"over" as overs,
"allow_field_addition" as test_column,
TIMESTAMP ('2021-09-01T01:02:03') as `event_timestamp`
20 changes: 17 additions & 3 deletions task/bq2bq/executor/tests/test_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ def setUp(self):
def test_localise_datetime(self):
tzname = "Asia/Jakarta"
start_time = datetime(2019, 1, 1)
localised_start_time = localise_datetime(start_time,tzname)
localised_start_time = localise_datetime(start_time, tzname)

expected_start_time = self.timezone.localize(datetime(year=2019, month=1, day=1))
self.assertEqual(expected_start_time,localised_start_time)
Expand Down Expand Up @@ -194,6 +194,20 @@ def test_empty_destination_exception(self):

self.assertEqual("config 'PROJECT' must be provided", str(ex.exception))

def test_allow_field_addition(self):
self.set_vars_with_default()
os.environ['ALLOW_FIELD_ADDITION'] = 'true'

config = TaskConfigFromEnv()
self.assertEqual(True, config.allow_field_addition)
del os.environ['ALLOW_FIELD_ADDITION']

def test_allow_field_addition_should_be_false_by_default(self):
self.set_vars_with_default()

config = TaskConfigFromEnv()
self.assertEqual(False, config.allow_field_addition)


class TestTaskFiles(TestCase):

Expand Down Expand Up @@ -227,8 +241,8 @@ def test_task_files_with_spillover_query(self):
fs.exist.assert_has_calls([call(query_sql_file)])
fs.read.assert_has_calls([call(query_sql_file)])

self.assertEqual(task_files.query,"content")
self.assertEqual(task_files.spillover_query,"content")
self.assertEqual(task_files.query, "content")
self.assertEqual(task_files.spillover_query, "content")


class TestLoadMethod(TestCase):
Expand Down
56 changes: 46 additions & 10 deletions task/bq2bq/executor/tests/test_transformation.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,9 @@ def test_partition_transform_execute(self, BigqueryServiceMock):
"""

bigquery_service.transform_load.assert_called_with(query=final_query,
write_disposition=WriteDisposition.WRITE_TRUNCATE,
destination_table="bq_project.playground_dev.abcd$20190101")
write_disposition=WriteDisposition.WRITE_TRUNCATE,
destination_table="bq_project.playground_dev.abcd$20190101",
allow_field_addition=False)

@mock.patch("bumblebee.bigquery_service.BigqueryService")
def test_table_transform(self, BigqueryServiceMock):
Expand All @@ -73,6 +74,7 @@ def test_table_transform(self, BigqueryServiceMock):
LOAD_METHOD="REPLACE"
"""
set_vars_with_default()

task_config = TaskConfigFromEnv()
localized_start_time = localise_datetime(datetime(2019, 1, 2), task_config.timezone)
localized_end_time = localise_datetime(datetime(2019, 1, 3), task_config.timezone)
Expand All @@ -87,7 +89,8 @@ def test_table_transform(self, BigqueryServiceMock):

bigquery_service.transform_load.assert_called_with(query=final_query,
write_disposition=WriteDisposition.WRITE_TRUNCATE,
destination_table="bq_project.playground_dev.abcd")
destination_table="bq_project.playground_dev.abcd",
allow_field_addition=False)

@mock.patch("bumblebee.bigquery_service.BigqueryService")
def test_single_partition_transform_1d_window_0_offset_without_spillover(self, BigqueryServiceMock):
Expand All @@ -109,7 +112,8 @@ def test_single_partition_transform_1d_window_0_offset_without_spillover(self, B

bigquery_service.transform_load.assert_called_with(query=final_query,
write_disposition=WriteDisposition.WRITE_TRUNCATE,
destination_table="bq_project.playground_dev.abcd$20190101")
destination_table="bq_project.playground_dev.abcd$20190101",
allow_field_addition=False)

@mock.patch("bumblebee.bigquery_service.BigqueryService")
def test_single_partition_transform_2d_window_24h_offset_without_spillover(self, BigqueryServiceMock):
Expand All @@ -130,7 +134,8 @@ def test_single_partition_transform_2d_window_24h_offset_without_spillover(self,

bigquery_service.transform_load.assert_called_with(query=final_query,
write_disposition=WriteDisposition.WRITE_TRUNCATE,
destination_table="bq_project.playground_dev.abcd$20190104")
destination_table="bq_project.playground_dev.abcd$20190104",
allow_field_addition=False)

@mock.patch("bumblebee.bigquery_service.BigqueryService")
def test_single_partition_transform_7d_window_without_spillover(self, BigqueryServiceMock):
Expand All @@ -152,6 +157,7 @@ def test_single_partition_transform_7d_window_without_spillover(self, BigquerySe
[LOAD]
LOAD_METHOD="REPLACE"
"""

set_vars_with_default()
task_config = TaskConfigFromEnv()
localized_start_time = localise_datetime(datetime(2019, 1, 3), task_config.timezone)
Expand All @@ -167,7 +173,8 @@ def test_single_partition_transform_7d_window_without_spillover(self, BigquerySe

bigquery_service.transform_load.assert_called_with(query=final_query,
write_disposition=WriteDisposition.WRITE_TRUNCATE,
destination_table="bq_project.playground_dev.abcd$20190103")
destination_table="bq_project.playground_dev.abcd$20190103",
allow_field_addition=False)

@mock.patch("bumblebee.bigquery_service.BigqueryService")
def test_single_partition_transform_2d_with_spillover(self, BigqueryServiceMock):
Expand All @@ -189,9 +196,11 @@ def test_single_partition_transform_2d_with_spillover(self, BigqueryServiceMock)
final_query_2 = """\nselect count(1) from table where date >= '2019-01-04' and date < '2019-01-05'"""

calls = [call(query=final_query_1, write_disposition=WriteDisposition.WRITE_TRUNCATE,
destination_table="bq_project.playground_dev.abcd$20190103"),
destination_table="bq_project.playground_dev.abcd$20190103",
allow_field_addition=False),
call(query=final_query_2, write_disposition=WriteDisposition.WRITE_TRUNCATE,
destination_table="bq_project.playground_dev.abcd$20190104")]
destination_table="bq_project.playground_dev.abcd$20190104",
allow_field_addition=False)]
bigquery_service.transform_load.assert_has_calls(calls, any_order=True)
self.assertEqual(len(bigquery_service.transform_load.call_args_list), len(calls))

Expand Down Expand Up @@ -231,6 +240,31 @@ def test_execute_dry_run(self, BigqueryServiceMock):
bigquery_service.transform_load.assert_not_called()


@mock.patch("bumblebee.bigquery_service.BigqueryService")
def test_allow_field_addition(self, BigqueryServiceMock):
query = """select count(1) from table where date >= '__dstart__' and date < '__dend__'"""

set_vars_with_default()
os.environ['ALLOW_FIELD_ADDITION'] = 'true'
task_config = TaskConfigFromEnv()
del os.environ['ALLOW_FIELD_ADDITION']

localized_start_time = localise_datetime(datetime(2019, 1, 1), task_config.timezone)
localized_end_time = localise_datetime(datetime(2019, 1, 2), task_config.timezone)
localized_execution_time = localise_datetime(datetime(2019, 1, 1), task_config.timezone)

bigquery_service = BigqueryServiceMock()
task = TableTransformation(bigquery_service, task_config, query, localized_start_time,
localized_end_time, False, localized_execution_time)
task.transform()

final_query = """select count(1) from table where date >= '2019-01-01' and date < '2019-01-02'"""
bigquery_service.transform_load.assert_called_with(query=final_query,
write_disposition=WriteDisposition.WRITE_TRUNCATE,
destination_table="bq_project.playground_dev.abcd",
allow_field_addition=True)


class TestTransformation(TestCase):

@mock.patch("bumblebee.bigquery_service.BigqueryService")
Expand Down Expand Up @@ -290,7 +324,8 @@ def get_table_mock(table_name):
final_query = """select count(1) from table where date >= '2019-02-01' and date < '2019-02-02'"""
bigquery_service.transform_load.assert_called_with(query=final_query,
write_disposition=WriteDisposition.WRITE_APPEND,
destination_table="bq_project.playground_dev.abcd")
destination_table="bq_project.playground_dev.abcd",
allow_field_addition=False)

@mock.patch("bumblebee.bigquery_service.BigqueryService")
def test_table_transform_with_merge_load_method_and_non_partitioned_destination(self, BigqueryServiceMock):
Expand Down Expand Up @@ -326,7 +361,8 @@ def get_table_mock(table_name):
final_query = """select count(1) from table where date >= '2019-01-03' and date < '2019-01-04'\n"""
bigquery_service.transform_load.assert_called_with(query=final_query,
write_disposition=WriteDisposition.WRITE_TRUNCATE,
destination_table="bq_project.playground_dev.abcd")
destination_table="bq_project.playground_dev.abcd",
allow_field_addition=False)

@mock.patch("bumblebee.bigquery_service.BigqueryService")
def test_should_run_partition_task_on_field(self, BigqueryServiceMock):
Expand Down

0 comments on commit c325b86

Please sign in to comment.