# Learning Objectives

In this notebook, you will 
- learn the concept of ETL
- write ETL jobs for CSV files from `pgexercises` https://pgexercises.com/gettingstarted.html

# What's ETL or ELT?

ETL stands for Extract, Transform, Load. In the context of Spark, ETL refers to the process of extracting data from various sources, transforming it into a desired format or structure, and loading it into a target system, such as a data warehouse or a data lake.

Here's a breakdown of each step in the ETL process:

## Extract
This step involves extracting data from multiple sources, such as databases, files (CSV, JSON, Parquet), APIs, or streaming data sources. Spark provides connectors and APIs to read data from a wide range of sources, allowing you to extract data in parallel and efficiently handle large datasets.

## Transform
In the transform step, the extracted data is processed and transformed according to specific business logic or requirements. This may involve cleaning the data, applying calculations or aggregations, performing data enrichment, filtering, joining datasets, or any other data manipulation operations. Spark provides a powerful set of transformation functions and SQL capabilities to perform these operations efficiently in a distributed and scalable manner.

## Load
Once the data has been transformed, it is loaded into a target system, such as a data warehouse, a data lake, or another storage system. Spark allows you to write the transformed data to various output formats and storage systems, including databases, distributed file systems (like Hadoop Distributed File System or Amazon S3), or columnar formats like Delta Lake or Apache Parquet. The data can be partitioned, sorted, or structured to optimize querying and analysis.

Spark's distributed computing capabilities, scalability, and rich ecosystem of libraries make it a popular choice for ETL workflows. It can handle large-scale data processing, perform complex transformations, and efficiently load data into different target systems.

By leveraging Spark for ETL, organizations can extract data from diverse sources, apply transformations to ensure data quality and consistency, and load the transformed data into a central repository for further analysis, reporting, or machine learning tasks.

# Enable DBFS UI

- Setting -> Admin Console -> search for dbfs

<img src="https://raw.githubusercontent.com/jarviscanada/jarvis_data_eng_demo/feature/data/spark/notebook/spark_fundamentals/img/entable_dbfs.jpg" width="700">

- Refresh the page and view DBFS files from UI

<img src="https://raw.githubusercontent.com/jarviscanada/jarvis_data_eng_demo/feature/data/spark/notebook/spark_fundamentals/img/dbfs%20ui.png" width="700">

## Import `pgexercises` CSV files

- The pgexercises CSV data files can be found [here](https://github.com/jarviscanada/jarvis_data_eng_demo/tree/feature/data/spark/data/pgexercises).
- The pgexercises schema can be found [here](https://pgexercises.com/gettingstarted.html) (for reference purposes).
- Upload the `bookings.csv`, `facilities.csv`, and `members.csv` files using Databricks UI (see screenshot)
- You can view the imported files from the DBFS UI.

![Upload Files](https://raw.githubusercontent.com/jarviscanada/jarvis_data_eng_demo/feature/data/spark/notebook/spark_fundamentals/img/upload%20file.png)

# Interview Questions

While completing the rest of the practice, try to answer the following questions:

## Concepts
- What is ETL? (Hint: Explain each step)
ETL stands for Extract, Transform, Load. It refers to the process of extracting data from various sources, transforming it into a format that is suitable for analysis or other downstream purposes, and then loading it into a target data storage system, typically a data warehouse or a database.
## Databricks
- What is Databricks?
Databricks is a unified analytics platform designed to accelerate innovation by unifying data engineering, data science, and business analytics. It was founded by the creators of Apache Spark, an open-source distributed computing system for big data processing.

- What is a Notebook?
A notebook is an interactive computing environment that allows users to create and share documents containing live code, equations, visualizations, and narrative text. Notebooks are commonly used in data science, machine learning, and scientific computing for exploratory analysis, prototyping, and sharing research findings.

- What is DBFS?
DBFS stands for Databricks File System. It is a distributed file system that is part of the Databricks Unified Analytics Platform. DBFS provides a scalable and reliable storage solution for managing and accessing data within Databricks environments.

- What is a cluster? 
A cluster refers to a group of interconnected computers (nodes) that work together to process and analyze large volumes of data. Each node in the cluster typically runs a distributed computing framework, such as Apache Spark, which allows for parallel processing of data across multiple machines.

- Is Databricks a data lake or a data warehouse?

Databricks is not strictly a data lake or a data warehouse; rather, it is a unified analytics platform that supports both data lake and data warehouse functionalities, among other features.

## Managed Table
- What is a managed table in Databricks?
A managed table refers to a type of table where Databricks manages both the metadata and the data files associated with the table. When you create a managed table, Databricks takes care of storing the table's schema and location information in its internal metastore, and it also manages the actual data files stored in a designated directory within a file system, such as DBFS (Databricks File System) or a cloud storage service like Amazon S3 or Azure Data Lake Storage (ADLS).

- Can you explain how to create a managed table in Databricks?
-- Create a database (optional)
CREATE DATABASE IF NOT EXISTS my_database;

-- Use the database
USE my_database;

-- Create a managed table
CREATE TABLE IF NOT EXISTS my_table (
    column1_name datatype1,
    column2_name datatype2,
    ...
) USING delta;

- Can you compare a managed table with an RDBMS table? (Hint: Schema on read vs schema on write)
In an RDBMS, such as MySQL or PostgreSQL, the schema of a table is defined when the data is written to the database.
Before data can be inserted into the table, the table's schema (column names, data types, constraints) must be predefined and enforced by the database engine. Once data is inserted into the table, it must adhere to the predefined schema. Any deviations will result in errors during insertion.

- What is the Hive metastore and how does it relate to managed tables in Databricks?
The Hive Metastore is a central repository that stores metadata for Apache Hive tables, including their schema (column names and data types) and location in Hadoop Distributed File System (HDFS) or other storage systems. It serves as a catalog or directory for Hive-managed tables, providing information about the structure and location of the data.

- How does a managed table differ from an unmanaged (external) table in Databricks? (Hint: Consider what happens to the data when the table is deleted)
Managed tables are those in which both the data and metadata are managed by Databricks.
When you create a managed table, Databricks manages both the table's schema and the physical location of the data files associated with the table.
Unmanaged tables, on the other hand, are those in which only the metadata is managed by Databricks, while the data files remain external to Databricks.
When you create an unmanaged table, you provide a location (e.g., a directory in a storage system like HDFS, AWS S3, or Azure Blob Storage) where the data files are stored.

- How can you define a schema for a managed table?

In Databricks, you can define a schema for a managed table using the CREATE TABLE statement or by inferring the schema from an existing DataFrame. 

## Spark
`df = spark.read.format("csv").option("header", "true").option("inferSchema", "true").load(file_location)`
- What does the option("inferSchema", "true") do? 
 "inferSchema", "true" is used when reading data from an external data source (such as a CSV file) to instruct Spark to automatically infer the schema of the data based on its structure.

- What does the option("header", "true") do?
"header", "true" is used when reading data from an external data source (such as a CSV file) to specify that the first row of the data contains the header or column names. 

- How can you write data to a managed table?
To write data to a managed table in Databricks, we can use the DataFrameWriter API provided by Apache Spark. 

- How can you read data from a managed table into a DataFrame?
To read data from a managed table into a DataFrame in Databricks, we can use the DataFrameReader API provided by Apache Spark. 

# ETL `bookings.csv` file

- **Extract**: Load data from CSV file into a DF
- **Transformation**: no transformation needed as we want to load data as it
- **Load**: Save the DF into a managed table (or Hive table); 

# Managed Table
This is an important interview topic. Some people may refer to managed tables as Hive tables.

https://docs.databricks.com/data-governance/unity-catalog/create-tables.html

In [0]:
from pyspark.sql.types import StructType, StructField, IntegerType, TimestampType

file_location = "/FileStore/tables/bookings.csv"

# What does `option("header", "true")` and `option("inferSchema", "true")` do?
df = spark.read.format("csv").option("header", "true").option("inferSchema", "true").load(file_location)

# Why the df schema doesn't match the DDL data type? https://pgexercises.com/gettingstarted.html (hint: `option("inferSchema", "true")`)
df.printSchema()

# Here is the solution to define schema manually
# Define schema for the bookings table
schema = StructType([
    StructField("bookid", IntegerType(), True),
    StructField("facid", IntegerType(), True),
    StructField("memid", IntegerType(), True),
    StructField("starttime", TimestampType(), True),
    StructField("slots", IntegerType(), True)
])

# Read data from CSV file into DataFrame with predefined schema
df = spark.read.format("csv").option("header", "true").schema(schema).load(file_location)

# No 

# Drop the table if it already exists
spark.sql("DROP TABLE IF EXISTS bookings")

# Write data from DataFrame into managed table
df.write.saveAsTable("bookings")


root
 |-- bookid: integer (nullable = true)
 |-- facid: integer (nullable = true)
 |-- memid: integer (nullable = true)
 |-- starttime: timestamp (nullable = true)
 |-- slots: integer (nullable = true)



[0;31m---------------------------------------------------------------------------[0m
[0;31mAnalysisException[0m                         Traceback (most recent call last)
File [0;32m<command-2434236142515292>:30[0m
[1;32m     27[0m spark[38;5;241m.[39msql([38;5;124m"[39m[38;5;124mDROP TABLE IF EXISTS bookings[39m[38;5;124m"[39m)
[1;32m     29[0m [38;5;66;03m# Write data from DataFrame into managed table[39;00m
[0;32m---> 30[0m df[38;5;241m.[39mwrite[38;5;241m.[39msaveAsTable([38;5;124m"[39m[38;5;124mbookings[39m[38;5;124m"[39m)

File [0;32m/databricks/spark/python/pyspark/instrumentation_utils.py:48[0m, in [0;36m_wrap_function.<locals>.wrapper[0;34m(*args, **kwargs)[0m
[1;32m     46[0m start [38;5;241m=[39m time[38;5;241m.[39mperf_counter()
[1;32m     47[0m [38;5;28;01mtry[39;00m:
[0;32m---> 48[0m     res [38;5;241m=[39m [43mfunc[49m[43m([49m[38;5;241;43m*[39;49m[43margs[49m[43m,[49m[43m [49m[38;5;241;43m*[39;49m[38;5;241

# Complete ETL Jobs

- Complete ETL for `facilities.csv` and `members.csv`
- Tips
  - The Databricks community version will terminate the cluster after a few hours of inactivity. As a result, all managed tables will be deleted. You will need to rerun this notebook to perform the ETL on all files for the other exercises.
  - DBFS data will not be deleted when a custer become inactive/deleted

In [0]:
# Write a ETL job for `facilities.csv`
# Define the file location
file_location = "/FileStore/tables/facilities.csv"

# Read data from CSV into DataFrame
facilities_df = spark.read.format("csv") \
                         .option("header", "true") \
                         .option("inferSchema", "true") \
                         .load(file_location)

# Optionally, define schema manually if needed
# schema = StructType([
#     StructField("facid", IntegerType(), True),
#     StructField("name", StringType(), True),
#     StructField("membercost", DecimalType(10, 2), True),
#     StructField("guestcost", DecimalType(10, 2), True),
#     StructField("initialoutlay", DecimalType(10, 2), True),
#     StructField("monthlymaintenance", DecimalType(10, 2), True)
# ])

# Load data into a managed table
facilities_df.write.mode("overwrite").saveAsTable("facilities")

In [0]:
# Write a ETL job Complete ETL for `members.csv`
# Define the file location
file_location = "/FileStore/tables/members.csv"

# Read data from CSV into DataFrame
members_df = spark.read.format("csv") \
                       .option("header", "true") \
                       .option("inferSchema", "true") \
                       .load(file_location)

# Optionally, define schema manually if needed
# schema = StructType([
#     StructField("memid", IntegerType(), True),
#     StructField("surname", StringType(), True),
#     StructField("firstname", StringType(), True),
#     StructField("address", StringType(), True),
#     StructField("zipcode", IntegerType(), True),
#     StructField("telephone", StringType(), True),
#     StructField("recommendedby", IntegerType(), True),
#     StructField("joindate", TimestampType(), True)
# ])

# Load data into a managed table
members_df.write.mode("overwrite").saveAsTable("members")

# Save your work to Git

- Export the notebook to IPYTHON format, `notebook top menu bar -> File -> Export -> iphython`
- Upload to your Git repository, `your_repo/spark/notebooks/`
- Github can render ipython notebook https://github.com/josephcslater/JupyterExamples/blob/master/Calc_Review.ipynb