This repository has been archived by the owner on Nov 2, 2021. It is now read-only.
/
adjust.py
64 lines (50 loc) · 1.84 KB
/
adjust.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
"""Adjust ETL task."""
from argparse import Namespace
from typing import Dict, Any, List, Tuple
import pandas as pd
from tasks import base
import numpy as np
from utils.config import get_configs, get_arg_parser
DEFAULTS = {"rm": True}
class AdjustEtlTask(base.EtlTask):
"""ETL task to compute Adjust from events."""
def __init__(
self,
args: Namespace,
sources: Dict[str, Any],
schema: List[Tuple[str, np.generic]],
destinations: Dict[str, Any],
):
"""Initialize Adjust ETL task.
:param args: args passed from command line,
see `get_arg_parser()`
:param sources: data source to be extracted,
specified in task config, see `configs/*.py`
:param schema: the target schema to load to.
:param destinations: destinations to load data to,
specified in task config, see `configs/*.py`
"""
super().__init__(args, sources, schema, destinations, "staging", "adjust")
def transform_adjust_trackers(self, adjust_trackers):
"""Transform Adjust data."""
# trasnform here
adjust_trackers["execution_date"] = pd.datetime.utcnow()
adjust_trackers["execution_date"] = adjust_trackers["execution_date"].astype(
"datetime64[ns]"
)
return adjust_trackers
def main(args: Namespace):
"""Take args and pass them to AdjustEtlTask.
:param args: args passed from command line, see `base.get_arg_parser()`
"""
config_name = ""
if args.debug:
config_name = "debug"
if args.config:
config_name = args.config
configs = get_configs("adjust", config_name)
task = AdjustEtlTask(args, configs.SOURCES, configs.SCHEMA, configs.DESTINATIONS)
task.run()
if __name__ == "__main__":
arg_parser = get_arg_parser(**DEFAULTS)
main(arg_parser.parse_args())