In [1]:
import pandas as pd
import numpy as np
import os
import socket
import datetime
import itertools
from pyspark import SparkConf
from pyspark import SparkContext
from pyspark.sql import HiveContext
from pyspark.sql.types import ArrayType, FloatType, StringType, BooleanType, DateType, DoubleType, IntegerType
from pyspark.sql.functions import udf, lit, array_contains, col, when, concat_ws, to_timestamp, from_unixtime, unix_timestamp
from pyspark.sql import functions as f
from pyspark.ml.feature import HashingTF, IDF, Tokenizer, CountVectorizer
from pyspark.ml.feature import StopWordsRemover
import pickle
import re

In [2]:
SparkContext.version

<property at 0x7f1adcb74f18>

In [3]:
from subprocess import call
# Configure environment (note: does NOT work with java-11)
os.environ["JAVA_HOME"] = "/java_home"
os.environ['SPARK_HOME'] = '/spark_home'
os.environ['HADOOP_CONF_DIR'] = '/hadoop_dir'
os.environ['YARN_CONF_DIR'] = '/yarn_dir'
os.environ['HADOOP_OPTS'] = '/hadoop_opts'
os.environ['_JAVA_OPTIONS'] = '/java_options'


In [4]:
# folder configs
main_folder = 'main_folder_name'
time_now = datetime.datetime.now().strftime('%Y%m%d-%H%M%S')
debug_flag = False

In [5]:
# Spark config
config = SparkConf().setAll([('spark.executor.memory', '10g'),
                             ('spark.executor.cores', '2'),
                             ('spark.yarn.executor.memoryOverhead','5000')])

In [6]:
start_time = datetime.datetime.now()
import pyspark
from pyspark.sql import SparkSession
sc = pyspark.SparkContext(conf=config)
spark = SparkSession.builder.appName('hive').getOrCreate()
stop_time = datetime.datetime.now()
print("Run time: " + str(stop_time-start_time))
# running spark 2.4.0

Run time: 0:00:12.335996


In [8]:
if debug_flag: sc.getConf().getAll()

In [9]:
from pyspark.sql import HiveContext

sqlContext = HiveContext(sc)

In [53]:
# this is a Impala database that is already located in a Hadoop environment 
# You could build your own set of files and load them in Spark without Impala
sqlContext.sql("use db_panjiva")
dates_table = sqlContext.sql('SELECT * FROM panjivausimportdates')

In [54]:
imports = sqlContext.sql("""Select panjivarecordid,
volumeteu,
arrivaldate,
portofunlading,
concountry
from panjivausimport 
where concountry = 'United States' OR concountry = 'None'""")
dates_table = dates_table.join(imports, how='inner',on='panjivarecordid')
dates_table.printSchema()

root
 |-- panjivarecordid: long (nullable = true)
 |-- sourcecountry: string (nullable = true)
 |-- direction: string (nullable = true)
 |-- datayear: long (nullable = true)
 |-- xfcreationdate: timestamp (nullable = true)
 |-- panjivadataitemid: long (nullable = true)
 |-- dataitemvalue: timestamp (nullable = true)
 |-- volumeteu: double (nullable = true)
 |-- arrivaldate: timestamp (nullable = true)
 |-- portofunlading: string (nullable = true)
 |-- concountry: string (nullable = true)



In [55]:
dates_table = dates_table.withColumn('load_timestamp', f.unix_timestamp(col('xfcreationdate')))
dates_table = dates_table.withColumn('year', f.year(col('arrivaldate')))
dates_table = dates_table.withColumn('month', f.month(col('arrivaldate'))+1) #moving everything one month forward
dates_table = dates_table.withColumn('day', lit(int("01")))

In [56]:
dates_table =(dates_table.withColumn("year", when(f.col("month") > 12, (f.col("year")+1)) #moving month "13" forward to the next year
                                      .otherwise(f.col("year"))))

dates_table = (dates_table.withColumn("month", when(f.col("month") > 12, f.col("month")-12) #changing month "13 to "1"
                                      .otherwise(f.col("month"))))
#into a date
cols=["year","month","day"]
dates_table = (dates_table.withColumn("data_timestamp",concat_ws("-",*cols))
            .withColumn('data_timestamp', f.unix_timestamp(col('data_timestamp'), "yyyy-MM-dd")))

In [58]:
#list of interested ports
li =['The Port of Los Angeles, Los Angeles, California',
     "Port of Tacoma, Tacoma, Washington",
      "Port of Seattle, Seattle, Washington",
      'Port of Long Beach, Long Beach, California',
      "New York/Newark Area, Newark, New Jersey",
      "New York, New York",
      "Georgia Ports Authority, Savannah, Georgia",
      "Houston, Houston, Texas",
     "Port of Virginia, Norfolk, Virginia",
     "Port of Oakland, Oakland, California",
    "The Port of Charleston, Charleston, South Carolina"]


In [59]:
dates_table = dates_table.filter(dates_table.portofunlading.isin(li))

In [60]:
dates_table.select('portofunlading').distinct().sort(col('portofunlading').asc()).count()

11

In [61]:
dates_table.select('portofunlading').distinct().sort(col('portofunlading').asc()).show()

+--------------------+
|      portofunlading|
+--------------------+
|Georgia Ports Aut...|
|Houston, Houston,...|
|  New York, New York|
|New York/Newark A...|
|Port of Long Beac...|
|Port of Oakland, ...|
|Port of Seattle, ...|
|Port of Tacoma, T...|
|Port of Virginia,...|
|The Port of Charl...|
|The Port of Los A...|
+--------------------+



In [63]:
dates_table = dates_table.withColumn('date_diff', f.round(col('load_timestamp') - col('data_timestamp'))/(60*60*24))

date_cutoff_names = []
for i in list(range(-31,31)) + [40,50,60]: #1-30, 40, 50, 60
    dates_table = dates_table.withColumn('date_cutoff_' + str(i), col('date_diff') <= i)

In [66]:
# creating folders with time stamp inside for tracking jobs
program_name = 'teu_delay'
if not os.path.exists(main_folder + '/' + program_name):
    os.mkdir(main_folder + '/' + program_name)
if not os.path.exists(main_folder + '/' + program_name + '/' + time_now):
    current_program = main_folder + '/' + program_name + '/' + time_now
    os.mkdir(current_program)
   

In [67]:
for delay_i in list(range(-31,31)) + [40,50,60] : 
    col_name = 'date_cutoff_' + str(delay_i) 
    #print(col_name)
    
    working_data_agg = (dates_table
                        .withColumn('date_cutoff', lit(str(delay_i)))
                        .groupBy('year','month','date_cutoff',str(col_name), 'portofunlading')
                        .agg(f.round(f.expr('sum(volumeteu)'),3).alias('sum_teu'))
                        )

    #getting teu total for month
    working_data_total = (working_data_agg
                            .groupBy('year','month','date_cutoff', 'portofunlading')
                            .agg(f.round(f.expr('sum(sum_teu)'),3).alias('sum_teu_total')))
    
    #getting percent delay per month and joining with true monthly
    working_data_percent = (working_data_total
                             .join(working_data_agg, how='left',on=['year','month','date_cutoff', 'portofunlading'])
                             .filter(col(col_name) == False)
                             .withColumn("date_delay_percent",(1-(col("sum_teu")/col('sum_teu_total')))*100) 
                             .withColumnRenamed(col_name, 'date_cutoff_num'))
    
    #exporting dataframe into folder
    working_data_percent.toPandas().reset_index(drop = True).to_csv(os.path.join(current_program + '/' + program_name + 
                                                                                   '-pull_' +  str(col_name) +  '.csv')) 
