![](https://github.com/microsoft/fabric-analytics-roadshow-lab/blob/initial-version-prep/assets/images/spark/analytics.png?raw=true)
# Welcome to the Fabric Analytics Roadshow Lab

## Overview
Welcome to the **McMillan Industrial Group** analytics transformation journey! In this lab, you'll build a modern, streaming-enabled data lakehouse using Microsoft Fabric.

### The Business Scenario
McMillan Industrial Group is a leading manufacturer and distributor of industrial equipment and parts. Their systems generate real-time data from:
- üë• **Customers** - Customer master data and profiles
- üìù **Orders** - Sales orders placed online and manually
- ‚öôÔ∏è **Items** - Item master data
- üì¶ **Shipments** - Outbound shipments and delivery tracking
- üì± **Shipment Scan Events** - Real-time package scanning from field technicians and warehouse systems
- üöö **Logistics Dimensions** - Facilities, routes, shipping methods, service level, and exception type

This data streams continuously into OneLake in various formats (JSON, Parquet), and your mission is to transform raw data into actionable business intelligence.

### Architecture: Medallion Pattern
We'll implement a **medallion architecture** - a common practice for organizing data based on the level of data refinement and readiness for end-user consumption:

> ‚ÑπÔ∏è _Note: similar streaming scenarios ideally leverage Azure Event Hubs or Fabric EventStreams to store events in a message store that manages sequence and provides a simple consumption endpoint. The same JSON payloads could be coming from either of these message stores, however for simplicity of reproducing the use case, we will be reading events as files stored in OneLake._

```
üì• Landing Zone (Raw Data: JSON/Parquet)
    ‚Üì Spark - Structured Streaming
ü•â BRONZE Zone - Raw ingestion with audit columns and column name cleaning
    ‚Üì Spark - Structured Streaming
ü•à SILVER Zone - Cleaned, validated, and conformed data
    ‚Üì Fabric Warehouse - Dimensional Modeling
ü•á GOLD Zone - Business-level aggregates (Warehouse)
    ‚Üì
ü§ñ Analytics & AI - Data Agent and Semantic Models
```

---

## üéØ Lab Setup: Start Your Data Pipeline!

Before we explore Spark fundamentals, you need to **start the production-grade streaming pipeline** that will process data throughout this lab.

### Step 1: Trigger the Spark Job Definition
> **Note:** Please read the full instructions for this step before opening the Spark Job Definition.

1. **Open Spark Job Definition** - Click here to open: [StreamBonzeAndSilver](https://msit.powerbi.com/groups/60c4c0e4-1e55-44cc-b6c3-860d3bb431ba/sparkjobdefinitions/fd65b0b2-98a3-48cd-8b9d-9c35674fe1e5?experience=power-bi)
1. **Click** the **"Run"** button at the top of the screen
1. **Confirm** the job starts successfully (you'll see a status of "Running")
1. **Return** to this Notebook (1_ExploreData)

### What Happens Next

The Spark Job Definition you just triggered will:
- üé≤ **Generate synthetic data** simulating McMillan's business operations
- üìù **Write JSON and Parquet files** to the Landing zone (folder) of your Lakehouse
- ‚ö° **Stream data** from Landing ‚Üí Bronze ‚Üí Silver zones
- üîÑ **Run continuously** for the duration of this lab

> ‚ÑπÔ∏è **Important:** The job runs in the background. You don't need to wait for it to complete - you can start working through this notebook immediately. The job should take approximately 1.5 minutes to start writing data to `Files/landing/` and another 2-3 minutes for all bronze and silver tables to be initially created and hydrated with data.

### What You'll Learn in This Notebook

1. **Spark Fundamentals** - DataFrames, transformations, and actions
2. **Structured Streaming** - Processing real-time and batch data with Spark
3. **Data Exploration** - Discover what's already been processed in Bronze & Silver zones

### The Target Schema
By the end of the lab, you'll understand some basic concepts and then see the outcome of a mature data engineering pipeline:

![McMillian Industrial Group Silver Schema](https://github.com/microsoft/fabric-analytics-roadshow-lab/blob/initial-version-prep/assets/images/spark/silver-erd.png?raw=true)

Let's get started!

## üìö Part 1: Spark Fundamentals

Before diving into streaming data, let's understand the power of Apache Spark. Spark is a distributed computing engine that allows you to process massive datasets across one or many machines.

### Key Concepts
- **DataFrame**: A distributed collection of data organized into named columns (like a table)
- **Lazy Evaluation**: Transformations aren't executed until an action is called
- **Partitioning**: Data is split across multiple nodes for parallel processing
- **In-Memory Processing**: Spark caches data in RAM for lightning-fast analysis

Fabric Spark Notebooks have a Spark session already started, so let's get right into exploring some data.

Execute the cell below to preview parquet data landing in the `Files/landing/item` folder. 

In [None]:
# Read parquet via Spark
df = spark.read.parquet('Files/landing/item')
display(df)

Run the cell below to preview JSON data from the `Files/landing/shipment` folder. Notice how there's a `data` `Struct` column. This contains the entire shipment structure with various nested elements. This data will be flattened when writing to the Silver zone.

> ‚ÑπÔ∏è **Tip:** Complex data type columns (Struct, Map, Array, etc.) can be drilled into by clicking on a cell value and then clicking the caret symbol. 

![Explore Struct](https://github.com/microsoft/fabric-analytics-roadshow-lab/blob/initial-version-prep/assets/images/spark/explore-struct.gif?raw=true)

In [None]:
# Read JSON via Spark
df = spark.read.json('Files/landing/shipment', multiLine=True)
display(df)

### Switching Between DataFrame API and Spark SQL

While the PySpark **DataFrame API** was just used to preview files, we can also use **Spark SQL** to query the same data using familiar SQL syntax. Both approaches are equally powerful and often interchangeable.

#### üìã What We'll Demonstrate

The next cells show two key SQL patterns:

**1. Creating a Temporary View**
- Register JSON files as a SQL table (exists only for this session)
- Query file-based data as if it were a database table
- Express additional options like `multiLine` JSON configuration

**2. Exploding Nested Arrays** _(you'll write this query!)_
- The shipment JSON contains an **array** of shipment records
- Use `EXPLODE()` to transform arrays into individual rows
- Use `*` to expand all columns from nested structs into flat columns

> üéØ **Why This Matters:** While many data engineers prefer the PySpark DataFrame API, Spark supports SQL too (SparkSQL). It's often easier to express complex business logic in SQL - there's no need to compromise, work in the language that you are most comfortable with! 

> üí° **Pro Tip**: Use `%%sql` magic command or `spark.sql()` to write pure SparkSQL instead of PySpark code!

First, let's create the temporary view:

In [None]:
%%sql
CREATE OR REPLACE TEMPORARY VIEW shipment_data
USING JSON
OPTIONS (
  path "Files/landing/shipment",
  multiLine "true"
);

### üéØ Challenge: Query Nested JSON Data

Now it's your turn! Write a `SELECT` statement to query the `shipment_data` temporary view and flatten the nested structure.

**üí° Hints:**
- Use `explode(<column_name>)` to expand an array into individual rows
- Use `<column_name>.*` to select all top-level elements from a struct or map
- You'll need a subquery to explode first, then expand the struct

Try it in the cell below!

In [None]:
%%sql


---

<details>
  <summary><strong>üîë Solution:</strong> Click to reveal the answer</summary>

<br/>

**Approach:**
1. Inner query: `EXPLODE(data)` converts the array into rows with alias `shipment`
2. Outer query: `shipment.*` expands all struct fields into columns

```sql
SELECT shipment.*
FROM (
    SELECT explode(data) as shipment 
    FROM shipment_data
);
```

**Key Takeaway:** This two-step pattern (explode ‚Üí expand) is fundamental for flattening nested JSON in data engineering pipelines.
  
</details>

---
## üåä Part 2: Why Structured Streaming?

**Structured Streaming** is Spark's powerful engine for processing data streams, but it's useful far beyond just real-time, low-latency scenarios. Here's why it's commonly used in modern data engineering:

### üéØ Key Benefits

1. **Built-in Incremental Processing**
   - Automatically tracks which data has been processed
   - Only processes new/changed files since the last run
   - No need to manually manage watermarks or state

1. **Exactly-Once Semantics**
   - Guarantees each record is processed exactly once
   - Prevents duplicate data in your Delta tables
   - Handles failures gracefully with automatic recovery

1. **Fault Tolerance**
   - Checkpointing saves progress automatically
   - If a job fails, it resumes from the last checkpoint
   - No data loss or reprocessing of already-handled records

1. **Unified API**
   - Same DataFrame API for batch and streaming
   - Write once, run in batch or streaming mode
   - Easy to prototype in batch, deploy as streaming

1. **Optimized for Delta Lake**
   - Native integration with Delta tables
   - Handles schema evolution automatically
   - Enables time travel and data versioning

### üíº Common Use Cases

- **ETL Pipelines**: Continuously ingest and transform data as it arrives
- **Data Lakehouse**: Build incremental Bronze ‚Üí Silver ‚Üí Gold pipelines
- **Real-time Analytics**: Power dashboards with up-to-the-minute data
- **Change Data Capture (CDC)**: Process CDC data from source systems
- **Event Processing**: Handle IoT sensors, clickstreams, logs, etc.

### üèóÔ∏è Architecture in This Lab

In our medallion architecture, Structured Streaming powers:
- **Landing ‚Üí Bronze**: Ingesting raw JSON/Parquet files with audit metadata and column renaming (snake case)
- **Bronze ‚Üí Silver**: Flattening nested structures, applying business rules, data quality checks

Even though the data arrives as files in OneLake (not a traditional message store), Structured Streaming gives us:
- Incremental processing (only new files)
- Exactly-once guarantees (no duplicates)
- Automatic restart capability (fault tolerance)
- Scalability (handles growing data volumes)

---

## üåä Part 3: Structured Streaming Fundamentals

Structured Streaming is Spark's **scalable and fault-tolerant** stream processing engine. It treats streaming data as an **unbounded table** that grows continuously.

### üß© Key Streaming Concepts

| Component | Description |
|-----------|-------------|
| **Input Source** | Where data comes from (files, Kafka, Event Hubs, etc.) |
| **Transformations** | How you process each micro-batch (same API as batch!) |
| **Output Sink** | Where results are written (Delta tables, console, memory, etc.) |
| **Checkpointing** | Tracks progress for exactly-once processing and fault tolerance |
| **Trigger Intervals** | How often to process new data (continuous, fixed interval, available now) |

### üîß The Streaming Pattern

```python
# 1. Read stream from source
df = spark.readStream.format("json").load("path/to/input")

# 2. Apply transformations (same as batch!)
transformed = df.select(...).where(...).withColumn(...)

# 3. Write to Delta Lake
query = transformed.writeStream \
    .format("delta") \
    .outputMode("append") \
    .option("checkpointLocation", "path/to/checkpoint") \
    .start("path/to/delta/table")
```

### üí° Batch vs Streaming: Same Code!

The beauty of Structured Streaming is that **the same transformation code** works for both batch and streaming. The only difference is:
- Batch: `spark.read...` ‚Üí `df.write...`
- Streaming: `spark.readStream...` ‚Üí `df.writeStream...`

Let's see this in action! First, let's query the `item` parquet files using **batch** processing:

In [None]:
item_df = spark.read.parquet('Files/landing/item')
display(item_df)

### üîÑ Convert Batch to Streaming

After validating the output, we can **simply switch** to the `readStream` API! The transformation logic remains identical, except for the need to specify the schema of the input DataFrame.

**Key Changes:**
1. `spark.read` ‚Üí `spark.readStream`
1. Schema can be implicit or defined ‚Üí `.schema()` is required for streaming operations
1. `df.write` ‚Üí `df.writeStream` (add checkpoint location and trigger)

Execute the cell below to create your first streaming pipeline:

In [None]:
# Create a streaming DataFrame to incrementally read only new parquet files as they arrive
item_stream_df = spark.readStream.schema(item_df.schema).parquet('Files/landing/item')

# Write stream triggered as a single batch (process available files)
item_stream = (item_stream_df.writeStream
    .format('delta')
    .outputMode('append')
    .option('checkpointLocation', 'Files/test/checkpoints/item')
    .trigger(availableNow=True)
    .toTable('dbo.item')
)

### üìä Monitoring Streaming Jobs

Streaming jobs can be triggered as **synchronous** or **asynchronous** operations depending on your design requirements.

**üîç Check Async Job Status:**

| Method | What It Shows |
|--------|---------------|
| `<stream>.status` | Overall job status (active, stopped, etc.) |
| `<stream>.lastProgress` | Detailed metrics about the last completed batch |
| `<stream>.awaitTermination()` | Wait for job completion (synchronous execution) |

Let's check the status of our streaming job:

In [None]:
item_stream.status

In [None]:
item_stream.lastProgress

### ‚úÖ Verify Your First Streaming Pipeline

Once the `status` shows as **"Stopped"** or `lastProgress` returns metrics, your streaming job has completed!

**üìÇ Verify the Table Was Created:**
1. Look at the **Lakehouse explorer** on the left sidebar
2. Expand the **Tables** section
3. Find the `item` table under the `dbo` schema
4. Right-click and select **Load data -> Spark**, drag and drop the table onto your Notebook, or query it with SparkSQL!

> üéâ **Congratulations!** You've just created your first Spark Structured Streaming pipeline in Microsoft Fabric!

---

## üé¨ Exploring Your Production Pipeline Data

Now let's explore the data produced by the **Spark Job Definition** you triggered earlier! It's been streaming data into Bronze and Silver zones while you've been learning Spark fundamentals.

### üìä Silver Zone: Shipment Scan Events

Run the cell below to count `shipment_scan_event` records processed to the **Silver zone**:

> üîÑ **Try This**: Run this cell multiple times over the next few minutes - watch the count grow as the streaming job processes more data!

In [None]:
%%sql
SELECT COUNT(1) FROM silver.shipment_scan_event

### üì¶ Silver Zone: Flattened Shipment Data

Remember the nested JSON structure you saw earlier in `Files/landing/shipment`? Let's see how it's been **flattened** in the Silver zone!

Query the `silver.shipment` table below to compare:

**What to Notice:**
- All nested fields are now **top-level columns** (easier to query!)
- Clean, standardized **snake_case** column names
- Data types properly enforced (timestamps, numbers, strings)
- Ready for joining with other tables and analytics

In [None]:
%%sql
SELECT * from silver.shipment LIMIT 100

### Measure End-to-End Latency: ü•â Bronze Zone

Let's measure the **latency** from when scan events are generated at IoT devices to when they land in the **Bronze** Delta table.

**Latency Calculation:**
```
Latency = Processing Timestamp - Device Generated Timestamp
```

> üí° **Visualization Tip**: After running the query, click **"New chart"** in the results to visualize latency trends over time!

In [None]:
%%sql
SELECT data.generated_at, _processing_timestamp, (unix_millis(_processing_timestamp) - unix_millis(cast(data.generated_at as timestamp))) / 1000.0 AS seconds_latency_from_source 
FROM bronze.shipment_scan_event
group by all
order by cast(data.generated_at as timestamp) desc LIMIT 100

### Measure End-to-End Latency: ü•à Silver Zone

Now let's measure latency to the **Silver zone** - this shows the complete journey through your medallion architecture.

**Data Flow:**
```
Device ‚Üí Landing ‚Üí Bronze ‚Üí Silver
```

This latency includes:
- File landing in OneLake
- Bronze zone processing (ingestion + metadata)
- Silver zone processing (flattening + transformations + validation)

In [None]:
%%sql
SELECT generated_at, _processing_timestamp, (unix_millis(_processing_timestamp) - unix_millis(generated_at)) / 1000.0 AS seconds_latency_from_source 
FROM silver.shipment_scan_event
group by all
order by generated_at desc LIMIT 100

### ‚úÖ Silver Zone Complete

**What You've Accomplished:**

Your streaming data is now:
- **Parsed** from complex JSON structures
- **Cleaned** with standardized naming and types
- **Ready** for dimensional modeling in the Gold layer

The Silver zone is where the magic happens - raw, messy data transformed into analytics-ready tables that business users can trust.


---

## üéì Part 6: Key Takeaways & Next Steps

### üèÜ What You've Accomplished

Congratulations! You've explored a production-grade streaming data pipeline in Microsoft Fabric. Here's what you've learned:

#### 1. Spark Fundamentals
- **DataFrame API**: Reading Parquet and JSON files
- **Spark SQL**: Using `%%sql` magic commands and temporary views
- **Data Exploration**: Previewing schemas and nested structures
- **Batch vs Streaming**: Understanding the unified API

#### 2. Structured Streaming in Action
- **Incremental Processing**: Only new files are processed automatically
- **Checkpointing**: Fault-tolerant, exactly-once semantics
- **Real-time Monitoring**: Using `.status` and `.lastProgress`
- **Trigger Modes**: `availableNow=True` for one-time batch processing

#### 3. Medallion Architecture (Bronze ‚Üí Silver ‚Üí Gold)
- **Bronze Zone**: Raw data preservation with audit metadata
- **Silver Zone**: Cleaned, flattened, analytics-ready data
- **Data Quality**: Understanding transformation at each layer
- **Latency Monitoring**: Measuring end-to-end processing time

#### 4. Production Pipeline Patterns
- **Spark Job Definitions**: Long-running, orchestrated pipelines
- **JSON Flattening**: Handling nested structures

---

### üé¨ Your Streaming Job Status

**Remember:** Your Spark Job Definition (`StreamBronzeAndSilver`) is still running in the background!

**Current State:**
- Generating synthetic data every few seconds
- Processing Landing ‚Üí Bronze ‚Üí Silver continuously
- Data accumulating in Delta tables

**What You Can Do:**
- **Re-run queries** in this notebook to see growing data volumes
- **Check the Lakehouse** to explore tables and schemas
- **Stop the job** if needed (navigate to workspace ‚Üí Spark Job Definition ‚Üí Cancel)

---

### üöÄ What's Next?

Continue your journey through the McMillan Industrial Group data pipeline:

| Experience | What You'll Learn |
|----------|-------------------|
| **ü•á 2_GoldLayer** Notebook | Build dimensional models in Fabric Warehouse |
| **ü§ñ 3_SalesAndLogisticsAgent** | Chat with your data via a Data Agent |

### üìö Additional Resources

Expand your knowledge with these official docs:

- [Microsoft Fabric Documentation](https://learn.microsoft.com/fabric/)
- [Spark Structured Streaming Guide](https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html)
- [Delta Lake Best Practices](https://docs.delta.io/latest/best-practices.html)

---

### üéØ Pro Tips for Your Own Projects

**When building streaming pipelines:**
1. **Start with batch** - Prototype transformations in batch mode first
2. **Monitor latency** - Track end-to-end processing times from the start
3. **Test with sample data** - Validate logic before processing production volumes
4. **Document schemas** - Maintain schema definitions for all layers
5. **Plan for reprocessing** - Design Bronze so you can reprocess Silver at any time

---

**üéâ Great work completing this notebook!** Move on to the next notebook when you're ready to build the Gold layer! üöÄ