# Data Engineering and Modeling with PySpark
This notebook serves as Part 1 of Lab 5: Data Engineering in Fabric Notebooks. The goal is to demonstrate the foundational steps of data engineering using PySpark, leading to the creation of a Delta table. We will explore different methods of reading data into a DataFrame and how to transform this data effectively.

## Reading Data into DataFrames
This section illustrates different methods to read data into a Spark DataFrame. Understanding these methods is crucial as DataFrames serve as the core structure for data manipulation in Spark. Each method, while yielding the same result, offers different approaches that can be utilized based on the specific requirements of your data processing task.

### Using Spark SQL
Here, we use Spark SQL to load data into a DataFrame. This method is particularly useful if you are comfortable with SQL syntax. It allows you to leverage the power of SQL queries within the Spark environment.

Take note of the multi-part qualifier being used in the SELECT statement. "Lakehouse_WC" refers to a specific Lakehouse in our Fabric environment.

```python
# Load data into a DataFrame using SparkSQL
df = spark.sql("SELECT * FROM Lakehouse_WC.packagetypes")

# Display the DataFrame
df.show()

In [2]:
# Add your code in this cell
# You can use the markdown cell above for reference
# Remember to change the name of the Lakehouse (Lakehouse_{your initials})



StatementMeta(, 73219191-9347-4a14-9d40-f3a8e23e889d, 4, Finished, Available)

+-------------+---------------+------------+-------------------+--------------------+
|PackageTypeID|PackageTypeName|LastEditedBy|          ValidFrom|             ValidTo|
+-------------+---------------+------------+-------------------+--------------------+
|            1|            Bag|           1|2013-01-01 00:00:00|9999-12-31 23:59:...|
|            2|          Block|           1|2013-01-01 00:00:00|9999-12-31 23:59:...|
|            3|         Bottle|           1|2013-01-01 00:00:00|9999-12-31 23:59:...|
|            4|            Box|           1|2013-01-01 00:00:00|9999-12-31 23:59:...|
|            5|            Can|           1|2013-01-01 00:00:00|9999-12-31 23:59:...|
|            6|         Carton|           1|2013-01-01 00:00:00|9999-12-31 23:59:...|
|            7|           Each|           1|2013-01-01 00:00:00|9999-12-31 23:59:...|
|            8|             Kg|           1|2013-01-01 00:00:00|9999-12-31 23:59:...|
|            9|         Packet|           1|2013-01-01

### Using PySpark DataFrame API
Alternatively, we can use the PySpark DataFrame API to achieve the same result. This approach is more native to Spark and utilizes the DataFrame API's methods for data manipulation. 

```python
# Load the data into a DataFrame using PySpark DataFrame API
df = spark.table("Lakehouse_WC.packagetypes")

# Show the DataFrame
df.show()

In [3]:
# Add your code in this cell
# You can use the markdown cell above for reference
# Remember to change the name of the Lakehouse (Lakehouse_{your initials})



StatementMeta(, 73219191-9347-4a14-9d40-f3a8e23e889d, 5, Finished, Available)

+-------------+---------------+------------+-------------------+--------------------+
|PackageTypeID|PackageTypeName|LastEditedBy|          ValidFrom|             ValidTo|
+-------------+---------------+------------+-------------------+--------------------+
|            1|            Bag|           1|2013-01-01 00:00:00|9999-12-31 23:59:...|
|            2|          Block|           1|2013-01-01 00:00:00|9999-12-31 23:59:...|
|            3|         Bottle|           1|2013-01-01 00:00:00|9999-12-31 23:59:...|
|            4|            Box|           1|2013-01-01 00:00:00|9999-12-31 23:59:...|
|            5|            Can|           1|2013-01-01 00:00:00|9999-12-31 23:59:...|
|            6|         Carton|           1|2013-01-01 00:00:00|9999-12-31 23:59:...|
|            7|           Each|           1|2013-01-01 00:00:00|9999-12-31 23:59:...|
|            8|             Kg|           1|2013-01-01 00:00:00|9999-12-31 23:59:...|
|            9|         Packet|           1|2013-01-01

## Building the Package Types Silver Table Using PySpark
Moving forward, we will demonstrate how to build a specific data structure - in this case, a silver table for package types. This involves cleansing and preparing the data to meet the requirements of a dimensional model, commonly used in data warehousing and analytics.

### Utilizing DataFrame API with Method Chaining
This example showcases the use of method chaining in PySpark, which enhances readability and maintains a clear flow of data transformations.

```python
# Create a DataFrame using the DataFrame API with chaining format
distinct_df = (
    spark.table("Lakehouse_WC.PackageTypes")  # Reads the "PackageTypes" table
    .select("PackageTypeId", "PackageTypeName")  # Selects the specified columns
    .distinct()  # Retrieves distinct rows
)

# Show the result
distinct_df.show()

In [5]:
# Add your code in this cell
# You can use the markdown cell above for reference




StatementMeta(, 73219191-9347-4a14-9d40-f3a8e23e889d, 7, Finished, Available)

+-------------+---------------+
|PackageTypeId|PackageTypeName|
+-------------+---------------+
|            8|             Kg|
|            4|            Box|
|            7|           Each|
|           14|           Tube|
|            1|            Bag|
|            3|         Bottle|
|            5|            Can|
|           11|         Pallet|
|            6|         Carton|
|           12|           Tray|
|           13|           Tub |
|           10|           Pair|
|            2|          Block|
|            9|         Packet|
+-------------+---------------+



### Creating a Delta Table
Finally, we write our transformed DataFrame to a Delta table. Delta tables provide advanced capabilities like ACID transactions, scalable metadata handling, and unifies streaming and batch data processing.

```python
# Write the DataFrame to the Delta table
distinct_df.write.format("delta").saveAsTable("Silver_Lakehouse_WC.package_type")

In [6]:
# Add your code in this cell
# You can use the markdown cell above for reference




StatementMeta(, 73219191-9347-4a14-9d40-f3a8e23e889d, 8, Finished, Available)

## Data Transformation with Spark SQL
Spark SQL integrates seamlessly with PySpark, allowing the execution of SQL queries. Multi-line SQL queries can be neatly encapsulated within triple quotes (`"""`) in Python. This practice improves the readability of complex SQL statements.

```python
# Use Spark SQL for complex data transformation
# Triple quotes are used for multi-line SQL queries
sales_orders_df = spark.sql("""
SELECT 
    a.OrderLineId,
    a.OrderId,
    a.StockItemId,
    a.PackageTypeId,
    b.CustomerId,
    b.SalespersonPersonId,
    c.DeliveryCityId,
    c.BillToCustomerId,
    b.OrderDate,
    b.ExpectedDeliveryDate,
    a.Quantity,
    a.UnitPrice,
    a.TaxRate,
    a.PickedQuantity
FROM 
    Lakehouse_WC.Sales_OrderLines a
JOIN 
    Lakehouse_WC.Sales_Orders b ON a.OrderId = b.OrderId
JOIN 
    Lakehouse_WC.Sales_Customers c ON b.CustomerId = c.CustomerId
""")

# Display a sample of the DataFrame
sales_orders_df.show(5)

In [8]:
# Add your code in this cell
# You can use the markdown cell above for reference




StatementMeta(, 73219191-9347-4a14-9d40-f3a8e23e889d, 10, Finished, Available)

+-----------+-------+-----------+-------------+----------+-------------------+--------------+----------------+----------+--------------------+--------+---------+-------+--------------+
|OrderLineId|OrderId|StockItemId|PackageTypeId|CustomerId|SalespersonPersonId|DeliveryCityId|BillToCustomerId| OrderDate|ExpectedDeliveryDate|Quantity|UnitPrice|TaxRate|PickedQuantity|
+-----------+-------+-----------+-------------+----------+-------------------+--------------+----------------+----------+--------------------+--------+---------+-------+--------------+
|      25168|   7903|        141|           10|       940|                 14|         27543|             940|2020-05-30|          2020-05-31|      24|     5.00| 15.000|            24|
|      65871|  20750|        141|           10|       988|                  6|         19979|             988|2021-01-24|          2021-01-27|      24|     5.00| 15.000|            24|
|      97458|  30762|        141|           10|        75|                 

### Writing Data to a Delta Table
Writing the DataFrame to a Delta table is the next step. Delta tables provide a more advanced format for storage and querying within Spark. This step demonstrates the process of persisting a DataFrame as a Delta table, a common practice in data engineering for building reliable data pipelines.

```python
# Persist the DataFrame as a Delta table
sales_orders_df.write.format("delta").saveAsTable("Silver_Lakehouse_WC.sales_orders")

In [9]:
# Add your code in this cell
# You can use the markdown cell above for reference




StatementMeta(, 73219191-9347-4a14-9d40-f3a8e23e889d, 11, Finished, Available)