### DATA EXTRACTION AND PREPARATION

In [None]:
from pyspark.sql.window import Window
import pyspark.sql.functions as F
from pyspark.sql.functions import rank
from pyspark.sql.functions import sum
from pyspark.sql.functions import regexp_replace
from pyspark.sql.functions import concat, col, lit

import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
%matplotlib inline

import datetime
from dateutil.rrule import rrule, MONTHLY
from dateutil.relativedelta import relativedelta
import itertools
import statsmodels
from statsmodels.tsa.seasonal import seasonal_decompose
import statistics

import warnings
warnings.filterwarnings('ignore')

In [None]:
data = spark.sql("SELECT * FROM dnd.cftex_v2")
data = data.toPandas()

In [None]:
# data.createOrReplaceTempView("data")
main_data = data.dropna()
main_data = main_data[(main_data['CONTAINER_TYPE_CD'] == 'DRY')&((main_data['CONTAINER_SIZE_CD'] == '20') | (main_data['CONTAINER_SIZE_CD'] == '40'))]
main_data = main_data[(main_data['CDET_Days'] > 0) & (main_data['Std_FreeTimeGranted'] > 0) & (main_data['CONSIGNEE_CUSTOMER_CD'] != 'NULL')]
main_data['Import_Country'] = main_data['DIPLA_CITY_CD'].apply(lambda x : x[:2])
main_data = main_data[['DIPLA_CITY_CD', 'Spot_YN', 'CONTAINER_SIZE_CD', 'PRICE_CALC_DT', 'CommoditySubType_Dsc', 'CDET_Days']]
main_data['CONTAINER_SIZE_CD'] = pd.to_numeric(main_data['CONTAINER_SIZE_CD'])
main_data = main_data.sort_values(by=['DIPLA_CITY_CD', 'Spot_YN', 'CONTAINER_SIZE_CD', 'PRICE_CALC_DT', 'CommoditySubType_Dsc'])
main_data['PRICE_CALC_DT']= main_data['PRICE_CALC_DT'].dt.strftime('%m')
main_data = main_data.rename(columns={'PRICE_CALC_DT':'PRICE_CALC_MONTH'})
# main_data['PRICE_CALC_DT']= main_data['PRICE_CALC_DT'].dt.strftime('%Y-%m')
#main_data.head()

In [None]:
avg_tt = main_data.groupby(['DIPLA_CITY_CD', 'Spot_YN', 'CONTAINER_SIZE_CD', 'PRICE_CALC_MONTH', 'CommoditySubType_Dsc'], as_index=False).mean('CDET_Days')
avg_tt['CDET_Days'] = avg_tt['CDET_Days'].round()
avg_tt = avg_tt.rename(columns={'CDET_Days':'Avg_CDET_Days'})
# avg_tt.head()

In [None]:
avg_tt_others = avg_tt.groupby(['DIPLA_CITY_CD', 'Spot_YN', 'CONTAINER_SIZE_CD', 'PRICE_CALC_MONTH'], as_index=False).mean('Avg_CDET_Days')
avg_tt_others['Avg_CDET_Days'] = avg_tt_others['Avg_CDET_Days'].round()
avg_tt_others['CommoditySubType_Dsc'] = 'Others' 
avg_tt = pd.concat([avg_tt, avg_tt_others], axis=0)

In [None]:
avg_tt = avg_tt.sort_values(['DIPLA_CITY_CD', 'Spot_YN', 'CONTAINER_SIZE_CD', 'PRICE_CALC_MONTH', 'CommoditySubType_Dsc'])
avg_tt = avg_tt.rename(columns={'DIPLA_CITY_CD':'DIPLA_CD', 'CONTAINER_SIZE_CD':'Cont_Size', 'PRICE_CALC_MONTH':'Pcd_Month', 'CommoditySubType_Dsc':'Commodity', 'Avg_CDET_Days':'TT_Pred'})
avg_tt = avg_tt.astype({'Pcd_Month':np.int8, 'TT_Pred':np.int8})
avg_tt['Cont_Type'] = 'Dry'
avg_tt = avg_tt[['Pcd_Month', 'DIPLA_CD', 'Spot_YN', 'Cont_Type', 'Cont_Size', 'Commodity', 'TT_Pred']]
avg_tt.reset_index(inplace=True, drop=True)
avg_tt.head()

Unnamed: 0,Pcd_Month,DIPLA_CD,Spot_YN,Cont_Type,Cont_Size,Commodity,TT_Pred
0,1,AEAMN,N,Dry,20,Appliances and kitchenware,13
1,1,AEAMN,N,Dry,20,Beverages,13
2,1,AEAMN,N,Dry,20,Chemicals,4
3,1,AEAMN,N,Dry,20,Foodstuff,10
4,1,AEAMN,N,Dry,20,Furniture,13


In [None]:
commodity =spark.read.format("delta").load("dbfs:/mnt/Gen2_prod_cleansed_gcss/mars_commodities/")
commodity.createOrReplaceTempView("commodity")
commodity = spark.sql(""" select distinct commodity.code, name from commodity  order by 2""")

commodity = commodity.toPandas()
commodity = commodity.rename(columns={'name':'Commodity', 'code':'Commodity_CD'})
commodity = commodity[commodity['Commodity_CD'].apply(lambda x:len(x) == 4)]
commodity = commodity.drop_duplicates()

avg_tt = avg_tt.merge(commodity, on = 'Commodity', how='left')
avg_tt.tail()

Unnamed: 0,Pcd_Month,DIPLA_CD,Spot_YN,Cont_Type,Cont_Size,Commodity,TT_Pred,Commodity_CD
351331,12,ZAPLZ,Y,Dry,40,Plastic and rubber,5,32
351332,12,ZAPLZ,Y,Dry,40,Sports goods,3,38
351333,12,ZAPLZ,Y,Dry,40,Textiles and apparel,3,41
351334,12,ZAPLZ,Y,Dry,40,Vehicles,2,46
351335,12,ZAPLZ,Y,Dry,40,Wood,3,47


In [None]:
avg_tt['Default'] = 'N'

avg_tt1 = avg_tt.copy()
avg_tt1 = avg_tt1[['DIPLA_CD', 'TT_Pred']].groupby(['DIPLA_CD'], as_index = False).mean()

avg_tt2 = avg_tt1.copy()
avg_tt2['DIPLA_CD'] = 'Others'
avg_tt2 = avg_tt2.groupby(['DIPLA_CD'], as_index = False).mean()

avg_tt1 = avg_tt1.append(avg_tt2)
avg_tt1['TT_Pred'] = round(avg_tt1['TT_Pred'])
avg_tt1['Default'] = 'Y'
avg_tt = avg_tt.append(avg_tt1)

avg_tt[['Pcd_Month', 'Cont_Size', 'TT_Pred']] = avg_tt[['Pcd_Month', 'Cont_Size', 'TT_Pred']].fillna(0).reset_index().drop('index', axis=1)
avg_tt = avg_tt.fillna('')

avg_tt[['Pcd_Month', 'Cont_Size', 'TT_Pred']] = avg_tt[['Pcd_Month', 'Cont_Size', 'TT_Pred']].astype(int)

avg_tt['Operator'] = 'Maersk'
avg_tt = avg_tt[['Operator', 'Pcd_Month', 'DIPLA_CD', 'Spot_YN', 'Cont_Type', 'Cont_Size','Commodity', 'Commodity_CD', 'TT_Pred', 'Default']]
avg_tt.loc[avg_tt['Commodity']=='Others', 'Commodity_CD'] = 'Others'
avg_tt = avg_tt.rename(columns={'DIPLA_CD':'Dipla_CD'})
avg_tt.loc[avg_tt['Operator']=='Maersk', 'Operator'] = 'MAEU'
avg_tt.head()

Unnamed: 0,Operator,Pcd_Month,Dipla_CD,Spot_YN,Cont_Type,Cont_Size,Commodity,Commodity_CD,TT_Pred,Default
0,MAEU,1,AEAMN,N,Dry,20,Appliances and kitchenware,14,13,N
1,MAEU,1,AEAMN,N,Dry,20,Beverages,2,13,N
2,MAEU,1,AEAMN,N,Dry,20,Chemicals,16,4,N
3,MAEU,1,AEAMN,N,Dry,20,Foodstuff,6,10,N
4,MAEU,1,AEAMN,N,Dry,20,Furniture,20,13,N


In [None]:
avg_df_main_spark = spark.createDataFrame(avg_tt)
#avg_df_main_spark.createOrReplaceTempView("avg_spark")
display(avg_df_main_spark)

Operator,Pcd_Month,Dipla_CD,Spot_YN,Cont_Type,Cont_Size,Commodity,Commodity_CD,TT_Pred,Default
MAEU,1,AEAMN,N,Dry,20,Appliances and kitchenware,0014,13,N
MAEU,1,AEAMN,N,Dry,20,Beverages,0002,13,N
MAEU,1,AEAMN,N,Dry,20,Chemicals,0016,4,N
MAEU,1,AEAMN,N,Dry,20,Foodstuff,0006,10,N
MAEU,1,AEAMN,N,Dry,20,Furniture,0020,13,N
MAEU,1,AEAMN,N,Dry,20,Meat,0008,7,N
MAEU,1,AEAMN,N,Dry,20,Metal,0023,18,N
MAEU,1,AEAMN,N,Dry,20,Miscellaneous manufactured materials,0025,18,N
MAEU,1,AEAMN,N,Dry,20,"Ores, slag and ash",0027,10,N
MAEU,1,AEAMN,N,Dry,20,Others,Others,12,N


In [None]:
avg_df_main_spark = spark.createDataFrame(avg_tt)
#avg_df_main_spark.createOrReplaceTempView("avg_spark")
display(avg_df_main_spark)

Operator,Pcd_Month,Dipla_CD,Spot_YN,Cont_Type,Cont_Size,Commodity,Commodity_CD,TT_Pred,Default
MAEU,1,AEAMN,N,Dry,20,Appliances and kitchenware,0014,13,N
MAEU,1,AEAMN,N,Dry,20,Beverages,0002,13,N
MAEU,1,AEAMN,N,Dry,20,Chemicals,0016,4,N
MAEU,1,AEAMN,N,Dry,20,Foodstuff,0006,10,N
MAEU,1,AEAMN,N,Dry,20,Furniture,0020,12,N
MAEU,1,AEAMN,N,Dry,20,Metal,0023,18,N
MAEU,1,AEAMN,N,Dry,20,Miscellaneous manufactured materials,0025,18,N
MAEU,1,AEAMN,N,Dry,20,"Ores, slag and ash",0027,10,N
MAEU,1,AEAMN,N,Dry,20,Others,Others,12,N
MAEU,1,AEAMN,N,Dry,20,Paper,0028,10,N


In [None]:
avg_df_main_spark.count()

351886

In [None]:
avg_df_main_spark.count()

339301

In [None]:
sqlschemaName= 'ctpprod.' 

dwDatabase = 'cftex_pricing'
  
dwServer =  'sqlserveridadndprod'
  
dwJdbcPort =  '1433'
  
dwJdbcExtraOptions =  'encrypt=true;trustServerCertificate=true;hostNameInCertificate=*.database.windows.net;loginTimeout=30;' 


In [None]:
dwUser = '****'
dwPass = '****'
dwJdbcExtraOptions = "encrypt=true;trustServerCertificate=true;hostNameInCertificate=*.database.windows.net;loginTimeout=30;"
sqlDwUrlSmall = "jdbc:sqlserver://" + dwServer + ".database.windows.net:" + dwJdbcPort + ";database=" + dwDatabase + ";user=" + dwUser+";password=" + dwPass
sqlDwhTbl="seasonality"

In [None]:
from  pyspark.sql.functions import col,when
from pyspark.sql.types import DoubleType
from pyspark.sql.types import BooleanType
from pyspark.sql.types import IntegerType
avg_df_main_spark = avg_df_main_spark.withColumn("Pcd_Month", avg_df_main_spark["Pcd_Month"].cast(IntegerType())).withColumn("Cont_Size", avg_df_main_spark["Cont_Size"].cast(IntegerType())).withColumn("TT_Pred", avg_df_main_spark["TT_Pred"].cast(IntegerType()))

In [None]:
avg_df_main_spark.write.format("jdbc").mode("overwrite").option("truncate","true").option("BEST_EFFORT","true").option("tabLock","true").option("url", sqlDwUrlSmall).option("dbtable", sqlschemaName+sqlDwhTbl).option("user", dwUser).option("password", dwPass).option("driver", "com.microsoft.sqlserver.jdbc.SQLServerDriver").save()