In [0]:
%python
spark.conf.set("fs.azure.account.auth.type.datawarehouse11.dfs.core.windows.net", "SAS")
spark.conf.set("fs.azure.sas.token.provider.type.datawarehouse11.dfs.core.windows.net", "org.apache.hadoop.fs.azurebfs.sas.FixedSASTokenProvider")
spark.conf.set("fs.azure.sas.fixed.token.datawarehouse11.dfs.core.windows.net", "sv=2022-11-02&ss=bfqt&srt=sco&sp=rwdlacupiytfx&se=2025-03-12T03:18:46Z&st=2025-03-11T19:18:46Z&spr=https&sig=d6JLH8JNJ7BLcvYkjFkHKK%2Ftb04X%2BA6sq6kw9bbj%2BTk%3D")




# Data Extraction from  Azure Blob Storage

In [0]:
path_shipment = "wasbs://demo-container@datawarehouse11.blob.core.windows.net/Freight_Shipments.csv"
df_shipment = spark.read.csv(path_shipment,inferSchema = True, header= True )
df_shipment.show(10)

path_border = "wasbs://demo-container@datawarehouse11.blob.core.windows.net/Border_Crossing.csv"
df_border = spark.read.csv(path_border,inferSchema = True, header= True )
df_border.show(10)

path_port = "wasbs://demo-container@datawarehouse11.blob.core.windows.net/Port_Data.csv"
df_port = spark.read.csv(path_port,inferSchema = True, header= True )
df_port.show(10)



+----------------+----+--------------------+----+------+--------------------+-------+-----------+---------------------------+--------------------+
|Visualization ID|Mode|           Statistic|Year| Value|               Units|Percent|Fiscal Year|% Change from Previous Year|Statistic Short Name|
+----------------+----+--------------------+----+------+--------------------+-------+-----------+---------------------------+--------------------+
|        07_03_01|null|      Electric Power|1960|240.97|Million Barrels p...|   null|       1960|                       null|Petroleum Consump...|
|        07_03_01|null|          Industrial|1960|   2.7|Million Barrels p...|   null|       1960|                       null|Petroleum Consump...|
|        07_03_01|null|Residential and C...|1960|   1.7|Million Barrels p...|   null|       1960|                       null|Petroleum Consump...|
|        07_03_01|null|      Transportation|1960|   5.1|Million Barrels p...|   null|       1960|                     

# SQL Database creation

In [0]:
spark.sql("CREATE DATABASE IF NOT EXISTS Logistics_db")
spark.sql("USE Logistics_db")

Out[3]: DataFrame[]

#Shipments


## Shipments Preprocessing
###We remove all freight instances in which the mode of transport was not 
- Water
- Rail
- Air and truck-air
- Air 
- Pipeline
- Highway

In [0]:
from pyspark.sql.functions import col, regexp_extract

df_shipment_filtered = df_shipment.select(
    col("Mode"),
    col("Statistic"),
    col("Year"),
    col("Value"),
    col("Units"),
    col("Statistic Short Name").alias("StatisticShortName")).filter(
        (col("Mode")=="Water")  | 
        (col("Mode")=="Rail") | 
        (col("Mode")=="Air and truck-air") | 
        (col("Mode")=="Air") | 
        (col("Mode")=="Pipeline") | 
        (col("Mode")=="Highway"))
 
df_shipment_filtered.count()

Out[4]: 614

## Shipments Table Creation


In [0]:

df_shipment_filtered.write.mode("overwrite").saveAsTable("Logistics_db.Shipments")


# Border

## Border Preprocessing
### We will remove the column 'Point' since it the information can be infered from Latitude and Longitude 

In [0]:
from pyspark.sql.functions import length

df_border_filter = df_border.select(
    col("Port Name").alias("PortName"),
    col("State"),
    col("Port Code").alias("PortCode"),
    col("Border"),
    col("Date"),
    col("Measure"),
    col("Value"),
    col("Latitude"),
    col("Longitude")).filter(
        col("Port Name").isNotNull() &
        col("State").isNotNull() &
        col("Port Code").isNotNull() &
        col("Border").isNotNull() &
        col("Date").isNotNull() &
        col("Measure").isNotNull() & 
        col("Value").isNotNull() & 
        col("Latitude").isNotNull() &
        col("Longitude").isNotNull() &
         # Ensure 'State' and 'Border' do not contain numbers
        col("State").rlike("^[^0-9]*$") &
        col("Border").rlike("^[^0-9]*$") &
        
        # Ensure 'Date' contains at least one four-digit number
        col("Date").rlike(".*\\d{4}.*") &

        (length(col("Port Name")) > 4)
        )
df_border_filter.count()

Out[6]: 390380

In [0]:
df_border_filter.show(10)

+------------+---------+--------+----------------+--------+--------------------+------+--------+---------+
|    PortName|    State|PortCode|          Border|    Date|             Measure| Value|Latitude|Longitude|
+------------+---------+--------+----------------+--------+--------------------+------+--------+---------+
|     Jackman|    Maine|    0104|US-Canada Border|Jan 2024|              Trucks|  6556|  45.806|  -70.397|
|    Porthill|    Idaho|    3308|US-Canada Border|Apr 2024|              Trucks|    98|  49.000| -116.499|
|    San Luis|  Arizona|    2608|US-Mexico Border|Apr 2024|               Buses|    10|  32.485| -114.782|
|Willow Creek|  Montana|    3325|US-Canada Border|Jan 2024|         Pedestrians|     2|  49.000| -109.731|
|     Warroad|Minnesota|    3423|US-Canada Border|Jan 2024|Personal Vehicle ...|  9266|  48.999|  -95.377|
|    Whitlash|  Montana|    3321|US-Canada Border|Jan 2024|   Personal Vehicles|    29|  48.997| -111.258|
|      Ysleta|    Texas|    2401|US-M

## Border Table Creation


In [0]:
df_border_filter.write.mode("overwrite").saveAsTable("Logistics_db.Borders")

# Port

## Port Preprocessing

In [0]:
df_port_filtered = df_port.select(
    col("Cargo Type").alias("CargoType"),
    col("Port ID").alias("PortID"),
    col("Port_Name").alias("PortName"),
    col("Region"),
    col("Reporting Year").alias("ReportingYear"),
    col("State"),
    col("Trade Type").alias("TradeType"),
    col("Units"),
    col("Port Ranking").alias("PortRanking"),
    col("Percent Change").alias("PercentChange"),
    col("Volume")
    ).filter(
        col("Cargo Type").isNotNull() &
        col("Port ID").isNotNull() &
        col("Port_Name").isNotNull() &
        col("Region").isNotNull() &
        col("Reporting Year").isNotNull() &
        col("State").isNotNull() & 
        col("Trade Type").isNotNull() &
        col("Units").isNotNull() &
        col("Port Ranking").isNotNull() &
        col("Percent Change").isNotNull() &
        col("Volume").isNotNull()
    )

df_port_filtered.count()

Out[9]: 3490

## Port Table Creation

In [0]:
df_port_filtered.write.mode("overwrite").saveAsTable("Logistics_db.Ports")


In [0]:
df_port_filtered.select("State").distinct().show(10)
