In [0]:
from pyspark.sql.functions import current_timestamp

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import current_timestamp

spark = SparkSession.builder.appName("Superstore Bronze Ingestion").getOrCreate()

# âœ… Read the schema correctly with all quote/escape rules applied
df = (
    spark.read
        .format("csv")
        .option("header", "true")
        .option("inferSchema", "true")
        .option("multiLine", "true")           # allow multi-line product names
        .option("quote", '"')                  # treat double quotes as text wrappers
        .option("escape", '"')                 # escape embedded quotes correctly
        .option("ignoreLeadingWhiteSpace", "true")
        .option("ignoreTrailingWhiteSpace", "true")
        .load("/Volumes/superstore_databricks_dbt/source/raw/superstore/")
)

df_schema = df.schema


df = (
    spark.readStream
        .format("csv")
        .option("header", "true")
        .option("multiLine", "true")
        .option("quote", '"')
        .option("escape", '"')
        .option("ignoreLeadingWhiteSpace", "true")
        .option("ignoreTrailingWhiteSpace", "true")
        .schema(df_schema)
        .load("/Volumes/superstore_databricks_dbt/source/raw/superstore/")
        .toDF(*[c.strip().replace(" ", "_") for c in df_schema.fieldNames()])
        .withColumn("date_added", current_timestamp())
        .withColumn("date_updated", current_timestamp())
)


df.writeStream \
    .outputMode("append") \
    .format("delta") \
    .option("checkpointLocation", "/Volumes/superstore_databricks_dbt/bronze/checkpoint") \
    .trigger(once=True) \
    .toTable("superstore_databricks_dbt.bronze.superstore")


In [0]:
%sql
describe superstore_databricks_dbt.bronze.superstore 