In [1]:
import pandas as pd
import sqlalchemy as sa
from datetime import datetime, timedelta
from glob import glob

In [None]:
from EXT_ATH import process_date

In [5]:
dq_sql = """select
	current_date as curr_date,
	date(date_parse("O_Created Date & Time", '%Y-%m-%dT%H:%i:%s')) as ord_date,
	"seller np name" AS seller_np,
    SUM(CASE WHEN "Fulfillment Id" IS NULL THEN 1 ELSE 0 END) AS null_fulfilment_id,
    SUM(CASE WHEN "Network Transaction Id" IS NULL THEN 1 ELSE 0 END) AS null_net_tran_id,
    SUM(CASE WHEN "Qty" IS NULL THEN 1 ELSE 0 END) AS null_qty,
    SUM(CASE WHEN "Item Fulfillment Id" IS NULL THEN 1 ELSE 0 END) AS null_itm_fulfilment_id,
    SUM(CASE WHEN "Delivery Pincode" IS NULL OR 'Delivery Pincode' LIKE '%XXX%' THEN 1 ELSE 0 END) AS null_del_pc,
    SUM(CASE WHEN "O_Created Date & Time" IS NULL THEN 1 ELSE 0 END) AS null_created_date_time,
    SUM(CASE WHEN "Domain" IS NULL THEN 1 ELSE 0 END) AS null_domain,
    SUM(CASE WHEN "Delivery City" IS NULL OR "Delivery City" LIKE '%XXX%' THEN 1 ELSE 0 END) AS null_del_cty,
    SUM(CASE WHEN ("Order Status"='Cancelled') AND ("Cancellation Code" IS NULL  OR "Cancellation Code" LIKE '%Item Out of Stock%' OR "Cancellation Code" LIKE '%std:011%')THEN 1 ELSE 0 END) AS null_cans_code,
    SUM(CASE WHEN "Order Status"='Cancelled' AND "F_Cancelled At Date & Time" IS NULL THEN 1 ELSE 0 END) AS null_cans_dt_time,
    SUM(CASE WHEN "Order Status" IS NULL THEN 1 ELSE 0 END) AS null_ord_stats,
    SUM(CASE WHEN "Fulfillment Status" IS NULL THEN 1 ELSE 0 END) AS null_fulfil_status,
    SUM(CASE WHEN "Item Category" IS NULL THEN 1 ELSE 0 END) AS null_itm_cat,
    SUM(CASE WHEN "Item Consolidated Category" IS NULL THEN 1 ELSE 0 END) AS null_cat_cons,
    SUM(CASE WHEN "Seller Pincode" IS NULL OR "Seller Pincode" LIKE '%XXXX%' THEN 1 ELSE 0 END) AS null_sell_pincode,
    SUM(CASE WHEN "Provider id" IS NULL THEN 1 ELSE 0 END) AS null_prov_id,
    SUM(CASE WHEN "item id" IS NULL THEN 1 ELSE 0 END) AS null_itm_id,
    SUM(CASE WHEN "seller np name" IS NULL THEN 1 ELSE 0 END) AS null_sell_np,
    SUM(CASE WHEN "network order id" IS NULL THEN 1 ELSE 0 END) AS null_net_ord_id,
    SUM(CASE WHEN "seller city" IS NULL THEN 1 ELSE 0 END) AS null_sell_cty,
    count(*) as total_orders,
    SUM(CASE WHEN "Order Status" = 'Cancelled' THEN 1 ELSE 0 END) AS total_canceled_orders
FROM default.nhm_order_fulfillment_subset_v1
where
	extract(month from date(date_parse("O_Created Date & Time", '%Y-%m-%dT%H:%i:%s'))) = extract(month from date('{date_val}'))
GROUP by
	date(date_parse("O_Created Date & Time", '%Y-%m-%dT%H:%i:%s')),
	"seller np name";"""

In [6]:
def get_raw_results(results):
    final_data = []
    columns = [x["VarCharValue"] for x in results["rows"][0]["Data"]]
    rows = [list(map(lambda field: field.get('VarCharValue', ''), row['Data'])) for row in results['rows'][1:]]
    for data in rows:
        final_data.append(data)
    return pd.DataFrame(columns=columns, data=final_data)

In [7]:
def list_dates(start_date, period='days'):
    start_date = datetime.strptime(start_date, '%Y-%m-%d')
    end_date = datetime.now().date()
    date_list = []

    while start_date.date() <= end_date:
        date_list.append(start_date.date().strftime(format="%Y-%m-%d"))

        if period == 'days':
            start_date += timedelta(days=1)
        elif period == 'months':
            if start_date.month == 12:
                start_date = start_date.replace(year=start_date.year + 1, month=1)
            else:
                next_month = start_date.month + 1
                try:
                    start_date = start_date.replace(month=next_month)
                except ValueError:
                    # Handle the cases like transitioning from Jan 31 to Feb
                    start_date = start_date.replace(day=1, month=next_month) + timedelta(days=31)
                    start_date = start_date.replace(day=1)
    return date_list
    

In [8]:
start_date = '2024-05-01'
dates_between = list_dates(start_date, period="months")

In [10]:
tbl_name = "od_dq_nhm"

In [None]:
for date_val in dates_between:
    print(f"Processing {date_val}")
    results = await process_date(tbl_name="nhm_order_fulfillment_subset_v1",date=date_val,raw_query=dq_sql)
    df = get_raw_results(results)
    df.to_parquet(f"D:\\DATA_DUMP\\DATA_QUALITY\\{tbl_name}_{date_val}.parquet", index=False)

In [None]:
sql_mapping = {
  "base_od_dq_nhm.sql": "dq_main",
  
}