In [0]:
# Check if the directory is already mounted
mount_points = [mount.mountPoint for mount in dbutils.fs.mounts()]

if "/mnt/deliverycenterm" not in mount_points:
    # Mount the directory with the provided configurations
    configs = {
        "fs.azure.account.auth.type": "OAuth",
        "fs.azure.account.oauth.provider.type": "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider",
        "fs.azure.account.oauth2.client.id": "",
        "fs.azure.account.oauth2.client.secret": '',
        "fs.azure.account.oauth2.client.endpoint": "https://login.microsoftonline.com//oauth2/token"
    }

    dbutils.fs.mount(
        source="abfss://@.dfs.core.windows.net", 
        mount_point="/mnt/deliverycenterm", 
        extra_configs=configs
    )
else:
    print("Directory already mounted.")


Directory already mounted.


In [0]:
spark

In [0]:
%fs
ls "/mnt/deliverycenterm"

path,name,size,modificationTime
dbfs:/mnt/deliverycenterm/raw-data/,raw-data/,0,1714540655000
dbfs:/mnt/deliverycenterm/transformed-data/,transformed-data/,0,1714540665000


In [0]:
from pyspark.sql.functions import col,regexp_replace,to_date, to_timestamp,expr,unix_timestamp
from pyspark.sql.types import IntegerType, DoubleType, BooleanType, DateType, TimestampType
from pyspark.sql import DataFrame

In [0]:
channels = spark.read.format("csv").option("header","true").option("inferSchema","true").load("/mnt/deliverycenterm/raw-data/channels.csv")
deliveries = spark.read.format("csv").option("header","true").option("inferSchema","true").load("/mnt/deliverycenterm/raw-data/deliveries.csv")
drivers = spark.read.format("csv").option("header","true").option("inferSchema","true").load("/mnt/deliverycenterm/raw-data/drivers.csv")
hubs = spark.read.format("csv").option("header","true").option("inferSchema","true").load("/mnt/deliverycenterm/raw-data/hubs.csv")
orders = spark.read.format("csv").option("header","true").option("inferSchema","true").load("/mnt/deliverycenterm/raw-data/orders.csv")
payments = spark.read.format("csv").option("header","true").option("inferSchema","true").load("/mnt/deliverycenterm/raw-data/payments.csv")
stores = spark.read.format("csv").option("header","true").option("inferSchema","true").load("/mnt/deliverycenterm/raw-data/stores.csv")

In [0]:
channels.show()

+----------+--------------+------------+
|channel_id|  channel_name|channel_type|
+----------+--------------+------------+
|         1|   OTHER PLACE| OWN CHANNEL|
|         2|   PHONE PLACE| OWN CHANNEL|
|         3|   WHATS PLACE| OWN CHANNEL|
|         4|    FACE PLACE| OWN CHANNEL|
|         5|    FOOD PLACE| MARKETPLACE|
|         6|   STORE PLACE| OWN CHANNEL|
|         7|  BERLIN PLACE| OWN CHANNEL|
|         8|  MADRID PLACE| OWN CHANNEL|
|         9|   THINK PLACE| OWN CHANNEL|
|        10|  LISBON PLACE| OWN CHANNEL|
|        11|   SUPER PLACE| OWN CHANNEL|
|        12|     ALL PLACE| MARKETPLACE|
|        13|VELOCITY PLACE| MARKETPLACE|
|        15|    EATS PLACE| MARKETPLACE|
|        17|   SHOPP PLACE| MARKETPLACE|
|        20|  MUNICH PLACE| MARKETPLACE|
|        21|  LONDON PLACE| MARKETPLACE|
|        23|  ATCHIN PLACE| MARKETPLACE|
|        24|    FULL PLACE| MARKETPLACE|
|        25|      ON PLACE| MARKETPLACE|
+----------+--------------+------------+
only showing top

In [0]:
channels.printSchema()

root
 |-- channel_id: integer (nullable = true)
 |-- channel_name: string (nullable = true)
 |-- channel_type: string (nullable = true)



In [0]:
# Function to check for null values in each column of the DataFrame
def count_nulls(df: DataFrame):
    # Check for null values in each column of the DataFrame
    null_counts = df.select([col(column).isNull().cast("int").alias(column) for column in df.columns])

    # Sum the null counts for each column
    null_counts_dict = {col_name: null_counts.selectExpr(f"sum(`{col_name}`) as count").first().asDict()["count"] for col_name in df.columns}

    # Print the null counts for each column
    for column, count in null_counts_dict.items():
        print(f"Null count in {column}: {count}")




In [0]:
#function to count rows of dataframe
def count_rows(df: DataFrame):
    """
    Counts the number of rows in the given DataFrame and prints the count.
    
    Args:
        df (DataFrame): The PySpark DataFrame to count rows for.
    """
    row_count = df.count()
    print("Number of rows in the DataFrame:", row_count)

In [0]:
#function to remove rows when null consist in a specified column
def remove_rows_with_null(df: DataFrame, column_name: str):
    """
    Removes rows from the given DataFrame where the specified column contains null values.
    
    Args:
        df (DataFrame): The PySpark DataFrame to remove rows from.
        column_name (str): The name of the column to check for null values.
        
    Returns:
        DataFrame: The DataFrame with rows removed where the specified column contains null values.
    """
    return df.filter(df[column_name].isNotNull())

In [0]:
count_nulls(channels)

Null count in channel_id: 0
Null count in channel_name: 0
Null count in channel_type: 0


In [0]:
deliveries.show()

+-----------+-----------------+---------+------------------------+---------------+
|delivery_id|delivery_order_id|driver_id|delivery_distance_meters|delivery_status|
+-----------+-----------------+---------+------------------------+---------------+
|    2174658|         68413340|     8378|                    5199|      DELIVERED|
|    2174660|         68414309|     2473|                     410|      DELIVERED|
|    2174661|         68416230|     7615|                    3784|      DELIVERED|
|    2174663|         68412721|     8378|                    5714|      DELIVERED|
|    2174675|         68414018|    10463|                    3746|      DELIVERED|
|    2174680|         68415103|    16430|                    3924|      DELIVERED|
|    2174693|         68416643|    14513|                    2489|      DELIVERED|
|    2174695|         68417783|     null|                    2564|      CANCELLED|
|    2174696|         68415457|     9996|                     340|      DELIVERED|
|   

In [0]:
deliveries.printSchema()

root
 |-- delivery_id: integer (nullable = true)
 |-- delivery_order_id: integer (nullable = true)
 |-- driver_id: integer (nullable = true)
 |-- delivery_distance_meters: integer (nullable = true)
 |-- delivery_status: string (nullable = true)



In [0]:
count_nulls(deliveries)
count_rows(deliveries)

Null count in delivery_id: 0
Null count in delivery_order_id: 0
Null count in driver_id: 15886
Null count in delivery_distance_meters: 73
Null count in delivery_status: 0
Number of rows in the DataFrame: 378843


In [0]:
deliveries = remove_rows_with_null(deliveries, "driver_id")
count_nulls(deliveries)
count_rows(deliveries)

Null count in delivery_id: 0
Null count in delivery_order_id: 0
Null count in driver_id: 0
Null count in delivery_distance_meters: 29
Null count in delivery_status: 0
Number of rows in the DataFrame: 362957


In [0]:
deliveries = remove_rows_with_null(deliveries, "delivery_distance_meters")
count_nulls(deliveries)
count_rows(deliveries)

Null count in delivery_id: 0
Null count in delivery_order_id: 0
Null count in driver_id: 0
Null count in delivery_distance_meters: 0
Null count in delivery_status: 0
Number of rows in the DataFrame: 362928


In [0]:
drivers.show()

+---------+------------+-----------------+
|driver_id|driver_modal|      driver_type|
+---------+------------+-----------------+
|      133|     MOTOBOY|LOGISTIC OPERATOR|
|      138|     MOTOBOY|        FREELANCE|
|      140|     MOTOBOY|        FREELANCE|
|      143|       BIKER|        FREELANCE|
|      148|     MOTOBOY|        FREELANCE|
|      165|     MOTOBOY|        FREELANCE|
|      172|     MOTOBOY|        FREELANCE|
|      174|       BIKER|        FREELANCE|
|      187|       BIKER|        FREELANCE|
|      196|       BIKER|        FREELANCE|
|      202|       BIKER|        FREELANCE|
|      210|     MOTOBOY|        FREELANCE|
|      217|     MOTOBOY|LOGISTIC OPERATOR|
|      223|     MOTOBOY|        FREELANCE|
|      224|       BIKER|        FREELANCE|
|      225|     MOTOBOY|LOGISTIC OPERATOR|
|      228|     MOTOBOY|        FREELANCE|
|      231|     MOTOBOY|LOGISTIC OPERATOR|
|      243|     MOTOBOY|LOGISTIC OPERATOR|
|      245|     MOTOBOY|        FREELANCE|
+---------+

In [0]:
drivers.printSchema()

root
 |-- driver_id: integer (nullable = true)
 |-- driver_modal: string (nullable = true)
 |-- driver_type: string (nullable = true)



In [0]:
count_nulls(drivers)
count_rows(drivers)

Null count in driver_id: 0
Null count in driver_modal: 0
Null count in driver_type: 0
Number of rows in the DataFrame: 4824


In [0]:
hubs.show()

+------+----------------+--------------+---------+------------+-------------+
|hub_id|        hub_name|      hub_city|hub_state|hub_latitude|hub_longitude|
+------+----------------+--------------+---------+------------+-------------+
|     2|   BLUE SHOPPING|  PORTO ALEGRE|       RS| -30.0474148|    -51.21351|
|     3|  GREEN SHOPPING|  PORTO ALEGRE|       RS| -30.0374149|    -51.20352|
|     4|    RED SHOPPING|  PORTO ALEGRE|       RS| -30.0219481|  -51.2083816|
|     5|   FUNK SHOPPING|RIO DE JANEIRO|       RJ| -23.0007498|   -43.318282|
|     8| GOLDEN SHOPPING|RIO DE JANEIRO|       RJ|  -22.921475|   -43.234774|
|    13|HIP HOP SHOPPING|RIO DE JANEIRO|       RJ| -22.8858199|  -43.2792183|
|    16| PEOPLE SHOPPING|RIO DE JANEIRO|       RJ| -23.0174723|  -43.4799389|
|    17|  SMALL SHOPPING|     S�O PAULO|       SP| -23.5920041|  -46.6365035|
|    18|   STAR SHOPPING|RIO DE JANEIRO|       RJ| -22.9454948|  -43.1821807|
|    20| PURPLE SHOPPING|RIO DE JANEIRO|       RJ|  -22.996848| 

In [0]:
hubs.printSchema()

root
 |-- hub_id: integer (nullable = true)
 |-- hub_name: string (nullable = true)
 |-- hub_city: string (nullable = true)
 |-- hub_state: string (nullable = true)
 |-- hub_latitude: double (nullable = true)
 |-- hub_longitude: double (nullable = true)



In [0]:

# Assuming 'spark' is your SparkSession and 'hubs' is your DataFrame
# Replace 'spark' and 'hubs' with your actual SparkSession and DataFrame names

# Replace 'Ã' with 'A' in 'hub_city' column for the word 'SÃO PAULO'
hubs = hubs.withColumn('hub_city', regexp_replace('hub_city', '�', 'A').alias('hub_city'))

# Show the resulting DataFrame
hubs.show()


+------+----------------+--------------+---------+------------+-------------+
|hub_id|        hub_name|      hub_city|hub_state|hub_latitude|hub_longitude|
+------+----------------+--------------+---------+------------+-------------+
|     2|   BLUE SHOPPING|  PORTO ALEGRE|       RS| -30.0474148|    -51.21351|
|     3|  GREEN SHOPPING|  PORTO ALEGRE|       RS| -30.0374149|    -51.20352|
|     4|    RED SHOPPING|  PORTO ALEGRE|       RS| -30.0219481|  -51.2083816|
|     5|   FUNK SHOPPING|RIO DE JANEIRO|       RJ| -23.0007498|   -43.318282|
|     8| GOLDEN SHOPPING|RIO DE JANEIRO|       RJ|  -22.921475|   -43.234774|
|    13|HIP HOP SHOPPING|RIO DE JANEIRO|       RJ| -22.8858199|  -43.2792183|
|    16| PEOPLE SHOPPING|RIO DE JANEIRO|       RJ| -23.0174723|  -43.4799389|
|    17|  SMALL SHOPPING|     SAO PAULO|       SP| -23.5920041|  -46.6365035|
|    18|   STAR SHOPPING|RIO DE JANEIRO|       RJ| -22.9454948|  -43.1821807|
|    20| PURPLE SHOPPING|RIO DE JANEIRO|       RJ|  -22.996848| 

In [0]:
count_nulls(hubs)
count_rows(hubs)

Null count in hub_id: 0
Null count in hub_name: 0
Null count in hub_city: 0
Null count in hub_state: 0
Null count in hub_latitude: 0
Null count in hub_longitude: 0
Number of rows in the DataFrame: 32


In [0]:
payments.show()

+----------+----------------+--------------+-----------+--------------+--------------+
|payment_id|payment_order_id|payment_amount|payment_fee|payment_method|payment_status|
+----------+----------------+--------------+-----------+--------------+--------------+
|   4427917|        68410055|        118.44|        0.0|       VOUCHER|          PAID|
|   4427918|        68410055|        394.81|        7.9|        ONLINE|          PAID|
|   4427941|        68412721|        206.95|       5.59|        ONLINE|          PAID|
|   4427948|        68413340|          58.8|       1.59|        ONLINE|          PAID|
|   4427955|        68414018|          45.8|       0.92|        ONLINE|          PAID|
|   4427956|        68414309|         106.8|       2.88|        ONLINE|          PAID|
|   4427961|        68414512|          57.8|       1.56|        ONLINE|          PAID|
|   4427963|        68414563|          26.9|        0.4|        ONLINE|          PAID|
|   4427975|        68415103|         115.5

In [0]:
payments.printSchema()

root
 |-- payment_id: integer (nullable = true)
 |-- payment_order_id: integer (nullable = true)
 |-- payment_amount: double (nullable = true)
 |-- payment_fee: double (nullable = true)
 |-- payment_method: string (nullable = true)
 |-- payment_status: string (nullable = true)



In [0]:
count_nulls(payments)
count_rows(payments)

Null count in payment_id: 0
Null count in payment_order_id: 0
Null count in payment_amount: 0
Null count in payment_fee: 175
Null count in payment_method: 0
Null count in payment_status: 0
Number of rows in the DataFrame: 400834


In [0]:
orders.show()

+--------+--------+----------+----------------+-----------------+------------+------------+------------------+-------------------+------------------+--------------------+-----------------+-------------------+------------------+--------------------+---------------------+-------------------+----------------------+--------------------------+-----------------------+----------------------+---------------------+
|order_id|store_id|channel_id|payment_order_id|delivery_order_id|order_status|order_amount|order_delivery_fee|order_delivery_cost|order_created_hour|order_created_minute|order_created_day|order_created_month|order_created_year|order_moment_created|order_moment_accepted| order_moment_ready|order_moment_collected|order_moment_in_expedition|order_moment_delivering|order_moment_delivered|order_moment_finished|
+--------+--------+----------+----------------+-----------------+------------+------------+------------------+-------------------+------------------+--------------------+----------

In [0]:
orders.printSchema()

root
 |-- order_id: integer (nullable = true)
 |-- store_id: integer (nullable = true)
 |-- channel_id: integer (nullable = true)
 |-- payment_order_id: integer (nullable = true)
 |-- delivery_order_id: integer (nullable = true)
 |-- order_status: string (nullable = true)
 |-- order_amount: double (nullable = true)
 |-- order_delivery_fee: double (nullable = true)
 |-- order_delivery_cost: double (nullable = true)
 |-- order_created_hour: integer (nullable = true)
 |-- order_created_minute: integer (nullable = true)
 |-- order_created_day: integer (nullable = true)
 |-- order_created_month: integer (nullable = true)
 |-- order_created_year: integer (nullable = true)
 |-- order_moment_created: string (nullable = true)
 |-- order_moment_accepted: string (nullable = true)
 |-- order_moment_ready: string (nullable = true)
 |-- order_moment_collected: string (nullable = true)
 |-- order_moment_in_expedition: string (nullable = true)
 |-- order_moment_delivering: string (nullable = true)
 |-

In [0]:
count_nulls(orders)
count_rows(orders)

Null count in order_id: 0
Null count in store_id: 0
Null count in channel_id: 0
Null count in payment_order_id: 0
Null count in delivery_order_id: 0
Null count in order_status: 0
Null count in order_amount: 0
Null count in order_delivery_fee: 0
Null count in order_delivery_cost: 7205
Null count in order_created_hour: 0
Null count in order_created_minute: 0
Null count in order_created_day: 0
Null count in order_created_month: 0
Null count in order_created_year: 0
Null count in order_moment_created: 0
Null count in order_moment_accepted: 9461
Null count in order_moment_ready: 25106
Null count in order_moment_collected: 42894
Null count in order_moment_in_expedition: 67429
Null count in order_moment_delivering: 25316
Null count in order_moment_delivered: 349398
Null count in order_moment_finished: 15599
Number of rows in the DataFrame: 368999


In [0]:
# List of columns to drop
columns_to_drop = ['order_created_hour', 'order_created_minute', 'order_created_day', 'order_created_month', 'order_created_year']

# Drop the specified columns
orders = orders.drop(*columns_to_drop)

# Show the columns after dropping the columns
orders.columns


Out[62]: ['order_id',
 'store_id',
 'channel_id',
 'payment_order_id',
 'delivery_order_id',
 'order_status',
 'order_amount',
 'order_delivery_fee',
 'order_delivery_cost',
 'order_moment_created',
 'order_moment_accepted',
 'order_moment_ready',
 'order_moment_collected',
 'order_moment_in_expedition',
 'order_moment_delivering',
 'order_moment_delivered',
 'order_moment_finished']

In [0]:
from pyspark.sql.functions import to_timestamp

# Define the columns to convert
columns_to_convert = [
    "order_moment_created",
    "order_moment_accepted",
    "order_moment_ready",
    "order_moment_collected",
    "order_moment_in_expedition",
    "order_moment_delivering",
    "order_moment_delivered",
    "order_moment_finished"
]

# Convert the columns to datetime data type
for column in columns_to_convert:
    orders = orders.withColumn(column, to_timestamp(orders[column], "M/d/yyyy h:mm:ss a"))

# Show the DataFrame after conversion
orders.select(columns_to_convert).show()


+--------------------+---------------------+-------------------+----------------------+--------------------------+-----------------------+----------------------+---------------------+
|order_moment_created|order_moment_accepted| order_moment_ready|order_moment_collected|order_moment_in_expedition|order_moment_delivering|order_moment_delivered|order_moment_finished|
+--------------------+---------------------+-------------------+----------------------+--------------------------+-----------------------+----------------------+---------------------+
| 2021-01-01 00:01:36|                 null|               null|                  null|                      null|                   null|                  null|                 null|
| 2021-01-01 00:04:26|                 null|               null|                  null|                      null|                   null|                  null|                 null|
| 2021-01-01 00:13:07|                 null|               null|                

In [0]:
# Calculate order_metric_collected_time in minutes
orders = orders.withColumn("order_metric_collected_time", 
                           (unix_timestamp(col("order_moment_collected")) - unix_timestamp(col("order_moment_ready"))) / 60)

orders.select("order_metric_collected_time").show()

+---------------------------+
|order_metric_collected_time|
+---------------------------+
|                       null|
|                       null|
|                       null|
|                       null|
|                       null|
|                       null|
|                       null|
|                       null|
|          6.633333333333334|
|                       null|
|                       null|
|                       null|
|                       null|
|                       null|
|                       null|
|                       null|
|                       null|
|        0.26666666666666666|
|          7.033333333333333|
|         10.216666666666667|
+---------------------------+
only showing top 20 rows



In [0]:
# Calculate order_metric_cycle_time in minutes
orders = orders.withColumn("order_metric_cycle_time", 
                           (unix_timestamp(col("order_moment_finished")) - unix_timestamp(col("order_moment_created"))) / 60)
orders.select("order_metric_cycle_time").show()

+-----------------------+
|order_metric_cycle_time|
+-----------------------+
|                   null|
|                   null|
|                   null|
|                   null|
|                   null|
|                   null|
|                   null|
|                   null|
|     2424.7166666666667|
|                   null|
|                   null|
|                   null|
|                   null|
|                   null|
|                   null|
|                   null|
|                   null|
|     120.41666666666667|
|                  77.05|
|      51.81666666666667|
+-----------------------+
only showing top 20 rows



In [0]:
orders = orders.toPandas()
channels = channels.toPandas()
deliveries = deliveries.toPandas()
hubs = hubs.toPandas()
payments = payments.toPandas()
stores = stores.toPandas()

In [0]:
orders.to_csv('/dbfs/mnt/deliverycenterm/transformed-data/orders.csv',index = False)

In [0]:
channels.to_csv('/dbfs/mnt/deliverycenterm/transformed-data/channels.csv',index = False)
deliveries.to_csv('/dbfs/mnt/deliverycenterm/transformed-data/deliveries.csv',index = False)
hubs.to_csv('/dbfs/mnt/deliverycenterm/transformed-data/hubs.csv',index = False)
payments.to_csv('/dbfs/mnt/deliverycenterm/transformed-data/payments.csv',index = False)
stores.to_csv('/dbfs/mnt/deliverycenterm/transformed-data/stores.csv',index = False)