In [0]:
SPARK - Bronze - The code you have with spark.write.format("delta").mode("append").saveAsTable(...) is just a Spark job, not a DLT pipeline.

from pyspark.sql.functions import current_timestamp
import requests

CATALOG = "bronze"  

# Fully qualified table names
API_TBL = f"{CATALOG}.bronze_api_data"
ADLS_TBL = f"{CATALOG}.bronze_adls_data"
VOLUME_TBL = f"{CATALOG}.bronze_volumes_data"


# API Source → Bronze
url = "https://api.example.com/data"
response = requests.get(url)
data = response.json()
df_api = spark.createDataFrame(data).withColumn("ingestion_date", current_timestamp())
df_api.write.format("delta").mode("append").saveAsTable(API_TBL)


# ADLS Source → Bronze
df_adls = (
    spark.read.format("parquet")
    .load("abfss://container@storageaccount.dfs.core.windows.net/folder/")
    .withColumn("ingestion_date", current_timestamp())
)
df_adls.write.format("delta").mode("append").saveAsTable(ADLS_TBL)


# Volumes / Local File → Bronze
df_volume = (
    spark.read.format("csv")
    .option("header", "true")
    .load("/mnt/volumes/data.csv")
    .withColumn("ingestion_date", current_timestamp())
)
df_volume.write.format("delta").mode("append").saveAsTable(VOLUME_TBL)


In [0]:
import requests
from pyspark.sql.functions import current_timestamp

url = "https://jsonplaceholder.typicode.com/users"

response = requests.get(url)
data = response.json()

df_api = (
    spark.createDataFrame(data)
    .withColumn("ingestion_date", current_timestamp())
)
display(df_api)

#Issue - cluster cannot access the public internet (DNS blocked)

https://jsonplaceholder.typicode.com/users -Download Local -> Upload to Volumes/DBFS -> Ingested to Bronze

In [0]:
from pyspark.sql.functions import current_timestamp, col

INPUT_PATH = "/Volumes/source/default/users/users.json"

BRONZE_TBL = "bronze.default.api_users"

df_raw = spark.read.option("multiline", "true").json(INPUT_PATH)

df_bronze = (
    df_raw
    .select(
        col("id").cast("int").alias("id"),
        col("name").cast("string").alias("name"),
        col("username").cast("string").alias("username"),
        col("email").cast("string").alias("email"),
        col("phone").cast("string").alias("phone"),
        col("website").cast("string").alias("website"),
        col("address.street").cast("string").alias("address_street"),
        col("address.city").cast("string").alias("address_city"),
        col("address.zipcode").cast("string").alias("address_zipcode"),
        col("company.name").cast("string").alias("company_name"),
    )
    .withColumn("ingestion_date", current_timestamp())
)
display(df_bronze)

df_bronze.write.format("delta").mode("append").saveAsTable(BRONZE_TBL)


In [0]:
%sql
select * from bronze.default.api_users;