# AWS Glue Studio Notebook
##### You are now running a AWS Glue Studio notebook; To start using your notebook you need to start an AWS Glue Interactive Session.


In [1]:
%idle_timeout 2880
%glue_version 3.0
%worker_type G.2X
%number_of_workers 10

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
  
sc = SparkContext.getOrCreate()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)

Welcome to the Glue Interactive Sessions Kernel
For more information on available magic commands, please type %help in any new cell.

Please view our Getting Started page to access the most up-to-date information on the Interactive Sessions kernel: https://docs.aws.amazon.com/glue/latest/dg/interactive-sessions.html
Installed kernel version: 0.38.1 
Current idle_timeout is 2800 minutes.
idle_timeout has been set to 2880 minutes.
Setting Glue version to: 3.0
Previous worker type: G.1X
Setting new worker type to: G.2X
Previous number of workers: 5
Setting new number of workers to: 10
Authenticating with environment variables and user-defined glue_role_arn: arn:aws:iam::243249644484:role/dev-use1-sc-forecast-mwaa-glue-athena-role
Trying to create a Glue session for the kernel.
Worker Type: G.2X
Number of Workers: 10
Session ID: 7938e91d-519b-4fad-b49c-0855187e2f29
Job Type: glueetl
Applying the following default arguments:
--glue_kernel_version 0.38.1
--enable-glue-datacatalog true
Waitin

In [2]:
spark.sparkContext._conf.setAll([('spark.sql.files.maxPartitionBytes', '500mb'), 
                                 ("spark.sql.shuffle.partitions", 16),
                                ('spark.sql.autoBroadcastJoinThreshold', -1)])

<pyspark.conf.SparkConf object at 0x7fcb2faa79d0>


In [3]:
import pyspark.sql.functions as F
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
import os
from pyspark.sql.window import Window
from pyspark.sql.functions import collect_list
from datetime import datetime
from pyspark.sql.types import IntegerType, FloatType, DoubleType
import itertools




In [4]:
def convert_to_timestamp(value):
    # if value >= 240000:
    #     hours = value // 10000 
    #     minutes = (value // 100) % 100
    #     seconds = value % 100
    # else:
    hours = value // 10000
    minutes = (value // 100) % 100
    seconds = value % 100
    return f"{hours:02d}:{minutes:02d}:{seconds:02d}"

convert_to_timestamp_udf = udf(convert_to_timestamp, StringType())




In [5]:
def join_lag_name(names):
    return "_".join([str(n) for n in names if n])




### Path - Params

In [6]:
# Params
start_date = '2023-01-01'
start_dttm = '2023-01-01 00:00:00'

end_date = '2023-10-01'
end_dttm = '2023-10-01 00:00:00'

order_start_date = '2022-12-19'

# Paths
schema = 's3://'

output_bucket_name = 'ogunes-promise'
output_project_name = 'EDD'

tableau_data_path = os.path.join(
    schema, 
    output_bucket_name, 
    output_project_name, 
    'reporting/data', 
    '2023-11-13_v3')

enrich_data_path = os.path.join(
    schema, 
    output_bucket_name, 
    output_project_name, 
    'enrich', 
    '2023-11-13_v4')




In [7]:
core_fc = ['AVP1','AVP2','CFC1','CLT1',
           'DAY1','DFW1','EFC3','MCI1',
           'MCO1','MDT1','PHX1','RNO1',
           'WFC2', 'BNA1']
G2 = ['AVP2', 'MCI1', 'RNO1', 'BNA1']




### Feature Selection and Prep for Analysis

In [8]:
data = glueContext.create_dynamic_frame.from_catalog(
    database='shipment_inspecter_table_promise', 
    table_name='shipment_transaction_for_edd_estimation_v2')
data = data.toDF()




In [9]:
uom = spark.read.parquet('s3://qzhao-promise/ML_feature/product_uom_flag/')




In [10]:
data.printSchema()

root
 |-- ORDER_ID: decimal(38,0) (nullable = true)
 |-- SHIPMENT_TRACKING_NUMBER: string (nullable = true)
 |-- PRODUCT_KEY: decimal(38,0) (nullable = true)
 |-- ORDER_PACKAGE_COUNT: decimal(38,0) (nullable = true)
 |-- SHIPMENT_QUANTITY: decimal(38,0) (nullable = true)
 |-- SHIPMENT_COUNT_OF_ITEMS_IN_BOX: decimal(38,0) (nullable = true)
 |-- SHIPMENT_COUNT_OF_UNIQUE_ITEMS_IN_BOX: decimal(38,0) (nullable = true)
 |-- CARRIER_CODE: string (nullable = true)
 |-- FFMCENTER_NAME: string (nullable = true)
 |-- RELEASE_DTTM_EST: timestamp (nullable = true)
 |-- SHIPMENT_SHIPPED_DATE: string (nullable = true)
 |-- SHIPMENT_SHIPPED_DTTM_EST: timestamp (nullable = true)
 |-- WAREHOUSE_ACTUAL_SHIP_DTTM_EST: timestamp (nullable = true)
 |-- SHIPMENT_DELIVERY_DATE: string (nullable = true)
 |-- SHIPMENT_DELIVERY_DTTM_EST: timestamp (nullable = true)
 |-- CUSTOMER_POSTCODE: string (nullable = true)
 |-- SHIPMENT_SHIP_ROUTE: string (nullable = true)
 |-- SHIPMENT_ESTIMATED_DELIVERY_DATE: date (null

In [12]:
data.count()

108967392


In [11]:
data = data.join(uom,
                 uom.product_key == data.PRODUCT_KEY, 
                 how = 'left') # F.broadcast
data = data.withColumn('UOM_flag', 
                       F.when(
                           F.col('PRODUCT_SHIPPABLE_UOM') == True, 
                           1).otherwise(0))




In [27]:
# sr_edd.filter((F.col('FFMCENTER_NAME') == 'AVP1') & 
#               (F.col('RELEASE_DATE') == '2022-12-29') & 
#               (F.col('CUSTOMER_POSTCODE') == '08360')).show()

+--------------+-----------------+------------+-------------------+------------+------+------+----------+
|FFMCENTER_NAME|CUSTOMER_POSTCODE|RELEASE_DATE|SHIPMENT_SHIP_ROUTE|CARRIER_CODE|CUTOFF|ADJTNT|NEXTADJTNT|
+--------------+-----------------+------------+-------------------+------------+------+------+----------+
|          AVP1|             8360|  2022-12-29|            BRRS_MX|       FDXGD|180000|     1|         4|
+--------------+-----------------+------------+-------------------+------------+------+------+----------+


In [31]:
# dd = spark.read.parquet('s3://qzhao-promise/shipment_route_EDD_history/2022-12-29/')
# dd.filter((F.col('FCName') == 'AVP1') & 
#               (F.col('DATE') == '2022-12-29') & 
#                (F.col('Zip5') == '08360')).show()

+----------+---------+------+-----+-----------+----+------+---+------+----------+----+-------+-----------------+
|      DATE|DayOfWeek|FCName| MODE|ORSItemType|Zip5|Cutoff|TNT|AdjTNT|NextAdjTNT|Zone|RouteID|__index_level_0__|
+----------+---------+------+-----+-----------+----+------+---+------+----------+----+-------+-----------------+
|2022-12-29|        3|  AVP1|FDXGD|          N|8360|180000|  1|     1|         4|   2|BRRS_MX|           549524|
|2022-12-29|        3|  AVP1|FDXHD|          N|8360|180000|  1|     1|         2|   2|BRRS_MX|           549532|
+----------+---------+------+-----+-----------+----+------+---+------+----------+----+-------+-----------------+


In [32]:
# dd.filter((F.col('FCName') == 'AVP2') & 
#               (F.col('DATE') == '2022-12-29') & 
#                (F.col('Zip5') == '08360')).show()

+----------+---------+------+-----+-----------+----+------+---+------+----------+----+-------+-----------------+
|      DATE|DayOfWeek|FCName| MODE|ORSItemType|Zip5|Cutoff|TNT|AdjTNT|NextAdjTNT|Zone|RouteID|__index_level_0__|
+----------+---------+------+-----+-----------+----+------+---+------+----------+----+-------+-----------------+
|2022-12-29|        3|  AVP2|FDXGD|          N|8360|170000|  1|     1|         4|   1|QBAR_PM|           553570|
|2022-12-29|        3|  AVP2|FDXHD|          N|8360|170000|  1|     1|         2|   1|QBAR_PM|           553577|
+----------+---------+------+-----+-----------+----+------+---+------+----------+----+-------+-----------------+


In [12]:
data = data.groupBy('ORDER_ID',
                    'SHIPMENT_TRACKING_NUMBER',
                    'SHIPMENT_COUNT_OF_ITEMS_IN_BOX',
                    'CARRIER_CODE',
                    'FFMCENTER_NAME',
                    'RELEASE_DTTM_EST',
                    'SHIPMENT_SHIPPED_DTTM_EST',
                    'SHIPMENT_SHIPPED_DATE',
                    'SHIPMENT_DELIVERY_DTTM_EST',
                    'SHIPMENT_DELIVERY_DATE',
                    'CUSTOMER_POSTCODE',
                    'SHIPMENT_SHIP_ROUTE',
                    'SHIPMENT_ESTIMATED_DELIVERY_DATE',
                    'STD_ACTUAL',
                    'STD_V0',
                    'ACTUAL_ZONE',
                    'INITIAL_DELIVERY_ATTEMPT_DTTM_EST',
                    'INITIAL_SHIPMENT_DELIVERY_ATTEMPT_DATE',
                    'WAREHOUSE_EXPECTED_WEIGHT',
                   ).agg(F.max('UOM_flag').alias('UOM_flag'),
                        (F.max(F.col('WAREHOUSE_EXPECTED_HEIGHT')*F.col('WAREHOUSE_EXPECTED_LENGTH')*F.col('WAREHOUSE_EXPECTED_WIDTH'))).alias('package_size'))




In [13]:
# customer state
zip_to_state = spark.read.csv('s3://ogunes-promise/EDD/geolocation/uszips.csv',
                             header=True)

data = data.join(zip_to_state.select(F.col('zip').alias('CUSTOMER_POSTCODE'),
                                    F.col('state_id').alias('CUSTOMER_STATE')),
                on='CUSTOMER_POSTCODE',
                how='left')




In [14]:
# fc shipping scan dow
data = data.withColumn("SHIPPED_DOW", 
                       F.dayofweek(data["SHIPMENT_SHIPPED_DATE"]) - 1)




In [15]:
# std_actual based on attempted date
data = data.withColumn("STD_ACTUAL_VATTEMPTED", 
                     F.datediff(F.col("INITIAL_DELIVERY_ATTEMPT_DTTM_EST"), 
                                F.col("SHIPMENT_SHIPPED_DTTM_EST")))




In [16]:
data = data.withColumn('STD_ACTUAL_VATTEMPTED',
                      F.when(F.col('STD_ACTUAL_VATTEMPTED').isNull(), 
                             F.col('STD_ACTUAL')).otherwise(F.col('STD_ACTUAL_VATTEMPTED')))




In [17]:
data = data.withColumn('STD_V0',
                      F.when(F.col('STD_V0').isNull(), 
                             F.lit(1)).otherwise(F.col('STD_V0')))




In [18]:
data = data.filter(F.col('ACTUAL_ZONE').isNotNull())




In [19]:
# get lag features
calendar = data.select('SHIPMENT_SHIPPED_DATE').dropDuplicates()

w = Window.orderBy("SHIPMENT_SHIPPED_DATE")
calendar = calendar.withColumn("index", F.row_number().over(w))

data = data.join(calendar,
                'SHIPMENT_SHIPPED_DATE')




In [20]:
# get lag features
features = ['STD_ACTUAL_VATTEMPTED']
groupby_features_list = [['SHIPPED_DOW', 'FFMCENTER_NAME', 'CUSTOMER_STATE'],
                         ['SHIPPED_DOW', 'SHIPMENT_SHIP_ROUTE'],
                         ['SHIPPED_DOW', 'ACTUAL_ZONE']]
ma_ranges = [(-28,-1), (-7,-1)] 
operation = 'avg'
index_col = 'index'




In [21]:
for groupby_features in groupby_features_list:

    data = data.repartition(*groupby_features)

    for lag_pair in itertools.product(features, ma_ranges):

        print(lag_pair)

        column, ma_range = lag_pair
        new_col_name = join_lag_name(
        [operation, *[str(ma_) for ma_ in ma_range], column, *[str(gb_) for gb_ in groupby_features]]
        )

        print(new_col_name)

        sql_function = getattr(F, operation)
        sql_function = sql_function(F.col(column))


        ma_window = (Window.partitionBy(groupby_features)
                    .orderBy(index_col)
                    .rangeBetween(*ma_range))

        data = data.withColumn(new_col_name, sql_function.over(ma_window))
        
        data = data.withColumn(new_col_name,
                      F.when(F.col(new_col_name).isNull(), 
                             F.lit(1)).otherwise(F.col(new_col_name)))

('STD_ACTUAL_VATTEMPTED', (-28, -1))
avg_-28_-1_STD_ACTUAL_VATTEMPTED_SHIPPED_DOW_FFMCENTER_NAME_CUSTOMER_STATE
('STD_ACTUAL_VATTEMPTED', (-7, -1))
avg_-7_-1_STD_ACTUAL_VATTEMPTED_SHIPPED_DOW_FFMCENTER_NAME_CUSTOMER_STATE
('STD_ACTUAL_VATTEMPTED', (-28, -1))
avg_-28_-1_STD_ACTUAL_VATTEMPTED_SHIPPED_DOW_SHIPMENT_SHIP_ROUTE
('STD_ACTUAL_VATTEMPTED', (-7, -1))
avg_-7_-1_STD_ACTUAL_VATTEMPTED_SHIPPED_DOW_SHIPMENT_SHIP_ROUTE
('STD_ACTUAL_VATTEMPTED', (-28, -1))
avg_-28_-1_STD_ACTUAL_VATTEMPTED_SHIPPED_DOW_ACTUAL_ZONE
('STD_ACTUAL_VATTEMPTED', (-7, -1))
avg_-7_-1_STD_ACTUAL_VATTEMPTED_SHIPPED_DOW_ACTUAL_ZONE


In [22]:
# add cutoff, adj tnt, next adj tnt
sr_edd = glueContext.create_dynamic_frame.from_catalog(
    database='shipment_inspecter_table_promise', 
    table_name='shipment_route_EDD_history')
sr_edd = sr_edd.toDF()




In [23]:
sr_edd = sr_edd.filter(
    F.col('FCName').isin(core_fc)).cache()

window_spec = Window.partitionBy(
    ['DATE', 'FCName' ,'RouteID', 'Zip5', 'MODE']).orderBy(
    F.col("AdjTNT"))
sr_edd = sr_edd.withColumn(
    "row_number", 
    F.row_number().over(window_spec))           

sr_edd = sr_edd.filter(
    F.col('row_number') == 1).drop('row_number') 

sr_edd = sr_edd.select(F.col('FCName').alias('FFMCENTER_NAME'),
                       F.col('Zip5').alias('CUSTOMER_POSTCODE'),
                       F.col('DATE').alias('RELEASE_DATE'),
                       F.col('RouteID').alias('SHIPMENT_SHIP_ROUTE'),
                       F.col('MODE').alias('CARRIER_CODE'),
                       F.col('Cutoff').alias('CUTOFF'),
                       F.col('AdjTNT').alias('ADJTNT'),
                       F.col('NextAdjTNT').alias('NEXTADJTNT')).dropDuplicates()

data = data.withColumn("RELEASE_DATE", 
                       F.substring(F.col("RELEASE_DTTM_EST"), 1, 10))

data = data.join(sr_edd,
                on=['RELEASE_DATE', 'FFMCENTER_NAME', 'CUSTOMER_POSTCODE', 'SHIPMENT_SHIP_ROUTE', 'CARRIER_CODE'],
                how='left')

data = data.filter((F.col('CUTOFF').isNotNull())).dropDuplicates()
data = data.withColumn("CUTOFF_TIMESTAMP", convert_to_timestamp_udf("CUTOFF"))




In [24]:
# add fc type
data = data.withColumn('FC_TYPE',
                      F.when(F.col('FFMCENTER_NAME').isin(G2), 
                             F.lit('G2')).otherwise('G1'))




In [25]:
tableau_data_path

's3://ogunes-promise/EDD/reporting/data/2023-11-13_v3'


In [26]:
data.repartition(20).write.parquet(tableau_data_path, mode='overwrite')




In [27]:
data = spark.read.parquet(tableau_data_path)




In [28]:
data.count()

41669048


In [33]:
data.select('ORDER_ID', 'SHIPMENT_TRACKING_NUMBER').dropDuplicates().count()

41669048


### Feature Engineering - Model Input Generation

In [29]:
# add flag for orders with multiple shipments
w = Window.partitionBy('ORDER_ID')
data = data.withColumn('PACKAGE_COUNT_BY_ORDER', 
                       F.count('SHIPMENT_TRACKING_NUMBER').over(w))




In [30]:
# after cpt flag
data = data.withColumn(
    'CPT',
    F.when(F.col('FFMCENTER_NAME').isin(G2), 
      F.col('CUTOFF_TIMESTAMP') + F.expr('INTERVAL 5 HOURS')
      ).otherwise( 
    F.col('CUTOFF_TIMESTAMP') + F.expr('INTERVAL 3 HOURS') ))

data = data.withColumn("CPT", 
                     F.substring(F.col("CPT"), 12, 8))

data = data.withColumn("CPT", 
                     F.date_format(
                         F.to_timestamp(
                             F.col("CPT"), 'HH:mm:ss'), 
                         "HH:mm:ss"))

data = data.withColumn("SHIPMENT_SHIPPED_TS", 
                     F.substring(F.col("SHIPMENT_SHIPPED_DTTM_EST"), 12, 8))

data = data.withColumn("SHIPMENT_SHIPPED_TS", 
                     F.date_format(
                         F.to_timestamp(
                             F.col("SHIPMENT_SHIPPED_TS"), 'HH:mm:ss'), 
                         "HH:mm:ss"))

data = data.withColumn('IS_AFTER_CPT',
                    F.when(F.col('SHIPMENT_SHIPPED_TS') > F.col('CPT'),
                          F.lit(1)).otherwise(F.lit(0)))

data = data.withColumn('IS_AFTER_CPT',
                    F.when( 
                        (F.col('CPT') < '04:00:00') & 
                        (F.col('SHIPMENT_SHIPPED_TS') > '04:00:00'),
                          F.lit(0)).otherwise(F.col('IS_AFTER_CPT')))




In [31]:
# is holiday
holiday_path = os.path.join(
    schema, 
    output_bucket_name, 
    output_project_name, 
    'holiday',
    'FdxHolidaysWeb.csv')
holiday = spark.read.csv(holiday_path, header=True)

def user_defined_timestamp(date_col):
    _date = datetime.strptime(date_col, '%m/%d/%y')
    return _date.strftime('%Y-%m-%d')
user_defined_timestamp_udf = F.udf(
    user_defined_timestamp, StringType())

holiday = holiday.withColumn("Date", 
                             user_defined_timestamp_udf('Date'))

data = data.join(holiday.select(F.col('Date').alias('SHIPMENT_SHIPPED_DATE'),
                       F.col('Holiday'), F.col('Note')),
                'SHIPMENT_SHIPPED_DATE',
                'left')

data = data.withColumn("HOLIDAY_FLAG", 
                     F.when(
                         F.col('Holiday').isNotNull(), 
                         F.lit('holiday')).otherwise(F.lit('regular')))
    
data = data.withColumn("HOLIDAY_FLAG", 
                     F.when(
                         F.col('Note').isNotNull(), 
                         F.lit('before_holiday')).otherwise(F.col('HOLIDAY_FLAG')))




In [32]:
# friday saturday tag
data = data.withColumn("SHIPPED_FRI_SAT_TAG", 
                         F.when(
                             F.col('SHIPPED_DOW').isin([5]), F.lit('friday')).otherwise(
                             F.when(F.col('SHIPPED_DOW').isin([6]), F.lit('saturday')).otherwise(
                             F.lit('regular')))
                         )




In [33]:
# time till next CPT
data = data.withColumn("CPT",
                     F.to_timestamp(F.col("CPT"),"HH:mm:ss")) \
   .withColumn("SHIPMENT_SHIPPED_TS",
               F.to_timestamp(F.col("SHIPMENT_SHIPPED_TS"),"HH:mm:ss"))

data = data.withColumn("CPT", 
                     F.when(
                         F.col('CPT').isNull(), 
                         F.to_timestamp(F.lit("00:00:00"),"HH:mm:ss")).otherwise(F.col('CPT'))
                    )

data = data.withColumn("HOURS_TILL_CPT", 
                     F.round((F.col("CPT").cast("long")-F.col("SHIPMENT_SHIPPED_TS").cast("long"))/3600,2))

data = data.withColumn("HOURS_TILL_CPT", 
                     F.when(
                         F.col('HOURS_TILL_CPT')<0, F.col('HOURS_TILL_CPT')+24).otherwise(F.col('HOURS_TILL_CPT'))
                     )

data = data.withColumn("HOURS_TILL_CPT", 
                     F.round(F.col('HOURS_TILL_CPT'), 2))




In [34]:
# distance mi
geo = glueContext.create_dynamic_frame.from_catalog(
    database='shipment_inspecter_table_promise', 
    table_name='geo_attributes'
    )
geo = geo.toDF()

geo = geo.withColumn('dest_zip',
                     F.lpad('dest_zip', 5, '0'))

data = data.join(geo.select(F.col('dest_zip').alias('CUSTOMER_POSTCODE'),
                               F.col('fc_code').alias('FFMCENTER_NAME'),
                               F.col('distance_mi').alias('DISTANCE_MI')), 
                 on = ['CUSTOMER_POSTCODE', 'FFMCENTER_NAME'], 
                 how = 'left')




In [35]:
# arc range
data = data.withColumn('ARC_RANGE', F.when(F.col('DISTANCE_MI') <= 250, F.lit(1))
                            .otherwise(F.when(F.col('DISTANCE_MI') <= 500, F.lit(2))
                                      .otherwise(F.when(F.col('DISTANCE_MI') <= 1000, F.lit(3))
                                                .otherwise(F.when(F.col('DISTANCE_MI') <= 1500, F.lit(4))
                                                          .otherwise(F.when(F.col('DISTANCE_MI') <= 2000, F.lit(5))
                                                                    .otherwise(F.when(F.col('DISTANCE_MI') <= 3000, F.lit(6))
                                                                              .otherwise(F.lit(7))))))))




In [36]:
# customer zip3
data = data.withColumn('CUSTOMER_ZIP3',
                       F.substring(F.col('CUSTOMER_POSTCODE'), 1, 3))




In [37]:
# routes
routes_path = os.path.join(
    schema, 
    output_bucket_name, 
    output_project_name, 
    'routes',
    'routes.csv')
routes = spark.read.csv(routes_path, header=True)
routes = routes.select('FFMCENTER_NAME', 'SHIPMENT_SHIP_ROUTE','RouteType').dropDuplicates()

window_spec = Window.partitionBy(
    ['FFMCENTER_NAME', 'SHIPMENT_SHIP_ROUTE']).orderBy(['RouteType'])
routes = routes.withColumn(
    "row_number", 
    F.row_number().over(window_spec))           
routes = routes.filter(
    F.col('row_number') == 1).drop('row_number') 




In [38]:
data = data.join(routes, 
                 on = ['FFMCENTER_NAME', 'SHIPMENT_SHIP_ROUTE'], 
                 how = 'left')




In [40]:
data = data.withColumn('RouteType',
                      F.when(F.col('RouteType').isNull(), 
                             F.lit('Others')).otherwise(F.col('RouteType')))




In [41]:
# update data types
data = data.withColumn("ACTUAL_ZONE",
                       F.col("ACTUAL_ZONE").cast(IntegerType()))
data = data.withColumn("STD_ACTUAL",
                       F.col("STD_ACTUAL").cast(IntegerType()))
data = data.withColumn("STD_V0",
                       F.col("STD_V0").cast(IntegerType()))
data = data.withColumn("STD_ACTUAL_VATTEMPTED",
                       F.col("STD_ACTUAL_VATTEMPTED").cast(IntegerType()))

data = data.withColumn("avg_-28_-1_STD_ACTUAL_VATTEMPTED_SHIPPED_DOW_FFMCENTER_NAME_CUSTOMER_STATE",
                       F.col("avg_-28_-1_STD_ACTUAL_VATTEMPTED_SHIPPED_DOW_FFMCENTER_NAME_CUSTOMER_STATE").cast(DoubleType()))
data = data.withColumn("avg_-7_-1_STD_ACTUAL_VATTEMPTED_SHIPPED_DOW_FFMCENTER_NAME_CUSTOMER_STATE",
                       F.col("avg_-7_-1_STD_ACTUAL_VATTEMPTED_SHIPPED_DOW_FFMCENTER_NAME_CUSTOMER_STATE").cast(DoubleType()))
data = data.withColumn("avg_-28_-1_STD_ACTUAL_VATTEMPTED_SHIPPED_DOW_SHIPMENT_SHIP_ROUTE",
                       F.col("avg_-28_-1_STD_ACTUAL_VATTEMPTED_SHIPPED_DOW_SHIPMENT_SHIP_ROUTE").cast(DoubleType()))
data = data.withColumn("avg_-7_-1_STD_ACTUAL_VATTEMPTED_SHIPPED_DOW_SHIPMENT_SHIP_ROUTE",
                       F.col("avg_-7_-1_STD_ACTUAL_VATTEMPTED_SHIPPED_DOW_SHIPMENT_SHIP_ROUTE").cast(DoubleType()))
data = data.withColumn("avg_-28_-1_STD_ACTUAL_VATTEMPTED_SHIPPED_DOW_ACTUAL_ZONE",
                       F.col("avg_-28_-1_STD_ACTUAL_VATTEMPTED_SHIPPED_DOW_ACTUAL_ZONE").cast(DoubleType()))
data = data.withColumn("avg_-7_-1_STD_ACTUAL_VATTEMPTED_SHIPPED_DOW_ACTUAL_ZONE",
                       F.col("avg_-7_-1_STD_ACTUAL_VATTEMPTED_SHIPPED_DOW_ACTUAL_ZONE").cast(DoubleType()))




In [42]:
data.printSchema()

root
 |-- FFMCENTER_NAME: string (nullable = true)
 |-- SHIPMENT_SHIP_ROUTE: string (nullable = true)
 |-- CUSTOMER_POSTCODE: string (nullable = true)
 |-- SHIPMENT_SHIPPED_DATE: string (nullable = true)
 |-- RELEASE_DATE: string (nullable = true)
 |-- CARRIER_CODE: string (nullable = true)
 |-- ORDER_ID: decimal(38,0) (nullable = true)
 |-- SHIPMENT_TRACKING_NUMBER: string (nullable = true)
 |-- SHIPMENT_COUNT_OF_ITEMS_IN_BOX: decimal(38,0) (nullable = true)
 |-- RELEASE_DTTM_EST: timestamp (nullable = true)
 |-- SHIPMENT_SHIPPED_DTTM_EST: timestamp (nullable = true)
 |-- SHIPMENT_DELIVERY_DTTM_EST: timestamp (nullable = true)
 |-- SHIPMENT_DELIVERY_DATE: string (nullable = true)
 |-- SHIPMENT_ESTIMATED_DELIVERY_DATE: date (nullable = true)
 |-- STD_ACTUAL: integer (nullable = true)
 |-- STD_V0: integer (nullable = true)
 |-- ACTUAL_ZONE: integer (nullable = true)
 |-- INITIAL_DELIVERY_ATTEMPT_DTTM_EST: timestamp (nullable = true)
 |-- INITIAL_SHIPMENT_DELIVERY_ATTEMPT_DATE: string (n

In [8]:
enrich_data_path

's3://ogunes-promise/EDD/enrich/2023-11-13_v4'


In [45]:
data.repartition(20).write.parquet(enrich_data_path, mode='overwrite')




In [9]:
data = spark.read.parquet(enrich_data_path)




#### Prep For Model

In [11]:
cols = [x.lower() for x in data.schema.names]
data = data.toDF(*cols)




In [12]:
data.printSchema()

root
 |-- ffmcenter_name: string (nullable = true)
 |-- shipment_ship_route: string (nullable = true)
 |-- customer_postcode: string (nullable = true)
 |-- shipment_shipped_date: string (nullable = true)
 |-- release_date: string (nullable = true)
 |-- carrier_code: string (nullable = true)
 |-- order_id: decimal(38,0) (nullable = true)
 |-- shipment_tracking_number: string (nullable = true)
 |-- shipment_count_of_items_in_box: decimal(38,0) (nullable = true)
 |-- release_dttm_est: timestamp (nullable = true)
 |-- shipment_shipped_dttm_est: timestamp (nullable = true)
 |-- shipment_delivery_dttm_est: timestamp (nullable = true)
 |-- shipment_delivery_date: string (nullable = true)
 |-- shipment_estimated_delivery_date: date (nullable = true)
 |-- std_actual: integer (nullable = true)
 |-- std_v0: integer (nullable = true)
 |-- actual_zone: integer (nullable = true)
 |-- initial_delivery_attempt_dttm_est: timestamp (nullable = true)
 |-- initial_shipment_delivery_attempt_date: string (n

In [14]:
# remove outliers
anormality_threshold = 11

data = data.filter(F.col('std_actual_vattempted') >= 0)
data = data.filter(F.col('std_actual_vattempted') < anormality_threshold)




In [51]:
# col selection
col_target = ['std_actual', 'std_actual_vattempted']
col_order = ['order_id', 'shipment_tracking_number', 
             'ffmcenter_name', 'fc_type', 'uom_flag', 'carrier_code',
             'customer_postcode', 'customer_zip3', 'customer_state',
             'shipment_delivery_date', 'initial_shipment_delivery_attempt_date', 
             'shipment_shipped_date',
             'holiday_flag', 'shipped_fri_sat_tag', 'shipped_dow',
             'is_after_cpt', 'hours_till_cpt', 
             'warehouse_expected_weight', 'package_size',
             'adjtnt', 'nextadjtnt', 'std_v0']
col_route = ['shipment_ship_route',
             'arc_range', 
             'distance_mi', 
             'actual_zone',
             'routetype',
             'avg_-28_-1_std_actual_vattempted_shipped_dow_shipment_ship_route', 
             'avg_-7_-1_std_actual_vattempted_shipped_dow_shipment_ship_route', 
             'avg_-28_-1_std_actual_vattempted_shipped_dow_actual_zone', 
             'avg_-7_-1_std_actual_vattempted_shipped_dow_actual_zone', 
             'avg_-28_-1_std_actual_vattempted_shipped_dow_ffmcenter_name_customer_state', 
             'avg_-7_-1_std_actual_vattempted_shipped_dow_ffmcenter_name_customer_state']

df_train_val_test = data.select(col_order + col_route + col_target)




In [52]:
# one-hot encoding
cols_to_encode = ['ffmcenter_name', 'fc_type', 'holiday_flag', 
                  'shipped_fri_sat_tag', 'carrier_code', 'shipped_dow', 
                  'routetype']




In [53]:
def onehotencoding(feature, df):
    
    print(feature)
    
    feature_list = df_train_val_test.select(feature).distinct().toPandas()
    feature_list = sorted(list(feature_list[feature]))
    feature_dict = {value: index for index, value in enumerate(feature_list)}

    print(feature_dict)
    
    for k, v in feature_dict.items():
        df = df.withColumn(
            feature+'_'+ str(k), 
            F.when(F.col(feature) == v, 
                   1).otherwise(0))
   
    return df




In [54]:
for f in cols_to_encode:
    df_train_val_test = onehotencoding(f, df_train_val_test)

ffmcenter_name
{'AVP1': 0, 'AVP2': 1, 'BNA1': 2, 'CFC1': 3, 'CLT1': 4, 'DAY1': 5, 'DFW1': 6, 'EFC3': 7, 'MCI1': 8, 'MCO1': 9, 'MDT1': 10, 'PHX1': 11, 'RNO1': 12, 'WFC2': 13}
fc_type
{'G1': 0, 'G2': 1}
holiday_flag
{'before_holiday': 0, 'holiday': 1, 'regular': 2}
shipped_fri_sat_tag
{'friday': 0, 'regular': 1, 'saturday': 2}
carrier_code
{'FDXGD': 0, 'FDXHD': 1, 'ONTRGD': 2}
shipped_dow
{0: 0, 1: 1, 2: 2, 3: 3, 4: 4, 5: 5, 6: 6}
routetype
{'CS': 0, 'Direct': 1, 'Others': 2, 'Preload': 3, 'RSF': 4}


In [56]:
# Dates
train_start_date = '2023-01-01'
train_end_date = '2023-05-01'

val_start_date = '2023-05-01'
val_end_date = '2023-06-01'

test_start_date = '2023-06-01'
test_end_date = '2023-08-01'

exp_date_string = '20231113_v4'




In [57]:
# Output Paths
output_folder_path = os.path.join(schema,
                                  output_bucket_name,
                                  output_project_name,
                                  'ml_train_val_test_set', 
                                  exp_date_string)

training_validation_path = os.path.join(output_folder_path,
                                        f'train_val_{train_start_date[:10]}_{val_end_date[:10]}')
print(training_validation_path)


#training_path = os.path.join(output_folder_path,
#                             f'train_{train_start_date[:10]}_{train_end_date[:10]}')
#print(training_path)


#validation_path = os.path.join(output_folder_path,
#                               f'val_{val_start_date[:10]}_{val_end_date[:10]}')
#print(validation_path)


test_path = os.path.join(output_folder_path,
                               f'test_{test_start_date[:10]}_{test_end_date[:10]}')
print(test_path)

s3://ogunes-promise/EDD/ml_train_val_test_set/20231113_v4/train_val_2023-01-01_2023-06-01
s3://ogunes-promise/EDD/ml_train_val_test_set/20231113_v4/test_2023-06-01_2023-08-01


In [58]:
df_train_val = df_train_val_test\
    .filter((F.col('shipment_delivery_date') < test_start_date) &
           (F.col('shipment_delivery_date') >= train_start_date))
df_train_val.write.mode('overwrite').parquet(training_validation_path)


#df_train = df_train_val_test\
#    .filter((F.col('shipment_delivery_date') < val_start_date))
#df_train.write.mode('overwrite').parquet(training_path)

#df_val = df_train_val_test\
#    .filter(
#        (F.col('shipment_delivery_date') < test_start_date) & 
#        (F.col('shipment_delivery_date') >= val_start_date))
#df_val.write.mode('overwrite').parquet(validation_path)


df_test = df_train_val_test\
    .filter(
        (F.col('shipment_delivery_date') >= test_start_date) & 
        (F.col('shipment_delivery_date') < test_end_date))
df_test.write.mode('overwrite').parquet(test_path)




In [59]:
df_train_val.count()

21510416


In [60]:
df_train_val_test.count()

41642944


In [76]:
df_train_val_test.select('shipment_tracking_number').dropDuplicates().count()

41642944
