In [None]:
import re
import pandas as pd
import numpy as np

from airflow import DAG
from airflow.hooks.postgres_hook import PostgresHook
from airflow.hooks.S3_hook import S3Hook
from airflow.models import Variable
from airflow.operators.python_operator import PythonOperator

from config import ENVIRONMENT, sh_default_args


ENV_SUFFIX = "" if ENVIRONMENT.lower() == "production" else f"_{ENVIRONMENT.lower()}"

def get_data() -> pd.DataFrame:
    """Pulls renter and any associated phone number from Rental, Renter, and SFDC tables"""
    
    query = f"""
        SELECT *
        FROM
            (SELECT
              renter.id as renter_id,
              renter.phone_number as phone_number
            FROM sh_public.spothero_user as renter
            GROUP BY 1,2
            UNION ALL
            SELECT
              renter.id as renter_id,
              rental.phone_number as phone_number
            FROM sh_public.spothero_user as renter
              LEFT JOIN pipegen.pg_rentals as rental on rental.renter_id = renter.id
            WHERE NOT COALESCE(rental.partner_id = 45 , FALSE)
            GROUP BY 1,2
            UNION ALL
            SELECT
              renter.id as renter_id,
              sfdc_case.contact_phone AS phone_number
            FROM sh_public.spothero_user as renter
              LEFT JOIN pipegen.pg_rentals as rental on rental.renter_id = renter.id
              LEFT JOIN sfdc.case as sfdc_case on sfdc_case.rental_id_c = rental.rental_id
            WHERE
                NOT COALESCE(rental.partner_id = 45 , FALSE)
                AND (rental.reservation_status  IN ('valid', 'recurrence', 'cancelled'))
            GROUP BY 1,2
            UNION ALL
            SELECT
              renter.id as renter_id,
              sfdc_case.supplied_phone AS phone_number
            FROM sh_public.spothero_user as renter
              LEFT JOIN pipegen.pg_rentals as rental on rental.renter_id = renter.id
              LEFT JOIN sfdc.case as sfdc_case on sfdc_case.rental_id_c = rental.rental_id
            WHERE
                NOT COALESCE(rental.partner_id = 45 , FALSE)
                AND (rental.reservation_status  IN ('valid', 'recurrence', 'cancelled'))
            GROUP BY 1,2)
        WHERE phone_number is not null and phone_number != ''
        GROUP BY 1,2
        ORDER BY 1
        """
    return PostgresHook("redshift").get_pandas_df(query)

def format_numbers():
    renter_phone_data = get_data()
    renter_phone_data['phone_number'] = renter_phone_data['phone_number'].astype(str)
    renter_phone_data['phone_number'] = renter_phone_data['phone_number'].str.replace(r'[^0-9]+', '')
    
    trimmed_number = renter_phone_data['phone_number'].str[:-1]
    last_number = renter_phone_data['phone_number'].str[-1]
    renter_phone_data['phone_number'] = (trimmed_number.str.replace("(\d)(?=(\d{3})+(?!\d))", r"\1-") + last_number).astype(str)
    
    renter_phone_data['phone_number'] =  renter_phone_data['phone_number'].apply(lambda x: x[-12:] if len(x)>11 else x)
        
    renter_phone_data = renter_phone_data.drop_duplicates(['renter_id', 'phone_number']).reset_index(drop=True)
    
    s3_hook = S3Hook(aws_conn_id="S3_SPOTHERO_SCIENCE_DATA")
    s3_key = f"renter_phone_data{ENV_SUFFIX}/RenterPhoneData.csv"
    s3_hook.load_string(
        renter_phone_data.to_csv(),
        key=s3_key,
        bucket_name=Variable.get("SCIENCE_S3_BUCKET"),
        replace=True,
    )
    

def copy_to_redshift():
#     execution_date = context["execution_date"].isoformat("_")
    s3_key = f"renter_phone_data{ENV_SUFFIX}/RenterPhoneData.csv"
    table_name = f"renter_phone_data{ENV_SUFFIX}"
    PostgresHook(postgres_conn_id="redshift").run(
        f"""
        CREATE TABLE IF NOT EXISTS science.{table_name} (
            renter_id INT,
            phone_number VARCHAR(15)
        )
        SORTKEY (renter_id);
        TRUNCATE TABLE science.{table_name};
        COPY science.{table_name}
        FROM 's3://{Variable.get("SCIENCE_S3_BUCKET")}/{s3_key}'
        IAM_ROLE '{Variable.get("SCIENCE_REDSHIFT_COPY_ROLE")}'
        REGION 'us-east-1'
        CSV
        IGNOREHEADER 1;
    """
    )
    
# create the renter_phone_data DAG to run overnight
renter_phone_data_dag = DAG(
    "renter_phone_data",
    description="Creates renter phone data table nightly.",
    start_date=datetime(2021, 2, 17),
    dagrun_timeout=timedelta(hours=5),
    max_active_runs=1,
    default_args=sh_default_args,
    schedule_interval="@daily",
    catchup=False
)

format_numbers_task = PythonOperator(
    python_callable=format_numbers,
    task_id="format_numbers",
    provide_context=True,
    dag=renter_phone_data_dag,
)

write_output = PythonOperator(
    python_callable=copy_to_redshift,
    task_id="write_output",
    execution_timeout=timedelta(minutes=30),
    provide_context=True,
    dag=renter_phone_data_dag,
)

renter_phone_data_dag >> format_numbers_task >> write_output