# OUTBOUND 
#### DATA: UNION OF CANCELLED ORDERS AND ORDERS THAT WERE FULLY OR PARTIALLY SHIPPED 
#### GRANULARITY LEVEL: ORDER

## Parameters

In [1]:
# Parameter Cell
object_name="OUT_ORD_SHIP_CANC"
instance_name="czpoh_2"


config_path = 'abfss://production@dscglblceedadlsppldnsazr.dfs.core.windows.net/config'
bronze_path = f'abfss://production@dscglblceedadlsppldnsazr.dfs.core.windows.net/bronze/blueyonder_wms/{instance_name}'
silver_path = f'abfss://production@dscglblceedadlsppldnsazr.dfs.core.windows.net/silver/{instance_name}'
gold_path = f'abfss://production@dscglblceedadlsppldnsazr.dfs.core.windows.net/gold/{instance_name}'

## Data Calculation Parameters

In [2]:
cond_dict_KPI_FLG = {'NEW_COLUMN':'DW_KPI_RELEVANT_FLAG',
                    'SOURCE_COLUMN':'USR_ID',
                     'VALUE_O_LIST':[],
                     'VALUE_1_LIST':['NOUSER','slInAdapter_111'],
                    'DEFAULT_VALUE':0}

In [3]:
column_order = ['WH_ID',
                'CLIENT_ID',
                'DW_REFFERENCE_DATE',
                'ORDNUM',
                'ADDDTE',
                'DW_CHANNEL',
                'DW_CATEGORY',
                'DW_SUBCATEGORY',
                'ORDTYP',
                'CPOTYP',
                'DW_NBR_LINES',
                'DW_NBR_CANCELLED_LINES',
                'DW_SUM_ORDQTY',
                'DW_SUM_HOST_ORDQTY',
                'DW_SUM_SHPQTY',
                'DW_KPI_RELEVANT_FLAG',
                'DW_KPI_RESULT',
                'DW_CANCELLED_FLAG',
                'DW_SHIPPED_FLAG',
                'REACOD_CMNT',
                'ORDLIN_CHG_REACOD',
                'USR_ID',
                'DW_CANCELLED_DATE',
                'DW_DISPATCH_DATE',
                'DW_PARTITION'
                ]

## Imports and Configuration

In [4]:
## code to make timestamps below 1900-01-01 work
spark.conf.set("spark.sql.legacy.parquet.int96RebaseModeInRead", "CORRECTED")
spark.conf.set("spark.sql.legacy.parquet.int96RebaseModeInWrite", "CORRECTED")
spark.conf.set("spark.sql.legacy.parquet.datetimeRebaseModeInRead", "CORRECTED")

In [5]:
import delta.tables as DT
import pyspark.sql.functions as F
import pyspark.sql.types as T
from datetime import datetime
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number

##  Loading Source Tables from Bronze Layer

In [6]:
## ORDER related tables
df_ORDACT= (spark
.read
.format("delta")
.load(f"{bronze_path}/ORDACT")
.where("DW_DELETED_FLAG = FALSE AND ACTCOD IN ('OCAN','SCMPL')")
)


df_ORD=(spark
.read
.format("delta")
.load(f"{bronze_path}/ORD")
.where('DW_DELETED_FLAG = FALSE AND CPOTYP IS NOT NULL')
)

df_ORD_LINE=(spark
.read
.format("delta")
.load(f"{bronze_path}/ORD_LINE")
.where('DW_DELETED_FLAG = FALSE')
.fillna({'ORDQTY':0,'SHPQTY':0,'HOST_ORDQTY':0})
)

# SHIPMENT tables
df_SHIPMENT_LINE=(spark
.read
.format("delta")
.load(f"{bronze_path}/SHIPMENT_LINE")
.where('DW_DELETED_FLAG = FALSE')
)

df_SHIPMENT=(spark
.read
.format("delta")
.load(f"{bronze_path}/SHIPMENT")
.where('DW_DELETED_FLAG = FALSE AND SHPSTS = "C"')
)

# TRLR DISPATCH related tables

df_STOP=(spark
.read
.format("delta")
.load(f"{bronze_path}/STOP")
.where('DW_DELETED_FLAG = FALSE')
)

df_CAR_MOVE =(spark
.read
.format("delta")
.load(f"{bronze_path}/CAR_MOVE")
.where('DW_DELETED_FLAG = FALSE')
)

df_TRLR =(spark
.read
.format("delta")
.load(f"{bronze_path}/TRLR")
.where('DW_DELETED_FLAG = FALSE AND TRLR_COD = "SHIP" AND TRLR_STAT = "D"')
)

## GETTING DATA WITH CANCELLED ORDERS

#### 1. Getting from ORD_LINE all orders that have all lines cancelled (granularity: order)

       



In [7]:
## Getting orders that have all order lines cancelled 

df_ORD_ALL_LINES_CANC = \
df_ORD_LINE.groupBy('ORDNUM','WH_ID','CLIENT_ID')\
.agg(F.count("*"          ).astype(T.LongType()).alias('DW_NBR_LINES'),
     F.sum('CANCELLED_FLG').astype(T.LongType()).alias('DW_NBR_CANCELLED_LINES'),
     F.sum('HOST_ORDQTY'  ).astype(T.LongType()).alias('DW_SUM_HOST_ORDQTY'),
     F.sum('ORDQTY'       ).astype(T.LongType()).alias('DW_SUM_ORDQTY')
     )\
.where('DW_NBR_LINES = DW_NBR_CANCELLED_LINES')

In [8]:
# Count: 114523
# Count distinct ordnum: 114523
# df_ORD_ALL_LINES_CANC.count()
# display(df_ORD_ALL_LINES_CANC.select(F.countDistinct('ORDNUM')))

#### 2. Getting from ORDACT row when last order cancellation happened (granularity: order)
Last order cancellation (max(trndte) when ordact.actcod = 'OCAN' ) <br>


In [9]:
## Getting max trndte for 'OCAN' per order


df_ORDACT_last_cancel_key = df_ORDACT.where('ACTCOD = "OCAN"')\
.groupBy('ORDNUM','WH_ID','CLIENT_ID')\
.agg(F.max('ORDACT_ID').alias('ORDACT_ID'),
     F.max('TRNDTE').alias('TRNDTE'))

## Getting full row ORDACT info for last('OCAN') in order
df_ORDACT_last_cancel = \
df_ORDACT.where('ACTCOD = "OCAN"')\
.join(df_ORDACT_last_cancel_key,['ORDNUM','WH_ID','CLIENT_ID','ORDACT_ID','TRNDTE'], 'left_semi')\
.select('ORDNUM','WH_ID','CLIENT_ID','TRNDTE','USR_ID','REACOD_CMNT','ORDLIN_CHG_REACOD','ORDACT_ID')

In [10]:
## count: 122828 - df_ORDACT_last_cancel.count()
## count: 122828 - df_ORDACT_last_cancel_key.count()
# print('all',df_ORDACT_last_cancel.count())
# print('key',df_ORDACT_last_cancel_key.count())

## 3. Joining orders with all lines cancelled to ord and ordact to add needed columns

In [11]:
# Joining ord and ordact to ord_lines for additional columns
df_ORD_CANC = \
df_ORD.select('ORDNUM','WH_ID','CLIENT_ID','CPOTYP','ADDDTE','ORDTYP')\
.join(df_ORD_ALL_LINES_CANC,['ORDNUM','WH_ID','CLIENT_ID'],how='inner' )\
.join(df_ORDACT_last_cancel,['ORDNUM','WH_ID','CLIENT_ID'],how='inner')

In [12]:
## count 114523 df_ORD_CANC.count()
## count 10, df_ORD_CANC.where('ORDACT_ID IS NULL').count()
##  df_ORD_CANC.count()
## df_ORD_CANC.where('ORDACT_ID IS NULL').count()
## count of orders that still have shipment line associated with shipment wich does not have shipment status Cancelled (7)
## display(df_ORD_CANC.join(df_SHIPMENT_LINE,['ORDNUM'], how='inner').join(df_SHIPMENT,['SHIP_ID'], how='inner'))

### ISSUES with data quality
## Issues: 10 orders have all lines cancelled but don't have 'OCAN' action in ordact
## Issues: 7 orders are associated with shipment_line on shipment with shpsts <> B

### 4. Select columns for cancelled orders

In [13]:
## Selecting columns 
df_ORD_CANC_final = df_ORD_CANC.select(\
    'ORDNUM',
    'WH_ID',
    'CLIENT_ID',
    'DW_SUM_ORDQTY',
    'DW_SUM_HOST_ORDQTY',
     F.lit(0).astype(T.LongType()).alias('DW_SUM_SHPQTY'),
    'ADDDTE',
    'USR_ID',
    'REACOD_CMNT',
     'ORDLIN_CHG_REACOD',
     'ORDTYP',
    'DW_NBR_LINES',
    'DW_NBR_CANCELLED_LINES',
     'CPOTYP',
    F.col('TRNDTE').alias('DW_CANCELLED_DATE'),
    F.lit(None).alias('DW_DISPATCH_DATE'),
    F.lit(1).alias('DW_CANCELLED_FLAG'),
    F.lit(0).alias('DW_SHIPPED_FLAG'),
    F.date_sub(F.col('TRNDTE'),0).alias('DW_REFFERENCE_DATE'),
    F.when(F.col(cond_dict_KPI_FLG['SOURCE_COLUMN']).isin(cond_dict_KPI_FLG['VALUE_O_LIST']),F.lit(0)) # KPI_RELEVANT_FLAG
     .when(F.col(cond_dict_KPI_FLG['SOURCE_COLUMN']).isin(cond_dict_KPI_FLG['VALUE_1_LIST']),F.lit(1))
     .otherwise(F.lit(cond_dict_KPI_FLG['DEFAULT_VALUE'])).alias(cond_dict_KPI_FLG['NEW_COLUMN']),
    F.when(F.col("CPOTYP").isin(["40","45"]), "B2C")# DW_CHANNEL
    .when(F.col("CPOTYP").isin(["50","66","30","20"]),"B2B" )
    .otherwise('NA').alias("DW_CHANNEL"),
    F.when(F.col("CPOTYP").isin(["66","30","20"]), "PRIO") # DW_CATEGORY
    .when(F.col("CPOTYP").isin(["50"]),    "NORMAL")
    .otherwise('NA').alias("DW_CATEGORY"),
    F.when(F.col("CPOTYP") == "66", "Two-Step-Cross-Docking") # DW_SUBCATEGORY
    .when(F.col("CPOTYP") == "30", "Replenishment")
    .when(F.col("CPOTYP") == "20", "KEP-Deliveries")
    .otherwise('NA').alias("DW_SUBCATEGORY"),
    F.lit(0).alias('DW_KPI_RESULT'),
    F.date_format(F.date_sub(F.col('TRNDTE'),0),'yQQQ').alias('DW_PARTITION'), ## DW_PARTITION (YearQuarter, Example:'2020Q3')
)

## Getting data for orders that were fully or partially shipped

### 1. All orders that have some lines that were not cancelled

In [14]:
# orders with some lines that were not cancelled
df_ORD_VALID_LINES = df_ORD_LINE\
.withColumn('SHPQTY', F.when(F.col('SHPQTY')>F.col('HOST_ORDQTY'), F.col('HOST_ORDQTY')).otherwise(F.col('SHPQTY')))\
.groupBy('ORDNUM','WH_ID','CLIENT_ID')\
.agg(F.count("*"          ).astype(T.LongType()).alias('DW_NBR_LINES'),
     F.sum('CANCELLED_FLG').astype(T.LongType()).alias('DW_NBR_CANCELLED_LINES'),
     F.sum('HOST_ORDQTY'  ).astype(T.LongType()).alias('DW_SUM_HOST_ORDQTY'),
     F.sum('ORDQTY'       ).astype(T.LongType()).alias('DW_SUM_ORDQTY'),
     F.sum('SHPQTY').astype(T.LongType()).alias('DW_SUM_SHPQTY')
     )\
.where('DW_NBR_LINES <> DW_NBR_CANCELLED_LINES')

In [15]:
## orders with at least one line valid count, 7560410
## df_ORD_VALID_LINES.count()

## 2. Getting from ordact where last shipment completed happened

In [16]:
# All shipment lines that were shipped (confirmed)
## Getting max trndte for 'SCMPL' per order
df_ORDACT_last_scmpl = df_ORDACT.where('ACTCOD = "SCMPL"')\
.groupBy('ORDNUM','WH_ID','CLIENT_ID')\
.agg(F.max('TRNDTE').alias('DISPATCH_DTE'))



In [17]:
## count: 7475220: df_ORDERS_SHIPPED.count()
## count of distinct ordnums:7475220 df_ORDERS_SHIPPED.select(F.countDistinct('ORDNUM')) 
## df_ORDERS_SHIPPED.count()
## display(df_ORDERS_SHIPPED.select(F.countDistinct('ORDNUM')))

### 3. Join ord, orders with at least one line that is not cancelled and orders that were fully/partially dispatched

In [18]:
df_ORD_SHIPPED = \
df_ORD.select('ORDNUM','WH_ID','CLIENT_ID','CPOTYP','ADDDTE','ORDTYP')\
.join(df_ORD_VALID_LINES,['ORDNUM','WH_ID','CLIENT_ID'],how='inner' )\
.join(df_ORDACT_last_scmpl,['ORDNUM','WH_ID','CLIENT_ID'],how='inner')

### 4. Select columns and add final calculations

In [19]:
## Selecting columns 
df_ORD_SHIPPED_final = df_ORD_SHIPPED.select(
    'ORDNUM',
    'WH_ID',
    'CLIENT_ID',
    'DW_SUM_ORDQTY',
    'DW_SUM_HOST_ORDQTY',
    'DW_SUM_SHPQTY',
    'ADDDTE',
    F.lit(None).alias('USR_ID'),
    F.lit(None).alias('REACOD_CMNT'),
    F.lit(None).alias('ORDLIN_CHG_REACOD'),
     'ORDTYP',
    'DW_NBR_LINES',
    'DW_NBR_CANCELLED_LINES',
     'CPOTYP',
    F.lit(None).alias('DW_CANCELLED_DATE'),
    F.col('DISPATCH_DTE').alias('DW_DISPATCH_DATE'),
    F.lit(0).alias('DW_CANCELLED_FLAG'),
    F.lit(1).alias('DW_SHIPPED_FLAG'),
    F.date_sub(F.col('DISPATCH_DTE'),0).alias('DW_REFFERENCE_DATE'),
    F.lit(1).alias(cond_dict_KPI_FLG['NEW_COLUMN']),#KPI RELEVANT FLAG
    F.when(F.col("CPOTYP").isin(["40","45"]), "B2C")# DW_CHANNEL
    .when(F.col("CPOTYP").isin(["50","66","30","20"]),"B2B" )
    .otherwise('NA').alias("DW_CHANNEL"),
    F.when(F.col("CPOTYP").isin(["66","30","20"]), "PRIO") # DW_CATEGORY
    .when(F.col("CPOTYP").isin(["50"]),    "NORMAL")
    .otherwise('NA').alias("DW_CATEGORY"),
    F.when(F.col("CPOTYP") == "66", "Two-Step-Cross-Docking") # DW_SUBCATEGORY
    .when(F.col("CPOTYP") == "30", "Replenishment")
    .when(F.col("CPOTYP") == "20", "KEP-Deliveries")
    .otherwise('NA').alias("DW_SUBCATEGORY"),
    F.when(F.col("DW_SUM_HOST_ORDQTY") == F.col("DW_SUM_SHPQTY"), 1) # DW_KPI_RESULTS
                                .otherwise(0).alias('DW_KPI_RESULT'),
    F.date_format(F.date_sub(F.col('DISPATCH_DTE'),0),'yQQQ').alias('DW_PARTITION'), ## DW_PARTITION (YearQuarter, Example:'2020Q3')
)

## Union of Cancelled and Shipped Orders

In [20]:
df_ORD_CANC_SHIP = df_ORD_SHIPPED_final.unionByName(df_ORD_CANC_final).select(column_order)

## Saving to Silver

In [21]:
df_ORD_CANC_SHIP\
.coalesce(1)\
.write\
.format("delta")\
.mode("overwrite")\
.option("overwriteSchema", "true")\
.partitionBy('DW_PARTITION')\
.save(f"{silver_path}/{object_name}")

In [22]:
# df_ORD_CANC_SHIP.count()

In [23]:
# display(df_ORD_CANC_SHIP.where('ORDNUM = "11002275736"'))

In [24]:
## start: 3:37:45 end 3:41:21 3 mins; row count 7587075