In [0]:
The notebook explains data lakehouse in simple terms and what task data lakehouse is capable to perform.
Datasets: 
    1. https://www.kaggle.com/datasets/divyeshardeshana/warehouse-and-retail-sales (structured data aka csv file)
    2. https://github.com/robconery/json-sales-data/blob/master/data/customers.json (semi structured data aka JSON file )


**Objective:**  Demonstrate how a Data Lakehouse facilitates the integration of structured, semi-structured and unstructured data.

In [0]:
from pyspark.sql.functions import col, when
sales_df = spark.read.option("header", True).csv("dbfs:/FileStore/Warehouse_and_Retail_Sales.csv") # reading structured data (CSV)
sales_df.show()

+----+-----+--------------------+---------+--------------------+---------+------------+----------------+---------------+
|YEAR|MONTH|            SUPPLIER|ITEM CODE|    ITEM DESCRIPTION|ITEM TYPE|RETAIL SALES|RETAIL TRANSFERS|WAREHOUSE SALES|
+----+-----+--------------------+---------+--------------------+---------+------------+----------------+---------------+
|2020|    1|REPUBLIC NATIONAL...|   100009| BOOTLEG RED - 750ML|     WINE|        0.00|            0.00|           2.00|
|2020|    1|           PWSWN INC|   100024|MOMENT DE PLAISIR...|     WINE|        0.00|            1.00|           4.00|
|2020|    1|RELIABLE CHURCHIL...|     1001|S SMITH ORGANIC P...|     BEER|        0.00|            0.00|           1.00|
|2020|    1|LANTERNA DISTRIBU...|   100145|SCHLINK HAUS KABI...|     WINE|        0.00|            0.00|           1.00|
|2020|    1|DIONYSOS IMPORTS INC|   100293|SANTORINI GAVALA ...|     WINE|        0.82|            0.00|           0.00|
|2020|    1|KYSELA PERE ET FI...

In [0]:
customers_df = spark.read.json("dbfs:/FileStore/customers.json") # reading semi-structured data (JSON)
customers_df.show()

+--------------------+--------------------+--------------------+--------------------+-------+---+---------+
|             company|             country|          created_at|               email|  first| id|     last|
+--------------------+--------------------+--------------------+--------------------+-------+---+---------+
|Hilll, Mayert and...|         Switzerland|2014-12-25T04:06:...|isidro_von@hotmai...| Torrey|  1|     Veum|
|      Stokes-Reichel|Democratic People...|2014-07-03T16:08:...|frederique19@gmai...|  Micah|  2|  Sanford|
|Rodriguez, Cartwr...|             Tunisia|2014-08-18T06:15:...|   fredy54@gmail.com| Hollis|  3|    Swift|
|Sipes, Feeney and...|                Chad|2014-07-10T11:31:...|braxton29@hotmail...|  Perry|  4|  Leffler|
| Lesch and Daughters|           Swaziland|2014-04-21T15:05:...|  turner59@gmail.com|Janelle|  5|  Hagenes|
|    Gorczany-Monahan|             Lebanon|2014-09-21T21:59:...|   halie47@yahoo.com|Charity|  6|  Bradtke|
|   Williamson-Hickle|      

Managing and transforming data

In [0]:
customers_df = customers_df.withColumn("customer_type", when(col("email").contains("@business.com"), "Business").otherwise("Personal")) # Extracting customer type from email domain as a new unstructured trait
customers_df.show()

+--------------------+--------------------+--------------------+--------------------+---------+---+----------+-------------+
|             company|             country|          created_at|               email|    first| id|      last|customer_type|
+--------------------+--------------------+--------------------+--------------------+---------+---+----------+-------------+
|Hilll, Mayert and...|         Switzerland|2014-12-25T04:06:...|isidro_von@hotmai...|   Torrey|  1|      Veum|     Personal|
|      Stokes-Reichel|Democratic People...|2014-07-03T16:08:...|frederique19@gmai...|    Micah|  2|   Sanford|     Personal|
|Rodriguez, Cartwr...|             Tunisia|2014-08-18T06:15:...|   fredy54@gmail.com|   Hollis|  3|     Swift|     Personal|
|Sipes, Feeney and...|                Chad|2014-07-10T11:31:...|braxton29@hotmail...|    Perry|  4|   Leffler|     Personal|
| Lesch and Daughters|           Swaziland|2014-04-21T15:05:...|  turner59@gmail.com|  Janelle|  5|   Hagenes|     Personal|


In [0]:
integrated_df = sales_df.join(customers_df) #Concatenating structured and semi-structured data to create a unified view
integrated_df.show(truncate=False)

+----+-----+---------------------------------+---------+-------------------+---------+------------+----------------+---------------+------------------------------+-------------------------------------+------------------------+-------------------------------+-------+---+---------+-------------+
|YEAR|MONTH|SUPPLIER                         |ITEM CODE|ITEM DESCRIPTION   |ITEM TYPE|RETAIL SALES|RETAIL TRANSFERS|WAREHOUSE SALES|company                       |country                              |created_at              |email                          |first  |id |last     |customer_type|
+----+-----+---------------------------------+---------+-------------------+---------+------------+----------------+---------------+------------------------------+-------------------------------------+------------------------+-------------------------------+-------+---+---------+-------------+
|2020|1    |REPUBLIC NATIONAL DISTRIBUTING CO|100009   |BOOTLEG RED - 750ML|WINE     |0.00        |0.00            

**Objective:** Show how Data Lakehouse architecture leverages Spark’s distributed processing to handle large volumes of data efficiently.

In [0]:
large_sales_df = spark.read.option("header", True).csv("dbfs:/FileStore/Warehouse_and_Retail_Sales.csv") # Loading a large dataset 
large_sales_df.show()

+----+-----+--------------------+---------+--------------------+---------+------------+----------------+---------------+
|YEAR|MONTH|            SUPPLIER|ITEM CODE|    ITEM DESCRIPTION|ITEM TYPE|RETAIL SALES|RETAIL TRANSFERS|WAREHOUSE SALES|
+----+-----+--------------------+---------+--------------------+---------+------------+----------------+---------------+
|2020|    1|REPUBLIC NATIONAL...|   100009| BOOTLEG RED - 750ML|     WINE|        0.00|            0.00|           2.00|
|2020|    1|           PWSWN INC|   100024|MOMENT DE PLAISIR...|     WINE|        0.00|            1.00|           4.00|
|2020|    1|RELIABLE CHURCHIL...|     1001|S SMITH ORGANIC P...|     BEER|        0.00|            0.00|           1.00|
|2020|    1|LANTERNA DISTRIBU...|   100145|SCHLINK HAUS KABI...|     WINE|        0.00|            0.00|           1.00|
|2020|    1|DIONYSOS IMPORTS INC|   100293|SANTORINI GAVALA ...|     WINE|        0.82|            0.00|           0.00|
|2020|    1|KYSELA PERE ET FI...

Reviewing the schemas to identify potential ways to link the data.

In [0]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("DataLakehouseExample").getOrCreate()
customers_df = spark.read.json("dbfs:/FileStore/customers.json")
sales_df = spark.read.option("header", True).csv("dbfs:/FileStore/Warehouse_and_Retail_Sales.csv")

# displaying the schema of each DataFrame
print("Customers Schema:")
customers_df.printSchema()

print("Sales Schema:")
sales_df.printSchema()


Customers Schema:
root
 |-- company: string (nullable = true)
 |-- country: string (nullable = true)
 |-- created_at: string (nullable = true)
 |-- email: string (nullable = true)
 |-- first: string (nullable = true)
 |-- id: long (nullable = true)
 |-- last: string (nullable = true)

Sales Schema:
root
 |-- YEAR: string (nullable = true)
 |-- MONTH: string (nullable = true)
 |-- SUPPLIER: string (nullable = true)
 |-- ITEM CODE: string (nullable = true)
 |-- ITEM DESCRIPTION: string (nullable = true)
 |-- ITEM TYPE: string (nullable = true)
 |-- RETAIL SALES: string (nullable = true)
 |-- RETAIL TRANSFERS: string (nullable = true)
 |-- WAREHOUSE SALES: string (nullable = true)



Performing a distributed processing operation

In [0]:
from pyspark.sql.functions import col
large_sales_df = large_sales_df.withColumn("WAREHOUSE SALES", col("WAREHOUSE SALES").cast("float")) # By checking the schema, it was concluded that WAREHOUSE SALES is defined as a string. So converting the 'WAREHOUSE SALES' column to float

In [0]:
aggregated_sales = large_sales_df.groupBy("ITEM TYPE").sum("WAREHOUSE SALES") # Then, performing the aggregation
aggregated_sales.show()

+--------------------+--------------------+
|           ITEM TYPE|sum(WAREHOUSE SALES)|
+--------------------+--------------------+
|        STR_SUPPLIES|                 0.0|
|                WINE|  1156981.9099973887|
|              LIQUOR|   94906.26995941997|
|                BEER|     6527236.5027849|
|             DUNNAGE|           -121454.0|
| DOMAINE PIRON - ...|                 0.0|
|         NON-ALCOHOL|  26149.590002059937|
|                 REF|            -20499.0|
|                KEGS|            118431.0|
|                null|                 1.0|
+--------------------+--------------------+



In [0]:
aggregated_sales.write.partitionBy("ITEM TYPE").mode("overwrite").parquet("/mnt/data/output/aggregated_warehouse_sales") # saving the large processed data back to the data lakehouse in a scalable format

To verify that the data has been saved correctly in the Parquet format, reading the Parquet files back into a DataFrame and displaying its contents

In [0]:
aggregated_sales_parquet_df = spark.read.parquet("/mnt/data/output/aggregated_warehouse_sales")
aggregated_sales_parquet_df.show()

+--------------------+--------------------+
|sum(WAREHOUSE SALES)|           ITEM TYPE|
+--------------------+--------------------+
|                 1.0|                null|
|  1156981.9099973887|                WINE|
|            -20499.0|                 REF|
|   94906.26995941997|              LIQUOR|
|            118431.0|                KEGS|
|  26149.590002059937|         NON-ALCOHOL|
|     6527236.5027849|                BEER|
|           -121454.0|             DUNNAGE|
|                 0.0| DOMAINE PIRON - ...|
|                 0.0|        STR_SUPPLIES|
+--------------------+--------------------+



In [0]:
flipped_aggregated_sales = aggregated_sales_parquet_df.select("ITEM TYPE", "sum(WAREHOUSE SALES)") #just flipping columns
flipped_aggregated_sales.show()

+--------------------+--------------------+
|           ITEM TYPE|sum(WAREHOUSE SALES)|
+--------------------+--------------------+
|                null|                 1.0|
|                WINE|  1156981.9099973887|
|                 REF|            -20499.0|
|              LIQUOR|   94906.26995941997|
|                KEGS|            118431.0|
|         NON-ALCOHOL|  26149.590002059937|
|                BEER|     6527236.5027849|
|             DUNNAGE|           -121454.0|
| DOMAINE PIRON - ...|                 0.0|
|        STR_SUPPLIES|                 0.0|
+--------------------+--------------------+



**Objective:** Illustrating the capability of performing advanced analytics and machine learning directly on stored data in the data lakehouse.

In [0]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression

Preparing data for machine learning

In [0]:
sales_ml_df = sales_df.select(
    col("WAREHOUSE SALES").cast("double"),
    col("RETAIL SALES").cast("double"),
    col("YEAR").cast("integer")
)

Assembling features

In [0]:
assembler = VectorAssembler(inputCols=["YEAR", "RETAIL SALES"], outputCol="features")
sales_features = assembler.transform(sales_ml_df)
sales_features.show()

+---------------+------------+----+-------------+
|WAREHOUSE SALES|RETAIL SALES|YEAR|     features|
+---------------+------------+----+-------------+
|            2.0|         0.0|2020| [2020.0,0.0]|
|            4.0|         0.0|2020| [2020.0,0.0]|
|            1.0|         0.0|2020| [2020.0,0.0]|
|            1.0|         0.0|2020| [2020.0,0.0]|
|            0.0|        0.82|2020|[2020.0,0.82]|
|            6.0|        2.76|2020|[2020.0,2.76]|
|            1.0|        0.08|2020|[2020.0,0.08]|
|            2.0|         0.0|2020| [2020.0,0.0]|
|            0.0|        6.41|2020|[2020.0,6.41]|
|            2.0|        0.33|2020|[2020.0,0.33]|
|            0.0|         1.7|2020| [2020.0,1.7]|
|            0.0|        1.02|2020|[2020.0,1.02]|
|            0.0|        0.68|2020|[2020.0,0.68]|
|            0.0|        0.34|2020|[2020.0,0.34]|
|            2.0|         0.0|2020| [2020.0,0.0]|
|            1.0|         0.0|2020| [2020.0,0.0]|
|            1.0|        0.34|2020|[2020.0,0.34]|


Training a linear regression model directly on the data lakehouse stored data

Handling NULL values

In [0]:
from pyspark.sql.functions import col
sales_ml_df = sales_ml_df.fillna({'YEAR': 0, 'RETAIL SALES': 0.0, 'WAREHOUSE SALES': 0.0}) # filling null values with a default value, for example, 0
assembler = VectorAssembler(inputCols=["YEAR", "RETAIL SALES"], outputCol="features") # re-trrying the VectorAssembler
sales_features = assembler.transform(sales_ml_df)
sales_features.show()

+---------------+------------+----+-------------+
|WAREHOUSE SALES|RETAIL SALES|YEAR|     features|
+---------------+------------+----+-------------+
|            2.0|         0.0|2020| [2020.0,0.0]|
|            4.0|         0.0|2020| [2020.0,0.0]|
|            1.0|         0.0|2020| [2020.0,0.0]|
|            1.0|         0.0|2020| [2020.0,0.0]|
|            0.0|        0.82|2020|[2020.0,0.82]|
|            6.0|        2.76|2020|[2020.0,2.76]|
|            1.0|        0.08|2020|[2020.0,0.08]|
|            2.0|         0.0|2020| [2020.0,0.0]|
|            0.0|        6.41|2020|[2020.0,6.41]|
|            2.0|        0.33|2020|[2020.0,0.33]|
|            0.0|         1.7|2020| [2020.0,1.7]|
|            0.0|        1.02|2020|[2020.0,1.02]|
|            0.0|        0.68|2020|[2020.0,0.68]|
|            0.0|        0.34|2020|[2020.0,0.34]|
|            2.0|         0.0|2020| [2020.0,0.0]|
|            1.0|         0.0|2020| [2020.0,0.0]|
|            1.0|        0.34|2020|[2020.0,0.34]|


Ensuring correct data type

In [0]:
sales_ml_df = sales_ml_df.withColumn("YEAR", col("YEAR").cast("double")) # casting the YEAR column to double
sales_ml_df = sales_ml_df.withColumn("RETAIL SALES", col("RETAIL SALES").cast("double")) # casting REATIL SALES column to double
sales_ml_df = sales_ml_df.withColumn("WAREHOUSE SALES", col("WAREHOUSE SALES").cast("double")) # casting WAREHOUSE SALES column to double

Running the assemblor and linear regression after identifying correct data type

In [0]:
assembler = VectorAssembler(inputCols=["YEAR", "RETAIL SALES"], outputCol="features")
sales_features = assembler.transform(sales_ml_df)
sales_features.show()

+---------------+------------+------+-------------+
|WAREHOUSE SALES|RETAIL SALES|  YEAR|     features|
+---------------+------------+------+-------------+
|            2.0|         0.0|2020.0| [2020.0,0.0]|
|            4.0|         0.0|2020.0| [2020.0,0.0]|
|            1.0|         0.0|2020.0| [2020.0,0.0]|
|            1.0|         0.0|2020.0| [2020.0,0.0]|
|            0.0|        0.82|2020.0|[2020.0,0.82]|
|            6.0|        2.76|2020.0|[2020.0,2.76]|
|            1.0|        0.08|2020.0|[2020.0,0.08]|
|            2.0|         0.0|2020.0| [2020.0,0.0]|
|            0.0|        6.41|2020.0|[2020.0,6.41]|
|            2.0|        0.33|2020.0|[2020.0,0.33]|
|            0.0|         1.7|2020.0| [2020.0,1.7]|
|            0.0|        1.02|2020.0|[2020.0,1.02]|
|            0.0|        0.68|2020.0|[2020.0,0.68]|
|            0.0|        0.34|2020.0|[2020.0,0.34]|
|            2.0|         0.0|2020.0| [2020.0,0.0]|
|            1.0|         0.0|2020.0| [2020.0,0.0]|
|           

In [0]:
lr = LinearRegression(featuresCol="features", labelCol="WAREHOUSE SALES")
lr_model = lr.fit(sales_features)

Looking at the coefficients and the intercept of the trained model and summaries of the model's performance.

In [0]:
print("Coefficients: " + str(lr_model.coefficients))
print("Intercept: " + str(lr_model.intercept))

Coefficients: [1.0604673343325666,3.9557116427964916]
Intercept: -2142.9784648593463


In [0]:
trainingSummary = lr_model.summary # summarizing the model over the training set and printing out some metrics
print("RMSE: %f" % trainingSummary.rootMeanSquaredError)
print("r2: %f" % trainingSummary.r2)

RMSE: 217.788323
r2: 0.240584


Showing predictions

In [0]:
predictions = lr_model.transform(sales_features)
predictions.select("features", "WAREHOUSE SALES", "prediction").show()

+-------------+---------------+-------------------+
|     features|WAREHOUSE SALES|         prediction|
+-------------+---------------+-------------------+
| [2020.0,0.0]|            2.0|-0.8344495075616578|
| [2020.0,0.0]|            4.0|-0.8344495075616578|
| [2020.0,0.0]|            1.0|-0.8344495075616578|
| [2020.0,0.0]|            1.0|-0.8344495075616578|
|[2020.0,0.82]|            0.0|  2.409234039531384|
|[2020.0,2.76]|            6.0| 10.083314626556785|
|[2020.0,0.08]|            1.0|-0.5179925761381128|
| [2020.0,0.0]|            2.0|-0.8344495075616578|
|[2020.0,6.41]|            0.0|   24.5216621227637|
|[2020.0,0.33]|            2.0| 0.4709353345610907|
| [2020.0,1.7]|            0.0|  5.890260285192198|
|[2020.0,1.02]|            0.0| 3.2003763680909287|
|[2020.0,0.68]|            0.0| 1.8554344095400666|
|[2020.0,0.34]|            0.0| 0.5104924509892044|
| [2020.0,0.0]|            2.0|-0.8344495075616578|
| [2020.0,0.0]|            1.0|-0.8344495075616578|
|[2020.0,0.3

**Objective:** Showcase both real-time and batch processing capabilities to address different business needs.

Batch processing: Process large batch of sales data for monthly report

Retail sales is not a numeric column so making it to numeric

In [0]:
from pyspark.sql.functions import col
sales_df = sales_df.withColumn("RETAIL SALES", col("RETAIL SALES").cast("float"))
sales_df.show()

+----+-----+--------------------+---------+--------------------+---------+------------+----------------+---------------+
|YEAR|MONTH|            SUPPLIER|ITEM CODE|    ITEM DESCRIPTION|ITEM TYPE|RETAIL SALES|RETAIL TRANSFERS|WAREHOUSE SALES|
+----+-----+--------------------+---------+--------------------+---------+------------+----------------+---------------+
|2020|    1|REPUBLIC NATIONAL...|   100009| BOOTLEG RED - 750ML|     WINE|         0.0|            0.00|           2.00|
|2020|    1|           PWSWN INC|   100024|MOMENT DE PLAISIR...|     WINE|         0.0|            1.00|           4.00|
|2020|    1|RELIABLE CHURCHIL...|     1001|S SMITH ORGANIC P...|     BEER|         0.0|            0.00|           1.00|
|2020|    1|LANTERNA DISTRIBU...|   100145|SCHLINK HAUS KABI...|     WINE|         0.0|            0.00|           1.00|
|2020|    1|DIONYSOS IMPORTS INC|   100293|SANTORINI GAVALA ...|     WINE|        0.82|            0.00|           0.00|
|2020|    1|KYSELA PERE ET FI...

In [0]:
monthly_sales_report = sales_df.groupBy("MONTH", "ITEM TYPE").sum("RETAIL SALES") #and now grouping
monthly_sales_report.show()

+-----+--------------------+------------------+
|MONTH|           ITEM TYPE| sum(RETAIL SALES)|
+-----+--------------------+------------------+
|    9|                KEGS|               0.0|
|    6|        STR_SUPPLIES| 225.7499957755208|
|    7|        STR_SUPPLIES| 271.5499989837408|
|    9|        STR_SUPPLIES|298.41999676823616|
|    7| DOMAINE PIRON - ...|              null|
|    3|                 REF| 42.02999994158745|
|    6|                KEGS|               0.0|
|    1|        STR_SUPPLIES|300.32000321149826|
|    6|                 REF| 55.22999906539917|
|    3|              LIQUOR| 72798.43002120964|
|    3|                KEGS|               0.0|
|    6|             DUNNAGE|               0.0|
|    7|             DUNNAGE|               0.0|
|    1|                WINE| 83628.99995342083|
|    3|                BEER| 51115.74006219581|
|    3|                WINE| 66526.01994728297|
|    6|                WINE|61210.149976285174|
|    9|              LIQUOR|  95013.8101

Real-time processing simulation: Update dashboard as new sales data arrives. 
Unfortunately,we weren't able to find real time data so we had to leave it blank

**Objective:** Explain how Data Lakehouse manages data governance and metadata to ensure data quality and accessibility.

Metadata management example: Describing and cataloging data

In [0]:
sales_df.describe().show() # cataloging structured data

+-------+------------------+------------------+----------+------------------+--------------------+--------------------+------------------+------------------+------------------+
|summary|              YEAR|             MONTH|  SUPPLIER|         ITEM CODE|    ITEM DESCRIPTION|           ITEM TYPE|      RETAIL SALES|  RETAIL TRANSFERS|   WAREHOUSE SALES|
+-------+------------------+------------------+----------+------------------+--------------------+--------------------+------------------+------------------+------------------+
|  count|            307645|            307645|    307478|            307645|              307645|              307644|            307639|            307645|            307645|
|   mean|2018.4385249232068| 6.423861918770011|      null|164114.39978087787|                null|                null| 7.024139885735243| 6.936464528921333|25.294587202782424|
| stddev|1.0830605350766584|3.4618121637228216|      null| 136114.8104939607|                null|                n

In [0]:
customers_df.describe().show() # cataloging the unstructured data

+-------+-------------+-----------+--------------------+--------------------+-------+-----------------+------+
|summary|      company|    country|          created_at|               email|  first|               id|  last|
+-------+-------------+-----------+--------------------+--------------------+-------+-----------------+------+
|  count|         9999|       9999|                9999|                9999|   9999|             9999|  9999|
|   mean|         null|       null|                null|                null|   null|           5000.0|  null|
| stddev|         null|       null|                null|                null|   null|2886.607004772212|  null|
|    min| Abbott Group|Afghanistan|2014-04-01T15:09:...|aaliyah.batz@yaho...|Abagail|                1|Abbott|
|    max|Zulauf-Wisozk|   Zimbabwe|2015-04-01T13:44:...|zula_sauer44@hotm...|   Zula|             9999|Zulauf|
+-------+-------------+-----------+--------------------+--------------------+-------+-----------------+------+



Demonstrating data governance by enforcing data quality rules

In [0]:
clean_sales_df = sales_df.filter(col("RETAIL SALES") > 0.0) # example: ensuring all sales are positive numbers
clean_sales_df.show()

+----+-----+--------------------+---------+--------------------+------------+------------+----------------+---------------+
|YEAR|MONTH|            SUPPLIER|ITEM CODE|    ITEM DESCRIPTION|   ITEM TYPE|RETAIL SALES|RETAIL TRANSFERS|WAREHOUSE SALES|
+----+-----+--------------------+---------+--------------------+------------+------------+----------------+---------------+
|2020|    1|DIONYSOS IMPORTS INC|   100293|SANTORINI GAVALA ...|        WINE|        0.82|            0.00|           0.00|
|2020|    1|KYSELA PERE ET FI...|   100641|CORTENOVA VENETO ...|        WINE|        2.76|            0.00|           6.00|
|2020|    1|SANTA MARGHERITA ...|   100749|SANTA MARGHERITA ...|        WINE|        0.08|            1.00|           1.00|
|2020|    1|  JIM BEAM BRANDS CO|    10103|KNOB CREEK BOURBO...|      LIQUOR|        6.41|            4.00|           0.00|
|2020|    1|INTERNATIONAL CEL...|   101117|   KSARA CAB - 750ML|        WINE|        0.33|            1.00|           2.00|
|2020|  