# Apache Spark Project on Databricks

## Description

There are datasets corresponding to a **list of health inspections in establishments** (restaurants, supermarkets, etc.), along with their respective health risks. Additionally, there is another dataset that shows a **description of these risks**.

**The objective is to load these datasets under specific requirements and manipulate them according to the instructions of each exercise**.

All necessary operations are described in the exercises, although extra tasks initiated by the student will be appreciated. The use of the DataFrame API will also be valued.

## Download Datasets

Downloading two datasets directly from GitHub into the local Databricks driver node. The `--output-dir /databricks/driver` option specifies that the files will be downloaded into the `/databricks/driver` directory, which is accessible from Databricks notebooks.

In [0]:
%sh 
curl -O 'https://raw.githubusercontent.com/masfworld/datahack_docker/master/zeppelin/data/food_inspections_lite.csv' --output-dir /databricks/driver
curl -O 'https://raw.githubusercontent.com/masfworld/datahack_docker/master/zeppelin/data/risk_description.csv'  --output-dir /databricks/driver

Copying the datasets from the local filesystem (`file:/databricks/driver/...`) into Databricks File System (`dbfs:`), making them accessible across the Spark cluster.
>Note: DBFS is a distributed filesystem integrated with Databricks and ensures datasets are available to all nodes.

In [0]:
dbutils.fs.cp('file:/databricks/driver/food_inspections_lite.csv','dbfs:/dataset/food_inspections_lite.csv')
dbutils.fs.cp('file:/databricks/driver/risk_description.csv','dbfs:/dataset/risk_description.csv')

Listing all files and directories under `/dataset/` on DBFS. It verifies that the previous copy operations succeeded, ensuring the files are correctly stored and accessible in DBFS.

In [0]:
dbutils.fs.ls('/dataset/')

Setting the Kafka bootstrap server address, which is used to connect to a Kafka cluster. This line defines a variable used later.

In [0]:
KAFKA_BOOTSTRAP_SERVER="35.227.18.205:9094"

Checkpointing allows Spark Structured Streaming to save its state (such as offsets, processed records, and metadata) periodically, enabling fault-tolerance and recovery from failures.

In [0]:
checkpoint_path = "/tmp/project_spark/_checkpoint"

- `spark.conf.set(...)` defines the location of the checkpoint files.
- `spark.conf.get(...)` retrieves the checkpoint location to confirm that it has been set correctly.

In [0]:
spark.conf.set("spark.sql.streaming.checkpointLocation", checkpoint_path)
spark.conf.get("spark.sql.streaming.checkpointLocation")

## Exercise 1
---
1. **Create two dataframes, one from the file `food_inspections_lite.csv` and another from `risk_description.csv`**.
2. **Convert these two dataframes into Delta tables**.

In [0]:
# Setting catalog and database
spark.sql("USE CATALOG hive_metastore")
spark.sql("USE DATABASE default")
spark.sql("SELECT current_catalog(), current_database()").display()

### Create DataFrames from CSV files.

In [0]:
# Defining file paths
food_inspections_path = "/dataset/food_inspections_lite.csv"
risk_description_path = "/dataset/risk_description.csv"

# Loading CSV into DataFrames
df_food_inspections = (spark.read
                       .option("header", True)
                       .option("inferSchema", True)                  
                       .csv(food_inspections_path, header=True)
                       )

df_risk_description = (spark.read
                       .option("header", True)
                       .option("inferSchema", True)
                       .csv(risk_description_path, header=True)
                       )

# Displaying schemas to verify
df_food_inspections.printSchema()
df_risk_description.printSchema()

Dropping tables and paths before recreating them:

In [0]:
spark.sql("DROP TABLE IF EXISTS hive_metastore.default.food_inspections")
spark.sql("DROP TABLE IF EXISTS hive_metastore.default.risk_description")

In [0]:
dbutils.fs.rm("dbfs:/user/hive/warehouse/food_inspections", recurse=True)
dbutils.fs.rm("dbfs:/user/hive/warehouse/risk_description", recurse=True)

### Convert DataFrames into Delta tables.

Delta tables don't allow column names containing spaces or certain special characters unless you explicitly enable column mapping.There are to options:
* Rename columns to remove invalid characters
* Enable Column Mapping (Delta table feature) that allows special characters and spaces in column names

Applying option 1 as it is simpler and cleaner:

In [0]:
from pyspark.sql.functions import *

# Rename columns in df_food_inspections to remove spaces and special characters
df_food_inspections_clean = df_food_inspections.select(
    col("Inspection ID").alias("inspection_id"),
    col("DBA Name").alias("dba_name"),
    col("AKA Name").alias("aka_name"),
    col("License #").alias("license"),
    col("Facility Type").alias("facility_type"),
    col("Risk").alias("risk"),
    col("Address").alias("address"),
    col("City").alias("city"),
    col("State").alias("state"),
    col("Zip").alias("zip"),
    col("Inspection Date").alias("inspection_date"),
    col("Inspection Type").alias("inspection_type"),
    col("Results").alias("results"),
    col("Violations").alias("violations"),
    col("Latitude").alias("latitude"),
    col("Longitude").alias("longitude"),
    col("Location").alias("location")
)

In [0]:
# Saving DataFrames as Delta tables
df_food_inspections_clean.write \
                    .mode("overwrite") \
                    .format("delta") \
                    .saveAsTable("food_inspections")

df_risk_description.write \
                    .mode("overwrite") \
                    .format("delta") \
                    .saveAsTable("risk_description")

In [0]:
# Verifying last changes
spark.sql("DESCRIBE EXTENDED food_inspections").display()

In [0]:
# Showing tables to verify
spark.sql("SELECT * FROM food_inspections").limit(5).display()
spark.sql("SELECT * FROM risk_description").display()

## Exercise 2
---
**Obtain the number of distinct inspections with High Risk `Risk 1 (High)`.**

In [0]:
high_risk_count = (
    spark.table("food_inspections")
         .filter(col("risk") == "Risk 1 (High)")
         .select("inspection_id")
         .distinct()
         .count()
)

print(f"Distinct inspections with High Risk (Risk 1): {high_risk_count}")

## Ejercicio 3
---
**From the previously loaded dataframes, obtain a table with the following columns:**
1. `DBA Name`
2. `Facility Type`
3. `Risk`
4. `Risk Description`

### Load the tables

In [0]:
df_food = spark.table("food_inspections")
df_risk = spark.table("risk_description")

### Prepare for joining
Extracting the numeric `risk_id` from the `Risk` column.

In [0]:
from pyspark.sql.functions import regexp_extract, col

df_food_prepared = df_food.withColumn(
    "risk_id",
    regexp_extract(col("risk"), r"Risk (\d+)", 1).cast("int")
)

### Join DataFrames

In [0]:
df_result = (
    df_food_prepared
    .join(df_risk, on="risk_id", how="left")
    .select(
        col("dba_name").alias("DBA Name"),
        col("facility_type").alias("Facility Type"),
        col("risk").alias("Risk"),
        col("description").alias("Risk Description")
    )
)

In [0]:
df_result.limit(20).display()

## Exercise 4
---
**Access the Spark UI to view the execution plan and describe each of the pieces/boxes that make up the execution plan. Add a screenshot of the analyzed execution plan.**

> **Note:** A brief one-line description per box is sufficient.

The image below shows the execution plan from Spark UI for the query joining `food_inspections_lite` and `risk_description`.

![Execution Plan](images/Spark_physical_execution_plan.png)

The attached screenshot is a Spark physical execution plan (DAG) from the Spark UI (Query 36). Based on the image, this query performs a join between `food_inspections_lite` and `risk_description`, followed by some filters and projections.

**WholeStageCodegen (Stage 39)**:
Spark compiles and optimizes a part of the query into Java bytecode for efficient execution (used here for reading and filtering `risk_description`).

**Filter (0)**:
Filters records from the `risk_description` Delta table (possibly removing nulls or invalid entries).

**ColumnarToRow (0)**:
Converts columnar format data (optimized for storage) into row format for in-memory processing.

**Scan parquet [hive_metastore.default.risk_description]**
Reads the `risk_description` Delta table from storage in Parquet format.

**BroadcastExchange (11)**:
Broadcasts the small `risk_description` table to all worker nodes to speed up the join (broadcast join optimization).

**WholeStageCodegen (Stage 38)**:
Optimized compiled code for processing the larger table (`food_inspections_lite`) and performing the join.

**Filter (0)**:
Applies filters to `food_inspections_lite` (risk IS NOT NULL).

**ColumnarToRow (0)**:
Converts columnar data from Delta into row format for processing.

**Scan parquet [hive_metastore.default.food_inspections_lite]**:
Reads data from the `food_inspections_lite` Delta table.

**Project (multiple layers)**:
Selects only the necessary columns for output (e.g., DBA Name, Facility Type, Risk, Description).

**LocalLimit and LimitLocalLimit and Limit**:
Limits the number of output rows (a .show()).

**BroadcastHashJoin (12)**:
Performs the join between the two tables using a hash-based algorithm, optimized with broadcast. Broadcast joins avoid shuffling because:
- The smaller table (`risk_description`) is broadcasted to all executors.
- The larger table (`food_inspections_lite`) is scanned and joined locally on each node.

**ResultQueryStage / AdaptiveSparkPlan**:
Adaptive Spark Planning dynamically optimized the query plan based on actual data statistics during execution.

## Exercise 5
---
1. **Obtain the number of inspections for each establishment (`DBA Name` column) and their result (`Results` column).**
2. **Get the two establishments (`DBA Name`) with the most inspections for each of the results.**
3. **Save the results in a new Delta table named `inspections_results`.**

### Count inspections per establishment and result

In [0]:
df_food = spark.table("food_inspections")

# Count inspections by DBA Name and Results
inspections_count_df = (
    df_food.groupBy("dba_name", "results")
           .agg(count("*")
                .alias("inspection_count"))
)


### Get the two establishments with most inspections per each result

In [0]:
from pyspark.sql.window import *

# Defining window partitioned by Results, ordered by inspection_count descending
window_spec = Window \
                .partitionBy("results") \
                .orderBy(desc("inspection_count"))

# Adding row number per Results
top_two_df = (
    inspections_count_df
        .withColumn("rank", row_number().over(window_spec))
        .filter(col("rank") <= 2)
        .select(
          inspections_count_df["dba_name"],
          inspections_count_df["results"],
          inspections_count_df["inspection_count"],
  )
)

### Save the results to a new Delta table

In [0]:
spark.sql("DROP TABLE IF EXISTS hive_metastore.default.inspections_results")

In [0]:
dbutils.fs.rm("/user/hive/warehouse/inspections_results", recurse=True)

In [0]:
top_two_df.write \
  .mode("overwrite") \
  .format("delta") \
  .option("mergeSchema", "true") \
  .saveAsTable("inspections_results")

In [0]:
# Verifying the results
spark.table("inspections_results").display()

## Exercise 6
---
1. **Update the Delta table created in the previous exercise with the value `DBA_Name = "error"`.**
2. **Restore the table to its original state.**

### Update Delta table (`inspections_results`) setting `dba_name` = "error"

In [0]:
# Performing UPDATE
spark.sql("""
    UPDATE inspections_results
    SET dba_name = 'error'
""")

# Verifying update
spark.table("inspections_results").display()

### Restore the table to its original state using Delta Lake's Time Travel

In [0]:
# Checking Delta table history
spark.sql("DESCRIBE HISTORY inspections_results").display()

In [0]:
# Restoring using the desired previous version (0)
spark.sql("""
    RESTORE TABLE inspections_results TO VERSION AS OF 0
""")

In [0]:
# Verifying restoration
spark.table("inspections_results").display()

## Exercise 7
---
**Create an application with Structured Streaming that reads data from the Kafka topic `inspections`.**
> **Note:** The Kafka server URL is defined at the beginning of this notebook.

**The data from this topic is exactly the same as the data being analyzed throughout this notebook, `Food Inspections`, so the schema is the same.**

In [0]:
# Set checkpoint path ensuring fault-tolerance and recovery
checkpoint_path = "/tmp/project_spark/_checkpoint"

dbutils.fs.rm(checkpoint_path, True)

spark.conf.set("spark.sql.streaming.checkpointLocation", checkpoint_path)
spark.conf.get("spark.sql.streaming.checkpointLocation")

### Define the schema

When consuming data from Kafka using Structured Streaming, all fields in the Kafka message are initially interpreted as raw strings unless they are explicitly parsed.

In [0]:
from pyspark.sql.types import *

food_inspections_schema = StructType([
    StructField("Inspection ID", StringType()),
    StructField("DBA Name", StringType()),
    StructField("AKA Name", StringType()),
    StructField("License #", StringType()),
    StructField("Facility Type", StringType()),
    StructField("Risk", StringType()),
    StructField("Address", StringType()),
    StructField("City", StringType()),
    StructField("State", StringType()),
    StructField("Zip", StringType()),
    StructField("Inspection Date", StringType()),  # will cast to DateType later
    StructField("Inspection Type", StringType()),
    StructField("Results", StringType()),
    StructField("Violations", StringType()),
    StructField("Latitude", StringType()),
    StructField("Longitude", StringType()),
    StructField("Location", StringType())
])

### Read from Kafka and parse JSON messages

In [0]:
from pyspark.sql.functions import from_json, col

kafka_stream = (
    spark.readStream
         .format("kafka")
         .option("kafka.bootstrap.servers", KAFKA_BOOTSTRAP_SERVER)
         .option("subscribe", "inspections")
         .option("startingOffsets", "earliest")
         .load()
)

### Cast fields to their correct types

For querying or joining, important fields can be casted:

In [0]:
from pyspark.sql.functions import to_date

cleaned_stream = kafka_stream \
        .selectExpr("CAST(value AS STRING)", "timestamp") \
        .withColumn("value", from_json("value", food_inspections_schema)) \
        .select(col('value.*'), col("timestamp")) \
        .withColumn("Inspection ID", col("Inspection ID").cast("int")) \
        .withColumn("License #", col("License #").cast("int")) \
        .withColumn("Zip", col("Zip").cast("int")) \
        .withColumn("Latitude", col("Latitude").cast("double")) \
        .withColumn("Longitude", col("Longitude").cast("double")) \
        .withColumn("Inspection Date", to_date(col("Inspection Date"), "MM/dd/yyyy"))

In [0]:
cleaned_stream.display()

## Exercise 8
---
**Based on the data source from the previous exercise, obtain the number of inspections per `Facility Type` every 5 seconds.**

In [0]:
# Set checkpoint path ensuring fault-tolerance and recovery
checkpoint_path = "/tmp/project_spark/_checkpoint"

dbutils.fs.rm(checkpoint_path, True)

spark.conf.set("spark.sql.streaming.checkpointLocation", checkpoint_path)
spark.conf.get("spark.sql.streaming.checkpointLocation")

Sparks receives records in micro-batches, JSON is parsed into a structured format, and columns like timestamp are preserved. Then Spark remembers the latest event time seen (timestamp) and drops records older than `max_event_time - 1 minute` (watermarking).

In [0]:
from pyspark.sql.functions import window

facility_count = (
    cleaned_stream
        .withWatermark("timestamp", "1 minute")
        .groupBy(
            window(col("timestamp"), "5 seconds"),
            col("Facility Type")
        )
        .count()
)

In [0]:
facility_count.display()

## Exercise 9
---
**Based on the data source from exercise 7, obtain the number of inspections by `Results` for the last 30 seconds every 5 seconds.**

In [0]:
# Set checkpoint path ensuring fault-tolerance and recovery
checkpoint_path = "/tmp/project_spark/_checkpoint"

spark.conf.set("spark.sql.streaming.checkpointLocation", checkpoint_path)
spark.conf.get("spark.sql.streaming.checkpointLocation")

### Apply windowed aggregation

In [0]:
results_count = (
    cleaned_stream
        .withWatermark("timestamp", "1 minute")
        .groupBy(
            window(col("timestamp"), "30 seconds", "5 seconds"),
            col("Results")
        )
        .count()
)

In [0]:
results_count.display()

## Exercise 10
---
1. **Update the Results column in the Delta table for food inspections created in Exercise 1 with the value `No result`.**
2. **Now that the Delta table is corrupted with the value `No result`, the problem must be resolved with the data coming from Kafka, which will be assumed as the absolute truth. Therefore, it will need to be updated in real time as items arrive from Kafka**.
> **Note**: It is recommended to stop all previous streams, as the one in this exercise tends to be resource-intensive.

In [0]:
# Set checkpoint path ensuring fault-tolerance and recovery
checkpoint_path = "/tmp/project_spark/_checkpoint"

dbutils.fs.rm(checkpoint_path, True)

spark.conf.set("spark.sql.streaming.checkpointLocation", checkpoint_path)
spark.conf.get("spark.sql.streaming.checkpointLocation")

### Simulate corruption on Delta Table

In [0]:
%sql
UPDATE food_inspections
SET Results = 'No result'

In [0]:
%sql
SELECT Inspection_ID, Results FROM food_inspections LIMIT 10

### Define the cleaned Kafka stream

In [0]:
from pyspark.sql.functions import from_json, col, to_date
from pyspark.sql.types import *

# Schema
food_inspections_schema = StructType([
    StructField("Inspection ID", StringType()),
    StructField("DBA Name", StringType()),
    StructField("AKA Name", StringType()),
    StructField("License #", StringType()),
    StructField("Facility Type", StringType()),
    StructField("Risk", StringType()),
    StructField("Address", StringType()),
    StructField("City", StringType()),
    StructField("State", StringType()),
    StructField("Zip", StringType()),
    StructField("Inspection Date", StringType()),
    StructField("Inspection Type", StringType()),
    StructField("Results", StringType()),
    StructField("Violations", StringType()),
    StructField("Latitude", StringType()),
    StructField("Longitude", StringType()),
    StructField("Location", StringType())
])

kafka_stream = (
    spark.readStream
         .format("kafka")
         .option("kafka.bootstrap.servers", KAFKA_BOOTSTRAP_SERVER)
         .option("subscribe", "inspections")
         .option("startingOffsets", "latest")
         .load()
)

cleaned_stream = (
    kafka_stream.selectExpr("CAST(value AS STRING)")
                .select(from_json(col("value"), food_inspections_schema).alias("data"))
                .select("data.*")
                .withColumn("Inspection ID", col("Inspection ID").cast("int"))
                .withColumn("License #", col("License #").cast("int"))
                .withColumn("Zip", col("Zip").cast("int"))
                .withColumn("Latitude", col("Latitude").cast("double"))
                .withColumn("Longitude", col("Longitude").cast("double"))
                .withColumn("Inspection Date", to_date(col("Inspection Date"), "MM/dd/yyyy"))
)

### Define foreachBatch upsert function

In [0]:
from delta.tables import DeltaTable

def upsert_inspections_from_kafka(batch_df, batch_id):
    if batch_df.isEmpty():
        return

    delta_path = "/delta/food_inspections"

    try:
        delta_table = DeltaTable.forPath(spark, delta_path)
        (
            delta_table.alias("t")
                .merge(
                    batch_df.alias("s"),
                    "t.`Inspection ID` = s.`Inspection ID`"
                )
                .whenMatchedUpdateAll()
                .whenNotMatchedInsertAll()
                .execute()
        )
    except Exception as e:
        print(f"Table not found, skipping upsert in batch {batch_id}: {e}")

### Start the streaming query

In [0]:
query = (
    cleaned_stream.writeStream
        .foreachBatch(upsert_inspections_from_kafka)
        .option("checkpointLocation", "/tmp/food_inspections_checkpoint")
        .start()
)

### Verify the fix

In [0]:
%sql
SELECT `inspection_id`, `Results`
FROM food_inspections
WHERE `Results` != 'No result'
LIMIT 10

## Exercise 11
---
**Design a real-time analysis solution using Apache Spark in Databricks to consume flight data transmitted by Kafka. These data should be stored in a Delta table, and the current position of the flights should be visualized on a map.**
* **Flight data is in a topic called `flights`.**
* **Save all the flights in a Delta table, but only one entry per flight code, so if updates on the flight position are received, the corresponding record will be updated. This must happen in real-time.**

> **Note**: For more information on the input data, refer to [OpenSky Network](https://openskynetwork.github.io/opensky-api/rest.html#all-state-vectors). A screenshot of the visualization to be achieved is shown below. Keep in mind that this map visualization is available in Databricks, so there will be no need to import any external libraries.

![Flight Map](https://raw.githubusercontent.com/masfworld/datahack_docker/ab487794745499248388b67cf574085c5d86746e/zeppelin/data/image.png)

In [0]:
checkpoint_path = "/tmp/project_spark/_checkpoint"

dbutils.fs.rm(checkpoint_path, True)

spark.conf.set("spark.sql.streaming.checkpointLocation", checkpoint_path)
spark.conf.get("spark.sql.streaming.checkpointLocation")

### Configuration and Schema

In [0]:
KAFKA_BOOTSTRAP_SERVER = "35.227.18.205:9094"
KAFKA_TOPIC = "flights"
DELTA_FLIGHTS_PATH = "/delta/flights"
CHECKPOINT_PATH = "/tmp/flights_checkpoint"

flights_schema = StructType([
    StructField("icao24", StringType(), True),
    StructField("callsign", StringType(), True),
    StructField("origin_country", StringType(), True),
    StructField("time_position", LongType(), True),
    StructField("last_contact", LongType(), True),
    StructField("longitude", DoubleType(), True),
    StructField("latitude", DoubleType(), True),
    StructField("on_ground", BooleanType(), True),
    StructField("velocity", DoubleType(), True),
    StructField("heading", DoubleType(), True),
    StructField("squawk", StringType(), True),
    StructField("spi", BooleanType(), True),
    StructField("position_source", IntegerType(), True)
])


### Consume and parse streaming data from Kafka

In [0]:
from pyspark.sql.functions import from_json, col, from_unixtime

raw_flights_stream = (
    spark.readStream
        .format("kafka")
        .option("kafka.bootstrap.servers", KAFKA_BOOTSTRAP_SERVER)
        .option("subscribe", KAFKA_TOPIC)
        .option("startingOffsets", "latest")
        .load()
)

parsed_flights = (
    raw_flights_stream.selectExpr("CAST(value AS STRING) as json")
                      .select(from_json(col("json"), flight_schema).alias("data"))
                      .select("data.*")
                      .withColumn("event_time", from_unixtime(col("last_contact")).cast("timestamp"))
)

In [0]:
%sql
DROP TABLE IF EXISTS flights;

In [0]:
FLIGHTS_TABLE_PATH = "/user/hive/warehouse/flights"
dbutils.fs.rm(FLIGHTS_TABLE_PATH, recurse=True)

### Real-time upsert into a Delta table

In [0]:
from delta.tables import DeltaTable

def upsert_flights(batch_df, batch_id):
    if batch_df.isEmpty():
        return

    delta_path = DELTA_FLIGHTS_PATH

    if DeltaTable.isDeltaTable(spark, delta_path):
        delta_table = DeltaTable.forPath(spark, delta_path)
        (
            delta_table.alias("t")
                .merge(
                    batch_df.alias("s"),
                    "t.icao24 = s.icao24"
                )
                .whenMatchedUpdateAll()
                .whenNotMatchedInsertAll()
                .execute()
        )
    else:
        batch_df.write.format("delta").mode("overwrite").save(delta_path)
        spark.sql(f"CREATE TABLE IF NOT EXISTS flights USING DELTA LOCATION '{delta_path}'")

Streaming query:

In [0]:
query = (
    parsed_flights.writeStream
        .foreachBatch(upsert_flights)
        .option("checkpointLocation", checkpoint_path)
        .start()
)

### Visualize flights

In [0]:
%sql
SELECT latitude, longitude
FROM flights
WHERE latitude IS NOT NULL AND longitude IS NOT NULL