Skip to content
This repository has been archived by the owner on Nov 2, 2021. It is now read-only.

Commit

Permalink
Implement daily cleanup and schema check.
Browse files Browse the repository at this point in the history
  • Loading branch information
elin-moco committed Oct 5, 2019
1 parent a375f92 commit e74d792
Show file tree
Hide file tree
Showing 8 changed files with 170 additions and 51 deletions.
1 change: 1 addition & 0 deletions dags/__init__.py
@@ -0,0 +1 @@
"""Airflow DAGs."""
3 changes: 3 additions & 0 deletions sql/cleanup_mango_core.sql
@@ -0,0 +1,3 @@
DELETE `{project}.{dataset}.{dest}` WHERE app_name='Zerda'
AND submission_date >= DATE '{start_date}'
AND submission_date < DATE_ADD(DATE '{start_date}', INTERVAL '1' DAY)
6 changes: 6 additions & 0 deletions sql/cleanup_mango_events.sql
@@ -0,0 +1,6 @@
DELETE
`{project}.{dataset}.{dest}`
WHERE
normalized_app_name='Zerda' AND
DATE(submission_timestamp) >= DATE '{start_date}' AND
DATE(submission_timestamp) < DATE_ADD(DATE '{start_date}', INTERVAL '1' DAY)
3 changes: 3 additions & 0 deletions sql/cleanup_revenue_bukalapak.sql
@@ -0,0 +1,3 @@
DELETE `{project}.{dataset}.{dest}` WHERE source='bukalapak'
AND DATE(utc_datetime) >= DATE '{start_date}'
AND DATE(utc_datetime) < DATE_ADD(DATE '{start_date}', INTERVAL '1' DAY)
7 changes: 7 additions & 0 deletions tasks/adjust.py
@@ -1,6 +1,9 @@
"""Adjust ETL task."""
from argparse import Namespace
from typing import Dict, Any, List, Tuple

import pandas as pd

from tasks import base
import numpy as np

Expand Down Expand Up @@ -34,6 +37,10 @@ def __init__(
def transform_adjust_trackers(self, adjust_trackers):
"""Transform Adjust data."""
# trasnform here
adjust_trackers["execution_date"] = pd.datetime.utcnow()
adjust_trackers["execution_date"] = adjust_trackers["execution_date"].astype(
"datetime64[ns]"
)
return adjust_trackers


Expand Down
178 changes: 133 additions & 45 deletions tasks/bigquery.py
@@ -1,7 +1,11 @@
"""BigQuery Etl Tasks."""
import datetime
import logging
from argparse import Namespace
from typing import Dict, Callable, Optional

from google.cloud.exceptions import NotFound

import utils.config
from google.cloud import bigquery

Expand All @@ -10,21 +14,49 @@
log = logging.getLogger(__name__)

DEFAULTS = {}
FILETYPES = {
"csv": bigquery.SourceFormat.CSV,
"jsonl": bigquery.SourceFormat.NEWLINE_DELIMITED_JSON,
}


class BqTask:
"""Base class for BigQuery ETL."""

def __init__(self, config: Dict, date: datetime.datetime):
self.date = (date - datetime.timedelta(days=1)).strftime(
utils.config.DEFAULT_DATE_FORMAT
)
# default to load data 1 day behind
self.date = (
date
- datetime.timedelta(
days=1 if "days_behind" not in config else config["days_behind"]
)
).strftime(utils.config.DEFAULT_DATE_FORMAT)
self.config = config
self.client = bigquery.Client(config["params"]["project"])

def is_write_append(self):
# default write append=true
return "append" not in self.config or self.config["append"]

def create_schema(self):
def does_table_exist(self):
try:
dataset = self.client.dataset(self.config["params"]["dataset"])
table_ref = dataset.table(self.config["params"]["dest"])
self.client.get_table(table_ref)
return True
except NotFound:
return False

def does_routine_exist(self, routine_id):
try:
dataset = self.client.dataset(self.config["params"]["dataset"])
routine_ref = dataset.routine(routine_id)
self.client.get_routine(routine_ref)
return True
except NotFound:
return False

def create_schema(self, check_exists=False):
udfs = []
if "udf" in self.config:
udfs += [
Expand All @@ -37,6 +69,8 @@ def create_schema(self):
for x in self.config["udf_js"]
]
for udf, qstring in udfs:
if check_exists and self.does_routine_exist(udf):
continue
qstring = qstring % (
self.config["params"]["project"],
self.config["params"]["dataset"],
Expand All @@ -56,7 +90,11 @@ def drop_schema(self):
for udf in udfs:
self.client.delete_routine(
"%s.%s.%s"
% (self.config["params"]["project"], self.config["params"]["dataset"], udf),
% (
self.config["params"]["project"],
self.config["params"]["dataset"],
udf,
),
not_found_ok=True,
)
self.client.delete_table(
Expand All @@ -73,32 +111,52 @@ def drop_schema(self):
def daily_run(self):
assert False, "daily_run not implemented."

def daily_cleanup(self, d):
if self.is_write_append() and "cleanup_query" in self.config:
qstring = read_string("sql/{}.sql".format(self.config["cleanup_query"]))
qparams = self.get_query_params(d)
query_job = self.client.query(qstring.format(**qparams))
query_job.result()
log.info("Done cleaning up.")

def get_query_params(self, d):
return {**self.config["params"], "start_date": d}


# https://cloud.google.com/bigquery/docs/loading-data-cloud-storage-json
class BqGcsTask(BqTask):
"""BigQuery ETL via GCS."""

def __init__(self, config: Dict, date: datetime):
super().__init__(config, date)

def create_schema(self):
self.daily_run()
def create_schema(self, check_exists=False):
if check_exists and self.does_table_exist():
return
# load a file to create schema
self.run_query(self.date, True)

def daily_run(self):
self.daily_cleanup(self.date)
self.run_query(self.date)

def run_query(self, date, autodetect=False):
dataset_ref = self.client.dataset(self.config["params"]["dataset"])
job_config = bigquery.LoadJobConfig()
job_config.write_disposition = (
bigquery.WriteDisposition.WRITE_APPEND
if self.is_write_append()
else bigquery.WriteDisposition.WRITE_TRUNCATE
)
job_config.autodetect = True
job_config.source_format = bigquery.SourceFormat.NEWLINE_DELIMITED_JSON
# don't do autodetect after schema created, may have errors on STRING/INTEGER
job_config.autodetect = autodetect
job_config.source_format = FILETYPES[self.config["filetype"]]
if "partition_field" in self.config:
job_config.time_partitioning = bigquery.TimePartitioning(
type_=bigquery.TimePartitioningType.DAY,
field=self.config["partition_field"],
)
# FIXME: handle date here
uri = "gs://%s" % self.config["params"]["src"]
uri = "gs://%s" % self.config["params"]["src"].format(start_date=date)

load_job = self.client.load_table_from_uri(
uri,
Expand All @@ -119,15 +177,22 @@ def daily_run(self):

# https://cloud.google.com/bigquery/docs/tables
# https://cloud.google.com/bigquery/docs/writing-results
class BqTableTask(BqTask):
class BqQueryTask(BqTask):
"""BigQuery ETL via query result."""

def __init__(self, config: Dict, date: datetime):
super().__init__(config, date)

def create_schema(self):
def create_schema(self, check_exists=False):
super().create_schema(check_exists)
if check_exists and self.does_table_exist():
return
# Run a empty query to create schema
# FIXME: use LIMIT 0
self.run_query("1970-01-01")

def daily_run(self):
self.daily_cleanup(self.date)
self.run_query(self.date)

def run_query(self, date):
Expand All @@ -147,41 +212,41 @@ def run_query(self, date):
type_=bigquery.TimePartitioningType.DAY,
field=self.config["partition_field"],
)
qparams = {**self.config["params"], **self.config["params"], "start_date": date}
qparams = self.get_query_params(date)
query = self.client.query(qstring.format(**qparams), job_config=job_config)
query.result()


# https://cloud.google.com/bigquery/docs/views
class BqViewTask(BqTask):
"""BigQuery ETL via view."""

def __init__(self, config: Dict, date: datetime.datetime):
super().__init__(config, date)
super().__init__(config, date - datetime.timedelta(days=1))

def create_schema(self):
super().create_schema()
def create_schema(self, check_exists=False):
super().create_schema(check_exists)
if check_exists and self.does_table_exist():
return
qstring = read_string("sql/{}.sql".format(self.config["query"]))
shared_dataset_ref = self.client.dataset(self.config["params"]["dataset"])
view_ref = shared_dataset_ref.table(self.config["params"]["dest"])
view = bigquery.Table(view_ref)
qparams = {
**self.config["params"],
**self.config["params"],
"start_date": self.date,
}
qparams = self.get_query_params(self.date)
view.view_query = qstring.format(**qparams)
view = self.client.create_table(view) # API request

log.info("Successfully created view at {}".format(view.full_table_id))


def get_task_by_config(config: Dict, date: datetime.datetime):
def get_task(config: Dict, date: datetime.datetime):
assert "type" in config, "Task type is required in BigQuery config."
if config["type"] == "gcs":
return BqGcsTask(config, date)
elif config["type"] == "view":
return BqViewTask(config, date)
elif config["type"] == "table":
return BqTableTask(config, date)
return BqQueryTask(config, date)


def main(args: Namespace):
Expand All @@ -195,52 +260,75 @@ def main(args: Namespace):
if args.config:
config_name = args.config
cfgs = utils.config.get_configs("bigquery", config_name)
init(args, cfgs)
# daily_run(args.date, cfgs)
backfill("2019-09-01", "2019-10-03", cfgs)
# init(args, cfgs)
daily_run_lastest(args.date, cfgs)
# backfill("2019-09-01", "2019-10-03", cfgs)


def backfill(start, end, configs: Optional[Callable]):
for d in get_date_range(start, end):
for d in get_date_range_from_string(start, end):
daily_run(d, configs)


def daily_run_lastest(d: datetime, configs: Optional[Callable]):
channel_mapping_task = get_task(configs.CHANNEL_MAPPING, d)
channel_mapping_task.daily_run()
daily_run(d, configs)
backfill_dates = get_date_range(
datetime.datetime.now() - datetime.timedelta(days=8),
datetime.datetime.now() - datetime.timedelta(days=1),
)
for d in backfill_dates:
revenue_bukalapak_task = get_task(configs.REVENUE_BUKALAPAK, d)
revenue_bukalapak_task.daily_run()


def daily_run(d: datetime, configs: Optional[Callable]):
print(d)
core_task = get_task_by_config(configs.MANGO_CORE, d)
core_task = get_task(configs.MANGO_CORE, d)
core_task.daily_run()
events_task = get_task_by_config(configs.MANGO_EVENTS, d)
events_task = get_task(configs.MANGO_EVENTS, d)
events_task.daily_run()
revenue_bukalapak_task = get_task(configs.REVENUE_BUKALAPAK, d)
revenue_bukalapak_task.daily_run()


def init(args, configs: Optional[Callable]):
core_task = get_task_by_config(configs.MANGO_CORE, args.date)
events_task = get_task_by_config(configs.MANGO_EVENTS, args.date)
unnested_events_task = get_task_by_config(configs.MANGO_EVENTS_UNNESTED, args.date)
feature_events_task = get_task_by_config(
configs.MANGO_EVENTS_FEATURE_MAPPING, args.date
)
channel_mapping_task = get_task_by_config(configs.CHANNEL_MAPPING, args.date)
user_channels_task = get_task_by_config(configs.USER_CHANNELS, args.date)
core_task = get_task(configs.MANGO_CORE, args.date)
events_task = get_task(configs.MANGO_EVENTS, args.date)
unnested_events_task = get_task(configs.MANGO_EVENTS_UNNESTED, args.date)
feature_events_task = get_task(configs.MANGO_EVENTS_FEATURE_MAPPING, args.date)
google_rps_task = get_task(configs.GOOOGLE_RPS, datetime.datetime(2018, 1, 1))
channel_mapping_task = get_task(configs.CHANNEL_MAPPING, args.date)
user_channels_task = get_task(configs.USER_CHANNELS, args.date)
revenue_bukalapak_task = get_task(configs.REVENUE_BUKALAPAK, args.date)
if args.dropschema:
core_task.drop_schema()
events_task.drop_schema()
unnested_events_task.drop_schema()
feature_events_task.drop_schema()
channel_mapping_task.drop_schema()
user_channels_task.drop_schema()
google_rps_task.drop_schema()
revenue_bukalapak_task.drop_schema()
if args.createschema:
core_task.create_schema()
events_task.create_schema()
unnested_events_task.create_schema()
feature_events_task.create_schema()
channel_mapping_task.create_schema()
user_channels_task.create_schema()
core_task.create_schema(args.checkchema)
events_task.create_schema(args.checkchema)
unnested_events_task.create_schema(args.checkchema)
feature_events_task.create_schema(args.checkchema)
channel_mapping_task.create_schema(args.checkchema)
user_channels_task.create_schema(args.checkchema)
google_rps_task.create_schema(args.checkchema)
revenue_bukalapak_task.create_schema(args.checkchema)


def get_date_range(start: str, end: str):
def get_date_range_from_string(start: str, end: str):
starttime = datetime.datetime.strptime(start, utils.config.DEFAULT_DATE_FORMAT)
endtime = datetime.datetime.strptime(end, utils.config.DEFAULT_DATE_FORMAT)
return get_date_range(endtime, starttime)


def get_date_range(starttime, endtime):
return [
starttime + datetime.timedelta(days=x)
for x in range(0, (endtime - starttime).days)
Expand Down
2 changes: 1 addition & 1 deletion tests/tasks/test_bigquery.py
Expand Up @@ -36,7 +36,7 @@ def test_BqTableTask(client, to_delete):
dataset = client.create_dataset(configs.SELECT_TABLE["params"]["dataset"])
to_delete.extend([dataset])

task = tasks.bigquery.get_task_by_config(
task = tasks.bigquery.get_task(
configs.SELECT_TABLE, datetime.datetime(2005, 7, 14, 12, 30)
)
task.daily_run()
Expand Down

0 comments on commit e74d792

Please sign in to comment.