In [3]:
import time
import requests
import urllib.request
from urllib.parse import urlencode
from urllib.request import urlopen
import json
import pandas as pd
import io
from datetime import datetime, timedelta
import psycopg2
from sqlalchemy import create_engine

from airflow import DAG
from airflow.operators.python_operator import PythonOperator, BranchPythonOperator
from airflow.providers.postgres.operators.postgres import PostgresOperator
from airflow.hooks.base import BaseHook
from airflow.providers.postgres.hooks.postgres import PostgresHook
from airflow.hooks.http_hook import HttpHook

In [2]:
# url = 'https://github.com/zakonreal/etc./blob/main/user_order_log.csv?raw=true'
# user_order_log = pd.read_csv(url, index_col=0)
# user_order_log = user_order_log.drop_duplicates(ignore_index = True)
# if 'status' not in user_order_log: user_order_log['status'] = 'shipped' 

In [None]:
def upload_data():

    ##init test connection
    conn = psycopg2.connect(host="localhost", port = 5432, database="postgres", user="postgres", password="postgres84", options="-c search_path=production")
    cur = conn.cursor()

    #get custom_research
    url_1 = 'https://github.com/zakonreal/etc./blob/main/customer_research.csv?raw=true'
    df_customer_research = pd.read_csv(url_1, index_col=0)
    df_customer_research = df_customer_research.csv.drop_duplicates(ignore_index = True)
    insert_cr = "insert into stage.customer_research (date_id,category_id,geo_id,sales_qty,sales_amt) VALUES {cr_val};"
    i = 0
    step = int(df_customer_research.shape[0] / 100)
    while i <= df_customer_research.shape[0]:
        print('df_customer_research' , i, end='\r')
        
        cr_val =  str([tuple(x) for x in df_customer_research.loc[i:i + step].to_numpy()])[1:-1]
        cur.execute(insert_cr.replace('{cr_val}',cr_val))
        conn.commit()
        
        i += step+1

    #get order log
    url_2 = 'https://github.com/zakonreal/etc./blob/main/user_orders_log.csv?raw=true'
    df_order_log = pd.read_csv(url_2, index_col=0)
    df_order_log = df_order_log.drop_duplicates(ignore_index = True)
    insert_uol = "insert into stage.user_order_log (date_time, city_id, city_name, customer_id, first_name, last_name, item_id, item_name, quantity, payment_amount) VALUES {uol_val};"
    i = 0
    step = int(df_order_log.shape[0] / 100)
    while i <= df_order_log.shape[0]:
        print('df_order_log',i, end='\r')
        
        uol_val =  str([tuple(x) for x in df_order_log.drop(columns = ['id'] , axis = 1).loc[i:i + step].to_numpy()])[1:-1]
        cur.execute(insert_uol.replace('{uol_val}',uol_val))
        conn.commit()
        
        i += step+1

    #get activity log
    url_3 = 'https://github.com/zakonreal/etc./blob/main/user_activity_log.csv?raw=true'
    df_activity_log = pd.read_csv(url_3, index_col=0)
    df_activity_log = df_activity_log.drop_duplicates(ignore_index = True)
    insert_ual = "insert into stage.user_activity_log (date_time, action_id, customer_id, quantity) VALUES {ual_val};"
    i = 0
    step = int(df_activity_log.shape[0] / 100)
    while i <= df_activity_log.shape[0]:
        print('df_activity_log',i, end='\r')
        
        ual_val =  str([tuple(x) for x in df_activity_log.drop(columns = ['id'] , axis = 1).loc[i:i + step].to_numpy()])[1:-1]
        cur.execute(insert_ual.replace('{ual_val}',ual_val))
        conn.commit()
        
        i += step+1


    cur.close()
    conn.close()

    return 200

In [None]:
def upload_data_inc(filename, pg_table, pg_schema):
    
    url = f'https://github.com/zakonreal/etc./blob/main/{filename}?raw=true'
    
    df = pd.read_csv(url, index_col=0)
    df = df.drop_duplicates(ignore_index = True)

    engine = create_engine('postgresql://postgres:postgres84@localhost:5432/postgres')

    df.to_sql(pg_table, engine, schema=pg_schema, if_exists='append', index=False)

In [None]:
business_dt = '{{ ds }}'
postgres_conn_id = psycopg2.connect(host="localhost", port = 5432, database="postgres", user="postgres", password="postgres84", options="-c search_path=production")

In [None]:
args = {
    "owner": "student",
    'email': ['student@example.com'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 0
}

In [None]:
with DAG(
        'customer_retention',
        default_args=args,
        description='Provide default dag',
        catchup=True,
        start_date=datetime.today() - timedelta(days=8),
        end_date=datetime.today() - timedelta(days=1),
) as dag:
    
    load_data = PythonOperator(
        task_id='get_data',
        python_callable = upload_data,
        )

    upload_customer_research_inc = PythonOperator(
        task_id='upload_customer_research_inc',
        python_callable=upload_data_inc,
        op_kwargs={'filename': 'customer_research_inc.csv',
                   'pg_table': 'customer_research',
                   'pg_schema': 'stage'})
    
    upload_user_order_log_inc = PythonOperator(
        task_id='upload_user_order_log_inc',
        python_callable=upload_data_to_staging,
        op_kwargs={'date': business_dt,
                   'filename': 'user_orders_log_inc.csv',
                   'pg_table': 'user_order_log',
                   'pg_schema': 'stage'})
    
    upload_user_activity_log_inc = PythonOperator(
        task_id='upload_user_activity_log_inc',
        python_callable=upload_data_inc,
        op_kwargs={'date': business_dt,
                   'filename': 'user_activity_log_inc.csv',
                   'pg_table': 'user_activity_log',
                   'pg_schema': 'stage'})

    
    update_d_item_table = PostgresOperator(
        task_id='update_d_item',
        postgres_conn_id=postgres_conn_id,
        sql='''insert into mart.d_item (item_id, item_name)
                    select item_id, item_name from stage.user_orders_log
                    where item_id not in (select item_id from mart.d_item)
                    group by item_id, item_name''')

    update_d_customer_table = PostgresOperator(
        task_id='update_d_customer',
        postgres_conn_id=postgres_conn_id,
        sql= ''' insert into mart.d_customer (customer_id, first_name, last_name, city_id)
                    select customer_id, first_name, last_name, max(city_id) from stage.user_orders_log
                    where customer_id not in (select customer_id from mart.d_customer)
                    group by customer_id, first_name, last_name''')

    update_d_city_table = PostgresOperator(
        task_id='update_d_city',
        postgres_conn_id=postgres_conn_id,
        sql='''insert into mart.d_city (city_id, city_name)
                    select city_id, city_name from stage.user_orders_log
                    where city_id not in (select city_id from mart.d_city)
                    group by city_id, city_name''')

    update_f_sales = PostgresOperator(
        task_id='update_f_sales',
        postgres_conn_id=postgres_conn_id,
        sql="""INSERT INTO mart.f_sales (date_id, item_id, customer_id, quantity, payment_amount, status)
                    SELECT DISTINCT dc.date_id, item_id, customer_id, quantity, payment_amount AS amount, status
                    FROM stage.user_orders_log         
                    JOIN mart.d_calendar dc ON dc.day_num = EXTRACT(DAY FROM date_time)
                    AND dc.month_num = EXTRACT(MONTH FROM date_time)
                    AND dc.year_num = EXTRACT(YEAR FROM date_time)
                    WHERE date_time =  '{{ds}}' """,
        parameters={"date": {business_dt}})

    clear_f_customer_retention = PostgresOperator(
        task_id='clear_f_customer_retention',
        postgres_conn_id=postgres_conn_id,
        sql="truncate table mart.f_customer_retention;")

    update_f_customer_retention = PostgresOperator(
        task_id='update_f_customer_retention',
        postgres_conn_id=postgres_conn_id,
        sql="""INSERT INTO mart.f_customer_retention (period_name, 
                                                             period_id, 
                                                             returning_customers_count, 
                                                             new_customers_count, 
                                                             refunded_customer_count, 
                                                             customers_refunded,
                                                             new_customers_revenue,
                                                             returning_customers_revenue)
                        SELECT 
                            'weekly'                                                          AS period_name,
                            week_num                                                          AS period_id,
                            SUM(CASE WHEN num_transactions > 1 THEN 1 ELSE 0 END)             AS returning_customers_count,
                            SUM(CASE WHEN num_transactions = 1 THEN 1 ELSE 0 END)             AS new_customers_count,
                            SUM(CASE WHEN num_returns > 0 THEN 1 ELSE 0 END)                  AS refunded_customer_count,
                            SUM(num_returns)                                                  AS customers_refunded,
                            SUM(CASE WHEN num_transactions = 1 THEN total_revenue ELSE 0 END) AS new_customers_revenue,
                            SUM(CASE WHEN num_transactions > 1 THEN total_revenue ELSE 0 END) AS returning_customers_revenue
                        FROM
                            (
                            SELECT 
                                dc.week_num,
                                customer_id,
                                COUNT(*)                                                                              AS num_transactions,
                                SUM(CASE WHEN status = 'refunded' THEN 1 ELSE 0 END)                                  AS num_returns,
                                SUM(CASE WHEN status = 'refunded' THEN (-1) * payment_amount ELSE payment_amount END) AS total_revenue
                            FROM 
                                mart.f_sales fds
                            JOIN mart.d_calendar dc ON dc.date_id = fds.date_id
                            GROUP BY
                                    dc.week_num,
                                    customer_id
                            ) as customers_orders
                        GROUP BY
                            period_name,
                            period_id;"""
    )

    (
            load_data
            >> upload_customer_research_inc
            >> upload_user_order_log_inc
            >> upload_user_activity_log_inc
            >> [update_d_item_table, update_d_city_table, update_d_customer_table]
            >> update_f_sales 
            >> clear_f_customer_retention
            >> update_f_customer_retention
    )