## How to test the DAG script in testing env

1. Get into your folder where the data is stored in the terminal
2. Then use this command to move your file into the local machine - scp dag_epc_channel_breakdown.py sshaukat@10.42.16.129:~/ContextLogic/clroot/wish_airflow/dags/
3. After that ssh into your local machine - ssh 10.42.16.129
4. Once you login to your local machine, go to this folder - cd ContextLogic/clroot/wish_airflow/
5. then fab deploy_update
6. it will ask you to download and install fab but the instructions will be there
7.you can then go to http://airflow-testing.i.wish.com/admin/ and your dag should be available there for testing
8. usually it takes 5-10 mins to show up

## update_default_args()

In [1]:
from cl.utils.airflow_base.utilities import AirflowUtilities
usecase : AirflowUtilities.update_default_args()
parameters: 
    @classmethod
    def update_default_args(cls, **kwargs):
        """
        :return: Dict of default args updated with given kwargs
        """
        default = {
            "email": c.AIRFLOW_EMAIL_GROUP,
            "email_on_failure": False,
            "email_on_retry": False,
            "on_failure_callback": cls.email_on_failure,
            "on_retry_callback": cls.email_on_failure,
            "on_success_callback": cls.on_success_callback,
            "depends_on_past": c.DEFAULT_DEPENDS_ON_PAST,
            "retries": c.DEFAULT_RETRIES,
            "retry_delay": c.DEFAULT_RETRY_DELAY,
            "priority_weight": c.DEFAULT_PRIORITY_WEIGHT,
        }
        
        default.update(kwargs)
        return default

SyntaxError: invalid syntax (utilities.py, line 125)

In [None]:
# example:

default_args = AirflowUtilities.update_default_args(
    owner="sshaukat",
    #this means when you want the DAG run, at least to be yesterday.
    start_date=datetime(2019, 6, 4),
    ## no need on end_date, you can disable it through the UI website
    pool=POOL.TESTING if DEBUG_MODE else POOL.PROD_DATA,
    queue=QUEUE.TESTING if DEBUG_MODE else QUEUE.PROD_DATA,
    ## today's run depends on yesterday's run for example.
    depends_on_past=False,
)

OR

DEFAULT_ARGS = AirflowUtilities.update_default_args(
    owner="bwirakesuma",
    start_date=datetime.strptime("2018-09-16 12:00:00 AM", "%Y-%m-%d %I:%M:%S %p"),
    queue=QUEUE.PROD_DATA,
    pool=POOL.PROD_DATA,
    email="airflow-exceptions@contextlogic.com",
    depends_on_past=False,
    retries=2,
    retry_delay=timedelta(minutes=5),
    priority_weight=1,
)

## DAG()

In [None]:
from airflow.models import DAG

In [None]:
## example:
dag = DAG(
    dag_id="EPCSubsidyBreakdown",
    default_args=default_args,
    schedule_interval="@daily",
    ## If you disable today's DAG instance, once you enable it, do you need to backfill.
    catchup=False,
)

In [None]:
## example:
def build_query(self, **kwargs):
    ## this one indicates the the historical data horizon from and to.
    end_date = kwargs["run_date"]
    start_date = datetime(2019, 4, 1)
    # or relative date
    base_date = kwargs["run_date"]
    end_date = base_date - timedelta(days=60)
    start_date = end_date - timedelta(days=150)
    
    ## build SQL/Hive query below:
    td_query = """
    INSERT INTO TABLE sweeper.logistics_delivered_tableau
            SELECT  TD_TIME_FORMAT(td_date_trunc('week',CAST(coalesce(delivered,user_confirmed_delivery) AS bigint),'America/Los_Angeles'),'yyyy-MM-dd') AS week,
                    .........
                    sum(case when wsp.m_transaction_id is not null then 1 else 0 end) as orders

            from sweeper.merch_merchanttransaction_dump wsp

            left join (select channel, tracking_id from sweeper.merch_wishpost_events) b
                on wsp.tracking_id = b.tracking_id

            where wsp.confirmed_shipped is not null and coalesce(delivered,user_confirmed_delivery) >= {start_date}
                  td_time_range(order_time,'{start_date_str}','{end_date_str}')
            
            group by 1,2,3,4,5
    """.format(
        start_date=int(start_date.timestamp()), 
        or start_date=start_date.date()
        end_date_str=end_date.strftime("%Y-%m-%d")
        db=self.td_db, 
        target=self.target)

    return td_query
    

In [None]:
## another example with embedded queries

def build_query(self, **kwargs):
    current_date = kwargs["run_date"]
    end_date = current_date - timedelta(days=2)
    contest_table = get_latest_sweeper_table("contest")

    impression_query = """
        SELECT first.cid,
               first.name,
               first.merchant_name,
               first.mid,
               first.bd_rep,
               first.country_code,
               first.categories,
               first.subcategories,
               first.imps_90D as imps_90D,
               first.imps_30D as imps_30D,
               first.imps_30D_60D as imps_30D_60D,
               first.imps_7D as imps_7D,
               first.imps_7D_14D as imps_7D_14D,
               RANK() OVER(partition BY country_code ORDER BY imps_7D DESC) AS imp_rank_7D

         FROM (SELECT  cid,
                    c.name,
                    cm.display_name as merchant_name,
                    cm.id as mid,
                    cm.bd_rep,
                    country_code,
                    categories,
                    subcategories,
                    SUM(case when x1.start_time >= {end_time} - 90*86400 and end_time < {end_time}
                             then count
                             else 0 end) as imps_90D,
                    SUM(case when x1.start_time >= {end_time} - 30*86400 and end_time < {end_time}
                             then count
                             else 0 end) as imps_30D,
                    SUM(case when x1.start_time >= {end_time} - 60*86400 and end_time < {end_time} - 30*86400
                             then count
                             else 0 end) as imps_30D_60D,
                    SUM(case when x1.start_time >= {end_time} - 7*86400 and end_time < {end_time}
                             then count
                             else 0 end) as imps_7D,
                    SUM(case when x1.start_time >= {end_time} - 14*86400 and end_time < {end_time} - 7*86400
                             then count
                             else 0 end) as imps_7D_14D

            FROM sweeper.new_daily_country_segmented_impressions x1
            LEFT OUTER JOIN sweeper.{latest_contest_table} c
                ON x1.cid = c.`_id`
            LEFT OUTER JOIN sweeper.commerce_merchant cm
                ON c.merchant_id = cm.id
            LEFT OUTER JOIN sweeper.merch_product_tags mpt
                ON x1.cid = mpt.product_id
            WHERE start_time >= {end_time} - 90*86400
                AND x1.time >= {end_time} - 90*86400
            GROUP BY 1,2,3,4,5,6,7,8) first
    """.format(
        latest_contest_table=contest_table, end_time=int(end_date.timestamp())
    )

    m_txn_query = """
        SELECT  product_id,
                dest_country,
                SUM(case when rating < 3 then 1 else 0 end) as low_rating,
                SUM(case when rating is not null then 1 else 0 end) as rating_count,
                AVG(rating) as avg_rating,
                SUM(case when order_time >= {end_time} - 90*86400 and order_time < {end_time}
                    then gmv
                    else 0 end) as gmv_90D,
                SUM(case when order_time >= {end_time} - 30*86400 and order_time < {end_time}
                    then gmv
                    else 0 end) as gmv_30D,
                SUM(case when order_time >= {end_time} - 60*86400 and order_time < {end_time} - 30*86400
                    then gmv
                    else 0 end) as gmv_30D_60D,
                SUM(case when order_time >= {end_time} - 7*86400 and order_time < {end_time}
                    then gmv
                    else 0 end) as gmv_7D,
                SUM(case when order_time >= {end_time} - 14*86400 and order_time < {end_time} - 7*86400
                    then gmv
                    else 0 end) as gmv_7D_14D,
                SUM(case when order_time >= {end_time} - 90*86400 and order_time < {end_time}
                    then 1
                    else 0 end) as orders_90D,
                SUM(case when order_time >= {end_time} - 30*86400 and order_time < {end_time}
                    then 1
                    else 0 end) as orders_30D,
                SUM(case when order_time >= {end_time} - 60*86400 and order_time < {end_time} - 30*86400
                    then 1
                    else 0 end) as orders_30D_60D,
                SUM(case when order_time >= {end_time} - 7*86400 and order_time < {end_time}
                    then 1
                    else 0 end) as orders_7D,
                SUM(case when order_time >= {end_time} - 14*86400 and order_time < {end_time} - 7*86400
                    then 1
                    else 0 end) as orders_7D_14D
        FROM sweeper.merch_merchanttransaction mt
            WHERE order_time >= {end_time} - 90*86400
        GROUP BY 1,2
     """.format(
        end_time=int(end_date.timestamp())
    )

    query = """
             INSERT INTO TABLE {db}.{target}
                    SELECT  y.*,
                            z.low_rating,
                            z.rating_count,
                            z.avg_rating,
                            z.gmv_90D,
                            z.gmv_30D,
                            z.gmv_30D_60D,
                            z.gmv_7D,
                            z.gmv_7D_14D,
                            z.orders_90D,
                            z.orders_30D,
                            z.orders_30D_60D,
                            z.orders_7D,
                            z.orders_7D_14D
                    FROM ({impression_query}) y
                    LEFT OUTER JOIN ({m_txn_query}) z
                        ON y.cid = z.product_id
                        AND y.country_code = z.dest_country
                    WHERE imp_rank_7D <= 10000

    """.format(
        impression_query=impression_query,
        m_txn_query=m_txn_query,
        db=self.td_db,
        target=self.target,
    )

    return query

## TDQueryOperator()

In [None]:
from cl.utils.airflow_base.operators import TDQueryOperator
"""
        Runs a TD Query which inserts data into a TD table.
        This task does not Return any results to upstream

        :param target: results td table name [required]
        :param td_db: results td database name [default: sweeper]

        :param build_query: build_query(self, **kwargs) [required]
        :param pre_process: pre_process(self, **kwargs)
        :param post_process: post_process(self, **kwargs)
"""

In [None]:
## example:
def prep_fact_table(self, **kwargs):
    table_delete(self.td_db, self.target)
    table_create(self.td_db, self.target)

task1 = TDQueryOperator(
        task_id="EPCSubsidyBreakdown",
        dag=dag,
        build_query=build_query,
        pre_process=prep_fact_table,
        marker=MARKER, or marker={"TDMarker": TDMarker},
        ## table name
        target="daily_epc_subsidy_breakdown",
        td_db="sweeper",
        ## True = OVERWRITE, False = Append
        truncate=True,
        )

## TDToRedshiftLoadUnionOperator()

In [None]:
from cl.utils.airflow_base.operators import TDToRedshiftLoadUnionOperator
"""
        :param distall: boolean if distall [default: False]
        :param time_key:
        :param suffix: redshift
        :param lookback: redshift lookback
        :param lookforward: redshift lookforward
        :param source_db: td database [default: sweeper]
        :param schema_name: redshift schema [default: public]
        :param sortkey_field: redshift sort key
        :param partition_key: redshift partition key
        :param cluster: redshift cluster [default: wish-platform]
        :param table_name: redshift table name [required]
        :param fields_list: list of redshift fields [required]
"""

In [None]:
## example:
    union_load_task = TDToRedshiftLoadUnionOperator(
                        task_id="TDRedshiftUnion",
                        dag=dag,
                        marker=MARKER,
                        source_db="sweeper",
                        #the schema value in Tableau, "logistics" for example
                        schema_name="logistics",
                        table_name="daily_epc_subsidy_breakdown",
                        #predefined the redshift schema
                        fields_list=FIELDS_LIST,
                        ## True = OVERWRITE, False = Append
                        truncate=True,
                        dependencies=[task1],
                        ## use below one as default value
                        cluster=c.RS_WISH_PLATFORM,
                    )

## TableauWorkbookRefreshOperator

In [None]:
from cl.utils.airflow_base.operators import TableauWorkbookRefreshOperator
"""
   Refreshes a tableau data extract
   :param workbook_name: full name of tableau workbook to refresh
"""

In [None]:
## example:
    tableau_load_task = TableauWorkbookRefreshOperator(
                        task_id="EPCDashboard_TableauExtract",
                        dag=dag,
                        ## this is requried, and you can upload more than 1 tables to same Tableau workbook 
                        workbook_name="EPC Dashboard",
                        marker=MARKER,
                        dependencies=[union_load_task],
                        )