In [8]:
# Install a pip package in the current Jupyter kernel
import sys
!{sys.executable} -m pip install azure-storage-blob

import pyarrow.parquet as pq
import pandas as pd
from io import BytesIO
import os, uuid
from azure.storage.blob import BlobServiceClient, BlobClient, ContainerClient, __version__
import traceback


# ABS params
sas_url="https://bvzdtlbxhptci.blob.core.windows.net/lrr-dev-aggregated-order-projections?se=2021-11-30T00%3A00%3A00Z&sp=rwdl&sv=2019-02-02&sr=c&sig=JWxcQoJBRaMpxNpBH5/GhF9lXfa5MpDkNKlXPddTHFA%3D"
order_place_date_folder='2021-09-24'
timestamp_folder='1635253732'#'1632466340'

# File system params
isLocalFileInput=bool(True)
isLocalFileOutput=bool(True)
# Local file paths
input_folder_path='/Users/1011425/Release/DCROFuncTests/dcroengineinput/'+timestamp_folder
output_folder_path='/Users/1011425/Desktop/'+(timestamp_folder+'-filtered')
# Remote file paths
input_folder_path_remote='dcro-input/'+order_place_date_folder+'/'+timestamp_folder
output_folder_path_remote='/Users/1011425/Desktop/'+(timestamp_folder+'-filtered')

# Filter params
filterByItems = bool(False)
items={'55613'}

filterByItemExternalCodes = bool(False)
item_external_codes={'20080137004'}

filterByOrderGroups = bool(True)
ordergroups={'2816'}



try:
    if(os.path.exists(output_folder_path)==False):
        os.mkdir(output_folder_path)
    
    # 1. Master data parquet
    filtered_master_data = generateFilteredMaster()
    # 2. AOP parquet
    generateFilteredAOP(filtered_master_data)
    # 3. Calendar times parquet
    generateFilteredCalendarTimes(filtered_master_data)
    # 4. Safety Stock parquet
    generateFilteredSS(filtered_master_data)
    # 5. Vendor Minimums parquet
    generateFilteredVendorMins(filtered_master_data)
    # 6. Transmode Minimums parquet
    generateFilteredTransMins(filtered_master_data)
    # 7. SchedRcpts parquet
    generateFilteredSchedRcpts(filtered_master_data)
    # 8. Item Conversions parquet
    generateFilteredItemConversions(filtered_master_data)
    # 9. Buy Guide parquet
    generateFilteredBuyGuides(filtered_master_data)
    # 10. SchedRcptsBySupplier parquet
    generateFilteredSchedRcptsBySupplier(filtered_master_data)
    # 11. AVO parquet
    generateFilteredAVO(filtered_master_data)
    
except Exception as ex:
    print('Exception:')
    print(ex)
    print(traceback.format_exc())
    
def generateFilteredMaster():
    master_path = input_folder_path+'/batch0/snd-data/masterdata.parquet'
    master_data = getDataFramesForFile(master_path)
    if(filterByItems):
        #filtered_master_data = master_data[master_data.PP_P_ID.isin(filteredItems)]
        filtered_master_data = master_data[master_data.P_EXTERNAL_CODE.isin(item_external_codes)]
        filteredOGs = set(filtered_master_data.PP_PUG_ID)
    else:
        if(filterByOrderGroups):
            filteredOGs = set(ordergroups)
    
    getFilteredRows(master_data, filteredOGs)
    filtered_master_data = master_data[master_data.PP_PUG_ID.isin(filteredOGs)]
    filtered_master_data.to_parquet(output_folder_path+'/masterdata.parquet')
    return filtered_master_data
    
def generateFilteredAOP(filtered_master_data):
    aop_path = input_folder_path+'/batch0/lsf-output-data/aggregated_order_projections.parquet'
    aop_data = getDataFramesForFile(aop_path)
    filteredItems = set()
    filteredDests = set()
    for index, row in filtered_master_data.iterrows():
        filteredItems.add(str(row['P_EXTERNAL_CODE']))
        filteredDests.add(str(row['L_EXTERNAL_CODE_TARGET']))
    filtered_aop_data = aop_data[aop_data.P_EXTERNAL_CODE.isin(filteredItems)&aop_data.SUPPLIER_EXTERNAL_CODE.isin(filteredDests)]
    filtered_aop_data.to_parquet(output_folder_path+'/aggregated_order_projections.parquet')

def generateFilteredAVO(filtered_master_data):
    avo_path = input_folder_path+'/snd-data/additional_vendor_orders.parquet'
    avo_data = getDataFramesForFile(avo_path)
    filteredItems = set()
    filteredDests = set()
    for index, row in filtered_master_data.iterrows():
        filteredItems.add(str(row['PP_P_ID']))
        filteredDests.add(str(row['PP_L_ID_TARGET']))
    filtered_avo_data = avo_data[avo_data.PP_P_ID.isin(filteredItems)&avo_data.PP_L_ID_TARGET.isin(filteredDests)]
    filtered_avo_data.to_parquet(output_folder_path+'/additional_vendor_orders.parquet')
    
def generateFilteredCalendarTimes(filtered_master_data):
    calendars_path = input_folder_path+'/batch0/snd-data/procurementcalendartimes.parquet'
    calendars_data = getDataFramesForFile(calendars_path)
    filteredCalendars = set()
    for index, row in filtered_master_data.iterrows():
        filteredCalendars.add(str(row['PUG_PC_ID']))
    filtered_calendars_data = calendars_data[calendars_data.PC_ID.isin(filteredCalendars)]
    filtered_calendars_data.to_parquet(output_folder_path+'/procurementcalendartimes.parquet')
    
def generateFilteredSS(filtered_master_data):
    ss_path = input_folder_path+'/batch0/safety_stock_output/safetystock.parquet'
    ss_data = getDataFramesForFile(ss_path)
    filteredItems = set()
    filteredDests = set()
    for index, row in filtered_master_data.iterrows():
        filteredItems.add(str(row['P_EXTERNAL_CODE']))
        filteredDests.add(str(row['L_EXTERNAL_CODE_TARGET']))
    filtered_ss_data = ss_data[ss_data.P_EXTERNAL_CODE.isin(filteredItems)&ss_data.SUPPLIER_EXTERNAL_CODE.isin(filteredDests)]
    filtered_ss_data.to_parquet(output_folder_path+'/safetystock.parquet')
    
def generateFilteredVendorMins(filtered_master_data):
    vendorminimums_path = input_folder_path+'/batch0/snd-data/purchasegroupvendorminimums.parquet'
    vendorminimums_data = getDataFramesForFile(vendorminimums_path)
    filteredOrderGroups = set()
    for index, row in filtered_master_data.iterrows():
        filteredOrderGroups.add(str(row['PP_PUG_ID']))
    filtered_vendorminimums_data = vendorminimums_data[vendorminimums_data.PUGVM_PUG_ID.isin(filteredOrderGroups)]
    filtered_vendorminimums_data.to_parquet(output_folder_path+'/purchasegroupvendorminimums.parquet')
    
def generateFilteredTransMins(filtered_master_data):
    transmodeminimums_path = input_folder_path+'/batch0/snd-data/transportequipmentcapacity.parquet'
    transmodeminimums_data = getDataFramesForFile(transmodeminimums_path)
    filteredTransmodes = set()
    for index, row in filtered_master_data.iterrows():
        filteredTransmodes.add(str(row['PUG_TREQ_ID']))
    filtered_transmodeminimums_data = transmodeminimums_data[transmodeminimums_data.TREQC_TREQ_ID.isin(filteredTransmodes)]
    filtered_transmodeminimums_data.to_parquet(output_folder_path+'/transportequipmentcapacity.parquet')
    
def generateFilteredSchedRcpts(filtered_master_data):
    schedrcpt_path = input_folder_path+'/batch0/snd-data/schedrcpts.parquet'
    schedrcpt_data = getDataFramesForFile(schedrcpt_path)
    filteredItems = set()
    filteredDests = set()
    for index, row in filtered_master_data.iterrows():
        filteredItems.add(str(row['PP_P_ID']))
        filteredDests.add(str(row['PP_L_ID_TARGET']))
    filtered_schedrcpt_data = schedrcpt_data[schedrcpt_data.H_EDLC_P_ID.isin(filteredItems)&schedrcpt_data.H_EDLC_L_ID_TARGET.isin(filteredDests)]
    filtered_schedrcpt_data.to_parquet(output_folder_path+'/schedrcpts.parquet')
    
def generateFilteredItemConversions(filtered_master_data):
    itemconversion_path = input_folder_path+'/batch0/snd-data/unitconversions.parquet'
    itemconversion_data = getDataFramesForFile(itemconversion_path)
    filteredItems = set()
    for index, row in filtered_master_data.iterrows():
        filteredItems.add(str(row['PP_P_ID']))
    filtered_itemconversion_data = itemconversion_data[itemconversion_data.P_P_ID_MASTER.isin(filteredItems)]
    filtered_itemconversion_data.to_parquet(output_folder_path+'/unitconversions.parquet')

def generateFilteredBuyGuides(filtered_master_data):
    bg_path = input_folder_path+'/batch0/dcro-supplier-splits/buy_guide_data.parquet'
    bg_data = getDataFramesForFile(bg_path)
    filteredItems = set()
    filteredDests = set()
    for index, row in filtered_master_data.iterrows():
        filteredItems.add(str(row['PP_P_ID']))
        filteredDests.add(str(row['PP_L_ID_TARGET']))
    print(filteredItems)
    filtered_bg_data = bg_data[bg_data.PP_P_ID.isin(filteredItems)&bg_data.PP_L_ID_TARGET.isin(filteredDests)]
    filtered_bg_data.to_parquet(output_folder_path+'/buy_guide_data.parquet')
    
def generateFilteredSchedRcptsBySupplier(filtered_master_data):
    srs_path = input_folder_path+'/batch0/dcro-supplier-splits/schedrcpts_supplier_data.parquet'
    srs_data = getDataFramesForFile(srs_path)
    filteredItems = set()
    filteredDests = set()
    for index, row in filtered_master_data.iterrows():
        filteredItems.add(str(row['PP_P_ID']))
        filteredDests.add(str(row['PP_L_ID_TARGET']))
    filtered_srs_data = srs_data[srs_data.H_EDLC_P_ID.isin(filteredItems)&srs_data.H_EDLC_L_ID_TARGET.isin(filteredDests)]
    filtered_srs_data.to_parquet(output_folder_path+'/schedrcpts_supplier_data.parquet')
        
def getDataFramesForFile(filepath):
    if(isLocalFileInput):
        return getDataFramesForLocalFile(filepath)
    else:
        return getDataFramesForRemoteFile(filepath)
    
def getDataFramesForLocalFile(filepath):
    retval = pd.read_parquet(filepath, engine='pyarrow')
    print('Successfully read ' + filepath)
    return retval
    
def getDataFramesForRemoteFile(filepath):
    proxy = 'http://bywww.blue-yonder.org:8888'
    os.environ['http_proxy'] = proxy 
    os.environ['HTTP_PROXY'] = proxy
    os.environ['https_proxy'] = proxy
    os.environ['HTTPS_PROXY'] = proxy

    container = ContainerClient.from_container_url(sas_url)
    blob_client = container.get_blob_client(blob=filepath)
    stream_downloader = blob_client.download_blob()
    stream = BytesIO()
    stream_downloader.readinto(stream)
    return pd.read_parquet(stream, engine='pyarrow')
        

def getFilteredRows(master_data, filteredOGs):
    newOGs = set(filteredOGs)
    while(len(newOGs)>0) :
        filtered_master_data = master_data[master_data.PP_PUG_ID.isin(newOGs)]
        filteredOGs.update(newOGs)
        newOGs.clear()
        for index, row in filtered_master_data.iterrows():
            filteredItems = set()
            filteredDests = set()
            filteredItems.add(str(row['PP_P_ID']))
            filteredDests.add(str(row['PP_L_ID_TARGET']))
            filtered_OG_data = master_data[master_data.PP_P_ID.isin(filteredItems)&master_data.PP_L_ID_TARGET.isin(filteredDests)]
            currentFilteredOGs = filtered_OG_data.PP_PUG_ID
            for currOG in currentFilteredOGs:
                if(currOG not in filteredOGs) :
                    newOGs.add(currOG)
            


Successfully read /Users/1011425/Release/DCROFuncTests/dcroengineinput/1635253732/batch0/snd-data/masterdata.parquet
Successfully read /Users/1011425/Release/DCROFuncTests/dcroengineinput/1635253732/batch0/lsf-output-data/aggregated_order_projections.parquet
Successfully read /Users/1011425/Release/DCROFuncTests/dcroengineinput/1635253732/batch0/snd-data/procurementcalendartimes.parquet
Successfully read /Users/1011425/Release/DCROFuncTests/dcroengineinput/1635253732/batch0/safety_stock_output/safetystock.parquet
Successfully read /Users/1011425/Release/DCROFuncTests/dcroengineinput/1635253732/batch0/snd-data/purchasegroupvendorminimums.parquet
Successfully read /Users/1011425/Release/DCROFuncTests/dcroengineinput/1635253732/batch0/snd-data/transportequipmentcapacity.parquet
Successfully read /Users/1011425/Release/DCROFuncTests/dcroengineinput/1635253732/batch0/snd-data/schedrcpts.parquet
Successfully read /Users/1011425/Release/DCROFuncTests/dcroengineinput/1635253732/batch0/snd-data