**This notebook demonstrates how to load data files using PySpark in a loop. It is intended as a reference for efficiently reading multiple files from a filestore and processing them with PySpark.**

In [0]:
%sql
CREATE DATABASE IF NOT EXISTS learning_catalog.football_bronze;

**Step 4:** Loop through files in filestore  
In this step, we iterate over all files located in the specified filestore directory. For each file, we load its contents using PySpark, allowing us to process multiple files efficiently in a single workflow.

In [0]:
import os # Import os module
from pyspark.sql.functions import current_timestamp, lit

# Define the base paths
input_path = "/Volumes/learning_catalog/raw_data/landing/"
bronze_schema_name = "learning_catalog.football_bronze"

# Get a list of all files in the input directory.
file_list = dbutils.fs.ls(input_path) # Returns a list of FileStatus objects

# Loop through each file in list

for file_info in file_list:
    filename = os.path.basename(file_info.path)

    if filename.lower().endswith(".csv"):  # Check if the file is a CSV file
        table_name_suffix = filename.replace(".csv", "")

        # Next construct file paths and table name
        full_input_path = file_info.path
        full_table_name = f"{bronze_schema_name}.raw_{table_name_suffix}"
        
        print(f"Processing file: {full_input_path} -> table: {full_table_name}")

    # Begin try catch to process and load with PySpark
    # Read
    try:
        # Read
        # In production would define schema with StuctType to avoid errors, but for test system just infer
        df = spark.read.format("csv") \
            .option("header", "true") \
            .option("inferSchema", "true") \
            .option("mode", "PERMISSIVE") \
            .load(full_input_path)

        # Check if the '_corrupt_record' column exists before trying to use it
        if "_corrupt_record" in df.columns:
            print(f"Corrupt records found in {filename}! Inspecting...")
            df.filter("_corrupt_record IS NOT NULL").show()

        else:
            # If no corrupt records, proceed
            print(f"No corrupt records found in {filename}.")


        # Transform and add cols for best practice
        df_with_metadata = df.withColumn("loaded_timestamp", current_timestamp()) \
            .withColumn("source_file", lit(filename)) # Write str var into every row

        # Write
        df_with_metadata.write.format("delta") \
            .mode("overwrite") \
            .option("mergeSchema", "true") \
            .saveAsTable(full_table_name)

        print(f"Successfully created Bronze table '{full_table_name}'")

    except Exception as e:   
        print(f"Error processing {filename}: {e}")

print("\nIngestion process complete!")

In [0]:
%sql
SHOW TABLES IN learning_catalog.football_bronze;