In [None]:
# this is include zeno_etl_libs in the python search path on the run time
import sys
sys.path.append('./../..')

In [None]:
import pandas as pd
import numpy as np
import json
from datetime import datetime, timedelta
import boto3
import base64
from botocore.exceptions import ClientError
import time

In [None]:
from zeno_etl_libs.db.db import DB
from zeno_etl_libs.config import set_env_config, EnvNames
from zeno_etl_libs.helper import helper
from zeno_etl_libs.helper.aws.s3 import S3

In [None]:
# !pip install redshift_connector

### Basic Setup

In [None]:
"""
Environment: Activate the env as per the requirement
"""
# env = EnvNames.production
# env = EnvNames.staging
env = EnvNames.development

In [None]:
config = set_env_config(env=env)

In [None]:
config.secrets

In [None]:
db = DB(secrets=config.secrets)

In [None]:
cursor = db.open_connection()

In [None]:
cursor

In [None]:
s3 = S3(aws_access_key_id=config.secrets['AWS_ACCESS_KEY_ID'],
            aws_secret_access_key=config.secrets['AWS_SECRET_ACCESS_KEY_ID'])

## Take the new records from bills and insert them into bills-metadata table

In [None]:
bill_metadata_table = "bills-1-metadata-dummay"

In [None]:
query = f'''
     insert
        into
        "prod2-generico"."{bill_metadata_table}" (id,
        "patient-id",
        "zippin-serial",
        "store-id",
        "doctor-id",
        "promo-code-id",
        "promo-flag",
        "promo-discount",
        "payment-method",
        "redeemed-points",
        "created-by",
        "created-at",
        "updated-at",
        "bill-date",
        "bill-year",
        "bill-month",
        "bill-day",
        "etl-status"
        )
    select
        b.id,
        b."patient-id",
        b."zippin-serial",
        b."store-id",
        b."doctor-id",
        b."promo-code-id",
        case when b."promo-code-id" is null then false else true end ,
        b."promo-discount",
        b."payment-method",
        b."redeemed-points",
        b."created-by",
        b."created-at",
        getdate(),
        trunc(b."created-at"),
        extract(year from b."created-at"),
        extract(month from b."created-at"),
        extract(day from b."created-at"),
        'updating'
    from
        "prod2-generico"."bills-1" b
    left join "prod2-generico"."{bill_metadata_table}" bm on
        bm.id = b.id
    where
        bm.id is null
        and date(b."created-at") between '2021-12-01' and '2021-12-31'
        and (bm."etl-status" != 'updated'
        or bm."etl-status" is null)
'''

In [None]:
db.execute(query, params=None)

# Take the effect of below tables
- "bills-1"
- "patients-store-orders"
- "bills-items-1"
- "inventory-1"
- "drugs"


In [None]:
query = f"""
    update
        "prod2-generico"."{bill_metadata_table}" bm2
    set
        "etl-status" = 'updating',
        "updated-at" = current_timestamp
    from
        (
        select
            f.id
        from
            "prod2-generico"."{bill_metadata_table}" bm
        inner join
            "prod2-generico"."bills-1" f on
            bm.id = f.id
        inner join "prod2-generico"."bill-items-1" a on
            bm."id" = a."bill-id"
        inner join "prod2-generico"."inventory-1" b on
            a."inventory-id" = b."id"
        inner join "prod2-generico".drugs d on
            b."drug-id" = d.id
        left join "prod2-generico"."patients-store-orders" pso on
            bm.id = NVL(pso."bill-id" , 0)
        where
            ((bm."updated-at" < f."updated-at")
                or
        (bm."updated-at" < a."updated-at")
                    or
        (bm."updated-at" < b."updated-at")
                        or
        (bm."updated-at" < d."updated-at")
                            or
        (bm."updated-at" < pso."updated-at"))) ab
    where 
        bm2.id = ab.id;
"""

In [None]:
db.execute(query, params=None)

## Considering only updated bills

In [None]:
query = f'''
 select
    id,
    "patient-id",
    "zippin-serial",
    "store-id",
    "doctor-id",
    "promo-code-id",
    "promo-discount",
    "payment-method",
    "redeemed-points",
    "created-by",
    "created-at",
    "updated-at",
    "bill-date",
    "bill-year",
    "bill-month",
    "bill-day",
    "promo-flag",
    "digital-payment-flag",
    "etl-status"
from
    "prod2-generico"."{bill_metadata_table}" bm3
where
    "etl-status" = 'updating'
'''

In [None]:
db.execute(query, params=None)

In [None]:
changed_bills: pd.DataFrame = cursor.fetch_dataframe()

In [None]:
len(changed_bills)

In [None]:
changed_bills['digital-payment-flag'] = np.where(
    changed_bills['payment-method'].isin(['', ' ', 'cash', 'cheque']), False, True)

In [None]:
# changed_bills['promo-flag'] = np.where(
#     changed_bills['promo-code-id'].isin([None, '', 0, 'NA']), False, True)

In [None]:
changed_bills.head(1).transpose()

## Min bill date logic to get month difference and month rank

In [None]:
query = f'''    
    select
        bm.id,
        bm."patient-id",
        bm."created-at" ,
        row_number () over (partition by bm."patient-id"
    order by
        "created-at" asc) as row_num,
        bm."bill-year", 
        bm."bill-month", 
        bm."bill-date",
        bm."store-id"
    from
        "prod2-generico"."{bill_metadata_table}" bm
    inner join 
     (
        select
            "patient-id"
        from
            "prod2-generico"."{bill_metadata_table}" 
        where
            "etl-status" = 'updating'
        group by
            "patient-id") p on
        bm."patient-id" = p."patient-id"
'''

In [None]:
db.execute(query, params=None)

In [None]:
numbered_bills: pd.DataFrame = cursor.fetch_dataframe()

In [None]:
first_bill = numbered_bills[numbered_bills['row_num'] == 1].rename(
    columns={"created-at":"min-created-at"})[['patient-id', 'min-created-at']]

In [None]:
first_bill.head(2)

## Month bill rank

In [None]:
# bill_rank_month = numbered_bills.copy()

In [None]:
bill_rank_month = numbered_bills.sort_values(by=['patient-id', 'bill-year', 'bill-month', 'bill-date']).copy()

In [None]:
bill_rank_month['month-bill-rank'] = bill_rank_month.groupby(['patient-id', 'bill-year', 'bill-month']).cumcount() + 1

In [None]:
bill_rank_month.head(2)

In [None]:
bill_rank_month_min = bill_rank_month[bill_rank_month['month-bill-rank'] == 1][
    ['patient-id', 'bill-year', 'bill-month', 'bill-date', 'store-id']].rename(
    columns={'bill-date': 'min-bill-date-in-month', 'store-id': 'store-id-month'})

In [None]:
bill_rank_month_min.head(2)

## PR, HD, Ecomm flags

In [None]:
query = f"""
    select
        bm.id,
        bool_or(case when pso."patient-request-id" is null then false else true end) as "pr-flag",
        bool_or(case when pso."order-type" = 'delivery' then true else false end) as "hd-flag",
        bool_or(case when pso."order-source" = 'zeno' then true else false end) as "ecom-flag"
    from
        "prod2-generico"."{bill_metadata_table}" bm
    left join "prod2-generico"."patients-store-orders" pso on
        pso."bill-id" = bm.id
    where
        bm."etl-status" = 'updating'
    group by
        bm.id
"""

In [None]:
db.execute(query, params=None)

In [None]:
pr_hd_ecom_bills: pd.DataFrame = cursor.fetch_dataframe()

In [None]:
pr_hd_ecom_bills.head(1)

## Doctor Data

In [None]:
query = f"""
    select
        bm.id ,
        d."name" as "doctor-name"
    from
        "prod2-generico"."{bill_metadata_table}" bm
    left join "prod2-generico".doctors d on
        bm."doctor-id" = d.id
    where
        bm."etl-status" = 'updating'
"""

In [None]:
db.execute(query, params=None)

In [None]:
doctors: pd.DataFrame = cursor.fetch_dataframe()

In [None]:
# doctors

## bill item, drug, inventory data

In [None]:
query = f"""
    select
        bm.id ,
        bi."inventory-id",
        bi."quantity",
        bi."rate",
        i."drug-id" ,
        i."purchase-rate" ,
        i.mrp ,
        i.ptr ,
        i.expiry ,
        d."drug-name" ,
        d."type" ,
        d."drug-name",
        d."type" as "drug-type",
        d.category as "drug-category",
        d."repeatability-index" ,
        d.composition ,
        d.schedule ,
        d."company-id" ,
        d.company ,
        d.pack
    from
        "prod2-generico"."{bill_metadata_table}" bm
    inner join "prod2-generico"."bill-items-1" bi on
        bm.id = bi."bill-id"
    inner join "prod2-generico"."inventory-1" i on
        bi."inventory-id" = i.id
    inner join "prod2-generico".drugs d on
        i."drug-id" = d.id
    where
        bm."etl-status" = 'updating';
"""

In [None]:
db.execute(query=query)

In [None]:
item_drug_inv: pd.DataFrame = cursor.fetch_dataframe()

In [None]:
item_drug_inv.head()

In [None]:
# Measured fields
item_drug_inv['total-spend'] = item_drug_inv['rate'].astype('float') * item_drug_inv['quantity'].astype('float')
item_drug_inv['total-mrp-value'] = item_drug_inv['mrp'].astype('float') * item_drug_inv['quantity'].astype('float')
item_drug_inv['total-purchase-rate-value'] = item_drug_inv['purchase-rate'].astype('float') * item_drug_inv['quantity'].astype('float')
item_drug_inv['total-ptr-value'] = item_drug_inv['ptr'].astype('float') * item_drug_inv['quantity'].astype('float')

In [None]:
# Quantity fields
item_drug_inv['quantity-generic'] = np.where(item_drug_inv['drug-type'] == 'generic', item_drug_inv['quantity'], 0)
item_drug_inv['quantity-goodaid'] = np.where(item_drug_inv['company'] == 'GOODAID', item_drug_inv['quantity'], 0)
item_drug_inv['quantity-ethical'] = np.where(item_drug_inv['drug-type'] == 'ethical', item_drug_inv['quantity'], 0)
item_drug_inv['quantity-others-type'] = np.where(~item_drug_inv['drug-type'].isin(['generic', 'ethical']), 
                                                 item_drug_inv['quantity'], 0)
item_drug_inv['quantity-chronic'] = np.where(item_drug_inv['drug-category'] == 'chronic',
                                             item_drug_inv['quantity'], 0)

item_drug_inv['quantity-repeatable'] = np.where(((item_drug_inv['repeatability-index'] >= 80) | (
            (item_drug_inv['drug-category'] == 'chronic') & (item_drug_inv['repeatability-index'] >= 40))),
                                           item_drug_inv['quantity'], 0)

In [None]:
# Spend columns
item_drug_inv['spend-generic'] = np.where(item_drug_inv['drug-type'] == 'generic',
                                     item_drug_inv['total-spend'], 0)

item_drug_inv['spend-goodaid'] = np.where(item_drug_inv['company'] == 'GOODAID',
                                 item_drug_inv['total-spend'], 0)

item_drug_inv['spend-ethical'] = np.where(item_drug_inv['drug-type'] == 'ethical',
                                 item_drug_inv['total-spend'], 0)

item_drug_inv['spend-others-type'] = np.where(~item_drug_inv['drug-type'].isin(['generic', 'ethical']),
                                         item_drug_inv['total-spend'], 0)

In [None]:
item_drug_inv.head()

In [None]:
# aggregation at bill level
bills_level_data = item_drug_inv.groupby(['id']).agg({'total-spend': 'sum',
                                              'total-mrp-value': 'sum',
                                              'total-purchase-rate-value': 'sum',
                                              'total-ptr-value': 'sum',
                                              'spend-generic': 'sum',
                                              'spend-goodaid': 'sum',
                                              'spend-ethical': 'sum',
                                              'spend-others-type': 'sum',
                                              'drug-id': 'nunique',
                                              'quantity': 'sum',
                                              'quantity-generic': 'sum',
                                              'quantity-goodaid': 'sum',
                                              'quantity-ethical': 'sum',
                                              'quantity-others-type': 'sum',
                                              'quantity-chronic': 'sum',
                                              'quantity-repeatable': 'sum'}).reset_index()

In [None]:
bills_level_data = bills_level_data.rename(columns={'drug-id': 'num-drugs',
                                        'quantity': 'total-quantity'})

In [None]:
# Patient is generic or not
bills_level_data['is-generic'] = np.where(bills_level_data['quantity-generic'] > 0, 1, 0)

In [None]:
# Patient is GOODAID or not
bills_level_data['is-goodaid'] = np.where(bills_level_data['quantity-goodaid'] > 0, 1, 0)

# Patient is ethical or not
bills_level_data['is-ethical'] = np.where(bills_level_data['quantity-ethical'] > 0, 1, 0)

# Patient is Others type or not
bills_level_data['is-others-type'] = np.where(bills_level_data['quantity-others-type'] > 0, 1, 0)

In [None]:
# Patient is RX or not
bills_level_data['is-rx'] = np.where((bills_level_data['quantity-generic'] + 
                                      bills_level_data['quantity-ethical']) > 0, 1, 0)

In [None]:
# Patient is chronic or not
bills_level_data['is-chronic'] = np.where(bills_level_data['quantity-chronic'] > 0, 1, 0)

# Patient is repeatable or not
bills_level_data['is-repeatable'] = np.where(bills_level_data['quantity-repeatable'] > 0, 1, 0)

## Merging data

### month difference data

In [None]:
transformed_bills = pd.DataFrame()
transformed_bills = changed_bills.merge(first_bill, how='inner', on=['patient-id'])

In [None]:
transformed_bills.head(1).transpose()

In [None]:
transformed_bills['month-diff'] = helper.month_diff(
    transformed_bills['created-at'], transformed_bills['min-created-at'])

In [None]:
transformed_bills = transformed_bills.drop(columns=['min-created-at'])

### PR, HD flags Data

In [None]:
transformed_bills = transformed_bills.merge(pr_hd_ecom_bills, how="left", left_on='id', right_on='id')

In [None]:
transformed_bills.head(2)

### Doctor Data

In [None]:
transformed_bills = transformed_bills.merge(doctors, how="left", left_on='id', right_on='id')

In [None]:
transformed_bills.head(2)

### Drug and inventory  data

In [None]:
transformed_bills = transformed_bills.merge(bills_level_data, how="left", left_on='id', right_on='id')

In [None]:
# transformed_bills.columns

### Month bill rank

In [None]:
transformed_bills = transformed_bills.merge(bill_rank_month[['id', 'month-bill-rank']], how='left', on=['id'])

### Month bill rank min date

In [None]:
transformed_bills = transformed_bills.merge(
    bill_rank_month_min[['patient-id', 'bill-year',  'bill-month', 'min-bill-date-in-month', 'store-id-month']], 
    how='left', on=['patient-id', 'bill-year',  'bill-month']
)

### Normalise date

In [None]:
transformed_bills['normalized-date'] = transformed_bills['created-at'].dt.date.values.astype(
    'datetime64[M]').astype('datetime64[D]')

In [None]:
transformed_bills['normalized-date'] = transformed_bills['normalized-date'].dt.date

### Final column selection

In [None]:
table_info = helper.get_table_info(db=db, table_name=bill_metadata_table, schema='prod2-generico')

In [None]:
"""correcting the column order"""
transformed_bills = transformed_bills[table_info['column_name']]

## Updating the data in the target table using temp table

In [None]:
helper.drop_table(db=db, table_name=bill_metadata_table.replace('-', '_')+"_temp")

In [None]:
""" Creating temp table """
bills1_temp = helper.create_temp_table(db=db, table=bill_metadata_table)

In [None]:
bills1_temp

In [None]:
# """Fill NaN with None (inside table it will be NULL) """
# transformed_bills['pr-flag'] = transformed_bills['pr-flag'].replace({np.nan: None}) 
# transformed_bills['hd-flag'] = transformed_bills['hd-flag'].replace({np.nan: None}) 
# transformed_bills['ecom-flag'] = transformed_bills['ecom-flag'].replace({np.nan: None})

In [None]:
# fillna(-1)
transformed_bills['promo-code-id'] = transformed_bills['promo-code-id'].fillna(-1).astype('int64')
transformed_bills['total-quantity'] = transformed_bills['total-quantity'].fillna(-1).astype('int64')
transformed_bills['quantity-generic'] = transformed_bills['quantity-generic'].fillna(-1).astype('int64')
transformed_bills['quantity-goodaid'] = transformed_bills['quantity-goodaid'].fillna(-1).astype('int64')
transformed_bills["quantity-ethical"] = transformed_bills["quantity-ethical"].fillna(-1).astype('int64')
transformed_bills["quantity-chronic"] = transformed_bills["quantity-chronic"].fillna(-1).astype('int64')
transformed_bills["quantity-repeatable"] = transformed_bills["quantity-repeatable"].fillna(-1).astype('int64')
transformed_bills["quantity-others-type"] = transformed_bills["quantity-others-type"].fillna(-1).astype('int64')


# transformed_bills['promo-code-id'] = transformed_bills['promo-code-id'].fillna(-1).astype('int64')
# transformed_bills['promo-min-purchase'] = transformed_bills['promo-min-purchase'].fillna(-1).astype('int64')
# transformed_bills['campaign-id'] = transformed_bills['campaign-id'].fillna(-1).astype('int64')
# transformed_bills['promo-code'] = 'NA'
# transformed_bills['promo-code-type'] = 'NA'
# transformed_bills['promo-eligibility'] = 'NA'
# transformed_bills['campaign-name'] = 'NA'
# transformed_bills['promo-discount-type'] = 'NA'

In [None]:
# transformed_bills.isnull().sum()
# transformed_bills.info()

In [None]:
ts = time.time()
s3.write_df_to_db(df=transformed_bills, table_name=bills1_temp, db=db)
print(f"total time: {time.time()-ts}")

In [None]:
transformed_bills

In [None]:
# Updating the Destination table using temp table
target = bill_metadata_table
source = bills1_temp
query = """
    update "prod2-generico"."%s" t
        set "month-diff" = s."month-diff",
            "pr-flag" = s."pr-flag",
            "hd-flag" = s."hd-flag",
            "ecom-flag" = s."ecom-flag",
            "doctor-name" = s."doctor-name",
            "total-spend" = s."total-spend", 
            "spend-generic" = s."spend-generic", 
            "spend-goodaid" = s."spend-goodaid", 
            "spend-ethical" = s."spend-ethical", 
            "spend-others-type" = s."spend-others-type", 
            "num-drugs" = s."num-drugs",
            "quantity-generic" = s."quantity-generic", 
            "quantity-goodaid" = s."quantity-goodaid", 
            "quantity-ethical" = s."quantity-ethical", 
            "quantity-chronic" = s."quantity-chronic", 
            "quantity-repeatable" = s."quantity-repeatable", 
            "quantity-others-type" = s."quantity-others-type", 
            "is-generic" = s."is-generic",
            "is-goodaid" = s."is-goodaid", 
            "is-ethical" = s."is-ethical", 
            "is-chronic" = s."is-chronic", 
            "is-repeatable" = s."is-repeatable", 
            "is-others-type" = s."is-others-type",
            "is-rx" = s."is-rx",
            "total-quantity" = s."total-quantity",
            "total-mrp-value" = s."total-mrp-value", 
            "total-purchase-rate-value" = s."total-purchase-rate-value", 
            "total-ptr-value" = s."total-ptr-value", 
            -- "promo-flag" = s."promo-flag",
            "digital-payment-flag" = s."digital-payment-flag",
            "zippin-serial" = s."zippin-serial",
            "month-bill-rank" = s."month-bill-rank",
            "min-bill-date-in-month" = s."min-bill-date-in-month",
            "store-id-month" = s."store-id-month",
            "normalized-date" = s."normalized-date",
            "etl-status" = 'updated'
    from "%s" s
    where t.id = s.id;
""" %(target, source)

In [None]:
db.execute(query=query)

## Update the patients-metadata etl-status

In [None]:
query = f"""
    update
        "prod2-generico"."patients-metadata-2" t
    set
        "etl-status" = 'updating'
    from
        {bills1_temp} s
    where
        t.id = s."patient-id"
        and s."etl-status" = 'updating';
"""

In [None]:
db.execute(query=query)

# Closing the DB conneciton

In [None]:
db.close_connection()