In [29]:
from pyspark.sql import SparkSession
from pyspark.sql.types import TimestampType, StructType, StringType, IntegerType, FloatType, StructField
from pyspark.sql.functions import lit, concat, col, regexp_replace, collect_list

In [30]:
ss = SparkSession \
    .builder \
    .master("local[*]") \
    .appName("us_import_sample") \
    .getOrCreate()

In [31]:
cargo_desc_schema = StructType([
    StructField("identifier", StringType()),
    StructField("container_number", StringType()),
    StructField("description_sequence_number", IntegerType()),
    StructField("piece_count", IntegerType()),
    StructField("description_text", StringType())
])

In [32]:
cargo_desc = ss.read \
    .option("header", True) \
    .option("escape", '"') \
    .csv(
        './ams/2020/202001201500/ams__cargodesc_2020__202001201500.csv',
        schema=cargo_desc_schema
    )

In [33]:
cargo_desc.printSchema()

root
 |-- identifier: string (nullable = true)
 |-- container_number: string (nullable = true)
 |-- description_sequence_number: integer (nullable = true)
 |-- piece_count: integer (nullable = true)
 |-- description_text: string (nullable = true)



In [34]:
cargo_desc.createOrReplaceTempView("cargo_desc")

In [35]:
hazmat_schema = StructType([
    StructField("identifier", StringType()),
    StructField("container_number", StringType()),
    StructField("hazmat_sequence_number", IntegerType()),
    StructField("hazmat_code", StringType()),
    StructField("hazmat_class", StringType()),
    StructField("hazmat_code_qualifier", StringType()),
    StructField("hazmat_contact", StringType()),
    StructField("hazmat_page_number", StringType()),
    StructField("hazmat_flash_point_temperature", StringType()),
    StructField("hazmat_flash_point_temperature_negative_ind", StringType()),
    StructField("hazmat_flash_point_temperature_unit", StringType()),
    StructField("hazmat_description", StringType())
])

In [36]:
hazmat = ss.read \
    .option("header", True) \
    .option("escape", '"') \
    .csv(
        './ams/2020/202001201500/ams__hazmat_2020__202001201500.csv',
        schema=hazmat_schema
    )

In [37]:
hazmat.printSchema()

root
 |-- identifier: string (nullable = true)
 |-- container_number: string (nullable = true)
 |-- hazmat_sequence_number: integer (nullable = true)
 |-- hazmat_code: string (nullable = true)
 |-- hazmat_class: string (nullable = true)
 |-- hazmat_code_qualifier: string (nullable = true)
 |-- hazmat_contact: string (nullable = true)
 |-- hazmat_page_number: string (nullable = true)
 |-- hazmat_flash_point_temperature: string (nullable = true)
 |-- hazmat_flash_point_temperature_negative_ind: string (nullable = true)
 |-- hazmat_flash_point_temperature_unit: string (nullable = true)
 |-- hazmat_description: string (nullable = true)



In [38]:
hazmat.createOrReplaceTempView("hazmat")

In [39]:
hazmat_class_schema = StructType([
    StructField("identifier", IntegerType()),
    StructField("container_number", StringType()),
    StructField("hazmat_sequence_number", IntegerType()),
    StructField("hazmat_classification", IntegerType())
])

In [40]:
hazmat_class = ss.read \
    .option("header", True) \
    .option("escape", '"') \
    .csv(
        './ams/2020/202001201500/ams__hazmatclass_2020__202001201500.csv',
        schema=hazmat_class_schema
    )

In [41]:
hazmat_class.printSchema()

root
 |-- identifier: integer (nullable = true)
 |-- container_number: string (nullable = true)
 |-- hazmat_sequence_number: integer (nullable = true)
 |-- hazmat_classification: integer (nullable = true)



In [42]:
hazmat_class.createOrReplaceTempView("hazmat_class")

In [43]:
cargo_table = ss.sql("""
    SELECT 
        c.identifier,
        c.container_number,
        c.description_sequence_number AS sequence_number,
        c.piece_count,
        c.description_text AS description,
        h.hazmat_code,
        (CASE 
            WHEN (h.hazmat_class IS NOT NULL) THEN h.hazmat_class
            ELSE hc.hazmat_classification
        END) AS hazmat_class,
        h.hazmat_code_qualifier,
        h.hazmat_contact,
        h.hazmat_page_number,
        h.hazmat_flash_point_temperature,
        h.hazmat_flash_point_temperature_negative_ind,
        h.hazmat_flash_point_temperature_unit,
        h.hazmat_description
    FROM cargo_desc AS c
    LEFT JOIN hazmat AS h
    ON 
        c.identifier = h.identifier AND 
        c.container_number = h.container_number AND 
        c.description_sequence_number = h.hazmat_sequence_number
    LEFT JOIN hazmat_class AS hc
    ON
        c.identifier = hc.identifier AND 
        c.container_number = hc.container_number AND 
        c.description_sequence_number = hc.hazmat_sequence_number
""")

In [44]:
cargo_table.printSchema()

root
 |-- identifier: string (nullable = true)
 |-- container_number: string (nullable = true)
 |-- sequence_number: integer (nullable = true)
 |-- piece_count: integer (nullable = true)
 |-- description: string (nullable = true)
 |-- hazmat_code: string (nullable = true)
 |-- hazmat_class: string (nullable = true)
 |-- hazmat_code_qualifier: string (nullable = true)
 |-- hazmat_contact: string (nullable = true)
 |-- hazmat_page_number: string (nullable = true)
 |-- hazmat_flash_point_temperature: string (nullable = true)
 |-- hazmat_flash_point_temperature_negative_ind: string (nullable = true)
 |-- hazmat_flash_point_temperature_unit: string (nullable = true)
 |-- hazmat_description: string (nullable = true)

