# Session 2:

## 1. Environment Setup
We will define paths and a helper function to simulate a "growing" data source by copying files from a public dataset into our working directory one by one.

In [0]:
# ==========================================
# CELL 1 (UPDATED): Setup & Corrected Data Simulation
# ==========================================
import time

# Define paths
base_dir = "/tmp/streaming_practice"
source_landing_zone = f"{base_dir}/landing_zone"
checkpoint_dir = f"{base_dir}/checkpoints"
output_gold_path = f"{base_dir}/gold_output"
autoloader_schema_dir = f"{base_dir}/autoloader_schema"

# Clean up previous runs to start fresh
dbutils.fs.rm(base_dir, recurse=True)
spark.sql(f"DROP TABLE IF EXISTS trips_gold")

# Dataset source
public_data_source = "dbfs:/databricks-datasets/iot/iot_devices.json"

print("Environment configured. Path:", base_dir)

def land_new_file(batch_id):
    """
    Simulates new data arriving.
    We use 'append' mode to the MAIN folder. 
    Spark automatically creates a new 'part-file' for this batch.
    """
    print(f"Processing Batch {batch_id}...")
    
    # Read a random sample of the public data
    df_chunk = spark.read.json(public_data_source).sample(withReplacement=False, fraction=0.1, seed=batch_id)
    
    # We repartition(1) to ensure we only create ONE file per batch to keep it clean
    # We write directly to source_landing_zone
    df_chunk.repartition(1).write.mode("append").json(source_landing_zone)
    
    print(f"✅ Batch {batch_id} landed in {source_landing_zone}")

# Land the first file
land_new_file(1)

# VERIFICATION:
# Check what the folder looks like now. You should see a file starting with 'part-'
display(dbutils.fs.ls(source_landing_zone))

Environment configured. Path: /tmp/streaming_practice
Processing Batch 1...
✅ Batch 1 landed in /tmp/streaming_practice/landing_zone


path,name,size,modificationTime
dbfs:/tmp/streaming_practice/landing_zone/_SUCCESS,_SUCCESS,0,1766033117000
dbfs:/tmp/streaming_practice/landing_zone/_committed_949506212968198380,_committed_949506212968198380,112,1766033116000
dbfs:/tmp/streaming_practice/landing_zone/_started_949506212968198380,_started_949506212968198380,0,1766033114000
dbfs:/tmp/streaming_practice/landing_zone/part-00000-tid-949506212968198380-80675950-57cc-4fba-a9e8-c70ba68a5e61-16-1-c000.json,part-00000-tid-949506212968198380-80675950-57cc-4fba-a9e8-c70ba68a5e61-16-1-c000.json,5450738,1766033116000


## 2. Processing Approaches: Batch vs. Streaming
Concept:

Reprocessing (Batch): Reads ALL files every time. Inefficient as data grows.

Structured Streaming: Reads ONLY new files since the last trigger.

In [0]:
# ==========================================
# CELL 2: Approach 1 - Reprocessing (Batch)
# ==========================================

# 1. Read the data
batch_df = spark.read.schema("device_id LONG, device_name STRING, ip STRING, cca3 STRING, ccn3 LONG, cn STRING, km LONG, lcd STRING, timestamp LONG, battery_level LONG").json(source_landing_zone)

# 2. Visualize
print("--- Current Data in Batch ---")
display(batch_df)

# 3. Count
print(f"Row Count: {batch_df.count()}")

# 4. Simulate arrival of NEW data (Batch 2)
print("--- Landing new file... ---")
land_new_file(2)

# 5. Proof of "Reprocessing":
# If you run this specific CELL again manually, you will see the count increase 
# and the new rows appear in the display() above.

--- Current Data in Batch ---


device_id,device_name,ip,cca3,ccn3,cn,km,lcd,timestamp,battery_level
4,sensor-pad-4mzWkz,66.39.173.154,USA,,United States,,yellow,1458444054121,6
19,meter-gauge-19eg1BpfCO,64.124.180.215,USA,,United States,,red,1458444054130,9
21,device-mac-21sjz5h,193.200.142.254,AUT,,Austria,,green,1458444054131,5
25,therm-stick-25kK6VyzIFB,24.154.45.90,USA,,United States,,green,1458444054134,4
51,device-mac-51iy02vXU,146.97.40.113,GBR,,United Kingdom,,green,1458444054153,5
61,meter-gauge-61NehO8Msi,86.198.202.1,FRA,,France,,yellow,1458444054160,7
88,sensor-pad-88nm5Thggzs,85.218.127.93,CHE,,Switzerland,,green,1458444054176,5
100,sensor-pad-1009LusgoDso,217.113.83.247,BEL,,Belgium,,yellow,1458444054182,5
101,meter-gauge-101LT6cP,66.83.105.10,USA,,United States,,green,1458444054182,4
123,device-mac-123zvY7uWFB,208.250.26.135,USA,,United States,,red,1458444054199,0


Row Count: 19688
--- Landing new file... ---
Processing Batch 2...
✅ Batch 2 landed in /tmp/streaming_practice/landing_zone


In [0]:
# ==========================================
# CELL 3: Approach 2 - Structured Streaming
# ==========================================
from pyspark.sql.functions import col, current_timestamp

# Define the schema (Streaming requires schema to be defined or inferred via sampling)
json_schema = "device_id LONG, device_name STRING, ip STRING, cca3 STRING, ccn3 LONG, cn STRING, km LONG, lcd STRING, timestamp LONG, battery_level LONG"

# ReadStream defines the source
stream_df = spark.readStream \
    .schema(json_schema) \
    .json(source_landing_zone)

# WriteStream defines the sink and starts the query
# We use 'display()' which works as a sink in Databricks notebooks for visualization
# Notice the "ProcessingRate" in the dashboard; it only spikes when new data arrives.
display(stream_df)

# NOTE: While this cell is running, run the helper command below in a separate cell 
# to see the stream update automatically!
# land_new_file(3)

device_id,device_name,ip,cca3,ccn3,cn,km,lcd,timestamp,battery_level
4,sensor-pad-4mzWkz,66.39.173.154,USA,,United States,,yellow,1458444054121,6
19,meter-gauge-19eg1BpfCO,64.124.180.215,USA,,United States,,red,1458444054130,9
21,device-mac-21sjz5h,193.200.142.254,AUT,,Austria,,green,1458444054131,5
25,therm-stick-25kK6VyzIFB,24.154.45.90,USA,,United States,,green,1458444054134,4
51,device-mac-51iy02vXU,146.97.40.113,GBR,,United Kingdom,,green,1458444054153,5
61,meter-gauge-61NehO8Msi,86.198.202.1,FRA,,France,,yellow,1458444054160,7
88,sensor-pad-88nm5Thggzs,85.218.127.93,CHE,,Switzerland,,green,1458444054176,5
100,sensor-pad-1009LusgoDso,217.113.83.247,BEL,,Belgium,,yellow,1458444054182,5
101,meter-gauge-101LT6cP,66.83.105.10,USA,,United States,,green,1458444054182,4
123,device-mac-123zvY7uWFB,208.250.26.135,USA,,United States,,red,1458444054199,0


In [0]:
land_new_file(6)

Processing Batch 6...
✅ Batch 6 landed in /tmp/streaming_practice/landing_zone


## 3. Fault Tolerance (Checkpoints & Idempotency)
Concept:

Checkpointing: Saves the state (offset) of what has been processed. If the stream crashes, it resumes from here.

Idempotency: Ensuring that re-processing the same data doesn't duplicate results (Delta Lake handles this automatically).

In [0]:
# ==========================================
# CELL 4: Checkpointing & Write-Ahead Logs
# ==========================================

# This query writes to a Delta table.
# The 'checkpointLocation' ensures that if we stop and restart this cell, 
# it won't re-process files it has already seen.

(stream_df.writeStream
    .format("delta")
    .outputMode("append")
    .option("checkpointLocation", f"{checkpoint_dir}/delta_stream")
    .table("iot_devices_stream"))

print("Streaming to Delta table started...")

Streaming to Delta table started...


## 4. Unsupported Operations
Concept: Some operations require "global" knowledge of the data (like sorting the entire dataset) which is impossible in an infinite stream.

In [0]:
# ==========================================
# CELL 5: Unsupported Operations (THIS WILL FAIL)
# ==========================================
# Goal: Try to sort a stream without a window or aggregation.

try:
    # ❌ Global Sort is not supported in Append mode
    sorted_stream = stream_df.orderBy("battery_level")
    
    query = sorted_stream.writeStream \
        .format("memory") \
        .queryName("failed_sort_query") \
        .start()
        
except Exception as e:
    print("❌ ERROR CAUGHT: As expected, global sorting is not supported on a stream.")
    print("-" * 30)
    print(str(e)[0:300] + "...") # Printing first 300 chars of error

❌ ERROR CAUGHT: As expected, global sorting is not supported on a stream.
------------------------------
Sorting is not supported on streaming DataFrames/Datasets, unless it is on aggregated DataFrame/Dataset in Complete output mode;
~Sort [battery_level#73726L ASC NULLS FIRST], true
+- ~StreamingRelation DataSource(org.apache.spark.sql.SparkSession@45f54919,json,List(),Some(StructType(StructField(devi...


## 5. Advanced Methods: Windowing & Watermarking
Concept:

Windowing: Aggregating data into time buckets (e.g., "count events every 10 minutes").

Watermarking: Telling the engine how late data can be before we drop it. This limits the state size in memory.

In [0]:
# ==========================================
# CELL 6: Windowing & Watermarking
# ==========================================
from pyspark.sql.functions import window

# We convert the Long timestamp to a proper Timestamp type for windowing
windowed_stream = stream_df \
    .withColumn("event_time", (col("timestamp").cast("double") / 1000).cast("timestamp")) \
    .withWatermark("event_time", "10 minutes") \
    .groupBy(
        window("event_time", "10 minutes"), 
        "cca3"  # Group by Country Code
    ) \
    .count()

display(windowed_stream)

window,cca3,count
"List(2016-03-20T03:20:00.000+0000, 2016-03-20T03:30:00.000+0000)",TZA,30
"List(2016-03-20T03:20:00.000+0000, 2016-03-20T03:30:00.000+0000)",VEN,109
"List(2016-03-20T03:20:00.000+0000, 2016-03-20T03:30:00.000+0000)",HND,38
"List(2016-03-20T03:20:00.000+0000, 2016-03-20T03:30:00.000+0000)",PSE,16
"List(2016-03-20T03:20:00.000+0000, 2016-03-20T03:30:00.000+0000)",COG,1
"List(2016-03-20T03:20:00.000+0000, 2016-03-20T03:30:00.000+0000)",LSO,3
"List(2016-03-20T03:20:00.000+0000, 2016-03-20T03:30:00.000+0000)",KWT,72
"List(2016-03-20T03:20:00.000+0000, 2016-03-20T03:30:00.000+0000)",CAN,3607
"List(2016-03-20T03:20:00.000+0000, 2016-03-20T03:30:00.000+0000)",GUY,9
"List(2016-03-20T03:20:00.000+0000, 2016-03-20T03:30:00.000+0000)",TTO,21


## 6. Ingestion Mechanisms: COPY INTO vs Auto Loader
Concept:

COPY INTO: Best for scheduled bulk ingestion. It tracks files it has already loaded (idempotent).

Auto Loader (cloudFiles): Best for continuous streaming ingestion. Uses notification services or directory listing to find new files instantly.

In [0]:
# ==========================================
# CELL 7: Mechanism 1 - COPY INTO (SQL)
# ==========================================
# Let's create a target table first
spark.sql("CREATE TABLE IF NOT EXISTS iot_copy_into (device_id LONG, device_name STRING, ip STRING, cca3 STRING, ccn3 LONG, cn STRING, km LONG, lcd STRING, timestamp LONG, battery_level LONG) USING DELTA")

# Run COPY INTO
# This command is idempotent. If you run it twice, it won't duplicate data.
spark.sql(f"""
    COPY INTO iot_copy_into
    FROM '{source_landing_zone}'
    FILEFORMAT = JSON
    FORMAT_OPTIONS ('mergeSchema' = 'true')
    COPY_OPTIONS ('force' = 'false','mergeSchema' = 'true')
""")

print("COPY INTO executed. Count:", spark.table("iot_copy_into").count())

COPY INTO executed. Count: 236761


In [0]:
# ==========================================
# CELL 8: Mechanism 2 - Auto Loader (Python)
# ==========================================
# Auto Loader uses the "cloudFiles" format.
# It automatically infers schema changes and stores them in schemaLocation.

autoloader_df = spark.readStream.format("cloudFiles") \
    .option("cloudFiles.format", "json") \
    .option("cloudFiles.schemaLocation", autoloader_schema_dir) \
    .load(source_landing_zone)

(autoloader_df.writeStream
    .format("delta")
    .option("checkpointLocation", f"{checkpoint_dir}/autoloader")
    .table("iot_autoloader_table"))

print("Auto Loader stream started...")

Auto Loader stream started...


## 1. Environment Setup
Dataset: NYC Taxi Trip Data (JSON format) Goal: Build a pipeline that transforms raw trip data into business insights.

In [0]:
# ==========================================
# CELL 1: Setup and Configuration
# ==========================================
import pyspark.sql.functions as F
from pyspark.sql.types import *

# Define paths for the 3 layers in the FileStore or DBFS
base_path = "/tmp/medallion_practice"
bronze_path = f"{base_path}/bronze"
silver_path = f"{base_path}/silver"
gold_path = f"{base_path}/gold"

# Cleanup function to reset the environment (Optional: Use with caution)
def cleanup_paths():
    print(f"Cleaning up {base_path}...")
    dbutils.fs.rm(base_path, recurse=True)
    spark.sql(f"DROP TABLE IF EXISTS taxi_bronze")
    spark.sql(f"DROP TABLE IF EXISTS taxi_silver")
    spark.sql(f"DROP TABLE IF EXISTS taxi_gold")
    print("Cleanup complete.")

# Run cleanup to start fresh
cleanup_paths()

Cleaning up /tmp/medallion_practice...
Cleanup complete.


## 2. Bronze Layer (Raw Ingestion)
Objective: Ingest the raw JSON data "as-is" into a Delta table. We want to preserve history and the original schema.

In [0]:
# ==========================================
# CELL 2: Bronze Layer - Raw Ingestion
# ==========================================

# 1. Source Path (Using Databricks built-in datasets)
# We are using a sample of the NYC Taxi data available in JSON format
source_path = "dbfs:/databricks-datasets/nyctaxi/sample/json/"

print(f"Reading raw data from: {source_path}")

# 2. Read Raw Data
# We allow schema inference here for flexibility, but often you'd enforce a schema.
raw_df = spark.read.format("json").load(source_path)

# 3. Add Metadata
# It is best practice to add an 'ingestion_date' or 'source_file' to trace data lineage
bronze_df = raw_df.withColumn("ingestion_timestamp", F.current_timestamp()) \
                  .withColumn("source_file", F.input_file_name())

# 4. Write to Bronze Delta Table
# We use 'append' generally, but for this lab we use 'overwrite' to keep it repeatable.
bronze_df.write \
    .format("delta") \
    .mode("overwrite") \
    .option("path", bronze_path) \
    .saveAsTable("taxi_bronze")

print(f"Bronze layer written to: {bronze_path}")
display(spark.sql("SELECT * FROM taxi_bronze LIMIT 5"))

Reading raw data from: dbfs:/databricks-datasets/nyctaxi/sample/json/
Bronze layer written to: /tmp/medallion_practice/bronze


DOLocationID,PULocationID,RatecodeID,VendorID,congestion_surcharge,extra,fare_amount,improvement_surcharge,mta_tax,passenger_count,payment_type,store_and_fwd_flag,tip_amount,tolls_amount,total_amount,tpep_dropoff_datetime,tpep_pickup_datetime,trip_distance,pep_pickup_date_txt,ingestion_timestamp,source_file
249,144,1,2,2.5,0.5,8.5,0.3,0.5,5,2,N,0.0,0.0,12.3,2008-12-31 23:18:00,2008-12-31 23:03:32,1.64,2008-12-31,2025-12-16T07:31:12.426+0000,dbfs:/databricks-datasets/nyctaxi/sample/json/pep_pickup_date_txt=2008-12-31/part-00000-tid-2393398366526828660-757e4540-9287-4b16-907e-cdc5945edcb0-16672-1.c000.json
226,237,1,2,2.5,0.0,18.0,0.3,0.5,3,2,N,0.0,0.0,21.3,2009-01-01 09:57:25,2008-12-31 23:02:28,4.01,2008-12-31,2025-12-16T07:31:12.426+0000,dbfs:/databricks-datasets/nyctaxi/sample/json/pep_pickup_date_txt=2008-12-31/part-00000-tid-2393398366526828660-757e4540-9287-4b16-907e-cdc5945edcb0-16672-1.c000.json
16,138,1,2,0.0,1.0,32.5,0.3,0.5,1,2,N,0.0,0.0,34.3,2009-01-01 00:27:49,2008-12-31 23:55:51,9.65,2008-12-31,2025-12-16T07:31:12.426+0000,dbfs:/databricks-datasets/nyctaxi/sample/json/pep_pickup_date_txt=2008-12-31/part-00000-tid-2393398366526828660-757e4540-9287-4b16-907e-cdc5945edcb0-16672-1.c000.json
223,107,1,2,2.5,1.0,31.5,0.3,0.5,5,1,N,10.48,6.12,52.4,2009-01-01 18:23:03,2008-12-31 23:02:32,10.1,2008-12-31,2025-12-16T07:31:12.426+0000,dbfs:/databricks-datasets/nyctaxi/sample/json/pep_pickup_date_txt=2008-12-31/part-00000-tid-2393398366526828660-757e4540-9287-4b16-907e-cdc5945edcb0-16672-1.c000.json
113,113,1,2,2.5,1.0,3.0,0.3,0.5,5,2,N,0.0,0.0,7.3,2008-12-31 23:05:29,2008-12-31 23:02:38,0.2,2008-12-31,2025-12-16T07:31:12.426+0000,dbfs:/databricks-datasets/nyctaxi/sample/json/pep_pickup_date_txt=2008-12-31/part-00000-tid-2393398366526828660-757e4540-9287-4b16-907e-cdc5945edcb0-16672-1.c000.json


## 3. Silver Layer (Cleansing & Conforming)
Objective: Clean the data. We will deduplicate, enforce data types, handle missing values, and filter out bad data (e.g., negative fares).

In [0]:
# ==========================================
# CELL 3: Silver Layer - Transformation & Cleaning
# ==========================================

# 1. Read from Bronze
df_bronze = spark.read.format("delta").load(bronze_path)

# 2. Apply Transformations
# - Cast columns to correct types (timestamps, doubles)
# - Filter out trips with negative fares or 0 passenger counts
# - Rename columns for clarity (Snake Case)

silver_df = df_bronze \
    .select(
        F.col("trip_distance").cast("double"),
        F.col("fare_amount").cast("double"),
        F.col("tpep_pickup_datetime").cast("timestamp").alias("pickup_time"),
        F.col("tpep_dropoff_datetime").cast("timestamp").alias("dropoff_time"),
        F.col("passenger_count").cast("integer"),
        F.col("vendorid").alias("vendor_name")
    ) \
    .filter(F.col("fare_amount") > 0) \
    .filter(F.col("passenger_count") > 0) \
    .dropDuplicates()

# 3. Write to Silver Delta Table
silver_df.write \
    .format("delta") \
    .mode("overwrite") \
    .option("path", silver_path) \
    .saveAsTable("taxi_silver")

print(f"Silver layer written to: {silver_path}")
print(f"Row count validation - Bronze: {df_bronze.count()} vs Silver: {silver_df.count()}")
display(silver_df.limit(5))

## 4. Gold Layer (Business Aggregations)
Objective: Create data ready for reporting. We will aggregate the data to answer: "What is the total revenue and average trip distance per Vendor?"

In [0]:
# ==========================================
# CELL 4: Gold Layer - Business Aggregations
# ==========================================

# 1. Read from Silver
df_silver = spark.read.format("delta").load(silver_path)

# 2. Create Aggregations
# We group by Vendor and calculate Total Revenue and Average Distance
gold_df = df_silver.groupBy("vendor_name") \
    .agg(
        F.sum("fare_amount").alias("total_revenue"),
        F.avg("trip_distance").alias("avg_trip_distance"),
        F.count("*").alias("total_trips")
    ) \
    .withColumn("total_revenue", F.round(F.col("total_revenue"), 2)) \
    .withColumn("avg_trip_distance", F.round(F.col("avg_trip_distance"), 2)) \
    .orderBy(F.desc("total_revenue"))

# 3. Write to Gold Delta Table
gold_df.write \
    .format("delta") \
    .mode("overwrite") \
    .option("path", gold_path) \
    .saveAsTable("taxi_gold")

print(f"Gold layer written to: {gold_path}")
display(gold_df)

Gold layer written to: /tmp/medallion_practice/gold


vendor_name,total_revenue,avg_trip_distance,total_trips
2,61845105.71,3.02,4539499
1,28827902.92,2.81,2156795


## 5. Validation & Analysis (SQL)
Objective: Verify the data using standard SQL, just like a Data Analyst would.

In [0]:
%sql
-- ==========================================
-- CELL 5: SQL Analysis
-- ==========================================

-- 1. Check the Gold Table
SELECT 
  vendor_name, 
  total_revenue, 
  total_trips,
  avg_trip_distance
FROM taxi_gold;


vendor_name,total_revenue,total_trips,avg_trip_distance
2,61845105.71,4539499,3.02
1,28827902.92,2156795,2.81


In [0]:
%sql
-- 2. Time Travel / History Check
-- Delta Lake allows you to see the history of changes.
DESCRIBE HISTORY taxi_silver;

version,timestamp,userId,userName,operation,operationParameters,job,notebook,clusterId,readVersion,isolationLevel,isBlindAppend,operationMetrics,userMetadata,engineInfo
0,2025-12-16T06:09:48.000+0000,2964032237259647,shyamprakash.kg@latentview.com,CREATE OR REPLACE TABLE AS SELECT,"Map(isManaged -> false, description -> null, partitionBy -> [], properties -> {})",,List(1957834268445823),1216-043958-ta3zqt5d,,WriteSerializable,False,"Map(numFiles -> 9, numOutputRows -> 6696294, numOutputBytes -> 118179251)",,Databricks-Runtime/12.2.x-scala2.12


## Summary of what you just built:
Bronze: Ingested JSON, added ingestion timestamps, saved as Delta.

Silver: Enforced schema (Integer/Double), removed bad data (negative fares), saved as Delta.

Gold: Aggregated by Vendor for high-level business reporting.

# Session 3: Data Quality Dimensions

## Accuracy : Constraints and Validate:

In [0]:
%sql
-- 1. Create the schema (database) WITHOUT the 'main.' prefix
CREATE SCHEMA IF NOT EXISTS practice_labs;
USE practice_labs;

-- 2. Create the local copy of the data
CREATE OR REPLACE TABLE taxi_demo AS
SELECT 
  tpep_pickup_datetime as pickup_time, 
  trip_distance, 
  fare_amount 
FROM samples.nyctaxi.trips 
LIMIT 100;

-- 3. Verify the data exists
SELECT * FROM taxi_demo LIMIT 5;

pickup_time,trip_distance,fare_amount
2016-02-14T16:52:13.000+0000,4.94,19.0
2016-02-04T18:44:19.000+0000,0.28,3.5
2016-02-17T17:13:57.000+0000,0.7,5.0
2016-02-18T10:36:07.000+0000,0.8,6.0
2016-02-22T14:14:41.000+0000,4.51,17.0


In [0]:
%sql
-------------------------------------------------------
-- EXERCISE 1: ADD A CHECK CONSTRAINT
-------------------------------------------------------
-- Note: The NYC Taxi sample data is from ~2016. 
-- If we set the check to '2023', it will fail because existing data violates it.
-- Let's set a constraint that validly passes (Dates after year 2000).

ALTER TABLE taxi_demo ADD CONSTRAINT valid_date_range CHECK (pickup_time > '2000-01-01');

-- Let's add a second one for Fare Amount to prevent negative fares
ALTER TABLE taxi_demo ADD CONSTRAINT positive_fare CHECK (fare_amount >= 0);


-------------------------------------------------------
-- EXERCISE 2: VERIFY CONSTRAINTS
-------------------------------------------------------
-- Method A: Describe Extended (Look for 'Properties' section at the bottom)
DESCRIBE EXTENDED taxi_demo;

-- Method B: Table Properties (Cleaner view of just properties)
SHOW TBLPROPERTIES taxi_demo;

-- Method C: Describe Detail (Returns a table format, scroll right to see 'properties')
DESCRIBE DETAIL taxi_demo;


-------------------------------------------------------
-- EXERCISE 3: DROP A CONSTRAINT
-------------------------------------------------------
-- We will drop the date range constraint we created in Exercise 1
ALTER TABLE taxi_demo DROP CONSTRAINT valid_date_range;

-- Verify it is gone
SHOW TBLPROPERTIES taxi_demo;

key,value
delta.constraints.positive_fare,fare_amount >= 0
delta.minReaderVersion,1
delta.minWriterVersion,3


In [0]:
%sql
USE practice_labs;

-------------------------------------------------------
-- EXERCISE 1: CREATE TABLE WITH NOT NULL
-------------------------------------------------------
-- We create a table where 'emp_id' MUST have a value, 
-- but 'emp_name' and 'department' can be empty (NULL).
CREATE OR REPLACE TABLE employee_demo (
  emp_id INT NOT NULL,
  emp_name STRING,
  department STRING
);


In [0]:
df = spark.read.table("employee_demo")
df.printSchema()

root
 |-- emp_id: integer (nullable = false)
 |-- emp_name: string (nullable = true)
 |-- department: string (nullable = true)



In [0]:
%sql
-------------------------------------------------------
-- EXERCISE 2: DROP A NOT NULL CONSTRAINT (Relax the rule)
-------------------------------------------------------
-- Scenario: We decided that emp_id is no longer mandatory.
ALTER TABLE employee_demo CHANGE COLUMN emp_id DROP NOT NULL;


In [0]:
df = spark.read.table("employee_demo")
df.printSchema()

root
 |-- emp_id: integer (nullable = true)
 |-- emp_name: string (nullable = true)
 |-- department: string (nullable = true)



In [0]:
%sql
-------------------------------------------------------
-- EXERCISE 3: SET A NOT NULL CONSTRAINT (Tighten the rule)
-------------------------------------------------------
-- Scenario: We want to make 'department' mandatory.
-- Note: This command verifies that no existing data is NULL before applying.
-- Since our table is empty, this will succeed immediately.
ALTER TABLE employee_demo CHANGE COLUMN department SET NOT NULL;


In [0]:
df = spark.read.table("employee_demo")
df.printSchema()

root
 |-- emp_id: integer (nullable = true)
 |-- emp_name: string (nullable = true)
 |-- department: string (nullable = false)



In [0]:
%python
# 1. SETUP FOR "COPY INTO"
# We need to write some Parquet files to a folder so you can practice copying them.
data = [(1, "Device A", 10.5), (2, "Device B", 12.0), (3, "Device A", -5.0)]
columns = ["device_id", "device_name", "reading"]

# Write these to a temporary folder in Databricks File System (DBFS)
source_path = "dbfs:/tmp/practice_labs/incoming_data"
spark.createDataFrame(data, columns).write.mode("overwrite").parquet(source_path)

print(f"File setup complete. Source path created at: {source_path}")

File setup complete. Source path created at: dbfs:/tmp/practice_labs/incoming_data


In [0]:
%sql
USE practice_labs;

CREATE OR REPLACE TABLE my_table (device_id LONG, device_name STRING, reading DOUBLE) USING DELTA;
-- validate data before ingestion
-- We use the path 'dbfs:/tmp/practice_labs/incoming_data' we created earlier
COPY INTO my_table
FROM 'dbfs:/tmp/practice_labs/incoming_data'
FILEFORMAT = PARQUET
VALIDATE 15 ROWS;

-- Note: The output will tell you if the data looks parsable, but won't ingest it yet.

device_id,device_name,reading
1,Device A,10.5
2,Device B,12.0
3,Device A,-5.0


In [0]:
%sql
-- 2. SETUP FOR "SILVER/QUARANTINE"
USE practice_labs;

-- Create the source table 'batch_updates' with some mixed data (good and bad)
CREATE OR REPLACE TABLE batch_updates (device_id LONG, timestamp TIMESTAMP, reading DOUBLE);

INSERT INTO batch_updates VALUES
(101, current_timestamp(), 25.5),  -- Good
(102, current_timestamp(), -10.0), -- Bad (Negative)
(103, current_timestamp(), 15.2),  -- Good
(104, current_timestamp(), 0.0);   -- Bad (Zero)

-- 3. SETUP FOR "RESTORE"
-- We need a table with 'history'. We will make 3 versions of it.
CREATE OR REPLACE TABLE restore_demo (id INT, name STRING); -- Version 0
INSERT INTO restore_demo VALUES (1, 'Alice');               -- Version 1
UPDATE restore_demo SET name = 'Bob' WHERE id = 1;          -- Version 2

num_affected_rows
1


In [0]:
%sql
USE practice_labs;

-- 1. Create the Silver table (Valid Data)
CREATE TABLE IF NOT EXISTS reading_silver
 (device_id LONG, timestamp TIMESTAMP, reading DOUBLE)
USING DELTA;

-- 2. Create the Quarantine table (Invalid Data)
CREATE TABLE IF NOT EXISTS reading_quarantine
 (device_id LONG, timestamp TIMESTAMP, reading DOUBLE)
USING DELTA;

-- 3. Insert BAD records (Reading <= 0)
INSERT INTO reading_quarantine
 SELECT device_id, timestamp, reading 
 FROM batch_updates 
 WHERE reading <= 0;

-- 4. Insert GOOD records (Reading > 0)
INSERT INTO reading_silver
 SELECT device_id, timestamp, reading 
 FROM batch_updates 
 WHERE reading > 0;

-- 5. Verification
SELECT 'Silver' as table_name, * FROM reading_silver
UNION ALL
SELECT 'Quarantine' as table_name, * FROM reading_quarantine;

table_name,device_id,timestamp,reading
Silver,101,2025-12-18T07:41:10.044+0000,25.5
Silver,103,2025-12-18T07:41:10.044+0000,15.2
Quarantine,102,2025-12-18T07:41:10.044+0000,-10.0
Quarantine,104,2025-12-18T07:41:10.044+0000,0.0


### Streaming Data checks:

In [0]:
# populate quarantine table using microbatches of bad records
def upsert_quarantine(mbatch_df, batchId):
# Using a merge statement to avoid writing duplicate records in target table
  target_table.alias("a").merge(
    mbatch_df.filter("reading > 0").alias("b"),
    """a.device_id = b.device_id AND a.timestamp = b.timestamp"""
  ).whenNotMatchedInsertAll().execute()
# Bad records are filtered into a reading_quarantine table
  mbatch_df.filter("reading <=   0").write.format("delta").mode("append").saveAsTable("reading_quarantine")

# Write the output of a streaming aggregation query into Delta table
raw_df.writeStream.foreachBatch(upsert_quarantine).outputMode("update").option("checkpointLocation", "checkpoint").start()

In [0]:
%sql
USE practice_labs;

-- 1. Check current history (You should see versions 0, 1, 2)
DESCRIBE HISTORY restore_demo;

version,timestamp,userId,userName,operation,operationParameters,job,notebook,clusterId,readVersion,isolationLevel,isBlindAppend,operationMetrics,userMetadata,engineInfo
3,2025-12-18T07:44:27.000+0000,2964032237259647,shyamprakash.kg@latentview.com,RESTORE,"Map(version -> 1, timestamp -> null)",,List(1957834268445823),1218-071325-cjkuklil,2.0,Serializable,False,"Map(numRestoredFiles -> 1, removedFilesSize -> 802, numRemovedFiles -> 1, restoredFilesSize -> 816, numOfFilesAfterRestore -> 1, tableSizeAfterRestore -> 816)",,Databricks-Runtime/12.2.x-scala2.12
2,2025-12-18T07:41:24.000+0000,2964032237259647,shyamprakash.kg@latentview.com,UPDATE,"Map(predicate -> [""(id#6562 = 1)""])",,List(1957834268445823),1218-071325-cjkuklil,1.0,WriteSerializable,False,"Map(numRemovedFiles -> 1, numRemovedBytes -> 816, numCopiedRows -> 0, numDeletionVectorsAdded -> 0, numDeletionVectorsRemoved -> 0, numAddedChangeFiles -> 0, executionTimeMs -> 3266, scanTimeMs -> 2118, numAddedFiles -> 1, numUpdatedRows -> 1, numAddedBytes -> 802, rewriteTimeMs -> 1123)",,Databricks-Runtime/12.2.x-scala2.12
1,2025-12-18T07:41:17.000+0000,2964032237259647,shyamprakash.kg@latentview.com,WRITE,"Map(mode -> Append, partitionBy -> [])",,List(1957834268445823),1218-071325-cjkuklil,0.0,WriteSerializable,True,"Map(numFiles -> 1, numOutputRows -> 1, numOutputBytes -> 816)",,Databricks-Runtime/12.2.x-scala2.12
0,2025-12-18T07:41:14.000+0000,2964032237259647,shyamprakash.kg@latentview.com,CREATE OR REPLACE TABLE,"Map(isManaged -> true, description -> null, partitionBy -> [], properties -> {})",,List(1957834268445823),1218-071325-cjkuklil,,WriteSerializable,True,Map(),,Databricks-Runtime/12.2.x-scala2.12


In [0]:
%sql
-- 2. Check current data (Should show 'Bob' because that was the last update)
SELECT * FROM restore_demo;

id,name
1,Bob


In [0]:
%sql
-- 3. Restore to Version 1 (When it was still 'Alice')
RESTORE TABLE restore_demo TO VERSION AS OF 1;

table_size_after_restore,num_of_files_after_restore,num_removed_files,num_restored_files,removed_files_size,restored_files_size
816,1,1,1,802,816


In [0]:
%sql
-- 4. Verify the data is back to 'Alice'
SELECT * FROM restore_demo;

id,name
1,Alice


## Validity: Schema Evolution:

In [0]:
%python
# 1. Define the path where we will store the Delta table
delta_path = "/tmp/practice_labs/schema_evolution_demo"

# 2. Cleanup (in case you ran this before)
dbutils.fs.rm(delta_path, True)

# 3. Create initial Data
data_v1 = [(1, "Alice"), (2, "Bob")]
df_initial = spark.createDataFrame(data_v1, ["id", "name"])

# 4. Write the initial table
df_initial.write.format("delta").mode("overwrite").save(delta_path)

# Verification
print("Initial table created. Schema:")
spark.read.format("delta").load(delta_path).printSchema()

Initial table created. Schema:
root
 |-- id: long (nullable = true)
 |-- name: string (nullable = true)



In [0]:
%python
# 1. Create new data with an EXTRA column ("email")
data_v2 = [(3, "Charlie", "charlie@example.com")]
df_new_col = spark.createDataFrame(data_v2, ["id", "name", "email"])

# --- YOUR PRACTICE CODE (Adapted) ---
# We use .mode("append") to add this data to the existing table
# We use .option("mergeSchema", "true") to allow the new column to be added
df_new_col.write.format("delta") \
          .option("mergeSchema", "true") \
          .mode("append") \
          .save(delta_path)

# Verification: You should see 'email' added to the schema
print("Merge complete. New Schema:")
spark.read.format("delta").load(delta_path).printSchema()
display(spark.read.format("delta").load(delta_path))

Merge complete. New Schema:
root
 |-- id: long (nullable = true)
 |-- name: string (nullable = true)
 |-- email: string (nullable = true)



id,name,email
3,Charlie,charlie@example.com
1,Alice,
2,Bob,


In [0]:
%python
# 1. Create data where 'id' is a STRING (e.g., "ID-004")
# The previous table had 'id' as a number. This is a conflict.
data_v3 = [("ID-004", "David")]
df_changed_type = spark.createDataFrame(data_v3, ["id", "name"])

# We use .mode("overwrite") to replace the data
# We use .option("overwriteSchema", "true") to force the schema change (Int -> String)
df_changed_type.write.format("delta") \
          .option("overwriteSchema", "true") \
          .mode("overwrite") \
          .save(delta_path)

# Verification: 'id' should now be a string, and 'email' should be gone (since we overwrote)
print("Overwrite complete. New Schema:")
spark.read.format("delta").load(delta_path).printSchema()
display(spark.read.format("delta").load(delta_path))

Overwrite complete. New Schema:
root
 |-- id: string (nullable = true)
 |-- name: string (nullable = true)



id,name
ID-004,David


## Completeness:

In [0]:
%python
# 1. Create a dummy CSV file in DBFS so input_file_name() has something to read
dbutils.fs.put("dbfs:/tmp/practice_data/users.csv", "id,name,age,salary\n1,Alice,30,50000\n2,Bob,25,60000\n3,Charlie,35,70000", True)

# 2. Register this file as a temporary view called 'raw_ingested_data'
spark.read.format("csv").option("header", "true").option("inferSchema", "true").load("dbfs:/tmp/practice_data/users.csv").createOrReplaceTempView("raw_ingested_data")

Wrote 69 bytes.


In [0]:
%sql
-- Clean up previous runs if necessary
DROP DATABASE IF EXISTS practice_labs CASCADE;

-- 1. Create database with options (Using a valid DBFS path for practice)
CREATE DATABASE IF NOT EXISTS practice_labs
COMMENT "This is a practice database for learning metadata management"
LOCATION "dbfs:/tmp/practice_labs_v2.db"
WITH DBPROPERTIES (contains_pii = true);

-- 2. Review database properties
DESCRIBE DATABASE EXTENDED practice_labs;

database_description_item,database_description_value
Catalog Name,spark_catalog
Namespace Name,practice_labs
Comment,This is a practice database for learning metadata management
Location,dbfs:/tmp/practice_labs_v2.db
Owner,root
Properties,"((contains_pii,true))"


In [0]:
%sql
USE practice_labs;

-- 1. Create the bronze table (ETL from raw)
-- This will now pull from the CSV we made in Step 1
CREATE OR REPLACE TABLE bronze_table AS (
  SELECT 
    *, 
    current_timestamp() as time_of_recording, 
    input_file_name() as source_file 
  FROM raw_ingested_data
);

-- Validate the data
SELECT * FROM bronze_table;

id,name,age,salary,time_of_recording,source_file
1,Alice,30,50000,2025-12-18T05:43:19.602+0000,dbfs:/tmp/practice_data/users.csv
2,Bob,25,60000,2025-12-18T05:43:19.602+0000,dbfs:/tmp/practice_data/users.csv
3,Charlie,35,70000,2025-12-18T05:43:19.602+0000,dbfs:/tmp/practice_data/users.csv


In [0]:
%sql
USE practice_labs;

-- 2. Create table with properties and column comments
CREATE TABLE IF NOT EXISTS my_pii_table (
  id INT COMMENT 'Unique Identification Number', 
  name STRING COMMENT 'PII - Full Name', 
  age INT COMMENT 'PII - Age in years'
)
TBLPROPERTIES ('contains_pii'='True', 'security_level'='high')
COMMENT 'This table contains sensitive PII data';

In [0]:
%sql
DESCRIBE EXTENDED my_pii_table

col_name,data_type,comment
id,int,Unique Identification Number
name,string,PII - Full Name
age,int,PII - Age in years
,,
# Detailed Table Information,,
Catalog,spark_catalog,
Database,practice_labs,
Table,my_pii_table,
Created Time,Thu Dec 18 05:43:33 UTC 2025,
Last Access,UNKNOWN,


In [0]:
%sql
USE practice_labs;

-- 1. Explore tables in the database
-- You should see 'bronze_table' and 'my_pii_table'
SHOW TABLES IN practice_labs;

database,tableName,isTemporary
practice_labs,bronze_table,False
practice_labs,my_pii_table,False
,raw_ingested_data,True


In [0]:
%sql
-- 2. Filter tables (wildcard search)
SHOW TABLES IN practice_labs LIKE '*pii*';

database,tableName,isTemporary
practice_labs,my_pii_table,False


In [0]:
%sql
-- 3. Deep dive into table metadata (Location, Provider, Comments)
DESCRIBE EXTENDED practice_labs.my_pii_table;

col_name,data_type,comment
id,int,Unique Identification Number
name,string,PII - Full Name
age,int,PII - Age in years
,,
# Detailed Table Information,,
Catalog,spark_catalog,
Database,practice_labs,
Table,my_pii_table,
Created Time,Thu Dec 18 05:43:33 UTC 2025,
Last Access,UNKNOWN,


## Uniqueness:

In [0]:
%sql
-- 1. Create Target Table (table_a) with initial data
CREATE OR REPLACE TABLE table_a (id INT, col_1 STRING, col_2 STRING, value INT);

INSERT INTO table_a VALUES 
(1, 'old_val', 'safe', 100),   -- Matches ID 1, will be UPDATED
(2, 'old_val', 'Y',    200),   -- Matches ID 2, col_2='Y', will be DELETED
(5, 'static',  'safe', 500);   -- No match in source, will remain untouched

-- 2. Create Source Table (table_b) with incoming changes
CREATE OR REPLACE TABLE table_b (id INT, col_1 STRING, col_2 STRING, value INT);

INSERT INTO table_b VALUES 
(1, 'X',      'safe', 999),    -- ID 1 match. col_1='X' triggers UPDATE
(2, 'ignore', 'safe', 888),    -- ID 2 match. Ignored by Update logic, caught by Delete logic in Target
(3, 'new',    'safe', 300);    -- ID 3 no match. Will INSERT

num_affected_rows,num_inserted_rows
3,3


In [0]:
%sql
MERGE INTO table_a a
USING table_b b
ON a.id = b.id                -- I mapped 'col_name' to 'id' for clarity
WHEN MATCHED AND b.col_1 = 'X'
  THEN UPDATE SET *
WHEN MATCHED AND a.col_2 = 'Y'
  THEN DELETE
WHEN NOT MATCHED
  THEN INSERT *;

-- Check results: ID 1 should be 999 (updated), ID 2 should be gone (deleted), ID 3 should exist (inserted)
SELECT * FROM table_a ORDER BY id;

id,col_1,col_2,value
1,X,safe,999
3,new,safe,300
5,static,safe,500


In [0]:
from pyspark.sql import Row

# Create data with full duplicates and partial duplicates
data = [
    Row(id=1, time="10:00", data="A"),
    Row(id=1, time="10:00", data="A"), # Full duplicate of row above
    Row(id=1, time="11:00", data="B"), # Same ID, different time/data
    Row(id=2, time="12:00", data="C")
]

df = spark.createDataFrame(data)
print("Original Count:", df.count())
display(df)

Original Count: 4


id,time,data
1,10:00,A
1,10:00,A
1,11:00,B
2,12:00,C


In [0]:
# 1. Deduplicates considering all columns (Should remove the exact duplicate of ID 1)
dedup_df = df.distinct()
display(dedup_df)

# 2. Column selection before deduplication (Should return unique ID/Time pairs)
# Note: You lose the 'data' column here because you didn't select it
dedup_selected_df = df.select(["id", "time"]).distinct()
display(dedup_selected_df)

id,time,data
1,10:00,A
1,11:00,B
2,12:00,C


id,time
1,10:00
1,11:00
2,12:00


In [0]:
import datetime
from pyspark.sql import functions as F
from pyspark.sql.window import Window

# Create a dataset where User 100 has 3 entries, User 101 has 1 entry
data_users = [
    (100, datetime.datetime(2023, 1, 1, 10, 0, 0), "Oldest Event"),
    (100, datetime.datetime(2023, 1, 1, 12, 0, 0), "Newest Event"), # Rank 1
    (100, datetime.datetime(2023, 1, 1, 11, 0, 0), "Middle Event"),
    (101, datetime.datetime(2023, 1, 2, 11, 0, 0), "Only Event")    # Rank 1
]

users_df = spark.createDataFrame(data_users, ["user_id", "timestamp", "event_info"])
display(users_df)

user_id,timestamp,event_info
100,2023-01-01T10:00:00.000+0000,Oldest Event
100,2023-01-01T12:00:00.000+0000,Newest Event
100,2023-01-01T11:00:00.000+0000,Middle Event
101,2023-01-02T11:00:00.000+0000,Only Event


In [0]:
# Define the window: Partition by User, Order by Time Descending (Newest first)
window = Window.partitionBy("user_id").orderBy(F.col("timestamp").desc())

# Calculate Rank, Filter for 1 (Newest), and drop the rank column
ranked_df = (users_df.withColumn("rank", F.rank().over(window))
                    .filter("rank == 1")
                    .drop("rank"))

# Result should only show "Newest Event" for User 100 and "Only Event" for User 101
display(ranked_df)

user_id,timestamp,event_info
100,2023-01-01T12:00:00.000+0000,Newest Event
101,2023-01-02T11:00:00.000+0000,Only Event
