In [8]:
import pyspark
from delta import *
from pyspark.sql.types import *
from delta.tables import *
from pyspark.sql.functions import *
from notebookutils import mssparkutils

StatementMeta(, b7a42965-6d17-4f77-8815-daff4672aecd, 10, Finished, Available, Finished)

In [9]:
# Define the base directory
main_directory = "abfss://Fabriclearn@onelake.dfs.fabric.microsoft.com/LH_test_WWI.Lakehouse/Files/TempFolder"
lakehouse_name= 'LH_test_WWI'
schema_name = 'dbo'
# path for tables
delta_table_path = "abfss://Fabriclearn@onelake.dfs.fabric.microsoft.com/LH_test_WWI.Lakehouse/Files/Tables"

StatementMeta(, b7a42965-6d17-4f77-8815-daff4672aecd, 11, Finished, Available, Finished)

In [10]:
# List all folders (table names) in TempFolder
folders = mssparkutils.fs.ls(main_directory)

# Filter the items to include only directories (folders)
folders = [item for item in folders if item.isDir]

# Count the number of folders
num_folders = len(folders)

# Print the result
print(f"Number of folders: {num_folders}")

StatementMeta(, b7a42965-6d17-4f77-8815-daff4672aecd, 12, Finished, Available, Finished)

Number of folders: 31


In [12]:
folders[0:2]

StatementMeta(, b7a42965-6d17-4f77-8815-daff4672aecd, 14, Finished, Available, Finished)

[FileInfo(path=abfss://Fabriclearn@onelake.dfs.fabric.microsoft.com/LH_test_WWI.Lakehouse/Files/TempFolder/Application.Cities, name=Application.Cities, size=0),
 FileInfo(path=abfss://Fabriclearn@onelake.dfs.fabric.microsoft.com/LH_test_WWI.Lakehouse/Files/TempFolder/Application.Countries, name=Application.Countries, size=0)]

In [11]:
def process_parquet_and_create_delta_table(latest_file, lakehouse_name, folder_full_name):
    

    # Create a temporary view from the latest Parquet data
    spark.read.parquet(latest_file.path).createOrReplaceTempView(f'tmpSrc_{folder_full_name}')

    # Define the location for the Delta table with "lakehouse_name.schema.table" format
    delta_table_loc = f"abfss://{lakehouse_name}@onelake.dfs.fabric.microsoft.com/LH_test_WWI.Lakehouse/Tables/{schema_name}/{folder_full_name}"

    # Check if the Delta table exists
    if not DeltaTable.isDeltaTable(spark, delta_table_loc):
        try:
            # Drop the existing table if it exists
            spark.sql(f"DROP TABLE IF EXISTS {lakehouse_name}.{schema_name}.{folder_full_name}")
            print(f"Cleared Target Location for {lakehouse_name}.{schema_name}.{folder_full_name}")
        except Exception as e:
            print(f"Error dropping the table for {lakehouse_name}.{schema_name}.{folder_full_name}: {e}")
        
        # Create a new Delta table from the temporary view with "schema.table" format
        spark.sql(f"CREATE TABLE {lakehouse_name}.{schema_name}.{folder_full_name} AS SELECT * FROM tmpSrc_{folder_full_name}")
    else:
        # Full truncate (delete all existing data) and insert new data
        print(f"Performing full truncate and load for {folder_full_name}.")
        spark.sql(f"DELETE FROM {lakehouse_name}.{schema_name}.{folder_full_name}")
        DeltaTable.forName(spark, delta_table_loc).alias("d") \
            .merge(spark.read.parquet(latest_file.path).alias("m"), '1==1') \
            .whenNotMatchedInsertAll() \
            .execute()


StatementMeta(, b7a42965-6d17-4f77-8815-daff4672aecd, 13, Finished, Available, Finished)

In [13]:
# Iterate through each folder
for folder in folders:
    if folder.isDir:  # Check if it's a directory
        table_path = folder.path  # Full path to the folder
        folder_full_name  = folder.name.replace('.', '_')  # Get the folder (table) name
      
        print('-----------------------------------------------')
        print(f"Processing table: {folder_full_name}")
        print('-----------------------------------------------')


        # List all files in the current table's folder
        files = mssparkutils.fs.ls(table_path)
        # Initialize variables to track the latest Parquet file
        latest_file = None
        latest_timestamp = None
        
        for file in files:
            if file.name.endswith(".parquet"):  # Check if the file is a Parquet file
                #print(f"Found Parquet file: {file.name} at {file.path}")
            # Get the last modified timestamp of the file
               file_timestamp = file.modifyTime

               # Update the latest file if the current file is newer
               if latest_timestamp is None or file_timestamp > latest_timestamp:
                   latest_file = file
                   latest_timestamp = file_timestamp
        # Check if no Parquet file was found
        if latest_file is None:
               print(f"No Parquet file found in the folder: {folder_full_name}")
               continue  # Move to the next folder
        print(f"FOund latest file {latest_file} ")
        print()
        print(f"latest time {latest_timestamp}")

        

        # Now process the latest Parquet file
        #print(f"Processing latest Parquet file: {latest_file.name} at {latest_file.path}")
        process_parquet_and_create_delta_table(latest_file, lakehouse_name,folder_full_name)





StatementMeta(, b7a42965-6d17-4f77-8815-daff4672aecd, 15, Finished, Available, Finished)

-----------------------------------------------
Processing table: Application_Cities
-----------------------------------------------
FOund latest file FileInfo(path=abfss://Fabriclearn@onelake.dfs.fabric.microsoft.com/LH_test_WWI.Lakehouse/Files/TempFolder/Application.Cities/data_92cb0915-b87c-4544-a14d-c049138d652d_cfed3bcd-2b9c-4aac-9a23-f0cda2eeab64.parquet, name=data_92cb0915-b87c-4544-a14d-c049138d652d_cfed3bcd-2b9c-4aac-9a23-f0cda2eeab64.parquet, size=1176075) 

latest time 1733209800639
Cleared Target Location for LH_test_WWI.dbo.Application_Cities
-----------------------------------------------
Processing table: Application_Countries
-----------------------------------------------
FOund latest file FileInfo(path=abfss://Fabriclearn@onelake.dfs.fabric.microsoft.com/LH_test_WWI.Lakehouse/Files/TempFolder/Application.Countries/data_d9ce90ee-df3b-44f3-a97d-e0a88971fdf7_90b5118d-37f7-4906-8459-0393f0925af2.parquet, name=data_d9ce90ee-df3b-44f3-a97d-e0a88971fdf7_90b5118d-37f7-4906-84

In [16]:
# Define the base path to your Lakehouse tables section
lakehouse_tables_path = "abfss://Fabriclearn@onelake.dfs.fabric.microsoft.com/LH_test_WWI.Lakehouse/Tables/dbo"

# List all tables in the Lakehouse tables section
tables = mssparkutils.fs.ls(lakehouse_tables_path)

# Initialize a counter for Delta tables
delta_table_count = 0

# Iterate over the tables and check if they are Delta tables
for table in tables:
    table_path = table.path
    
    try:
        # Check if it's a Delta table
        if DeltaTable.isDeltaTable(spark, table_path):
            delta_table_count += 1
    except Exception as e:
        # Handle any exception or non-Delta tables
        print(f"Error checking table {table.path}: {str(e)}")

# Print the total count of Delta tables
print(f"Total number of Delta tables: {delta_table_count}")


StatementMeta(, b7a42965-6d17-4f77-8815-daff4672aecd, 18, Finished, Available, Finished)

Total number of Delta tables: 2


In [17]:
df = spark.sql("SELECT count(*) FROM LH_test_WWI.dbo.application_cities")
df.show()

StatementMeta(, b7a42965-6d17-4f77-8815-daff4672aecd, 19, Finished, Available, Finished)

+--------+
|count(1)|
+--------+
|   37940|
+--------+

