##Olist Store Data Transformation in Azure Databricks :

**Azure Data Lake Storage (Landing Container) -> Databricks -> Azure Data Lake Storage (Transient Container)**

In this notebook, we extracted data from Azure Data Lake storage (Landing Container) into Databricks cluster, run transformations on the data in Databricks cluster and then load the transformed data into Azure Data Lake Storage (Transient Container).

##Import Packages

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
import pandas as pd



In [0]:
# Creating Spark Session
spark=SparkSession.builder.appName('Dataframe').getOrCreate()
spark

##Mount a Azure Data Lake Storage container

In [0]:
# Define the variables used for creating connection strings
adlsAccountName = "adlsolist51"
adlsContainerName = "olistcontainer"
mountPoint = "/mnt/"

# Application (Client) Secret Key
authenticationKey = dbutils.secrets.get(scope="SecretScope51",key="SecretKey") 

# Application (Client) ID
applicationId = dbutils.secrets.get(scope= "SecretScope51",key="ClientID")
 
# Directory (Tenant) ID
tenandId = dbutils.secrets.get(scope="SecretScope51",key= "TenantID")

endpoint = "https://login.microsoftonline.com/" + tenandId + "/oauth2/token"
source = "abfss://" + adlsContainerName + "@" + adlsAccountName + ".dfs.core.windows.net/"
 
# Connecting using Service Principal secrets and OAuth
configs = {"fs.azure.account.auth.type": "OAuth",
           "fs.azure.account.oauth.provider.type": "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider",
           "fs.azure.account.oauth2.client.id": applicationId,
           "fs.azure.account.oauth2.client.secret": authenticationKey,
           "fs.azure.account.oauth2.client.endpoint": endpoint}
 
# Mount ADLS Storage to DBFS only if the directory is not already mounted
if not any(mount.mountPoint == mountPoint for mount in dbutils.fs.mounts()):
  dbutils.fs.mount(
    source = source,
    mount_point = mountPoint,
    extra_configs = configs)


In [0]:
# List the contents of directory 
dbutils.fs.ls('/mnt/Landing Datalake')

Out[7]: [FileInfo(path='dbfs:/mnt/Landing Datalake/dboolist_customers_dataset.csv', name='dboolist_customers_dataset.csv', size=9033957, modificationTime=1670940842000),
 FileInfo(path='dbfs:/mnt/Landing Datalake/dboolist_geolocation_dataset.csv', name='dboolist_geolocation_dataset.csv', size=61273883, modificationTime=1670940864000),
 FileInfo(path='dbfs:/mnt/Landing Datalake/dboolist_order_items_dataset.csv', name='dboolist_order_items_dataset.csv', size=15438671, modificationTime=1670940853000),
 FileInfo(path='dbfs:/mnt/Landing Datalake/dboolist_order_payments_dataset.csv', name='dboolist_order_payments_dataset.csv', size=5777138, modificationTime=1670940848000),
 FileInfo(path='dbfs:/mnt/Landing Datalake/dboolist_orders_dataset.csv', name='dboolist_orders_dataset.csv', size=17654914, modificationTime=1670940856000),
 FileInfo(path='dbfs:/mnt/Landing Datalake/dboolist_products_dataset.csv', name='dboolist_products_dataset.csv', size=2379446, modificationTime=1670940840000),
 FileIn

In [0]:
%fs
ls "mnt/Landing Datalake"

path,name,size,modificationTime
dbfs:/mnt/Landing Datalake/dboolist_customers_dataset.csv,dboolist_customers_dataset.csv,9033957,1670940842000
dbfs:/mnt/Landing Datalake/dboolist_geolocation_dataset.csv,dboolist_geolocation_dataset.csv,61273883,1670940864000
dbfs:/mnt/Landing Datalake/dboolist_order_items_dataset.csv,dboolist_order_items_dataset.csv,15438671,1670940853000
dbfs:/mnt/Landing Datalake/dboolist_order_payments_dataset.csv,dboolist_order_payments_dataset.csv,5777138,1670940848000
dbfs:/mnt/Landing Datalake/dboolist_orders_dataset.csv,dboolist_orders_dataset.csv,17654914,1670940856000
dbfs:/mnt/Landing Datalake/dboolist_products_dataset.csv,dboolist_products_dataset.csv,2379446,1670940840000
dbfs:/mnt/Landing Datalake/dboolist_sellers_dataset.csv,dboolist_sellers_dataset.csv,174703,1670940838000
dbfs:/mnt/Landing Datalake/dboproduct_category_name_translation.csv,dboproduct_category_name_translation.csv,2613,1670940838000


##Reading Dataset

In [0]:
customers=spark.read.csv("dbfs:/mnt/Landing Datalake/dboolist_customers_dataset.csv",header=True ,inferSchema=True)
geolocation=spark.read.csv("dbfs:/mnt/Landing Datalake/dboolist_geolocation_dataset.csv",header=True ,inferSchema=True)
order_items=spark.read.csv("dbfs:/mnt/Landing Datalake/dboolist_order_items_dataset.csv",header=True ,inferSchema=True)
order_payments = spark.read.csv("dbfs:/mnt/Landing Datalake/dboolist_order_payments_dataset.csv",header=True ,inferSchema=True) 
orders=spark.read.csv("dbfs:/mnt/Landing Datalake/dboolist_orders_dataset.csv",header=True ,inferSchema=True)
product=spark.read.csv("dbfs:/mnt/Landing Datalake/dboolist_products_dataset.csv",header=True ,inferSchema=True)
sellers=spark.read.csv("dbfs:/mnt/Landing Datalake/dboolist_sellers_dataset.csv",header=True ,inferSchema=True)
product_category=spark.read.csv("dbfs:/mnt/Landing Datalake/dboproduct_category_name_translation.csv",header=True)
 

##Data Transformation

**Olist order payments options in Brazil**

In [0]:
order_payments.printSchema()

root
 |-- order_id: string (nullable = true)
 |-- payment_sequential: integer (nullable = true)
 |-- payment_type: string (nullable = true)
 |-- payment_installments: integer (nullable = true)
 |-- payment_value: double (nullable = true)



In [0]:
print(order_payments.count(),len(order_payments.columns))


103886 5


In [0]:
#checking null and missing values 
order_payments.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in  order_payments.columns]).show()

+--------+------------------+------------+--------------------+-------------+
|order_id|payment_sequential|payment_type|payment_installments|payment_value|
+--------+------------------+------------+--------------------+-------------+
|       0|                 0|           0|                   0|            0|
+--------+------------------+------------+--------------------+-------------+



In [0]:
#Displaying payment options 
order_payments =order_payments.filter(order_payments.payment_type != 'not_defined')
order_payments_data=order_payments.groupBy('payment_type').count().sort(desc('count'))
display(order_payments_data)

payment_type,count
credit_card,76795
boleto,19784
voucher,5775
debit_card,1529


**Brazilian zip codes and its lat/lng coordinates**

In [0]:
geolocation.show()

+---------------------------+-------------------+-------------------+----------------+-----------------+
|geolocation_zip_code_prefix|    geolocation_lat|    geolocation_lng|geolocation_city|geolocation_state|
+---------------------------+-------------------+-------------------+----------------+-----------------+
|                       1037| -23.54562128115268| -46.63929204800168|       sao paulo|               SP|
|                       1046|-23.546081127035535| -46.64482029837157|       sao paulo|               SP|
|                       1046| -23.54612896641469| -46.64295148361138|       sao paulo|               SP|
|                       1041|  -23.5443921648681| -46.63949930627844|       sao paulo|               SP|
|                       1035|-23.541577961711493| -46.64160722329613|       sao paulo|               SP|
|                       1012|-23.547762303364266| -46.63536053788448|       são paulo|               SP|
|                       1047|-23.546273112412678| -46.6

In [0]:
#Checking null and missing values in geolocation dataframe
geolocation.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in geolocation.columns]).show()

+---------------------------+---------------+---------------+----------------+-----------------+
|geolocation_zip_code_prefix|geolocation_lat|geolocation_lng|geolocation_city|geolocation_state|
+---------------------------+---------------+---------------+----------------+-----------------+
|                          0|              0|              0|               0|                0|
+---------------------------+---------------+---------------+----------------+-----------------+



**Olist Order data**

In [0]:
orders.show()

+--------------------+--------------------+------------+------------------------+-------------------+----------------------------+-----------------------------+-----------------------------+
|            order_id|         customer_id|order_status|order_purchase_timestamp|  order_approved_at|order_delivered_carrier_date|order_delivered_customer_date|order_estimated_delivery_date|
+--------------------+--------------------+------------+------------------------+-------------------+----------------------------+-----------------------------+-----------------------------+
|e481f51cbdc54678b...|9ef432eb625129730...|   delivered|     2017-10-02 10:56:33|2017-10-02 11:07:15|         2017-10-04 19:55:00|          2017-10-10 21:25:13|          2017-10-18 00:00:00|
|53cdb2fc8bc7dce0b...|b0830fb4747a6c6d2...|   delivered|     2018-07-24 20:41:37|2018-07-26 03:24:27|         2018-07-26 14:31:00|          2018-08-07 15:27:45|          2018-08-13 00:00:00|
|47770eb9100c2d0c4...|41ce2a54c0b03bf34...|  

In [0]:
#Checking missing and null values
from pyspark.sql import functions  as F
orders.select(*[
    (
        F.count(F.when((F.isnan(c) | F.col(c).isNull()), c)) if t not in ("timestamp", "date")
        else F.count(F.when(F.col(c).isNull(), c))
    ).alias(c)
    for c, t in  orders.dtypes if c in  orders.columns
]).show()

+--------+-----------+------------+------------------------+-----------------+----------------------------+-----------------------------+-----------------------------+
|order_id|customer_id|order_status|order_purchase_timestamp|order_approved_at|order_delivered_carrier_date|order_delivered_customer_date|order_estimated_delivery_date|
+--------+-----------+------------+------------------------+-----------------+----------------------------+-----------------------------+-----------------------------+
|       0|          0|           0|                       0|              160|                        1783|                         2965|                            0|
+--------+-----------+------------+------------------------+-----------------+----------------------------+-----------------------------+-----------------------------+



In [0]:
order_data=orders.na.drop()
order_data.count()

Out[131]: 96461

In [0]:
from pyspark.sql import functions  as F
order_data.select(*[
    (
        F.count(F.when((F.isnan(c) | F.col(c).isNull()), c)) if t not in ("timestamp", "date")
        else F.count(F.when(F.col(c).isNull(), c))
    ).alias(c)
    for c, t in  order_data.dtypes if c in  order_data.columns
]).show()


+--------+-----------+------------+------------------------+-----------------+----------------------------+-----------------------------+-----------------------------+
|order_id|customer_id|order_status|order_purchase_timestamp|order_approved_at|order_delivered_carrier_date|order_delivered_customer_date|order_estimated_delivery_date|
+--------+-----------+------------+------------------------+-----------------+----------------------------+-----------------------------+-----------------------------+
|       0|          0|           0|                       0|                0|                           0|                            0|                            0|
+--------+-----------+------------+------------------------+-----------------+----------------------------+-----------------------------+-----------------------------+



**Customers and its order delivery locations data**

In [0]:
customers.show()

+--------------------+--------------------+------------------------+--------------------+--------------+
|         customer_id|  customer_unique_id|customer_zip_code_prefix|       customer_city|customer_state|
+--------------------+--------------------+------------------------+--------------------+--------------+
|06b8999e2fba1a1fb...|861eff4711a542e4b...|                   14409|              franca|            SP|
|18955e83d337fd6b2...|290c77bc529b7ac93...|                    9790|sao bernardo do c...|            SP|
|4e7b3e00288586ebd...|060e732b5b29e8181...|                    1151|           sao paulo|            SP|
|b2b6027bc5c5109e5...|259dac757896d24d7...|                    8775|     mogi das cruzes|            SP|
|4f2d8ab171c80ec83...|345ecd01c38d18a90...|                   13056|            campinas|            SP|
|879864dab9bc30475...|4c93744516667ad3b...|                   89254|      jaragua do sul|            SC|
|fd826e7cf63160e53...|addec96d2e059c80c...|            

In [0]:
customers.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in customers.columns]).show()

+-----------+------------------+------------------------+-------------+--------------+
|customer_id|customer_unique_id|customer_zip_code_prefix|customer_city|customer_state|
+-----------+------------------+------------------------+-------------+--------------+
|          0|                 0|                       0|            0|             0|
+-----------+------------------+------------------------+-------------+--------------+



**Data about the items purchased within each order**

In [0]:
order_items.show()

+--------------------+-------------+--------------------+--------------------+-------------------+------+-------------+
|            order_id|order_item_id|          product_id|           seller_id|shipping_limit_date| price|freight_value|
+--------------------+-------------+--------------------+--------------------+-------------------+------+-------------+
|00010242fe8c5a6d1...|            1|4244733e06e7ecb49...|48436dade18ac8b2b...|2017-09-19 09:45:35|  58.9|        13.29|
|00018f77f2f0320c5...|            1|e5f2d52b802189ee6...|dd7ddc04e1b6c2c61...|2017-05-03 11:05:13| 239.9|        19.93|
|000229ec398224ef6...|            1|c777355d18b72b67a...|5b51032eddd242adc...|2018-01-18 14:48:30| 199.0|        17.87|
|00024acbcdf0a6daa...|            1|7634da152a4610f15...|9d7a1d34a50524090...|2018-08-15 10:10:18| 12.99|        12.79|
|00042b26cf59d7ce6...|            1|ac6c3623068f30de0...|df560393f3a51e745...|2017-02-13 13:57:51| 199.9|        18.14|
|00048cc3ae777c65d...|            1|ef92

In [0]:
from pyspark.sql import functions  as F
order_items.select(*[
    (
        F.count(F.when((F.isnan(c) | F.col(c).isNull()), c)) if t not in ("timestamp", "date")
        else F.count(F.when(F.col(c).isNull(), c))
    ).alias(c)
    for c, t in  order_items.dtypes if c in  order_items.columns
]).show()


+--------+-------------+----------+---------+-------------------+-----+-------------+
|order_id|order_item_id|product_id|seller_id|shipping_limit_date|price|freight_value|
+--------+-------------+----------+---------+-------------------+-----+-------------+
|       0|            0|         0|        0|                  0|    0|            0|
+--------+-------------+----------+---------+-------------------+-----+-------------+



**Data about the products sold by Olist**

In [0]:
product.show()

+--------------------+---------------------+-------------------+--------------------------+------------------+----------------+-----------------+-----------------+----------------+
|          product_id|product_category_name|product_name_lenght|product_description_lenght|product_photos_qty|product_weight_g|product_length_cm|product_height_cm|product_width_cm|
+--------------------+---------------------+-------------------+--------------------------+------------------+----------------+-----------------+-----------------+----------------+
|1e9e8ef04dbcff454...|           perfumaria|                 40|                       287|                 1|             225|               16|               10|              14|
|3aa071139cb16b67c...|                artes|                 44|                       276|                 1|            1000|               30|               18|              20|
|96bd76ec8810374ed...|        esporte_lazer|                 46|                       250|    

In [0]:
product=product.withColumnRenamed('product_name_lenght','product_name_length')\
               .withColumnRenamed('product_description_lenght','product_description_length')
display(product)

product_id,product_category_name,product_name_length,product_description_length,product_photos_qty,product_weight_g,product_length_cm,product_height_cm,product_width_cm
1e9e8ef04dbcff4541ed26657ea517e5,perfumaria,40.0,287.0,1.0,225,16,10,14
3aa071139cb16b67ca9e5dea641aaa2f,artes,44.0,276.0,1.0,1000,30,18,20
96bd76ec8810374ed1b65e291975717f,esporte_lazer,46.0,250.0,1.0,154,18,9,15
cef67bcfe19066a932b7673e239eb23d,bebes,27.0,261.0,1.0,371,26,4,26
9dc1a7de274444849c219cff195d0b71,utilidades_domesticas,37.0,402.0,4.0,625,20,17,13
41d3672d4792049fa1779bb35283ed13,instrumentos_musicais,60.0,745.0,1.0,200,38,5,11
732bd381ad09e530fe0a5f457d81becb,cool_stuff,56.0,1272.0,4.0,18350,70,24,44
2548af3e6e77a690cf3eb6368e9ab61e,moveis_decoracao,56.0,184.0,2.0,900,40,8,40
37cc742be07708b53a98702e77a21a02,eletrodomesticos,57.0,163.0,1.0,400,27,13,17
8c92109888e8cdf9d66dc7e463025574,brinquedos,36.0,1156.0,1.0,600,17,10,12


In [0]:
product.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in product.columns]).show()


+----------+---------------------+-------------------+--------------------------+------------------+----------------+-----------------+-----------------+----------------+
|product_id|product_category_name|product_name_length|product_description_length|product_photos_qty|product_weight_g|product_length_cm|product_height_cm|product_width_cm|
+----------+---------------------+-------------------+--------------------------+------------------+----------------+-----------------+-----------------+----------------+
|         0|                  610|                610|                       610|               610|               2|                2|                2|               2|
+----------+---------------------+-------------------+--------------------------+------------------+----------------+-----------------+-----------------+----------------+



In [0]:
product_data=product.na.drop()
product_data.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in product_data.columns]).show()

+----------+---------------------+-------------------+--------------------------+------------------+----------------+-----------------+-----------------+----------------+
|product_id|product_category_name|product_name_lenght|product_description_lenght|product_photos_qty|product_weight_g|product_length_cm|product_height_cm|product_width_cm|
+----------+---------------------+-------------------+--------------------------+------------------+----------------+-----------------+-----------------+----------------+
|         0|                    0|                  0|                         0|                 0|               0|                0|                0|               0|
+----------+---------------------+-------------------+--------------------------+------------------+----------------+-----------------+-----------------+----------------+



**Product category english translation name**

In [0]:
product_category.show()

+---------------------+-----------------------------+
|product_category_name|product_category_name_english|
+---------------------+-----------------------------+
|         beleza_saude|                health_beauty|
| informatica_acess...|         computers_accesso...|
|           automotivo|                         auto|
|      cama_mesa_banho|               bed_bath_table|
|     moveis_decoracao|              furniture_decor|
|        esporte_lazer|               sports_leisure|
|           perfumaria|                    perfumery|
| utilidades_domest...|                   housewares|
|            telefonia|                    telephony|
|   relogios_presentes|                watches_gifts|
|    alimentos_bebidas|                   food_drink|
|                bebes|                         baby|
|            papelaria|                   stationery|
| tablets_impressao...|         tablets_printing_...|
|           brinquedos|                         toys|
|       telefonia_fixa|     

In [0]:
product_category.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in  product_category.columns]).show()


+---------------------+-----------------------------+
|product_category_name|product_category_name_english|
+---------------------+-----------------------------+
|                    0|                            0|
+---------------------+-----------------------------+



In [0]:
#Finding which category most sold.
df_products = order_items.join(product_data,['product_id'])
df_products = df_products.join(product_category,['product_category_name'])
df_products = df_products.groupBy('product_category_name').count().orderBy('count',ascending=False).where("count > 500")
df_products = df_products.join(product_category, on=['product_category_name'], how='left') 
df_products = df_products .select('product_category_name','product_category_name_english','count')
df_products.show()

+---------------------+-----------------------------+-----+
|product_category_name|product_category_name_english|count|
+---------------------+-----------------------------+-----+
|                bebes|                         baby| 3064|
|     moveis_decoracao|              furniture_decor| 8334|
| construcao_ferram...|         construction_tool...|  929|
|           cool_stuff|                   cool_stuff| 3796|
|     eletrodomesticos|              home_appliances|  771|
|             pet_shop|                     pet_shop| 1947|
|      casa_construcao|            home_construction|  604|
| livros_interesse_...|         books_general_int...|  553|
| instrumentos_musi...|          musical_instruments|  680|
|         beleza_saude|                health_beauty| 9670|
|    moveis_escritorio|             office_furniture| 1691|
|          eletronicos|                  electronics| 2767|
|     malas_acessorios|          luggage_accessories| 1092|
| informatica_acess...|         computer

In [0]:
df_products.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in df_products.columns]).show()

+---------------------+-----------------------------+-----+
|product_category_name|product_category_name_english|count|
+---------------------+-----------------------------+-----+
|                    0|                            0|    0|
+---------------------+-----------------------------+-----+



In [0]:
#Displaying total item in particular product category 
product_data_final=df_products.drop(col('product_category_name'))
product_data_final =product_data_final.select(col("product_category_name_english").alias("Product_Category"), col("count").alias("Total_Item"))
product_data_final=product_data_final.orderBy(col("Total_Item").desc())
product_data_final.show()

+--------------------+----------+
|    Product_Category|Total_Item|
+--------------------+----------+
|      bed_bath_table|     11115|
|       health_beauty|      9670|
|      sports_leisure|      8641|
|     furniture_decor|      8334|
|computers_accesso...|      7827|
|          housewares|      6964|
|       watches_gifts|      5991|
|           telephony|      4545|
|        garden_tools|      4347|
|                auto|      4235|
|                toys|      4117|
|          cool_stuff|      3796|
|           perfumery|      3419|
|                baby|      3064|
|         electronics|      2767|
|          stationery|      2517|
|fashion_bags_acce...|      2031|
|            pet_shop|      1947|
|    office_furniture|      1691|
|      consoles_games|      1137|
+--------------------+----------+
only showing top 20 rows



**Data about sellers and is location**

In [0]:
sellers.show()

+--------------------+----------------------+-----------------+------------+
|           seller_id|seller_zip_code_prefix|      seller_city|seller_state|
+--------------------+----------------------+-----------------+------------+
|3442f8959a84dea7e...|                 13023|         campinas|          SP|
|d1b65fc7debc3361e...|                 13844|       mogi guacu|          SP|
|ce3ad9de960102d06...|                 20031|   rio de janeiro|          RJ|
|c0f3eea2e14555b6f...|                  4195|        sao paulo|          SP|
|51a04a8a6bdcb23de...|                 12914|braganca paulista|          SP|
|c240c4061717ac180...|                 20920|   rio de janeiro|          RJ|
|e49c26c3edfa46d22...|                 55325|           brejao|          PE|
|1b938a7ec6ac5061a...|                 16304|        penapolis|          SP|
|768a86e36ad6aae3d...|                  1529|        sao paulo|          SP|
|ccc4bbb5f32a6ab2b...|                 80310|         curitiba|          PR|

In [0]:
sellers.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in sellers.columns]).show()


+---------+----------------------+-----------+------------+
|seller_id|seller_zip_code_prefix|seller_city|seller_state|
+---------+----------------------+-----------+------------+
|        0|                     0|          0|           0|
+---------+----------------------+-----------+------------+



##Load data  into Azure Data Lake Storage (Transient Container)

In [0]:
output_container_path="dbfs:/mnt/Transient Datalake"

adls_output_folder1 ="dbfs:/mnt/Transient Datalake/olist_order_payments_dataset.csv"

# write csv file in transient container
order_payments.coalesce(1).write.mode("overwrite")\
.option("header", "true")\
.format("com.databricks.spark.csv")\
.save(adls_output_folder1)

# Move file to root location
files1 = dbutils.fs.ls(adls_output_folder1)
output_file1 = [x for x in files1 if x.name.startswith("part-")]
dbutils.fs.mv(output_file1[0].path,output_container_path)
 

In [0]:
output_container_path="dbfs:/mnt/Transient Datalake"

adls_output_folder2 ="dbfs:/mnt/Transient Datalake/olist_geolocation_dataset.csv"

geolocation.coalesce(1).write.mode("overwrite")\
.option("header", "true")\
.format("com.databricks.spark.csv")\
.save(adls_output_folder2)

files2 = dbutils.fs.ls(adls_output_folder2)
output_file2 = [x for x in files2 if x.name.startswith("part-")]
dbutils.fs.mv(output_file2[0].path,output_container_path)

Out[174]: True

In [0]:
output_container_path="dbfs:/mnt/Transient Datalake"

adls_output_folder3 ="dbfs:/mnt/Transient Datalake/olist_orders_dataset.csv"

order_data.coalesce(1).write.mode("overwrite")\
.option("header", "true")\
.format("com.databricks.spark.csv")\
.save(adls_output_folder3)

files3 = dbutils.fs.ls(adls_output_folder3)
output_file3 = [x for x in files3 if x.name.startswith("part-")]
dbutils.fs.mv(output_file3[0].path,output_container_path)

Out[175]: True

In [0]:
output_container_path="dbfs:/mnt/Transient Datalake"

adls_output_folder4 ="dbfs:/mnt/Transient Datalake/olist_order_items_dataset.csv"

order_items.coalesce(1).write.mode("overwrite")\
.option("header", "true")\
.format("com.databricks.spark.csv")\
.save(adls_output_folder4)

files4 = dbutils.fs.ls(adls_output_folder4)
output_file4 = [x for x in files4 if x.name.startswith("part-")]
dbutils.fs.mv(output_file4[0].path,output_container_path)

Out[176]: True

In [0]:
output_container_path="dbfs:/mnt/Transient Datalake"

adls_output_folder5 ="dbfs:/mnt/Transient Datalake/olist_customers_dataset.csv"

customers.coalesce(1).write.mode("overwrite")\
.option("header", "true")\
.format("com.databricks.spark.csv")\
.save(adls_output_folder5)

files5 = dbutils.fs.ls(adls_output_folder5)
output_file5= [x for x in files5 if x.name.startswith("part-")]
dbutils.fs.mv(output_file5[0].path,output_container_path)

Out[177]: True

In [0]:
output_container_path="dbfs:/mnt/Transient Datalake"

adls_output_folder6 ="dbfs:/mnt/Transient Datalake/olist_products_dataset.csv"

product_data.coalesce(1).write.mode("overwrite")\
.option("header", "true")\
.format("com.databricks.spark.csv")\
.save(adls_output_folder6)

files6 = dbutils.fs.ls(adls_output_folder6)
output_file6= [x for x in files6 if x.name.startswith("part-")]
dbutils.fs.mv(output_file6[0].path,output_container_path)

Out[178]: True

In [0]:
output_container_path="dbfs:/mnt/Transient Datalake"

adls_output_folder7 = "dbfs:/mnt/Transient Datalake/dboolist_sellers_dataset.csv"

sellers.coalesce(1).write.mode("overwrite")\
.option("header", "true")\
.format("com.databricks.spark.csv")\
.save(adls_output_folder7)

files7 = dbutils.fs.ls(adls_output_folder7)
output_file7 = [x for x in files7 if x.name.startswith("part-")]
dbutils.fs.mv(output_file7[0].path,output_container_path)

Out[179]: True

In [0]:
output_container_path="dbfs:/mnt/Transient Datalake"

adls_output_folder8 = "dbfs:/mnt/Transient Datalake/product_category_name_translation.csv"

product_category.coalesce(1).write.mode("overwrite")\
.option("header", "true")\
.format("com.databricks.spark.csv")\
.save(adls_output_folder8 )

files8 = dbutils.fs.ls(adls_output_folder8)
output_file8 = [x for x in files8 if x.name.startswith("part-")]
dbutils.fs.mv(output_file8[0].path,output_container_path)

Out[180]: True

In [0]:
output_container_path="dbfs:/mnt/Transient Datalake"

adls_output_folder9 = "dbfs:/mnt/Transient Datalake/product_category_aggr.csv"

product_data_final.coalesce(1).write.mode("overwrite")\
.option("header", "true")\
.format("com.databricks.spark.csv")\
.save(adls_output_folder9)

files9 = dbutils.fs.ls(adls_output_folder9)
output_file9 = [x for x in files9 if x.name.startswith("part-")]
dbutils.fs.mv(output_file9[0].path,output_container_path)

Out[87]: True

##Unmount Azure Data Lake Storage Container

In [0]:
dbutils.fs.unmount("/mnt/dbdemo01")