In [0]:
#Change to test dev pipeline1
# 1. Create a widget to get the environment (dev or prod)
dbutils.widgets.text("env", "dev", "Environment")

# 2. Get the value from the widget
env = dbutils.widgets.get("env")

# 3. Set your catalog and schema names based on the environment
catalog_name = "jet_engine_predictive_maintenance"
schema_name = f"{env}" # This will become "dev" or "prod"
data_folder_path = "./data"

print(f"Running for environment: '{env}'")
print(f"Target schema: '{catalog_name}.{schema_name}'")


spark.sql(f"CREATE CATALOG IF NOT EXISTS{catalog_name}")
spark.sql(f"CREATE SCHEMA IF NOT EXISTS{schema_name}")

abs_data_folder_path = os.path.abspath(data_folder_path)

os.listdir(abs_data_folder_path)
csv_files = [f for f in all_files if f.endswith('.csv')]

if not csv_files:
    print("No CSV files found in the specified directory.")
else:
    for csv_file in csv_files:
        file_path = os.path.join(abs_data_folder_path, csv_file)
        table_name = os.path.splittext(csv_file)[0].replace('-','_')
        full_table_path = f"{catalog_name}.{schema_name}.{table_name}"

# Read the CSV into a Spark DataFrame
        # option("header", "true"): Uses the first row as column headers
        # option("inferSchema", "true"): Automatically detects data types
        df = spark.read.format("csv") \
                      .option("header", "true") \
                      .option("inferSchema", "true") \
                      .load(f"file:{file_path}") # 'file:' prefix is needed to read from local repo files

        # Save the DataFrame as a table in Unity Catalog
        # mode("overwrite"): Replaces the table if it already exists
        df.write.mode("overwrite").saveAsTable(full_table_path)

        print(f"âœ… Successfully created table '{table_name}'.")

print("\n--- Ingestion complete! ---")