# DP-700 Spark & Lakehouse summary

>  **Note**: this tutorial is provided for educational purposes, for members of the [Fabric Dojo community](https://skool.com/fabricdojo/about). All content contained within is protected by Copyright © law. Do not copy or re-distribute.

In this notebook, we will review a broad spectrum of code that is useful for data engineers, related to the Spark engine, the Lakehouse and Notebooks in Fabric.
 
The aim is not to teach you Python/ Spark from scratch - but to cover some of the most important topics for the DP-700: 

1. Notebook fundamentals (languages, parameters, notebookutils)
2. Table creation (Spark SQL, PySpark (create + overwrite))
3. Data cleaning & transformation (a summary of what's expected)
4. Loading methods 

Not covered in this notebook (but important for the exam, we'll look at this separately)
- Version Control & CI/CD of Notebooks 
- Monitoring Spark 
- Error-Handling
- Optimizations 

## Prerequisites
- Download this notebook at the bottom of this page, and then upload it into your Fabric workspace. 
- Connect the notebook to a Lakehouse (this can be an empty Lakehouse when you begin the exercises)
- Download the files from Skool for this exercise, unzip the folder and upload the files into the 'Files/' folder of your Lakehouse. 


## 1. Notebook fundamentals
- Multi-language notebooks
- Notebook parameters for Parameterization (from within a Data Pipeline)
- notebookutils 
    - file management 
    - grabbing Azure Key Vault secrets
    - notebook orchestration

#### 1.1 Multi-language support
In Fabric, we now have three different 'engines' that can all use a Fabric notebook (separately). 
- Python Notebooks 
- T-SQL Notebooks 
- Spark Notebooks (including Spark SQL, SparkR, PySpark, Spark Scala)


#### 1.2 Notebook parameters  

In [None]:
param1 = 1 
param2 = 2

#### 1.3 notebookutils

`notebookutils` is a Python package (see [documentation](https://learn.microsoft.com/en-us/fabric/data-engineering/notebook-utilities)), created and maintained by Microsoft with a selection of utily functions to help with common tasks in Fabric notebooks, including: 
- file management 
- getting secrets from Azure Key Vault 
- running notebooks programatically
- much more

In [1]:
import notebookutils


StatementMeta(, 5533ec83-2d13-4684-9493-96ff4579f1d9, 3, Finished, Available, Finished)

In [None]:
import notebookutils

# some useful notebookutils functions 👇
notebookutils.fs.mv(...)
notebookutils.notebook.runMultiple(...)
notebookutils.credentials.getSecret(...)


## 2. Table creation methods

An easy method for table creation using Spark SQL:

In [2]:
%%sql
-- Spark SQL Table CREATION
CREATE TABLE MyTable_SparkSQL (
    `PatientId` INT,
    `Gender` STRING,
    `Scholarship` BOOLEAN
)

StatementMeta(, 5533ec83-2d13-4684-9493-96ff4579f1d9, 4, Finished, Available, Finished)

<Spark SQL result set with 0 rows and 0 fields>

We can also create tables using the `delta.tables` package

In [3]:
from delta.tables import DeltaTable
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, BooleanType

schema = StructType([
    StructField("PatientId", IntegerType(), True),
    StructField("Gender", StringType(), True),
    StructField("Scholarship", BooleanType(), True),
    StructField("Age", IntegerType(), True)
])

# Update the Delta table schema using createOrReplace
DeltaTable.createOrReplace(spark) \
    .tableName("mytable_pysparkapi") \
    .addColumns(schema) \
    .execute()


StatementMeta(, 5533ec83-2d13-4684-9493-96ff4579f1d9, 5, Finished, Available, Finished)

<delta.tables.DeltaTable at 0x7978580e5910>

## 3. Cleaning and transformation

Ensure you have a broad knowledge of the fundamentals of data cleaning using PySpark. Including as a good starting point: 
- DataFrame maniulation 
- withColumns() 
- Filtering
- Grouping & Aggregations 
- Joins 

For this, I recommend you go through the Spark Drills section of Fabric Dojo: 
- [LH007 🟢 PySpark Drills 1: DataFrames](https://www.skool.com/fabricdojo/classroom/951923c3?md=8cd0fdc9b09c41dfbaf40ac234f5e413)
- [LH008 🟢 PySpark Drills 2: Filtering](https://www.skool.com/fabricdojo/classroom/951923c3?md=b40d9744e04f4efa9cfa75b1dfa5fd6f)
- [LH009 🟢 PySpark Drills 3: GroupBy](https://www.skool.com/fabricdojo/classroom/951923c3?md=2f3655233e8943b28ac28f6bc580e1f5)
- [LH010 🔶 PySpark Drills 4: Data cleaning](https://www.skool.com/fabricdojo/classroom/951923c3?md=0a5a4d5ba5cb48ceb64ef688a6d3e640)
- [LH011 🔶 PySpark Drills 5: Reshaping data](https://www.skool.com/fabricdojo/classroom/951923c3?md=669f686ca31645d0a3a4b312a8f6a810)
- [LH012 ⬛ Audible Data Cleaning (Spark)](https://www.skool.com/fabricdojo/classroom/951923c3?md=2a7bac055bf44e3e982b94ebce111c65)


Plus I also have gone through some of this in my [Spark video on YouTube](https://youtu.be/02lSlhwLU4c). 






## 4. Table loading techniques

## 4.1 Full loading 
Table generation, from an existing Spark Dataframe: 
- If no table existing currently, a new table will be created, 
- then on subsequent executions, the data will be overwritten with the data in the Spark dataframe. 

In [4]:
# Loading a CSV File into a Spark Dataframe
df = (
    spark.read
        .option('header', True)
        .option('inferSchema', True)
        .csv('Files/hubspot_contacts_test.csv')
)


StatementMeta(, 5533ec83-2d13-4684-9493-96ff4579f1d9, 6, Finished, Available, Finished)

#### Exploring different syntax for loading data from a Spark dataframe into a Lakehouse

In [5]:
# Creating a Lakehouse Table, from a Spark Dataframe, and the saveAsTable
( 
    df.write
        .format("delta")
        .mode("overwrite")
        .option("mergeSchema", True)
        .saveAsTable("hubspot_contacts")
)

StatementMeta(, 5533ec83-2d13-4684-9493-96ff4579f1d9, 7, Finished, Available, Finished)

In [6]:
# creating a new managed table using .save() notation
( 
    df.write
        .format("delta")
        .mode("overwrite")
        .save("Tables/managed_table_using_save")
)

StatementMeta(, 5533ec83-2d13-4684-9493-96ff4579f1d9, 8, Finished, Available, Finished)

In [7]:
( 
    df.write
        .format("delta")
        .mode("overwrite")
        .save("Files/unmanaged_table_using_save")
)

StatementMeta(, 5533ec83-2d13-4684-9493-96ff4579f1d9, 9, Finished, Available, Finished)

#### 4.2 Incremental Loading 
Using Spark SQL

In [8]:
updates_df = (
    spark.read
        .option('header', True)
        .option('inferSchema', True)
        .csv('Files/hubspot_contacts_updates.csv')
)

updates_df.createOrReplaceTempView("contacts_updates")


StatementMeta(, 5533ec83-2d13-4684-9493-96ff4579f1d9, 10, Finished, Available, Finished)

In [10]:
%%sql
MERGE INTO hubspot_contacts AS target
USING contacts_updates AS source
ON target.customer_id = source.customer_id
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *

StatementMeta(, 5533ec83-2d13-4684-9493-96ff4579f1d9, 12, Finished, Available, Finished)

<Spark SQL result set with 1 rows and 4 fields>

Incremental Loading (MERGE/ UPSERT) using the `delta.table` library

In [9]:
##incremental Loading using the DeltaTable api 

# get our hc delta table 
hubspot_contacts = DeltaTable.forName(spark, "hubspot_contacts")

# perform the merge 
( 
    hubspot_contacts
        .alias("target")
        .merge(updates_df.alias("source"),"target.customer_id = source.customer_id")
        .whenMatchedUpdateAll() 
        .whenNotMatchedInsertAll() 
        .execute()
)

StatementMeta(, 5533ec83-2d13-4684-9493-96ff4579f1d9, 11, Finished, Available, Finished)

In [11]:
df = spark.table("hubspot_contacts")
display(df)

StatementMeta(, 5533ec83-2d13-4684-9493-96ff4579f1d9, 13, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 5879517a-4db4-46ba-865a-18b6d90923db)