In [0]:
from pyspark.sql.functions import col, avg, round, count
from pyspark.sql.types import IntegerType, DoubleType, BooleanType, DateType

In [0]:
# Check if the directory is already mounted
existing_mounts = dbutils.fs.mounts()
target_mount_point = "/mnt/ecommerce"

if any(mount.mountPoint == target_mount_point for mount in existing_mounts):
    print(f"The directory '{target_mount_point}' is already mounted.")
else:
    # Configuration settings
    configs = {
        "fs.azure.account.auth.type": "OAuth",
        "fs.azure.account.oauth.provider.type": "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider",
        "fs.azure.account.oauth2.client.id": "insert_client_id_here",
        "fs.azure.account.oauth2.client.secret": "insert_client_secret_here",
        "fs.azure.account.oauth2.client.endpoint": "https://login.microsoftonline.com/1bd1ff73-37f8-4b18-8ead-26f958d2f66d/oauth2/token"
    }

    # Mount the directory if not already mounted
    dbutils.fs.mount(
        source="abfss://e-commerce-data@ecommerceleo.dfs.core.windows.net",
        mount_point=target_mount_point,
        extra_configs=configs
    )
    print(f"Mounted the directory '{target_mount_point}'.")


The directory '/mnt/ecommerce' is already mounted.


In [0]:
%fs
ls "/mnt/ecommerce"

path,name,size,modificationTime
dbfs:/mnt/ecommerce/raw-data/,raw-data/,0,1692225575000
dbfs:/mnt/ecommerce/transformed-data/,transformed-data/,0,1692225582000


In [0]:
spark

In [0]:
# first way and less efficient way to query you tables
customers = spark.read.format('csv').option('header','true').load('/mnt/ecommerce/raw-data/customers.csv')
customers.show()

+--------------------+--------------------+------------------------+--------------------+--------------+
|         customer_id|  customer_unique_id|customer_zip_code_prefix|       customer_city|customer_state|
+--------------------+--------------------+------------------------+--------------------+--------------+
|06b8999e2fba1a1fb...|861eff4711a542e4b...|                   14409|              franca|            SP|
|18955e83d337fd6b2...|290c77bc529b7ac93...|                   09790|sao bernardo do c...|            SP|
|4e7b3e00288586ebd...|060e732b5b29e8181...|                   01151|           sao paulo|            SP|
|b2b6027bc5c5109e5...|259dac757896d24d7...|                   08775|     mogi das cruzes|            SP|
|4f2d8ab171c80ec83...|345ecd01c38d18a90...|                   13056|            campinas|            SP|
|879864dab9bc30475...|4c93744516667ad3b...|                   89254|      jaragua do sul|            SC|
|fd826e7cf63160e53...|addec96d2e059c80c...|            

In [0]:
# more efficent (less verbose) way to query your tables
geolocation = spark.read.csv('/mnt/ecommerce/raw-data/geolocation.csv', header=True, inferSchema=True)
geolocation.show()

+---------------------------+-------------------+-------------------+----------------+-----------------+
|geolocation_zip_code_prefix|    geolocation_lat|    geolocation_lng|geolocation_city|geolocation_state|
+---------------------------+-------------------+-------------------+----------------+-----------------+
|                       1037| -23.54562128115268| -46.63929204800168|       sao paulo|               SP|
|                       1046|-23.546081127035535| -46.64482029837157|       sao paulo|               SP|
|                       1046| -23.54612896641469| -46.64295148361138|       sao paulo|               SP|
|                       1041|  -23.5443921648681| -46.63949930627844|       sao paulo|               SP|
|                       1035|-23.541577961711493| -46.64160722329613|       sao paulo|               SP|
|                       1012|-23.547762303364266| -46.63536053788448|       são paulo|               SP|
|                       1047|-23.546273112412678| -46.6

In [0]:
# reading all the csv files
customers = spark.read.csv('/mnt/ecommerce/raw-data/customers.csv', header=True, inferSchema=True)
geolocation = spark.read.csv('/mnt/ecommerce/raw-data/geolocation.csv', header=True, inferSchema=True)
order_items = spark.read.csv('/mnt/ecommerce/raw-data/order_items.csv', header=True, inferSchema=True)
order_payments = spark.read.csv('/mnt/ecommerce/raw-data/order_payments.csv', header=True, inferSchema=True)
order_reviews = spark.read.csv('/mnt/ecommerce/raw-data/order_reviews.csv', header=True, inferSchema=True)
orders = spark.read.csv('/mnt/ecommerce/raw-data/orders.csv', header=True, inferSchema=True)
products = spark.read.csv('/mnt/ecommerce/raw-data/products.csv', header=True, inferSchema=True)
sellers = spark.read.csv('/mnt/ecommerce/raw-data/sellers.csv', header=True, inferSchema=True)
product_category_name_translation = spark.read.csv('/mnt/ecommerce/raw-data/product_category_name_translation.csv', header=True, inferSchema=True)

In [0]:
customers.show(5)

+--------------------+--------------------+------------------------+--------------------+--------------+
|         customer_id|  customer_unique_id|customer_zip_code_prefix|       customer_city|customer_state|
+--------------------+--------------------+------------------------+--------------------+--------------+
|06b8999e2fba1a1fb...|861eff4711a542e4b...|                   14409|              franca|            SP|
|18955e83d337fd6b2...|290c77bc529b7ac93...|                    9790|sao bernardo do c...|            SP|
|4e7b3e00288586ebd...|060e732b5b29e8181...|                    1151|           sao paulo|            SP|
|b2b6027bc5c5109e5...|259dac757896d24d7...|                    8775|     mogi das cruzes|            SP|
|4f2d8ab171c80ec83...|345ecd01c38d18a90...|                   13056|            campinas|            SP|
+--------------------+--------------------+------------------------+--------------------+--------------+
only showing top 5 rows



In [0]:
# checking if the schema is aligned with the data types
customers.printSchema()
# seems fine for this one

root
 |-- customer_id: string (nullable = true)
 |-- customer_unique_id: string (nullable = true)
 |-- customer_zip_code_prefix: integer (nullable = true)
 |-- customer_city: string (nullable = true)
 |-- customer_state: string (nullable = true)



In [0]:
geolocation.show(5)

+---------------------------+-------------------+------------------+----------------+-----------------+
|geolocation_zip_code_prefix|    geolocation_lat|   geolocation_lng|geolocation_city|geolocation_state|
+---------------------------+-------------------+------------------+----------------+-----------------+
|                       1037| -23.54562128115268|-46.63929204800168|       sao paulo|               SP|
|                       1046|-23.546081127035535|-46.64482029837157|       sao paulo|               SP|
|                       1046| -23.54612896641469|-46.64295148361138|       sao paulo|               SP|
|                       1041|  -23.5443921648681|-46.63949930627844|       sao paulo|               SP|
|                       1035|-23.541577961711493|-46.64160722329613|       sao paulo|               SP|
+---------------------------+-------------------+------------------+----------------+-----------------+
only showing top 5 rows



In [0]:
# checking if the schema is aligned with the data types
geolocation.printSchema()
# seems fine for this one

root
 |-- geolocation_zip_code_prefix: integer (nullable = true)
 |-- geolocation_lat: double (nullable = true)
 |-- geolocation_lng: double (nullable = true)
 |-- geolocation_city: string (nullable = true)
 |-- geolocation_state: string (nullable = true)



In [0]:
order_items.show(5)

+--------------------+-------------+--------------------+--------------------+-------------------+-----+-------------+
|            order_id|order_item_id|          product_id|           seller_id|shipping_limit_date|price|freight_value|
+--------------------+-------------+--------------------+--------------------+-------------------+-----+-------------+
|00010242fe8c5a6d1...|            1|4244733e06e7ecb49...|48436dade18ac8b2b...|2017-09-19 09:45:35| 58.9|        13.29|
|00018f77f2f0320c5...|            1|e5f2d52b802189ee6...|dd7ddc04e1b6c2c61...|2017-05-03 11:05:13|239.9|        19.93|
|000229ec398224ef6...|            1|c777355d18b72b67a...|5b51032eddd242adc...|2018-01-18 14:48:30|199.0|        17.87|
|00024acbcdf0a6daa...|            1|7634da152a4610f15...|9d7a1d34a50524090...|2018-08-15 10:10:18|12.99|        12.79|
|00042b26cf59d7ce6...|            1|ac6c3623068f30de0...|df560393f3a51e745...|2017-02-13 13:57:51|199.9|        18.14|
+--------------------+-------------+------------

In [0]:
# checking if the schema is aligned with the data types
order_items.printSchema() 
# seems fine for this one

root
 |-- order_id: string (nullable = true)
 |-- order_item_id: integer (nullable = true)
 |-- product_id: string (nullable = true)
 |-- seller_id: string (nullable = true)
 |-- shipping_limit_date: timestamp (nullable = true)
 |-- price: double (nullable = true)
 |-- freight_value: double (nullable = true)



In [0]:
order_payments.show(5)

+--------------------+------------------+------------+--------------------+-------------+
|            order_id|payment_sequential|payment_type|payment_installments|payment_value|
+--------------------+------------------+------------+--------------------+-------------+
|b81ef226f3fe1789b...|                 1| credit_card|                   8|        99.33|
|a9810da82917af2d9...|                 1| credit_card|                   1|        24.39|
|25e8ea4e93396b6fa...|                 1| credit_card|                   1|        65.71|
|ba78997921bbcdc13...|                 1| credit_card|                   8|       107.78|
|42fdf880ba16b47b5...|                 1| credit_card|                   2|       128.45|
+--------------------+------------------+------------+--------------------+-------------+
only showing top 5 rows



In [0]:
# checking if the schema is aligned with the data types
order_payments.printSchema()
# seems fine for this one

root
 |-- order_id: string (nullable = true)
 |-- payment_sequential: integer (nullable = true)
 |-- payment_type: string (nullable = true)
 |-- payment_installments: integer (nullable = true)
 |-- payment_value: double (nullable = true)



In [0]:
order_reviews.show(5)

+--------------------+--------------------+------------+--------------------+----------------------+--------------------+-----------------------+
|           review_id|            order_id|review_score|review_comment_title|review_comment_message|review_creation_date|review_answer_timestamp|
+--------------------+--------------------+------------+--------------------+----------------------+--------------------+-----------------------+
|7bc2406110b926393...|73fc7af87114b3971...|           4|                null|                  null| 2018-01-18 00:00:00|    2018-01-18 21:46:59|
|80e641a11e56f04c1...|a548910a1c6147796...|           5|                null|                  null| 2018-03-10 00:00:00|    2018-03-11 03:05:13|
|228ce5500dc1d8e02...|f9e4b658b201a9f2e...|           5|                null|                  null| 2018-02-17 00:00:00|    2018-02-18 14:36:24|
|e64fb393e7b32834b...|658677c97b385a9be...|           5|                null|  Recebi bem antes ...| 2017-04-21 00:00:00|   

In [0]:
# checking if the schema is aligned with the data types
order_reviews.printSchema()
# there are datatype problems with some columns

root
 |-- review_id: string (nullable = true)
 |-- order_id: string (nullable = true)
 |-- review_score: string (nullable = true)
 |-- review_comment_title: string (nullable = true)
 |-- review_comment_message: string (nullable = true)
 |-- review_creation_date: string (nullable = true)
 |-- review_answer_timestamp: string (nullable = true)



In [0]:
# fixing the datatype of some columns
order_reviews = order_reviews.withColumn('review_score', col('review_score').cast(IntegerType()))\
                             .withColumn('review_creation_date', col('review_creation_date').cast('timestamp'))\
                             .withColumn('review_answer_timestamp', col('review_answer_timestamp').cast('timestamp'))
order_reviews.printSchema()

root
 |-- review_id: string (nullable = true)
 |-- order_id: string (nullable = true)
 |-- review_score: integer (nullable = true)
 |-- review_comment_title: string (nullable = true)
 |-- review_comment_message: string (nullable = true)
 |-- review_creation_date: timestamp (nullable = true)
 |-- review_answer_timestamp: timestamp (nullable = true)



In [0]:
# I'm having problems with Synapse where it will not accept the current unfiltered table. I will delete the rows that don't have the "review_score" to filter the incomplete rows.
order_reviews.count()

Out[28]: 104162

In [0]:
order_reviews = order_reviews.filter(col('review_score').isNotNull())
order_reviews.count()

Out[41]: 99227

In [0]:
orders.show(5)

+--------------------+--------------------+------------+------------------------+-------------------+----------------------------+-----------------------------+-----------------------------+
|            order_id|         customer_id|order_status|order_purchase_timestamp|  order_approved_at|order_delivered_carrier_date|order_delivered_customer_date|order_estimated_delivery_date|
+--------------------+--------------------+------------+------------------------+-------------------+----------------------------+-----------------------------+-----------------------------+
|e481f51cbdc54678b...|9ef432eb625129730...|   delivered|     2017-10-02 10:56:33|2017-10-02 11:07:15|         2017-10-04 19:55:00|          2017-10-10 21:25:13|          2017-10-18 00:00:00|
|53cdb2fc8bc7dce0b...|b0830fb4747a6c6d2...|   delivered|     2018-07-24 20:41:37|2018-07-26 03:24:27|         2018-07-26 14:31:00|          2018-08-07 15:27:45|          2018-08-13 00:00:00|
|47770eb9100c2d0c4...|41ce2a54c0b03bf34...|  

In [0]:
# checking if the schema is aligned with the data types
orders.printSchema()
# seems fine for this one

root
 |-- order_id: string (nullable = true)
 |-- customer_id: string (nullable = true)
 |-- order_status: string (nullable = true)
 |-- order_purchase_timestamp: timestamp (nullable = true)
 |-- order_approved_at: timestamp (nullable = true)
 |-- order_delivered_carrier_date: timestamp (nullable = true)
 |-- order_delivered_customer_date: timestamp (nullable = true)
 |-- order_estimated_delivery_date: timestamp (nullable = true)



In [0]:
products.show(5)

+--------------------+---------------------+-------------------+--------------------------+------------------+----------------+-----------------+-----------------+----------------+
|          product_id|product_category_name|product_name_lenght|product_description_lenght|product_photos_qty|product_weight_g|product_length_cm|product_height_cm|product_width_cm|
+--------------------+---------------------+-------------------+--------------------------+------------------+----------------+-----------------+-----------------+----------------+
|1e9e8ef04dbcff454...|           perfumaria|                 40|                       287|                 1|             225|               16|               10|              14|
|3aa071139cb16b67c...|                artes|                 44|                       276|                 1|            1000|               30|               18|              20|
|96bd76ec8810374ed...|        esporte_lazer|                 46|                       250|    

In [0]:
# checking if the schema is aligned with the data types
products.printSchema()
# seems fine for this one

root
 |-- product_id: string (nullable = true)
 |-- product_category_name: string (nullable = true)
 |-- product_name_lenght: integer (nullable = true)
 |-- product_description_lenght: integer (nullable = true)
 |-- product_photos_qty: integer (nullable = true)
 |-- product_weight_g: integer (nullable = true)
 |-- product_length_cm: integer (nullable = true)
 |-- product_height_cm: integer (nullable = true)
 |-- product_width_cm: integer (nullable = true)



In [0]:
sellers.show(5)

+--------------------+----------------------+-----------------+------------+
|           seller_id|seller_zip_code_prefix|      seller_city|seller_state|
+--------------------+----------------------+-----------------+------------+
|3442f8959a84dea7e...|                 13023|         campinas|          SP|
|d1b65fc7debc3361e...|                 13844|       mogi guacu|          SP|
|ce3ad9de960102d06...|                 20031|   rio de janeiro|          RJ|
|c0f3eea2e14555b6f...|                  4195|        sao paulo|          SP|
|51a04a8a6bdcb23de...|                 12914|braganca paulista|          SP|
+--------------------+----------------------+-----------------+------------+
only showing top 5 rows



In [0]:
# checking if the schema is aligned with the data types
sellers.printSchema()
# seems fine for this one

root
 |-- seller_id: string (nullable = true)
 |-- seller_zip_code_prefix: integer (nullable = true)
 |-- seller_city: string (nullable = true)
 |-- seller_state: string (nullable = true)



In [0]:
product_category_name_translation.show(5)

+---------------------+-----------------------------+
|product_category_name|product_category_name_english|
+---------------------+-----------------------------+
|         beleza_saude|                health_beauty|
| informatica_acess...|         computers_accesso...|
|           automotivo|                         auto|
|      cama_mesa_banho|               bed_bath_table|
|     moveis_decoracao|              furniture_decor|
+---------------------+-----------------------------+
only showing top 5 rows



In [0]:
# checking if the schema is aligned with the data types
product_category_name_translation.printSchema()
# seems fine for this one

root
 |-- product_category_name: string (nullable = true)
 |-- product_category_name_english: string (nullable = true)



In [0]:
# checking the states that has the biggest amount of customers
customers_by_state = customers.groupBy("customer_state").count().orderBy(col("count").desc())
customers_by_state.show()

+--------------+-----+
|customer_state|count|
+--------------+-----+
|            SP|41746|
|            RJ|12852|
|            MG|11635|
|            RS| 5466|
|            PR| 5045|
|            SC| 3637|
|            BA| 3380|
|            DF| 2140|
|            ES| 2033|
|            GO| 2020|
|            PE| 1652|
|            CE| 1336|
|            PA|  975|
|            MT|  907|
|            MA|  747|
|            MS|  715|
|            PB|  536|
|            PI|  495|
|            RN|  485|
|            AL|  413|
+--------------+-----+
only showing top 20 rows



In [0]:
# on the "order_payments" table, find the average payment of the payment type and the number of times it was used, from top to bottom
average_payments = (
    order_payments
    .groupBy("payment_type")
    .agg(count("*").alias("count"), round(avg("payment_value"), 2).alias("avg_payment"))
    .filter(col("payment_type") != "not_defined")
    .orderBy(col("avg_payment").desc())
    .orderBy(col("count").desc())
)
average_payments.show()

+------------+-----+-----------+
|payment_type|count|avg_payment|
+------------+-----+-----------+
| credit_card|76795|     163.32|
|      boleto|19784|     145.03|
|     voucher| 5775|       65.7|
|  debit_card| 1529|     142.57|
+------------+-----+-----------+



In [0]:
# writing the transformed tables to the "transformed_data" folder on the data lake
customers.write.option('header','true').mode('overwrite').csv('/mnt/ecommerce/transformed-data/customers')
geolocation.write.option('header','true').mode('overwrite').csv('/mnt/ecommerce/transformed-data/geolocation')
order_items.write.option('header','true').mode('overwrite').csv('/mnt/ecommerce/transformed-data/order_items')
order_payments.write.option('header','true').mode('overwrite').csv('/mnt/ecommerce/transformed-data/order_payments')
order_reviews.write.option('header','true').mode('overwrite').csv('/mnt/ecommerce/transformed-data/order_reviews')
orders.write.option('header','true').mode('overwrite').csv('/mnt/ecommerce/transformed-data/orders')
products.write.option('header','true').mode('overwrite').csv('/mnt/ecommerce/transformed-data/products')
sellers.write.option('header','true').mode('overwrite').csv('/mnt/ecommerce/transformed-data/sellers')
product_category_name_translation.write.option('header','true').mode('overwrite').csv('/mnt/ecommerce/transformed-data/product_category_name_translation')