In [None]:
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DoubleType, DateType
from pyspark.sql.functions import ceil,col,split,lit,sum,bround
from pyspark.sql import functions as sf
from pyspark.sql import Window
from pyspark.sql.functions import min, max
from pyspark import SparkContext
from pyspark import SparkConf
from pyspark.sql import HiveContext
import pyspark.sql.functions as F
import pyspark.sql.functions as f
from pyspark.sql.functions import when
from pyspark.sql.functions import desc, regexp_replace
from pyspark.sql.functions import *

from functools import reduce
from pyspark.sql import DataFrame

# Ingest Data

## Read Data

In [None]:
sqlContext = HiveContext(sc)

inventory_raw_data = sqlContext.read \
     .format('com.databricks.spark.csv') \
     .options(header='false', delimiter=',') \
     .load('Inventory.csv')

## Rename Columns

In [None]:
#Inventory
inventory_raw_data = inventory_raw_data.select(col("_c0"), ... col("_c66"))\
.withColumnRenamed("_c0", "DC") \
...
.withColumnRenamed("_c66", "Vendor_Hi") \

## Modify Schema

In [None]:
# Inventory
inventory_raw_data = inventory_raw_data.withColumn("DC",inventory_raw_data.DC.cast(IntegerType()))\
...
.withColumn("Vendor_Hi",inventory_raw_data.Vendor_Hi.cast(IntegerType()))\

In [None]:
# FR03
fr03_raw_data_1 = sqlContext.read \
     .format('com.databricks.spark.csv') \
     .options(header='false', delimiter=',') \
     .load('slot_1.csv')

fr03_raw_data_2 = sqlContext.read \
     .format('com.databricks.spark.csv') \
     .options(header='false', delimiter=',') \
     .load('slot_2.csv')


fr03_raw_data = [fr03_raw_data_1,fr03_raw_data_2]
fr03_raw_data = reduce(DataFrame.unionAll, fr03_raw_data)

In [None]:
fr03_raw_data = fr03_raw_data.select(col("_c0"), ...col("_c35"))\
.withColumnRenamed("_c0", "DC_fr03") \
    ...
.withColumnRenamed("_c35", "Date")\

In [None]:
fr03_raw_data_select = fr03_raw_data.select(col("DC_fr03"), ...col("Date"))

In [None]:
fr03_raw_data_select = fr03_raw_data_select.withColumn("DC_fr03",fr03_raw_data.DC_fr03.cast(IntegerType()))

# Transform Data

## Inventory

In [None]:
inventory_fr03_raw_data = inventory_raw_data.join(fr03_raw_data,(inventory_raw_data["Slot_Address"] == fr03_raw_data["LOCATION_fr03"])&(inventory_raw_data["DC"] == fr03_raw_data["DC_fr03"])&(inventory_raw_data["WH"] == fr03_raw_data["WH_fr03"]),"left")


In [None]:
# change the null value of product cycle class into "No_Class"
inventory_fr03_raw_data = inventory_fr03_raw_data.withColumn("CYCLE_CLASS", \
              when(inventory_fr03_raw_data.CYCLE_CLASS.isNull(), lit("No_Class")).otherwise(inventory_fr03_raw_data["CYCLE_CLASS"]))


In [None]:
# ------------------ CALCULATING NUMBER OF LAYERS ------------------
# calculate the number of layers for each product based on its quantity in slot and DC TI
inventory_fr03_raw_data=inventory_fr03_raw_data.withColumn("Cal_Number_of_Layer", inventory_fr03_raw_data['Qty_in_Slot'] / inventory_fr03_raw_data['DC_Ti'])

inventory_fr03_raw_data = inventory_fr03_raw_data.select("*", ceil(col('Cal_Number_of_Layer')))

inventory_fr03_raw_data =inventory_fr03_raw_data.withColumnRenamed("CEIL(Cal_Number_of_Layer)", "Adj_Cal_Number_of_Layer")

In [None]:
# ------------------ CALCULATING NUMBER OF POSITIONS (DOUBLE/TRIPLE...) ------------------
# for most of data, either pick location has value or reserve location has value, thus we could simply add them up
inventory_fr03_raw_data=inventory_fr03_raw_data.withColumn("Number_of_Positions", inventory_fr03_raw_data['SelPOS'] + inventory_fr03_raw_data['ResPOS'])

# exclude the abnormal data
inventory_fr03_raw_data = inventory_fr03_raw_data.filter(inventory_fr03_raw_data.Number_of_Positions!=0)

In [None]:
# ------------------ CALCULATING %CUBE UTILIZATION ------------------

inventory_fr03_raw_data=inventory_fr03_raw_data.withColumn("Product_Width_Assumed", lit(product_width))
inventory_fr03_raw_data=inventory_fr03_raw_data.withColumn("Product_Length_Assumed", lit(product_length))
inventory_fr03_raw_data=inventory_fr03_raw_data.withColumn("Pallet_Width_Assumed", lit(pallet_width))
inventory_fr03_raw_data=inventory_fr03_raw_data.withColumn("Pallet_Length_Assumed", lit(pallet_length))

# calculate cube in feet
inventory_fr03_raw_data=inventory_fr03_raw_data.withColumn("Product_Cube_in_Feet", inventory_fr03_raw_data["Prod_Height"]*inventory_fr03_raw_data["Product_Width_Assumed"]*inventory_fr03_raw_data["Product_Length_Assumed"]*inventory_fr03_raw_data['Adj_Cal_Number_of_Layer']/12**3)

inventory_fr03_raw_data=inventory_fr03_raw_data.withColumn("Pallet_Cube_in_Feet", inventory_fr03_raw_data["Pallet_Height"]*inventory_fr03_raw_data["Pallet_Width_Assumed"]*inventory_fr03_raw_data["Pallet_Length_Assumed"]/12**3)

inventory_fr03_raw_data=inventory_fr03_raw_data.withColumn("Location_Cube_in_Feet", inventory_fr03_raw_data["Loc_Height"]*inventory_fr03_raw_data["WDTH"]*inventory_fr03_raw_data["DPTH"]*inventory_fr03_raw_data["ResPOS"]/12**3)

In [None]:
inventory_fr03_raw_data = inventory_fr03_raw_data.withColumn("Number_of_Pallets_per_Location", inventory_fr03_raw_data["Adj_Cal_Number_of_Layer"]/inventory_fr03_raw_data["DC_Hi"])

inventory_fr03_raw_data = inventory_fr03_raw_data.select("*", ceil(col('Number_of_Pallets_per_Location')))

inventory_fr03_raw_data =inventory_fr03_raw_data.withColumnRenamed("ceil(Number_of_Pallets_per_Location)", "Adj_Number_of_Pallets_per_Location") 

In [None]:
inventory_fr03_raw_data = inventory_fr03_raw_data.withColumn("Per_Cube_Utl",(inventory_fr03_raw_data["Product_Cube_in_Feet"]+inventory_fr03_raw_data["Pallet_Cube_in_Feet"]*inventory_fr03_raw_data["Adj_Number_of_Pallets_per_Location"])/inventory_fr03_raw_data["Location_Cube_in_Feet"]*100)

inventory_fr03_raw_data = inventory_fr03_raw_data.select("*", round(col('Per_Cube_Utl'),2))

inventory_fr03_raw_data =inventory_fr03_raw_data.withColumnRenamed("round(Per_Cube_Utl, 2)", "Adj_Per_Cube_Utl") 

In [None]:
inventory_fr03_raw_data = inventory_fr03_raw_data.withColumn("item_avg_number_layer_per_case",((col("Adj_Cal_Number_of_Layer")/col("Number_of_Positions"))))

inventory_fr03_raw_data = inventory_fr03_raw_data.select("*", ceil(col('item_avg_number_layer_per_case')))

inventory_fr03_raw_data =inventory_fr03_raw_data.withColumnRenamed("ceil(item_avg_number_layer_per_case)", "Adj_item_avg_number_layer_per_case")

inventory_fr03_raw_data = inventory_fr03_raw_data.withColumn("item_actual_height_pallet_included",(inventory_fr03_raw_data["Adj_item_avg_number_layer_per_case"]*inventory_fr03_raw_data["Prod_Height"])+inventory_fr03_raw_data["Pallet_Height"])

inventory_fr03_raw_data = inventory_fr03_raw_data.withColumn("Adj_item_actual_height_pallet_included", \
              when(inventory_fr03_raw_data["Product_Cube_in_Feet"] == 0, 0).otherwise(inventory_fr03_raw_data["item_actual_height_pallet_included"]))

In [None]:
inventory_fr03_raw_data = inventory_fr03_raw_data.withColumn("Is_full_pallet_case", \
              when(inventory_fr03_raw_data["Adj_item_avg_number_layer_per_case"] == inventory_fr03_raw_data["DC_Hi"] , lit(1)).otherwise(lit(0)))

inventory_fr03_raw_data = inventory_fr03_raw_data.withColumn("Is_item_hight_gt_loc_height", \
              when(inventory_fr03_raw_data["Adj_item_actual_height_pallet_included"] > inventory_fr03_raw_data["Loc_Height"] , lit(1)).otherwise(lit(0)))

# inventory_fr03_raw_data = inventory_fr03_raw_data.withColumn("Is_1_or_2_layer", \
#               when(((inventory_fr03_raw_data["Adj_item_avg_number_layer_per_case"] ==1) | (inventory_fr03_raw_data["Adj_item_avg_number_layer_per_case"] ==2)) & (inventory_fr03_raw_data["DC_Hi"] >2) , lit(1)).otherwise(lit(0)))

In [None]:
inventory_fr03_raw_data = inventory_fr03_raw_data.withColumn("Is_Ti_Mismatch", \
              when(inventory_fr03_raw_data["DC_Ti"] != inventory_fr03_raw_data["Vendor_Ti"], lit(1)).otherwise(lit(0)))

inventory_fr03_raw_data = inventory_fr03_raw_data.withColumn("Is_Hi_Mismatch", \
              when(inventory_fr03_raw_data["DC_Hi"] != inventory_fr03_raw_data["Vendor_Hi"], lit(1)).otherwise(lit(0)))

inventory_fr03_raw_data = inventory_fr03_raw_data.withColumn("Is_Ti_or_Hi_Mismatch",\
                                         when(((inventory_fr03_raw_data["DC_Ti"] != inventory_fr03_raw_data["Vendor_Ti"]) | (inventory_fr03_raw_data["DC_Hi"] != inventory_fr03_raw_data["Vendor_Hi"])),lit(1)).otherwise(lit(0)))

In [None]:
tihi_df = inventory_fr03_raw_data.groupBy("Vendor_Name","SEGA_Item").sum("Is_Ti_or_Hi_Mismatch","Is_Ti_Mismatch","Is_Hi_Mismatch")

tihi_df = tihi_df.withColumnRenamed("sum(Is_Ti_or_Hi_Mismatch)", "Ti_or_Hi_Mismatch") \
.withColumnRenamed("sum(Is_Ti_Mismatch)", "Ti_Mismatch") \
.withColumnRenamed("sum(Is_Hi_Mismatch)", "Hi_Mismatch") 

tihi_df = tihi_df.withColumn("Ti_or_Hi_Mismatch", \
              when(tihi_df["Ti_or_Hi_Mismatch"] >=1, lit(1)).otherwise(lit(0)))

tihi_df = tihi_df.withColumn("Ti_Mismatch", \
              when(tihi_df["Ti_Mismatch"] >=1, lit(1)).otherwise(lit(0)))

tihi_df = tihi_df.withColumn("Hi_Mismatch", \
              when(tihi_df["Hi_Mismatch"] >=1, lit(1)).otherwise(lit(0)))

In [None]:
#find out reserve locations that don't fit with pick locations
item_list = inventory_fr03_raw_data.select("Item_ID").distinct().rdd.map(lambda r: r[0]).collect()

for item in item_list:
  pick_slots_data = inventory_fr03_raw_data.filter((inventory_fr03_raw_data["Item_ID"]==item) & (inventory_fr03_raw_data["Slot_Type"]=="pick"))
  pick_aisle_list = ipick_slots_data.select("Aisle").distinct().rdd.map(lambda r: r[0]).collect()
  reserve_slots_data = inventory_fr03_raw_data.filter((inventory_fr03_raw_data["Item_ID"]==item) & (inventory_fr03_raw_data["Slot_Type"]=="reserve"))
  reserve_aisle_list = ipick_slots_data.select("Aisle").distinct().rdd.map(lambda r: r[0]).collect()
  for reserve_aisle in reserve_aisle_list:
    if reserve_aisle not in pick_aisle_list:
      inventory_fr03_raw_data = inventory_fr03_raw_data.withColumn("off_aisle",\
                                         when(((inventory_fr03_raw_data["Item_ID"] ==item) & (inventory_fr03_raw_data["Slot_Type"]=="reserve") & (inventory_fr03_raw_data["Aisle"]==reserve_aisle)),lit(1)).otherwise(lit(0)))

In [None]:
inventory_fr03_raw_data.write.mode("overwrite").saveAsTable("MyDatabase.inventory_joined_with_fr03")
tihi_df.write.mode("overwrite").saveAsTable("MyDatabase.tihi")

In [None]:
%sql
SHOW TABLES IN MyDatabase;