In [0]:
# https://learn.microsoft.com/en-us/azure/databricks/delta-live-tables/load

In [0]:
# Permission is based on File or folder based ACL assignments to the Data Lake filesystem (container) . RBAC assignments to the top level Azure Data Lake resource is not required.
# https://docs.databricks.com/storage/azure-storage.html
spark.conf.set("fs.azure.account.auth.type.adls04.dfs.core.windows.net", "OAuth")
spark.conf.set("fs.azure.account.oauth.provider.type.adls04.dfs.core.windows.net", "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider")
spark.conf.set("fs.azure.account.oauth2.client.id.adls04.dfs.core.windows.net", dbutils.secrets.get("myscope", key="clientid"))
spark.conf.set("fs.azure.account.oauth2.client.secret.adls04.dfs.core.windows.net", dbutils.secrets.get("myscope", key="clientsecret"))
spark.conf.set("fs.azure.account.oauth2.client.endpoint.adls04.dfs.core.windows.net", "https://login.microsoftonline.com/{}/oauth2/token".format(dbutils.secrets.get("myscope", key="tenantid")))

In [0]:
# Set ingestion options and set schema variables

username = spark.sql("SELECT regexp_replace(current_user(), '[^a-zA-Z0-9]', '_')").first()[0]
checkpoint_path= "abfss://unity-catalog@adls04.dfs.core.windows.net/jsondata/_checkpoint"
schema_location = "abfss://unity-catalog@adls04.dfs.core.windows.net/jsondata/_schematracking"
source_path = "dbfs:/databricks-datasets/nyctaxi/sample/json/"
target_path = "abfss://unity-catalog@adls04.dfs.core.windows.net/jsondata/output"
table_name = f"{username}_nytaxi"
# spark.sql(f"DROP TABLE IF EXISTS {table_name}")
# dbutils.fs.rm(checkpoint_path, True)
# dbutils.fs.rm(target_path, True)
# dbutils.fs.rm(schema_location, True)

options = {
    "cloudFiles.format": "json",
    "cloudFiles.inferColumnTypes": True,
    "cloudFiles.inferSchema": True,
    "cloudFiles.schemaLocation": schema_location,
}

In [0]:
import dlt

# UC enabled DLT bronze ingestion with schema inference
@dlt.table(table_properties={"quality": "bronze"})
def table_name():
    return (
        spark.readStream.format("cloudFiles")
        .options(**options)
        .option("cloudFiles.schemaLocation", schema_location)  # Specify the schema location
        .load(source_path)
    )

Name,Type
DOLocationID,bigint
PULocationID,bigint
RatecodeID,bigint
VendorID,bigint
congestion_surcharge,double
extra,double
fare_amount,double
improvement_surcharge,double
mta_tax,double
passenger_count,bigint
