In [None]:
#Load, Transform, Persist Pipeline

#1-mount the data lakes
#2-loads csvs from landing data lake
#3-convert csvs to parquet and move then to processing data lake
#4-create sql database
#5-create tables based on parquet format files
#6-specific analysis wil be moved to curated data lake and then loaded into sql tables
#7-powerbi application reads directly from sql tables at databricks rest api service


# Mounting Data lakes

In [None]:
# Mounting Data Lakes
# Unmount the existing mount if it exists (to avoid conflicts)
#dbutils.fs.unmount(f"/mnt/{bucket_name1}") 


In [None]:
# Define Azure storage and authentication details
client_id = "ee65398e-3e66-4e98-b8fa-348798024231"
scope_config = "olist_scope1"
key_vault = "olist-secret1"
directory_id = "a23e0519-184a-4922-b08d-96f35c444623"
url_storage_account = "oliststorageaccount"
bucket_name1="landing"
bucket_name2="processing"
bucket_name3="curated"

In [None]:
# Configuration using variables for mounting the data lakes
configs = {
    "fs.azure.account.auth.type": "OAuth",
    "fs.azure.account.oauth.provider.type": "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider",
    "fs.azure.account.oauth2.client.id": client_id,
    "fs.azure.account.oauth2.client.secret": dbutils.secrets.get(scope=scope_config, key=key_vault),
    "fs.azure.account.oauth2.client.endpoint": f"https://login.microsoftonline.com/{directory_id}/oauth2/token"
}


In [None]:
# Mount the landing data lake
"""
# Montando o ponto de montagem
dbutils.fs.mount(
    source=f"abfss://{bucket_name1}@{url_storage_account}.dfs.core.windows.net/",
    mount_point=f"/mnt/{bucket_name1}",
    extra_configs=configs
)
"""

In [None]:
# List files in the landing data lake to verify the mount
dbutils.fs.ls(f"/mnt/{bucket_name1}/")


In [None]:
# Unmount the existing mount if it exists (to avoid conflicts)
"""
#Example: error device already monted
dbutils.fs.unmount(f"/mnt/{bucket_name2}")
"""

In [None]:
# Mount the processing data lake
"""
# Montando o ponto de montagem
dbutils.fs.mount(
    source=f"abfss://{bucket_name2}@{url_storage_account}.dfs.core.windows.net/",
    mount_point=f"/mnt/{bucket_name2}",
    extra_configs=configs
)
"""



In [None]:
# List files in the processing data lake to verify the mount
"""
dbutils.fs.ls(f"/mnt/{bucket_name2}")
"""


In [None]:
# Unmount the existing mount if it exists (to avoid conflicts)
"""
dbutils.fs.unmount(f"/mnt/{bucket_name3}")
"""


In [None]:
# Mount the curated data lake
"""
dbutils.fs.mount(
  source = f"abfss://{bucket_name3}@{url_storage_account}.dfs.core.windows.net/",
  mount_point = f"/mnt/{bucket_name3}",
  extra_configs = configs)
"""


In [None]:
# List files in the curated data lake to verify the mount
dbutils.fs.ls(f"/mnt/{bucket_name3}")



# Readings CSVs in Landing Data Lake to DataFrames

In [None]:
# Function to read data from CSV and return a DataFrame
def read_data(format, inferSchema, header, delimiter, path_to_load):
    return spark.read.format(format) \
        .option("inferSchema", inferSchema) \
        .option("header", header) \
        .option("delimiter", delimiter) \
        .load(path_to_load)

In [None]:
# List of files in the landing data lake
df_csv_list = dbutils.fs.ls(f"/mnt/{bucket_name1}/")

# Reading parameters
format = "csv"
inferSchema = "true"
header = "true"
delimiter = ","

# Dictionary to store dataframes
dataframes = {}

In [None]:
# Iterating over the files and storing them in dataframes
for file_info in df_csv_list:
    file_path = file_info.path
    file_name = file_path.split('/')[-1]  
    dataframe_name = "df_" + file_name.split('_', 1)[1].replace('_dataset.csv', '').replace('.csv', '')
    
    print(f"Readed the path: {file_path} and created the dataframe: {dataframe_name}")
    
    # Create the dataframe by calling the read_data function with provided parameters
    dataframe = read_data(format, inferSchema, header, delimiter, file_path)
    
    # Store the dataframe in the dictionary with the key being the dataframe name
    dataframes[dataframe_name] = dataframe
    
    print(f"Dataframe {dataframe_name}:")
    display(dataframes[dataframe_name])
    print(f"{dataframe_name} schema:")
    dataframes[dataframe_name].printSchema()




In [None]:
# Display a specific dataframe to verify
dataframes['df_customers'].show()

# Create SQL Temp Views

In [None]:

# Define the database name
database_name = 'customers_db'

# List all tables in the database
tables = spark.sql(f"SHOW TABLES IN {database_name}")

# Iterate over the tables and drop each one to start fresh
for row in tables.collect():
    table_name = row.tableName
    drop_table_sql = f"DROP TABLE IF EXISTS {database_name}.{table_name}"
    spark.sql(drop_table_sql)
    print(f"Dropped table: {database_name}.{table_name}")


In [None]:
# Create views from dataframes
items_list = list(dataframes.items())
for name, df in items_list:
    view_name = name.replace('df_', '')
    print(f"Creating view for DataFrame: {name}")
    view_name = f"view_{view_name}"  
    dataframes[name].createOrReplaceTempView(view_name)
    print(f"View created: {view_name}")


In [None]:
# List all temporary views
temp_views = spark.catalog.listTables()
for view in temp_views:
    print(view.name)


In [None]:
# Query a specific view to verify
%sql
SELECT *
FROM view_customers

Create SQL Database

In [None]:
# Create SQL Database
%sql
CREATE DATABASE IF NOT EXISTS customers_db

# Create SQL Tables

In [None]:
# List of files in landing data lake
df_csv_list = dbutils.fs.ls(f"/mnt/{bucket_name1}/")

# Iterate over each CSV file
for file_info in df_csv_list:
    file_path = file_info.path
    file_name = file_path.split('/')[-1]  # Obtém o nome do arquivo
    table_name = file_name.split('_', 1)[1].replace('_dataset.csv', '').replace('.csv', '')

    # Create table in Spark SQL
    create_table_sql = f"""
    CREATE TABLE IF NOT EXISTS customers_db.{table_name}
    USING CSV
    OPTIONS (
        path '{file_path}',
        header 'true',
        inferSchema 'true'
    )
    """
    
    # Execute the SQL command to create the table
    spark.sql(create_table_sql)
    print(f"Table created for {file_name} as {table_name}")


In [None]:
# Verify if the tables are created
spark.sql("SHOW TABLES IN customers_db").show()

In [None]:
# Query the table to view its content
%sql
SELECT COUNT(*) FROM customers_db.customers

In [None]:
# Query the table to view its content
SELECT *
FROM customers_db.customers

In [None]:
# Describe the table to view its schema
%sql
DESCRIBE customers_db.customers

In [None]:
# Filtering the DataSet
df_customers_SQL = spark.table('customers_db.customers')
display(df_customers_SQL)

# Filtering the DataSet

In [None]:
df_customers_SQL.select('customer_state').distinct().show()

In [None]:
from pyspark.sql.functions import col
df_customers_SQL = df_customers_SQL.filter(col("customer_state") == "RJ")

In [None]:
display(df_customers_SQL)

# Write Full Parquet Datasets to Processing Data lake

In [None]:
items_list = list(dataframes.items())

for name, df in items_list:
    print(f"Writing dataset {name} in processing zone")
    name_file = name.replace('df_','')
    dataframes[name].write.mode("overwrite").parquet(f"/mnt/processing/{name_file}.parquet")
    print(f"File processed: {name_file}")

# Write Filtered Parquet to Processing Data Lake

In [None]:
df_customers_SQL.write.mode("overwrite").parquet("/mnt/processing/customers_RJ.parquet")


In [None]:
df_customers_parq = spark.read.parquet("/mnt/processing/customers_RJ.parquet")
display(df_customers_parq)

In [None]:
df_customers_parq.createOrReplaceTempView("CustomersParquetTable")
custparkSQL = spark.sql("select * from CustomersParquetTable where customer_state = 'RJ'")
display(custparkSQL)

# Create SQL Tables based on Parquet files at Processing Data Lake

In [None]:
file_info.name

In [None]:
# List of files in processing data lake
df_csv_list = dbutils.fs.ls(f"/mnt/{bucket_name2}/")

# Iterate over each Parquet file
for file_info in df_csv_list:
    file_path = file_info.path
    file_name = file_info.name  # Obtém o nome do arquivo
    table_name = file_name.replace('.parquet/', '_pqt')

    # Create table in Spark SQL
    create_table_sql = f"""
    CREATE TABLE IF NOT EXISTS customers_db.{table_name}
    USING PARQUET
    OPTIONS (
        path '{file_path}',
        header 'true',
        inferSchema 'true'
    )
    """

    print(create_table_sql)
    
    # Execute the SQL command to create the table
    spark.sql(create_table_sql)
    print(f"Table created for {file_name} as {table_name}")

In [None]:
%sql
REFRESH TABLE customers_db.customers_RJ_pqt

In [None]:
%sql
SELECT * from customers_db.customers_RJ_pqt

In [None]:
df_customers_parq = spark.read.parquet("/mnt/processing/customers_RJ.parquet")
df_customers_parq.createOrReplaceTempView("CustomersParquetTableByState")
df_customers_by_state_parq = spark.sql("select * from CustomersParquetTableByState where customer_state='RJ'")
display(df_customers_by_state_parq)

In [None]:
display(df_customers_parq)

# Write processed CSVs to Curated Data Lake

In [None]:
df_customers_parq.write.option("header",True).option("delimiter",",").mode("overwrite").csv("/mnt/curated/customers_RJ.csv")

# Test Reading CSV file located at Curated Data Lake

In [None]:
#read in the data to dataframe df
df_RJ = spark.read.format("csv").option("inferSchema", "true").option("header","true").option("delimiter",",").load("/mnt/curated/customers_RJ.csv")
 
#display the dataframe
display(df_RJ)

In [None]:
%sql
-- Filtered Curated CSV
CREATE TABLE IF NOT EXISTS customers_db.customers_RJ_csv 
USING CSV
LOCATION '/mnt/curated/customers_RJ.csv'
OPTIONS (header "true", inferSchema "true")


In [None]:
%sql
REFRESH TABLE customers_db.customers_RJ_csv

In [None]:
%sql
Select * from customers_db.customers_RJ_csv 