In [1]:
"""main wrapper for IPC safety stock reset"""

import os
import sys
import argparse

import pandas as pd
import datetime as dt
from dateutil.tz import gettz
from ast import literal_eval

In [2]:
# To add path so that we can improt zeno_etl_libs from local folder
sys.path.append('../../../..')

In [3]:
from zeno_etl_libs.helper.aws.s3 import S3
from zeno_etl_libs.db.db import DB, PostGre
from zeno_etl_libs.helper import helper
from zeno_etl_libs.logger import get_logger

In [4]:
from zeno_etl_libs.utils.ipc.forecast_reset import ipc_forecast_reset

  from .autonotebook import tqdm as notebook_tqdm


In [5]:
from zeno_etl_libs.utils.warehouse.wh_intervention.store_portfolio_consolidation import stores_ss_consolidation

In [6]:
from zeno_etl_libs.utils.ipc.goodaid_substitution import update_ga_ss

In [7]:
from zeno_etl_libs.utils.ipc.npi_exclusion import omit_npi_drugs

In [8]:
from zeno_etl_libs.utils.ipc.post_processing import post_processing

In [9]:
from zeno_etl_libs.utils.ipc.doid_update_ss import doid_update

# main

In [10]:
def main(debug_mode, reset_stores, reset_date, type_list, reset_store_ops,
         goodaid_ss_flag, ga_inv_weight, rest_inv_weight, top_inv_weight,
         chronic_max_flag, wh_gen_consolidation, v5_active_flag, v6_active_flag,
         v6_type_list, v6_ptr_cut_off, v3_active_flag,
         omit_npi, corrections_selling_probability_cutoff,
         corrections_cumulative_probability_cutoff, drug_type_list_v4,
         rs_db_read, rs_db_write, read_schema, write_schema, logger):

    s3 = S3()
    logger.info(f"Debug Mode: {debug_mode}")
    status = 'Failed'
    if v3_active_flag == 'Y':
        corrections_flag = True
    else:
        corrections_flag = False

    # Define empty DF if required in case of fail
    order_value_all = pd.DataFrame()
    new_drug_entries = pd.DataFrame()
    missed_entries = pd.DataFrame()

    logger.info("Forecast pipeline starts...")
    try:
        for store_id in reset_stores:
            logger.info("IPC SS calculation started for store id: " + str(store_id))

            # RUNNING FORECAST PIPELINE AND SAFETY STOCK CALC
            drug_class, weekly_fcst, safety_stock_df, df_corrections, \
                df_corrections_111, drugs_max_to_lock_ipcv6, \
                drug_rejects_ipcv6 = ipc_forecast_reset(
                    store_id, type_list, reset_date, corrections_flag,
                    corrections_selling_probability_cutoff,
                    corrections_cumulative_probability_cutoff,
                    rs_db_read, read_schema,
                    drug_type_list_v4=drug_type_list_v4,
                    v5_active_flag=v5_active_flag,
                    v6_active_flag=v6_active_flag,
                    v6_type_list=v6_type_list,
                    v6_ptr_cut_off=v6_ptr_cut_off,
                    chronic_max_flag=chronic_max_flag,
                    logger=logger)

            # WAREHOUSE GENERIC SKU CONSOLIDATION
            if wh_gen_consolidation == 'Y':
                safety_stock_df, consolidation_log = stores_ss_consolidation(
                    safety_stock_df, rs_db_read, read_schema,
                    min_column='safety_stock', ss_column='reorder_point',
                    max_column='order_upto_point')

            # GOODAID SAFETY STOCK MODIFICATION
            if goodaid_ss_flag == 'Y':
                safety_stock_df, good_aid_ss_log = update_ga_ss(
                    safety_stock_df, store_id, rs_db_read, read_schema,
                    ga_inv_weight, rest_inv_weight,
                    top_inv_weight, substition_type=['generic'],
                    min_column='safety_stock', ss_column='reorder_point',
                    max_column='order_upto_point', logger=logger)

            # OMIT NPI DRUGS
            if omit_npi == 'Y':
                safety_stock_df = omit_npi_drugs(safety_stock_df, store_id,
                                                 reset_date, rs_db_read,
                                                 read_schema, logger)

            # POST PROCESSING AND ORDER VALUE CALCULATION
            drug_class, weekly_fcst, safety_stock_df, \
                order_value = post_processing(store_id, drug_class, weekly_fcst,
                                              safety_stock_df, rs_db_read,
                                              read_schema,  logger)
            order_value_all = order_value_all.append(order_value, ignore_index=True)

            # WRITING TO RS-DB
            if debug_mode == 'N':
                logger.info("Writing table to RS-DB")
                # writing table ipc-forecast
                weekly_fcst.rename(
                    columns={'date': 'week_begin_dt', 'fcst': 'point_forecast',
                             'std': 'forecast_deviation'}, inplace=True)
                weekly_fcst['forecast_date'] = dt.datetime.strptime(reset_date, '%Y-%m-%d').date()
                weekly_fcst['week_begin_dt'] = weekly_fcst['week_begin_dt']
                weekly_fcst['created-at'] = dt.datetime.now(
                    tz=gettz('Asia/Kolkata')).strftime('%Y-%m-%d %H:%M:%S')
                weekly_fcst['created-by'] = 'etl-automation'
                weekly_fcst['updated-at'] = dt.datetime.now(
                    tz=gettz('Asia/Kolkata')).strftime('%Y-%m-%d %H:%M:%S')
                weekly_fcst['updated-by'] = 'etl-automation'
                weekly_fcst.columns = [c.replace('_', '-') for c in weekly_fcst.columns]
                table_info = helper.get_table_info(db=rs_db_write,
                                                   table_name='ipc-forecast',
                                                   schema=write_schema)
                columns = list(table_info['column_name'])
                weekly_fcst = weekly_fcst[columns]  # required column order

                logger.info("Writing to table: ipc-forecast")
                s3.write_df_to_db(df=weekly_fcst,
                                  table_name='ipc-forecast',
                                  db=rs_db_write, schema=write_schema)

                # writing table ipc-safety-stock
                safety_stock_df['reset_date'] = dt.datetime.strptime(reset_date, '%Y-%m-%d').date()
                safety_stock_df['created-at'] = dt.datetime.now(
                    tz=gettz('Asia/Kolkata')).strftime('%Y-%m-%d %H:%M:%S')
                safety_stock_df['created-by'] = 'etl-automation'
                safety_stock_df['updated-at'] = dt.datetime.now(
                    tz=gettz('Asia/Kolkata')).strftime('%Y-%m-%d %H:%M:%S')
                safety_stock_df['updated-by'] = 'etl-automation'
                safety_stock_df.columns = [c.replace('_', '-') for c in safety_stock_df.columns]
                table_info = helper.get_table_info(db=rs_db_write,
                                                   table_name='ipc-safety-stock',
                                                   schema=write_schema)
                columns = list(table_info['column_name'])
                safety_stock_df = safety_stock_df[columns]  # required column order

                logger.info("Writing to table: ipc-safety-stock")
                s3.write_df_to_db(df=safety_stock_df,
                                  table_name='ipc-safety-stock',
                                  db=rs_db_write, schema=write_schema)

                # writing table ipc-abc-xyz-class
                drug_class['reset_date'] = dt.datetime.strptime(reset_date, '%Y-%m-%d').date()
                drug_class['created-at'] = dt.datetime.now(
                    tz=gettz('Asia/Kolkata')).strftime('%Y-%m-%d %H:%M:%S')
                drug_class['created-by'] = 'etl-automation'
                drug_class['updated-at'] = dt.datetime.now(
                    tz=gettz('Asia/Kolkata')).strftime('%Y-%m-%d %H:%M:%S')
                drug_class['updated-by'] = 'etl-automation'
                drug_class.columns = [c.replace('_', '-') for c in drug_class.columns]
                table_info = helper.get_table_info(db=rs_db_write,
                                                  table_name='ipc-abc-xyz-class',
                                                  schema=write_schema)
                columns = list(table_info['column_name'])
                drug_class = drug_class[columns]  # required column order

                logger.info("Writing to table: ipc-abc-xyz-class")
                s3.write_df_to_db(df=drug_class,
                                  table_name='ipc-abc-xyz-class',
                                  db=rs_db_write, schema=write_schema)

                # to write ipc v6 tables ...

                # UPLOADING MIN, SS, MAX in DOI-D
                logger.info("Updating new SS to DrugOrderInfo-Data")
                safety_stock_df.columns = [c.replace('-', '_') for c in safety_stock_df.columns]
                ss_data_upload = safety_stock_df.query('order_upto_point > 0')[
                    ['store_id', 'drug_id', 'safety_stock', 'reorder_point',
                     'order_upto_point']]
                ss_data_upload.columns = ['store_id', 'drug_id', 'corr_min',
                                          'corr_ss', 'corr_max']
                new_drug_entries_str, missed_entries_str = doid_update(
                    ss_data_upload, type_list, rs_db_write, write_schema, logger)
                new_drug_entries = new_drug_entries.append(new_drug_entries_str)
                missed_entries = missed_entries.append(missed_entries_str)

                logger.info("All writes to RS-DB completed!")

                # INTERNAL TABLE SCHEDULE UPDATE - OPS ORACLE
                # logger.info(f"Rescheduling SID:{store_id} in OPS ORACLE")
                # if reset_store_ops != None:
                #     content_type = 74
                #     object_id = reset_store_ops.loc[
                #         reset_store_ops[
                #             'store_id'] == store_id, 'object_id'].unique()
                #     for obj in object_id:
                #         request_body = {
                #             "object_id": int(obj), "content_type": content_type}
                #         api_response, _ = django_model_execution_log_create_api(
                #             request_body)
                #         reset_store_ops.loc[
                #             reset_store_ops['object_id'] == obj,
                #             'api_call_response'] = api_response

            else:
                logger.info("Writing to RS-DB skipped")

        status = 'Success'
        logger.info(f"IPC code execution status: {status}")

    except Exception as error:
        logger.exception(error)
        logger.info(f"IPC code execution status: {status}")

    return order_value_all, new_drug_entries, missed_entries

In [11]:
# Parameter passing

In [12]:
env = "stage"

In [13]:
os.environ['env'] = env

In [14]:
email_to = "vivek.revi@zeno.health"
debug_mode = "N"

# JOB EXCLUSIVE PARAMS
exclude_stores = [52, 60, 92, 243, 281]
goodaid_ss_flag = "Y"
ga_inv_weight = 0.5
rest_inv_weight = 0.0
top_inv_weight = 1
chronic_max_flag = "N"
wh_gen_consolidation = "Y"
v5_active_flag = "N"
v6_active_flag = "N"
v6_type_list = ['ethical', 'generic', 'others']
v6_ptr_cut_off = 400
reset_date = "YYYY-MM-DD"
reset_stores = [2]
v3_active_flag = "N"
corrections_selling_probability_cutoff = "{'ma_less_than_2': 0.40, 'ma_more_than_2' : 0.40}"
corrections_cumulative_probability_cutoff = "{'ma_less_than_2':0.50,'ma_more_than_2':0.63}"
drug_type_list_v4 = "{'generic':'{0:[0,0,0], 1:[0,0,1], 2:[0,1,2],3:[1,2,3]}','ethical':'{0:[0,0,0], 1:[0,0,1], 2:[0,1,2],3:[1,2,3]}','others':'{0:[0,0,0], 1:[0,1,2], 2:[0,1,2],3:[1,2,3]}'}"
omit_npi = 'N'

In [15]:
# EVALUATE REQUIRED JSON PARAMS
corrections_selling_probability_cutoff = literal_eval(
    corrections_selling_probability_cutoff)
corrections_cumulative_probability_cutoff = literal_eval(
    corrections_cumulative_probability_cutoff)
drug_type_list_v4 = literal_eval(drug_type_list_v4)

In [16]:
logger = get_logger()
rs_db_read = DB(read_only=True)
rs_db_write = DB(read_only=False)
read_schema = 'prod2-generico'
write_schema = 'prod2-generico'

connecting with secrets manager for getting secrets


In [17]:
# open RS connection
rs_db_read.open_connection()
rs_db_write.open_connection()

InterfaceError: ('communication error', TimeoutError(60, 'Operation timed out'))

In [None]:
if reset_date == 'YYYY-MM-DD':  # Take current date
    reset_date = dt.date.today().strftime("%Y-%m-%d")

In [None]:
if 0:
    pass
else:
    type_list = "('ethical', 'ayurvedic', 'generic', 'discontinued-products', " \
                "'banned', 'general', 'high-value-ethical', 'baby-product'," \
                " 'surgical', 'otc', 'glucose-test-kit', 'category-2', " \
                "'category-1', 'category-4', 'baby-food', '', 'category-3')"
    reset_store_ops = None

In [None]:
""" calling the main function """
order_value_all, new_drug_entries, \
    missed_entries = main(
        debug_mode, reset_stores, reset_date, type_list, reset_store_ops,
        goodaid_ss_flag, ga_inv_weight, rest_inv_weight, top_inv_weight,
        chronic_max_flag, wh_gen_consolidation, v5_active_flag,
        v6_active_flag, v6_type_list, v6_ptr_cut_off, v3_active_flag,
        omit_npi, corrections_selling_probability_cutoff,
        corrections_cumulative_probability_cutoff, drug_type_list_v4,
        rs_db_read, rs_db_write, read_schema, write_schema, logger)

In [None]:
# close RS connection
rs_db_read.close_connection()
rs_db_write.close_connection()

In [None]:
# SENT EMAIL ATTACHMENTS
logger.info("Sending email attachments..")
# to write ..............