In [16]:

import csv
import json
import pymongo
import re
import os
from datetime import datetime, timedelta
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType,StructField, StringType, DoubleType, TimestampType, IntegerType, ArrayType
from pyspark.sql.functions import col, udf, lag, lit, when, unix_timestamp, count, avg, median, first, to_timestamp
from pyspark.sql import functions as F
from pyspark.sql.window import Window
from geopy import distance
import numpy as np
import pickle

In [2]:
spark = SparkSession.builder \
    .appName("MyApp") \
    .config("spark.sql.repl.eagerEval.enabled", "true") \
    .config("spark.driver.memory", "2g") \
    .master("local[*]") \
    .getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/12/23 00:56:35 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [3]:
countries_dict = {
    "AF": "Afghanistan",
    "AX": "Åland Islands",
    "AL": "Albania",
    "DZ": "Algeria",
    "AS": "American Samoa",
    "AD": "Andorra",
    "AO": "Angola",
    "AI": "Anguilla",
    "AQ": "Antarctica",
    "AG": "Antigua and Barbuda",
    "AR": "Argentina",
    "AM": "Armenia",
    "AW": "Aruba",
    "AU": "Australia",
    "AT": "Austria",
    "AZ": "Azerbaijan",
    "BS": "Bahamas",
    "BH": "Bahrain",
    "BD": "Bangladesh",
    "BB": "Barbados",
    "BY": "Belarus",
    "BE": "Belgium",
    "BZ": "Belize",
    "BJ": "Benin",
    "BM": "Bermuda",
    "BT": "Bhutan",
    "BO": "Bolivia",
    "BQ": "Bonaire, Sint Eustatius and Saba",
    "BA": "Bosnia and Herzegovina",
    "BW": "Botswana",
    "BR": "Brazil",
    "IO": "British Indian Ocean Territory",
    "BN": "Brunei Darussalam",
    "BG": "Bulgaria",
    "BF": "Burkina Faso",
    "BI": "Burundi",
    "KH": "Cambodia",
    "CM": "Cameroon",
    "CA": "Canada",
    "CV": "Cape Verde",
    "KY": "Cayman Islands",
    "CF": "Central African Republic",
    "TD": "Chad",
    "CL": "Chile",
    "CN": "China",
    "CX": "Christmas Island",
    "CC": "Cocos (Keeling) Islands",
    "CO": "Colombia",
    "KM": "Comoros",
    "CG": "Congo",
    "CD": "Congo, The Democratic Republic of the",
    "CK": "Cook Islands",
    "CR": "Costa Rica",
    "CI": "Côte d'Ivoire",
    "HR": "Croatia",
    "CU": "Cuba",
    "CW": "Curaçao",
    "CY": "Cyprus",
    "CZ": "Czech Republic",
    "DK": "Denmark",
    "DJ": "Djibouti",
    "DM": "Dominica",
    "DO": "Dominican Republic",
    "EC": "Ecuador",
    "EG": "Egypt",
    "SV": "El Salvador",
    "GQ": "Equatorial Guinea",
    "ER": "Eritrea",
    "EE": "Estonia",
    "SZ": "Eswatini",
    "ET": "Ethiopia",
    "FK": "Falkland Islands (Malvinas)",
    "FO": "Faroe Islands",
    "FJ": "Fiji",
    "FI": "Finland",
    "FR": "France",
    "GF": "French Guiana",
    "PF": "French Polynesia",
    "TF": "French Southern Territories",
    "GA": "Gabon",
    "GM": "Gambia",
    "GE": "Georgia",
    "DE": "Germany",
    "GH": "Ghana",
    "GI": "Gibraltar",
    "GR": "Greece",
    "GL": "Greenland",
    "GD": "Grenada",
    "GP": "Guadeloupe",
    "GU": "Guam",
    "GT": "Guatemala",
    "GG": "Guernsey",
    "GN": "Guinea",
    "GW": "Guinea-Bissau",
    "GY": "Guyana",
    "HT": "Haiti",
    "HM": "Heard Island and McDonald Islands",
    "VA": "Holy See (Vatican City State)",
    "HN": "Honduras",
    "HK": "Hong Kong",
    "HU": "Hungary",
    "IS": "Iceland",
    "IN": "India",
    "ID": "Indonesia",
    "XZ": "Installations in International Waters",
    "IR": "Iran",
    "IQ": "Iraq",
    "IE": "Ireland",
    "IM": "Isle of Man",
    "IL": "Israel",
    "IT": "Italy",
    "JM": "Jamaica",
    "JP": "Japan",
    "JE": "Jersey",
    "JO": "Jordan",
    "KZ": "Kazakhstan",
    "KE": "Kenya",
    "KI": "Kiribati",
    "KP": "Korea, Democratic People's Republic of",
    "KR": "Korea, Republic of",
    "KW": "Kuwait",
    "KG": "Kyrgyzstan",
    "LA": "Lao People's Democratic Republic",
    "LV": "Latvia",
    "LB": "Lebanon",
    "LS": "Lesotho",
    "LR": "Liberia",
    "LY": "Libya",
    "LI": "Liechtenstein",
    "LT": "Lithuania",
    "LU": "Luxembourg",
    "MO": "Macao",
    "MG": "Madagascar",
    "MW": "Malawi",
    "MY": "Malaysia",
    "MV": "Maldives",
    "ML": "Mali",
    "MT": "Malta",
    "MH": "Marshall Islands",
    "MQ": "Martinique",
    "MR": "Mauritania",
    "MU": "Mauritius",
    "YT": "Mayotte",
    "MX": "Mexico",
    "FM": "Micronesia, Federated States of",
    "MD": "Moldova",
    "MC": "Monaco",
    "MN": "Mongolia",
    "ME": "Montenegro",
    "MS": "Montserrat",
    "MA": "Morocco",
    "MZ": "Mozambique",
    "MM": "Myanmar",
    "NA": "Namibia",
    "NR": "Nauru",
    "NP": "Nepal",
    "NL": "Netherlands",
    "NC": "New Caledonia",
    "NZ": "New Zealand",
    "NI": "Nicaragua",
    "NE": "Niger",
    "NG": "Nigeria",
    "NU": "Niue",
    "NF": "Norfolk Island",
    "MK": "North Macedonia",
    "MP": "Northern Mariana Islands",
    "NO": "Norway",
    "OM": "Oman",
    "PK": "Pakistan",
    "PW": "Palau",
    "PS": "Palestine",
    "PA": "Panama",
    "PG": "Papua New Guinea",
    "PY": "Paraguay",
    "PE": "Peru",
    "PH": "Philippines",
    "PN": "Pitcairn",
    "PL": "Poland",
    "PT": "Portugal",
    "PR": "Puerto Rico",
    "QA": "Qatar",
    "RE": "Reunion",
    "RO": "Romania",
    "RU": "Russia",
    "RW": "Rwanda",
    "BL": "Saint Barthélemy",
    "SH": "Saint Helena, Ascension and Tristan Da Cunha",
    "KN": "Saint Kitts and Nevis",
    "LC": "Saint Lucia",
    "MF": "Saint Martin (French Part)",
    "PM": "Saint Pierre and Miquelon",
    "VC": "Saint Vincent and the Grenadines",
    "WS": "Samoa",
    "SM": "San Marino",
    "ST": "Sao Tome and Principe",
    "SA": "Saudi Arabia",
    "SN": "Senegal",
    "RS": "Serbia",
    "SC": "Seychelles",
    "SL": "Sierra Leone",
    "SG": "Singapore",
    "SX": "Sint Maarten (Dutch Part)",
    "SK": "Slovakia",
    "SI": "Slovenia",
    "SB": "Solomon Islands",
    "SO": "Somalia",
    "ZA": "South Africa",
    "GS": "South Georgia and the South Sandwich Islands",
    "SS": "South Sudan",
    "ES": "Spain",
    "LK": "Sri Lanka",
    "SD": "Sudan",
    "SR": "Suriname",
    "SJ": "Svalbard and Jan Mayen",
    "SE": "Sweden",
    "CH": "Switzerland",
    "SY": "Syrian Arab Republic",
    "TW": "Taiwan",
    "TJ": "Tajikistan",
    "TZ": "Tanzania",
    "TH": "Thailand",
    "TL": "Timor-Leste",
    "TG": "Togo",
    "TK": "Tokelau",
    "TO": "Tonga",
    "TT": "Trinidad and Tobago",
    "TN": "Tunisia",
    "TR": "Turkey",
    "TM": "Turkmenistan",
    "TC": "Turks and Caicos Islands",
    "TV": "Tuvalu",
    "UG": "Uganda",
    "UA": "Ukraine",
    "AE": "United Arab Emirates",
    "GB": "United Kingdom",
    "US": "United States",
    "UM": "United States Minor Outlying Islands",
    "UY": "Uruguay",
    "UZ": "Uzbekistan",
    "VU": "Vanuatu",
    "VE": "Venezuela",
    "VN": "Vietnam",
    "VG": "Virgin Islands, British",
    "VI": "Virgin Islands, U.S.",
    "WF": "Wallis and Futuna",
    "EH": "Western Sahara",
    "YE": "Yemen",
    "ZM": "Zambia",
    "ZW": "Zimbabwe"
}

map_col = F.create_map([F.lit(x) for i in countries_dict.items() for x in i])


In [8]:
volza_df = spark.read.csv('magnesium/magnesium.csv', header=True)
# china_ais_df = spark.read.csv('data/china-ais-sample.csv', header=True, sep='|')
port_events_itinerary = spark.read.csv('/Users/harshdeepsingh/ASU/Lab_V2/GSN/data/port_events_itinerary.csv',sep='|', header=True)

In [9]:
country_codes = port_events_itinerary.withColumn(
    "origin_country_code", col("departure_port").substr(1, 2)
).withColumn("destination_country_code", col("arrival_port").substr(1, 2))

volza_df = volza_df.withColumn("Date", col("Date").cast(TimestampType()))

In [10]:
# intinerary_df = country_codes.filter((col("origin_country_code") == "CN") | (col("destination_country_code") == "CN")).filter(col("origin_country_code") != col("destination_country_code"))
intinerary_df = country_codes

In [14]:
#If want to filter by country, else comment out
china_volza_df = volza_df.filter((volza_df["Country of Origin"] == "China") | (volza_df["Country of Destination"] == "China")).orderBy("Date")
volza_df = china_volza_df

In [12]:

intinerary_df = intinerary_df.withColumn("arrival_time", col("arrival_time").cast(TimestampType())).withColumn("origin_country", map_col[col("origin_country_code")]).withColumn("destination_country", map_col[col("destination_country_code")])

In [13]:
intinerary_df.show()

+---------+--------------------+------------+------------+--------------+--------------------+--------------+-------------------+------------------------+--------------+-------------------+
|     mmsi|        arrival_time|arrival_port|arrival_area|departure_port|      departure_time|departure_area|origin_country_code|destination_country_code|origin_country|destination_country|
+---------+--------------------+------------+------------+--------------+--------------------+--------------+-------------------+------------------------+--------------+-------------------+
|100000449| 2022-12-21 02:38:57|       CNLGY|     Longyan|         CNSHD|2022-12-16T14:04:...|        Shidao|                 CN|                      CN|         China|              China|
|201100143| 2022-10-18 12:46:53|       ITPMO|     Palermo|         NOBGO|2022-09-29T12:08:...|        Bergen|                 NO|                      IT|        Norway|              Italy|
|201100143|2022-10-31 21:42:...|       ALDRZ|     

In [15]:
final_df = intinerary_df.join(volza_df, (intinerary_df["arrival_time"] >= volza_df["Date"]) & (intinerary_df["arrival_time"] <= volza_df["Date"] + timedelta(days=1)) & (intinerary_df["destination_country"] == volza_df["Country of Destination"]) & (intinerary_df["origin_country"] == volza_df["Country of Origin"]), how="inner")

print(final_df.dropDuplicates().count())
# final_df = final_df.select("mmsi","arrival_time","arrival_port","origin_country","destination_country","Country of Origin","Country of Destination","Date")

23/12/22 22:10:21 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
23/12/22 22:10:21 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: , Date, HS Code, Product Description, Consignee, Notify Party Name, Shipper, Std. Quantity, Std. Unit, Standard Unit Rate INR, Value, Country of Origin, Country of Destination, Port of Destination, Quantity, Unit, Unit Rate $, Actual Duty, Port of Origin, Shipment Mode, Measurment, Bill of Entry No, Port of Delivery, Container TEU, Freight Term, Marks Number, HS Product Description, Gross Weight, Consignee Address, Shipper Address, Notify Party Address, Country Name
 Schema: _c0, Date, HS Code, Product Description, Consignee, Notify Party Name, Shipper, Std. Quantity, Std. Unit, Standard Unit Rate INR, Value, Country of Origin, Country of Destination, Port of Destination, Quantity, Unit, Unit Rate $, Actual Duty, Port

4756


                                                                                

In [12]:
final_df.orderBy("arrival_time").coalesce(1).write.csv("data/itinerary_volza_full.csv", sep="|", header=True, mode="overwrite")

23/12/22 02:00:10 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: , Date, HS Code, Product Description, Consignee, Notify Party Name, Shipper, Std. Quantity, Std. Unit, Standard Unit Rate INR, Value, Country of Origin, Country of Destination, Port of Destination, Quantity, Unit, Unit Rate $, Actual Duty, Port of Origin, Shipment Mode, Measurment, Bill of Entry No, Port of Delivery, Container TEU, Freight Term, Marks Number, HS Product Description, Gross Weight, Consignee Address, Shipper Address, Notify Party Address, Country Name
 Schema: _c0, Date, HS Code, Product Description, Consignee, Notify Party Name, Shipper, Std. Quantity, Std. Unit, Standard Unit Rate INR, Value, Country of Origin, Country of Destination, Port of Destination, Quantity, Unit, Unit Rate $, Actual Duty, Port of Origin, Shipment Mode, Measurment, Bill of Entry No, Port of Delivery, Container TEU, Freight Term, Marks Number, HS Product Description, Gross Weight, Consignee Address, Shipp

23/12/22 04:09:19 WARN HeartbeatReceiver: Removing executor driver with no recent heartbeats: 550338 ms exceeds timeout 120000 ms
23/12/22 04:09:19 WARN SparkContext: Killing executors is not supported by current scheduler.
23/12/22 04:14:05 ERROR Inbox: Ignoring error
org.apache.spark.SparkException: Exception thrown in awaitResult: 
	at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:322)
	at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
	at org.apache.spark.rpc.RpcEnv.setupEndpointRefByURI(RpcEnv.scala:102)
	at org.apache.spark.rpc.RpcEnv.setupEndpointRef(RpcEnv.scala:110)
	at org.apache.spark.util.RpcUtils$.makeDriverRef(RpcUtils.scala:36)
	at org.apache.spark.storage.BlockManagerMasterEndpoint.driverEndpoint$lzycompute(BlockManagerMasterEndpoint.scala:117)
	at org.apache.spark.storage.BlockManagerMasterEndpoint.org$apache$spark$storage$BlockManagerMasterEndpoint$$driverEndpoint(BlockManagerMasterEndpoint.scala:116)
	at org.apache.spark.storage.B

### Add Price

In [63]:
ais_volza_df = spark.read.csv("magnesium/ais-volza-magnesium.csv", header=True, sep="|")
mag_price_df = spark.read.csv("magnesium/magnesium_price.csv", header=True)

In [64]:
#Formatting the date and price for Price data
date_format = "MMM dd, yyyy"
mag_price_df = mag_price_df.withColumn("fDate", to_timestamp(col("Date"), date_format).cast(TimestampType()))
mag_price_df = mag_price_df.withColumn("formatted_price", F.regexp_replace("Price", ",", ""))
mag_price_df = mag_price_df.withColumn("Magnesium Spot Price", col("formatted_price").cast("double")).drop("formatted_price","Price","Change %","Vol.","Low","High","Open","Date")

In [73]:
price_volza_df = ais_volza_df.join(mag_price_df, (ais_volza_df["Date"] == mag_price_df["fDate"]), how="left").drop("fDate")
price_volza_df.count()

31849

In [72]:
price_volza_df.coalesce(1).write.csv("ais_volza_price", sep="|", header=True, mode="overwrite")