In [1]:
from pyspark import SparkContext
sc = SparkContext("local", "MyApp")

23/11/25 16:14:04 WARN Utils: Your hostname, me-virtual-machine resolves to a loopback address: 127.0.1.1; using 192.168.42.129 instead (on interface ens33)
23/11/25 16:14:04 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/11/25 16:14:05 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
23/11/25 16:14:08 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


In [16]:
# Create the SparkSession
from pyspark.sql import SparkSession

spark = SparkSession.builder.master("local[1]") \
                    .appName('Shipment data ETL') \
                    .getOrCreate()

In [60]:
df_customers = spark.read.csv('Customers.csv', header=True, inferSchema=True)
df_shipments = spark.read.csv('Shipments.csv', header=True, inferSchema=True)
df_shipments = df_shipments.withColumnRenamed("Reached.on.Time_Y.N","Reached_ontime")
df_customers.printSchema()
df_shipments.printSchema()

root
 |-- Id: integer (nullable = true)
 |-- Gender: string (nullable = true)
 |-- Age: integer (nullable = true)

root
 |-- ShipmentID: integer (nullable = true)
 |-- Warehouse_block: string (nullable = true)
 |-- Mode_of_Shipment: string (nullable = true)
 |-- Customer_care_calls: integer (nullable = true)
 |-- Customer_rating: integer (nullable = true)
 |-- Cost_of_the_Product: integer (nullable = true)
 |-- Product_importance: string (nullable = true)
 |-- Gender: string (nullable = true)
 |-- Discount_offered: integer (nullable = true)
 |-- Weight_in_gms: integer (nullable = true)
 |-- Reached_ontime: integer (nullable = true)
 |-- Customer_id: integer (nullable = true)
 |-- Delivery_start: date (nullable = true)
 |-- Delivery_end: date (nullable = true)



In [42]:
print((df_shipments.count(), len(df_shipments.columns)))

(10999, 14)


In [41]:
print((df_customers.count(), len(df_customers.columns)))

(500, 3)


In [61]:
# cleaning operation on shipments
df_shipments = df_shipments.filter(df_shipments.Cost_of_the_Product>0)

In [62]:
# cleaning operations on customers
df_customers = df_customers.filter((df_customers.Age>0) & (df_customers.Age<99))
df_customers = df_customers.filter(df_customers.Gender.isin(['M', 'F']))

In [88]:
# cache dataframes for repeated use
df_shipments.cache()
df_customers.cache()

df_shipments.createOrReplaceTempView('shipments')
df_customers.createOrReplaceTempView('customers')

23/11/25 17:34:22 WARN CacheManager: Asked to cache already cached data.
23/11/25 17:34:22 WARN CacheManager: Asked to cache already cached data.


In [66]:
# WarehouseBlockDim table

WarehouseBlockDim = spark.sql(
    """
    select
    Warehouse_block as WarehouseBlock,
    sum
    (
    case 
        when Reached_ontime = 1 then 1
        else 0
    end
    ) as TotalEarlyShipments,
    sum
    (
    case 
        when Reached_ontime = 0 then 1
        else 0
    end
    ) as TotalLateShipments
    from shipments
    group by Warehouse_block
    """
)
WarehouseBlockDim.show()

+--------------+-------------------+------------------+
|WarehouseBlock|TotalEarlyShipments|TotalLateShipments|
+--------------+-------------------+------------------+
|             F|               2194|              1472|
|             B|               1104|               729|
|             D|               1096|               738|
|             C|               1094|               739|
|             A|               1075|               758|
+--------------+-------------------+------------------+



In [87]:
# ShipmentEmbarkMonth table
WarehouseBlockDim = spark.sql(
    """
    select
    year(Delivery_start) as Year,
    month(Delivery_start) as Month,
    sum
    (
    case
        when Mode_of_Shipment = 'Flight' then 1
        else 0
    end
    ) as TotalFlightShipments,
    sum
    (
    case
        when Mode_of_Shipment = 'Ship' then 1
        else 0
    end
    ) as TotalShipShipments,
    sum
    (
    case
        when Mode_of_Shipment = 'Road' then 1
        else 0
    end
    ) as TotalRoadShipments
    
    from shipments
    group by Year, Month
    """
)
WarehouseBlockDim.show()

+----+-----+--------------------+------------------+------------------+
|Year|Month|TotalFlightShipments|TotalShipShipments|TotalRoadShipments|
+----+-----+--------------------+------------------+------------------+
|2022|   10|                 163|               663|               143|
|2022|    2|                 128|               555|               149|
|2022|    7|                 155|               654|               154|
|2022|   11|                 146|               621|               137|
|2022|    3|                 144|               649|               147|
|2022|    1|                 140|               636|               149|
|2022|    5|                 158|               617|               150|
|2022|    6|                 149|               622|               135|
|2022|    9|                 149|               654|               150|
|2022|    4|                 139|               566|               133|
|2022|   12|                 133|               596|            

In [98]:
shipment_customer = spark.sql(
    """
    select 
    s.*,
    c.age
    from shipments as s left join customers as c
    on s.Customer_id = c.id
    """
)
shipment_customer.show(1)
shipment_customer.cache()
shipment_customer.createOrReplaceTempView('shipment_customer')

+----------+---------------+----------------+-------------------+---------------+-------------------+------------------+------+----------------+-------------+--------------+-----------+--------------+------------+---+
|ShipmentID|Warehouse_block|Mode_of_Shipment|Customer_care_calls|Customer_rating|Cost_of_the_Product|Product_importance|Gender|Discount_offered|Weight_in_gms|Reached_ontime|Customer_id|Delivery_start|Delivery_end|age|
+----------+---------------+----------------+-------------------+---------------+-------------------+------------------+------+----------------+-------------+--------------+-----------+--------------+------------+---+
|         1|              D|          Flight|                  4|              2|                177|               low|     F|              44|         1233|             1|        403|    2022-05-18|  2022-06-12| 47|
+----------+---------------+----------------+-------------------+---------------+-------------------+------------------+------+-

In [97]:
# ShipmentModeDim
ShipmentModeDim = spark.sql(
    """
    select
    Mode_of_Shipment as ModeOfShipment,
    sum
    (
    case 
        when Reached_ontime = 1 then 1
        else 0
    end
    ) as TotalEarlyShipments,
    sum
    (
    case 
        when Reached_ontime = 0 then 1
        else 0
    end
    ) as TotalLateShipments
    from shipment_customer
    group by ModeOfShipment
    """
)
ShipmentModeDim.show()

+--------------+-------------------+------------------+
|ModeOfShipment|TotalEarlyShipments|TotalLateShipments|
+--------------+-------------------+------------------+
|          Road|               1035|               725|
|          Ship|               4459|              3003|
|        Flight|               1069|               708|
+--------------+-------------------+------------------+



In [101]:
#CustomerDim
CustomerDim = spark.sql(
    """
    select
    Customer_id as CustomerID,
    Gender,
    age as Age,
    avg(Customer_care_calls) as AvgCalls,
    avg(Customer_rating) as AvgRaRating,
    count(*) as TotalPurchases
    from shipment_customer
    group by CustomerID, Gender, age
    """
)
CustomerDim.show(10)

[Stage 126:>                                                        (0 + 1) / 1]

+----------+------+---+-----------------+------------------+--------------+
|CustomerID|Gender|Age|         AvgCalls|       AvgRaRating|TotalPurchases|
+----------+------+---+-----------------+------------------+--------------+
|       260|     M| 31|4.071428571428571|3.2857142857142856|            14|
|       253|     M| 25|              4.0|              2.25|            12|
|       117|     M| 19|4.076923076923077| 3.230769230769231|            13|
|       423|     F| 46|              5.0|             3.125|             8|
|       407|     M| 23|4.285714285714286|2.9285714285714284|            14|
|        65|     M| 35|            4.625|               3.0|             8|
|       375|     F| 35|4.111111111111111|               2.0|             9|
|       297|     M| 30|4.176470588235294|3.2941176470588234|            17|
|       163|     M| 27|              3.8|               2.0|             5|
|       335|     M| 40|              4.0|3.8181818181818183|            11|
+----------+

                                                                                

In [108]:
# Shipment table
Shipment = spark.sql(
    """
    select 
    ShipmentId,
    customer_id as CustomerID,
    warehouse_block as WarehouseBlock,
    mode_of_shipment as ModeOfShipment,
    customer_care_calls as CustomerCareCalls,
    Customer_rating as CustomerRating,
    cost_of_the_product as Cost,
    product_importance as ProductImportance,
    discount_offered as DiscountOffered,
    weight_in_gms as WeightInGrams,
    delivery_start as ShipmentStartDate,
    delivery_end as ShipmentEndDate,
    reached_ontime as ReachedOnTime
    from shipment_customer
    """
)
Shipment.show(10)

+----------+----------+--------------+--------------+-----------------+--------------+----+-----------------+---------------+-------------+-----------------+---------------+-------------+
|ShipmentId|CustomerID|WarehouseBlock|ModeOfShipment|CustomerCareCalls|CustomerRating|Cost|ProductImportance|DiscountOffered|WeightInGrams|ShipmentStartDate|ShipmentEndDate|ReachedOnTime|
+----------+----------+--------------+--------------+-----------------+--------------+----+-----------------+---------------+-------------+-----------------+---------------+-------------+
|         1|       403|             D|        Flight|                4|             2| 177|              low|             44|         1233|       2022-05-18|     2022-06-12|            1|
|         2|       430|             F|        Flight|                4|             5| 216|              low|             59|         3088|       2022-10-17|     2022-11-07|            1|
|         3|        34|             A|        Flight|       