In [0]:
# Import functions
from pyspark.sql.functions import col, current_timestamp

# Define variables used in code below
file_path = "/databricks-datasets/structured-streaming/events"
username = spark.sql("SELECT regexp_replace(current_user(), '[^a-zA-Z0-9]', '_')").first()[0]
table_name = f"{username}_etl_quickstart"
checkpoint_path = f"/tmp/{username}/_checkpoint/etl_quickstart"

# Clear out data from previous demo execution
spark.sql(f"DROP TABLE IF EXISTS {table_name}")
dbutils.fs.rm(checkpoint_path, True)

# Configure Auto Loader to ingest JSON data to a Delta table
(spark.readStream
  .format("cloudFiles")
  .option("cloudFiles.format", "json")
  .option("cloudFiles.schemaLocation", checkpoint_path)
  .load(file_path)
  .drop("_rescued_data") # clean the dataset, data transformations
  .select("*", col("_metadata.file_path").alias("source_file"), current_timestamp().alias("processing_time"))
  .writeStream
  .option("checkpointLocation", checkpoint_path)
  .trigger(availableNow=True)
  .toTable(table_name))

<pyspark.sql.streaming.query.StreamingQuery at 0x7fbe4c433370>

In [0]:
# Add data validation checks after data is loaded into the table
# Perform a count check to ensure data has been loaded
loaded_data_count = spark.sql(f"SELECT count(*) FROM {table_name}").first()[0]
assert loaded_data_count > 0, "Data load failed, table is empty."

In [0]:
from pyspark.sql.functions import input_file_name, current_timestamp

# Clear out data from the previous demo execution
dbutils.fs.rm(checkpoint_path, True)

# Read data using Structured Streaming and add source file path and processing time as metadata
streamingDF = (spark.readStream
               .format("cloudFiles")
               .option("cloudFiles.format", "json")
               .option("cloudFiles.schemaLocation", checkpoint_path)
               .load(file_path)
               .withColumn("source_file", input_file_name())
               .withColumn("processing_time", current_timestamp()))

# Create or replace a temporary view
streamingDF.createOrReplaceTempView("file_metadata_view")

# Execute query and display the results
# Note: Since this is streaming data, we cannot use show() directly, hence using Spark SQL to trigger the action
query = (spark.sql("SELECT * FROM file_metadata_view")
         .writeStream
         .format("console")
         .option("checkpointLocation", checkpoint_path)
         .trigger(availableNow=True)
         .start())

query.awaitTermination()

In [0]:
df = spark.read.table(table_name)

In [0]:
display(df)

action,time,source_file,processing_time
Open,1469567953,/databricks-datasets/structured-streaming/events/file-18.json,2023-11-19T21:48:25.451Z
Close,1469567955,/databricks-datasets/structured-streaming/events/file-18.json,2023-11-19T21:48:25.451Z
Close,1469567956,/databricks-datasets/structured-streaming/events/file-18.json,2023-11-19T21:48:25.451Z
Close,1469567958,/databricks-datasets/structured-streaming/events/file-18.json,2023-11-19T21:48:25.451Z
Open,1469567958,/databricks-datasets/structured-streaming/events/file-18.json,2023-11-19T21:48:25.451Z
Close,1469567959,/databricks-datasets/structured-streaming/events/file-18.json,2023-11-19T21:48:25.451Z
Open,1469567960,/databricks-datasets/structured-streaming/events/file-18.json,2023-11-19T21:48:25.451Z
Close,1469567967,/databricks-datasets/structured-streaming/events/file-18.json,2023-11-19T21:48:25.451Z
Open,1469567971,/databricks-datasets/structured-streaming/events/file-18.json,2023-11-19T21:48:25.451Z
Close,1469567973,/databricks-datasets/structured-streaming/events/file-18.json,2023-11-19T21:48:25.451Z


Databricks visualization. Run in Databricks to view.