In [None]:
import datetime
import sys
import os
import json
import pandas as pd
from sqlalchemy import create_engine
from sqlalchemy.sql import text
from importlib import reload

sys.path.append(os.path.join(os.path.dirname(os.getcwd()), 'dags', 'src'))


#%pip install psycopg2
import psycopg2

### Query texts

In [None]:
#transform app_time and filter the loan record within a specified application time range
CREATE_TEMP_TABLE_LOAN = """
    create temp table loan as (
        select 
            lower(t1.loan_id) loan_id,
            lower(t1.customer_id) customer_id,
            lower(t1.loan_status) loan_status,
            cast(concat(split_part(t1.application_time, '-', 2), '-', split_part(t1.application_time, '-', 1), '-', split_part(t1.application_time, '-', 3)) as timestamp) application_time,
            t1.current_loan_amount,
            lower(t1.term) term,
            t1.tax_liens,
            lower(t1.purpose) purpose,
            t1.no_of_properties 
        from (
            select 
                row_number() over(partition by loan_id order by application_time desc) rnk,
                ld.*
            from (
                select distinct * 
                from loan_details 
                where cast(concat(split_part(application_time, '-', 2), '-', split_part(application_time, '-', 1), '-', split_part(application_time, '-', 3)) as timestamp) between '{start_date}' and '{end_date}') ld
        ) t1
        where rnk=1
    );
"""

In [None]:
#get unique customers and their details

CREATE_TEMP_TABLE_CUSTOMER = """
    create temp table customer as (
        select t2.* 
        from (
            select customer_id, count(*) cnt from (select distinct * from customer_details) cd 
            group by customer_id 
        ) t1 
        join (select distinct * from customer_details) t2
        on t2.customer_id = t1.customer_id
        where t1.cnt=1
    );
"""

In [None]:
#get credit details for each customer

CREATE_TEMP_TABLE_CREDIT = """
    create temp table credit as (
        select t2.* 
        from (
            select customer_id, count(*) cnt from (select distinct * from credit_details) cd 
            group by customer_id 
        ) t1 
        join (select distinct * from credit_details) t2
        on t2.customer_id = t1.customer_id
        where t1.cnt=1
    );
"""

In [None]:
#get loan data

GET_DATA = """
    select 
        t1.loan_id, t1.customer_id, t1.loan_status, t1.application_time, t1.current_loan_amount, t1.term, t1.tax_liens, t1.purpose, t1.no_of_properties,
        lower(t2.home_ownership) home_ownership, t2.annual_income, lower(t2.years_in_current_job) years_in_current_job, t2.months_since_last_delinquent, t2.no_of_cars, t2.no_of_children,
        t3.credit_score, t3.monthly_debt, t3.years_of_credit_history, t3.no_of_open_accounts, t3.no_of_credit_problems, t3.current_credit_balance, t3.max_open_credit, t3.bankruptcies
    from loan t1
    left join customer t2
    on t2.customer_id = t1.customer_id
    left join credit t3
    on t3.customer_id = t2.customer_id
"""

In [None]:
def save_dataset(df:pd.DataFrame, path:str):
    """
    Save dataframe to csv.
    :param df: DataFrame
    :param path: str
    :return: None
    """
    df.to_csv(path, index=False)
    print(f"[INFO] Dataset saved to {path}")

In [None]:
import config
credentials = json.load(open(config.PATH_TO_CREDENTIALS, 'r'))
engine = create_engine(f"postgresql://{credentials['user']}:{credentials['password']}@{credentials['host']}:{credentials['port']}/{credentials['database']}")


In [None]:
def extract_data(start_date, end_date=datetime.date.today()) -> pd.DataFrame:
    """
    Extracts data from the database and returns it as a pandas dataframe.
    :param start_date: datetime.date = start date of the data to be extracted
    :param end_date: :datetime.date= end date of the data to be extracted
    :return: pandas dataframe
    """
    # assert start_date <= end_date, "start_date must be less than end_date"
    if start_date > end_date:
        raise ValueError("start_date must be less than end_date")

    print("[INFO] Extracting data from the database from {0} to {1} ...".format(start_date, end_date))
    helpers.engine.execute(text("""drop table if exists customer;""").execution_options(autocommit=True))
    helpers.engine.execute(text(CREATE_TEMP_TABLE_CUSTOMER).execution_options(autocommit=True))
    helpers.engine.execute(text("""drop table if exists loan;""").execution_options(autocommit=True))
    helpers.engine.execute(text(CREATE_TEMP_TABLE_LOAN.format(start_date=start_date, end_date=end_date)).execution_options(autocommit=True))
    helpers.engine.execute(text("""drop table if exists credit;""").execution_options(autocommit=True))
    helpers.engine.execute(text(CREATE_TEMP_TABLE_CREDIT).execution_options(autocommit=True))
    df = pd.read_sql(text(GET_DATA), helpers.engine)
    return df

In [None]:
def collect_data(start_date, end_date = datetime.date.today(), job_id=None):
    """
    Collects data from the database and dump them in the directory of raw data `config.PATH_DIR_DATA`.
    :param start_date: start date of the data to be extracted
    :param end_date: end date of the data to be extracted
    :param job_id: job id of the data to be extracted
    """
    assert isinstance(start_date, datetime.date)
    assert isinstance(end_date, datetime.date)
    assert isinstance(job_id, str)
    assert start_date <= end_date
    size = 0

    df = extract_data(start_date, end_date)
    size = df.shape[0]
    filename = os.path.join(config.PATH_DIR_DATA, "raw", f"{job_id}_"+start_date.strftime("%Y-%m-%d")+"_"+end_date.strftime("%Y-%m-%d")+".csv")
    helpers.save_dataset(df, filename)
    return filename

In [None]:
import helpers

job_id = helpers.generate_uuid()
start_date = datetime.date(2015, 1, 1)
end_date = datetime.date(2015, 5, 31)
print("job Id:", job_id)


In [None]:
print(collect_data(start_date, end_date, job_id))