In [0]:
dbutils.fs.ls('mnt/landing/')

[FileInfo(path='dbfs:/mnt/landing/Location/', name='Location/', size=0, modificationTime=1728919981000),
 FileInfo(path='dbfs:/mnt/landing/Payment/', name='Payment/', size=0, modificationTime=1728919981000),
 FileInfo(path='dbfs:/mnt/landing/PaymentMethod/', name='PaymentMethod/', size=0, modificationTime=1728919971000),
 FileInfo(path='dbfs:/mnt/landing/PaymentStatus/', name='PaymentStatus/', size=0, modificationTime=1728919976000),
 FileInfo(path='dbfs:/mnt/landing/Request/', name='Request/', size=0, modificationTime=1728919978000),
 FileInfo(path='dbfs:/mnt/landing/Trip/', name='Trip/', size=0, modificationTime=1728919984000),
 FileInfo(path='dbfs:/mnt/landing/User/', name='User/', size=0, modificationTime=1728920560000),
 FileInfo(path='dbfs:/mnt/landing/VehicleMakes/', name='VehicleMakes/', size=0, modificationTime=1728919979000),
 FileInfo(path='dbfs:/mnt/landing/Vehicles/', name='Vehicles/', size=0, modificationTime=1728919978000)]

In [0]:
dbutils.fs.ls('mnt/bronze/')

[FileInfo(path='dbfs:/mnt/bronze/Location/', name='Location/', size=0, modificationTime=1728928680000),
 FileInfo(path='dbfs:/mnt/bronze/Payment/', name='Payment/', size=0, modificationTime=1728928695000),
 FileInfo(path='dbfs:/mnt/bronze/PaymentMethod/', name='PaymentMethod/', size=0, modificationTime=1728928699000),
 FileInfo(path='dbfs:/mnt/bronze/PaymentStatus/', name='PaymentStatus/', size=0, modificationTime=1728928702000),
 FileInfo(path='dbfs:/mnt/bronze/Request/', name='Request/', size=0, modificationTime=1728928704000),
 FileInfo(path='dbfs:/mnt/bronze/Trip/', name='Trip/', size=0, modificationTime=1728928709000),
 FileInfo(path='dbfs:/mnt/bronze/User/', name='User/', size=0, modificationTime=1728928717000),
 FileInfo(path='dbfs:/mnt/bronze/VehicleMakes/', name='VehicleMakes/', size=0, modificationTime=1728928722000),
 FileInfo(path='dbfs:/mnt/bronze/Vehicles/', name='Vehicles/', size=0, modificationTime=1728928724000)]

In [0]:
table_names = []
for i in dbutils.fs.ls('mnt/landing/'):
    table_names.append(i.name.split('/')[0])


In [0]:
table_names

['Location',
 'Payment',
 'PaymentMethod',
 'PaymentStatus',
 'Request',
 'Trip',
 'User',
 'VehicleMakes',
 'Vehicles']

In [0]:
# Function to load CSV files into DataFrames
def load_parquet_files(table_names):
    """
    This function takes a list of CSV file names and loads each CSV into a Spark DataFrame.

    Args:
        file_names (list): List of CSV filenames to be loaded.
        landing_location (str): Path where the CSV files are located.

    Returns:
        dict: A dictionary containing DataFrames where the key is the file name and the value is the DataFrame.
    """
    # Create an empty dictionary to hold the DataFrames
    dataframes = {}
    # Loop over the filenames and load each CSV into a DataFrame
    for file in table_names:
        # Remove the ".csv" extension and use it as the DataFrame key
        df_name = file.split(".parquet")[0]
        # Load the CSV into a DataFrame and store it in the dictionary
        path = f"/mnt/landing/{file}/{file}.parquet"
        dataframes[df_name] = spark.read.format('parquet').load(path)
        
    return dataframes




In [0]:

# Load the CSV files into DataFrames
dataframes = load_parquet_files(table_names)

# Loop over the dataframes dictionary to assign each DataFrame to a global variable
for df_name, df in dataframes.items():
    globals()[df_name] = df



In [0]:
# Loop through each DataFrame in the dictionary and display its content
for df_name, df in dataframes.items():
    print(f"Showing first 5 rows of {df_name}:")
    df.show(5)
    df.printSchema()

Showing first 5 rows of Location:
+----------+------------+-----------+
|LocationID|   Longitude|   Latitude|
+----------+------------+-----------+
|     76173|-73.99945068|40.72192383|
|    567607|-73.77745056|40.64664841|
|    498838|-73.95517731|40.76498795|
|    138392|-73.99216461|40.72513962|
|    532266|-73.94660187|40.77573395|
+----------+------------+-----------+
only showing top 5 rows

root
 |-- LocationID: integer (nullable = true)
 |-- Longitude: decimal(12,8) (nullable = true)
 |-- Latitude: decimal(12,8) (nullable = true)

Showing first 5 rows of Payment:
+---------+---------------+---------------+
|PaymentID|PaymentMethodID|PaymentStatusID|
+---------+---------------+---------------+
|        6|              6|              4|
|        6|              6|              2|
|        3|              3|              1|
|        6|              6|              4|
|        6|              6|              1|
+---------+---------------+---------------+
only showing top 5 rows

r

In [0]:
from pyspark.sql import *
from pyspark.sql.functions import *
def add_date_columns(dataframes):
    """
    This function takes a dictionary of DataFrames and adds 'processing_date' and 'modification_date' columns
    to each DataFrame.

    Args:
        dataframes (dict): A dictionary containing DataFrames.
    """
    processing_date = date_trunc('second', current_timestamp())

    for df_name, df in dataframes.items():
        # Add 'processing_date' and 'modification_date' columns
        #df = df.withcolumn("_pipeline_run_id", lit(dbutils.widgets.get('_pipeline_run_id')))
        df = df.withColumn("_processing_date", processing_date) \
                .withColumn("_input_filename", input_file_name()) \
                .withColumn("_input_file_modification_date", col("_metadata.file_modification_time"))
        
        # Update the DataFrame in the dictionary
        dataframes[df_name] = df



In [0]:

# Call the function to add date columns to all DataFrames
add_date_columns(dataframes)
# Loop over the dataframes dictionary to assign each DataFrame to a global variable
for df_name, df in dataframes.items():
    globals()[df_name] = df


In [0]:
from delta.tables import DeltaTable

def write_to_delta(dataframes):
    """
    This function checks if each table exists in Delta format at the specified location. 
    If the table exists, it appends the data; otherwise, it creates a new Delta table.

    Args:
        dataframes (dict): Dictionary of DataFrames.
        bronze_location (str): The base path where Delta tables will be stored.
    """
    for df_name, df in dataframes.items():
        delta_table_path = f"/mnt/bronze/{df_name}/"
        # Check if the Delta table exists
        if DeltaTable.isDeltaTable(spark, delta_table_path):
            print(f"Table {df_name} already exists, appending data.")
            df.write.mode("append").format("delta").save(delta_table_path)
        else:
            print(f"Creating new table for {df_name}.")
            df.write.mode("overwrite").format("delta").save(delta_table_path)


In [0]:
write_to_delta(dataframes)


Table Location already exists, appending data.
Table Payment already exists, appending data.
Table PaymentMethod already exists, appending data.
Table PaymentStatus already exists, appending data.
Table Request already exists, appending data.
Table Trip already exists, appending data.
Table User already exists, appending data.
Table VehicleMakes already exists, appending data.
Table Vehicles already exists, appending data.


In [0]:
# After your write_to_delta function
for file in table_names:
    table_name = f"bronze.{file.split('.parquet')[0]}"
    print(f"Creating external table: {table_name}")
    # Creating schema and external table in Spark SQL
    spark.sql("CREATE SCHEMA IF NOT EXISTS bronze")
    # Set the current database to 'bronze'
    spark.sql("USE bronze")
    delta_table_path = f"/mnt/bronze/{file.split('.parquet')[0]}/"
    spark.sql(f"CREATE EXTERNAL TABLE IF NOT EXISTS {table_name} USING DELTA LOCATION '{delta_table_path}'")
    #spark.sql(f"SELECT * FROM location_df_raw").show()


Creating external table: bronze.Location
Creating external table: bronze.Payment
Creating external table: bronze.PaymentMethod
Creating external table: bronze.PaymentStatus
Creating external table: bronze.Request
Creating external table: bronze.Trip
Creating external table: bronze.User
Creating external table: bronze.VehicleMakes
Creating external table: bronze.Vehicles


In [0]:
# Show all tables in the 'bronze' schema
spark.sql("SHOW TABLES IN bronze").show()


+--------+-------------+-----------+
|database|    tableName|isTemporary|
+--------+-------------+-----------+
|  bronze|     location|      false|
|  bronze|      payment|      false|
|  bronze|paymentmethod|      false|
|  bronze|paymentstatus|      false|
|  bronze|      request|      false|
|  bronze|         trip|      false|
|  bronze|         user|      false|
|  bronze| vehiclemakes|      false|
|  bronze|     vehicles|      false|
+--------+-------------+-----------+



In [0]:

# Verify the current schema
print("Current Schema:")
spark.sql("SELECT current_database()").show()

# List all tables in the bronze schema
print("Tables in bronze schema:")
spark.sql("SHOW TABLES IN bronze").show()
spark.sql("SELECT * FROM bronze.location LIMIT 1").show()
spark.sql("SELECT * FROM bronze.payment LIMIT 1").show()





Current Schema:
+----------------+
|current_schema()|
+----------------+
|          bronze|
+----------------+

Tables in bronze schema:
+--------+-------------+-----------+
|database|    tableName|isTemporary|
+--------+-------------+-----------+
|  bronze|     location|      false|
|  bronze|      payment|      false|
|  bronze|paymentmethod|      false|
|  bronze|paymentstatus|      false|
|  bronze|      request|      false|
|  bronze|         trip|      false|
|  bronze|         user|      false|
|  bronze| vehiclemakes|      false|
|  bronze|     vehicles|      false|
+--------+-------------+-----------+

+----------+------------+-----------+-------------------+--------------------+-----------------------------+
|LocationID|   Longitude|   Latitude|   _processing_date|     _input_filename|_input_file_modification_date|
+----------+------------+-----------+-------------------+--------------------+-----------------------------+
|     76173|-73.99945068|40.72192383|2024-10-14 17:58:

In [0]:
spark.sql("SELECT _processing_date, count(*) cnt FROM bronze.location GROUP BY _processing_date").show()

+-------------------+-------+
|   _processing_date|    cnt|
+-------------------+-------+
|2024-10-14 17:58:01|1148663|
|2024-10-14 18:20:42|1148663|
|2024-10-15 00:11:46|1148663|
+-------------------+-------+



In [0]:
spark.sql("select * from bronze.location").show()

+----------+------------+-----------+-------------------+--------------------+-----------------------------+
|LocationID|   Longitude|   Latitude|   _processing_date|     _input_filename|_input_file_modification_date|
+----------+------------+-----------+-------------------+--------------------+-----------------------------+
|     76173|-73.99945068|40.72192383|2024-10-14 17:58:01|dbfs:/mnt/landing...|          2024-10-14 15:47:38|
|    567607|-73.77745056|40.64664841|2024-10-14 17:58:01|dbfs:/mnt/landing...|          2024-10-14 15:47:38|
|    498838|-73.95517731|40.76498795|2024-10-14 17:58:01|dbfs:/mnt/landing...|          2024-10-14 15:47:38|
|    138392|-73.99216461|40.72513962|2024-10-14 17:58:01|dbfs:/mnt/landing...|          2024-10-14 15:47:38|
|    532266|-73.94660187|40.77573395|2024-10-14 17:58:01|dbfs:/mnt/landing...|          2024-10-14 15:47:38|
|    144884|-73.99170685|40.76995850|2024-10-14 17:58:01|dbfs:/mnt/landing...|          2024-10-14 15:47:38|
|     60322|-74.001

In [0]:

# Dictionary to store the DeltaTable objects
delta_tables = {}

for file in table_names:
    file_name = file.split('.parquet')[0]  # Extract the file name (without .csv extension)
    table_name = f"bronze.{file_name}"  # Define the table name in the bronze schema
    # Store the DeltaTable object in the dictionary using the file name as the key
    delta_tables[file_name] = DeltaTable.forName(spark, table_name)

    # Vacuum the table if it has not been vacuumed in the last 30 days.
    if delta_tables[file_name].history(30).filter("operation = 'VACUUM START'").count() == 0:
        delta_tables[file_name].optimize()
        delta_tables[file_name].vacuum()  # Default = 7 days



In [0]:
spark.sql("SELECT COUNT(*) FROM bronze.location").show()


+--------+
|count(1)|
+--------+
| 3445989|
+--------+

