# Learning Objectives

In this notebook, you will craft sophisticated ETL jobs that interface with a variety of common data sources, such as 
- REST APIs (HTTP endpoints)
- RDBMS
- Hive tables (managed tables)
- Various file formats (csv, json, parquet, etc.)


# Interview Questions

As you progress through the practice, attempt to answer the following questions:

## Columnar File
- What is a columnar file format and what advantages does it offer?
  - A columnar file format stored data by columns instead of rows. This means all values from the same column are stored together on disk, which is different from row-based formats like (csv or json) that store entire records together. 
- Why is Parquet frequently used with Spark and how does it function?
  - Parquet is frequently used with Spark because it's a columnar, compressed and efficient file format that integrates seamlessly with Spark execution engine

- How do you read/write data from/to a Parquet file using a DataFrame?
  - To read and write to a Parquet file using a Spark DataFrame.
  Read:

  df = spark.read.parquet("path/to/parquet_file")

  Write:

  df.write.parquet("path/to/output_directory")

## Partitions
- How do you save data to a file system by partitions? (Hint: Provide the code)

  - df.write.partitionBy("column_name").parquet("path/to/output")
- How and why can partitions reduce query execution time? (Hint: Give an example)

  - Partitions reduce query execution time by reading only the relevant data, skipping entire folders based on partitions columns. 
  - Spark uses partition pruning - it scan only the files matching the filter condition on partitioned columns, reducing I/O and processing. 

## JDBC and RDBMS
- How do you load data from an RDBMS into Spark? (Hint: Discuss the steps and JDBC)
- Set JDBC parameters: 
 - URL, driver, table name, username, password
 - Use read.format("jdbc") with options

  df = spark.read \
      .format("jdbc") \
      .option("url", "jdbc:postgresql://hostname:5432/dbname") \
      .option("dbtable", "schema.table_name") \
      .option("user", "username") \
      .option("password", "password") \
      .option("driver", "org.postgresql.Driver") \
      .load()

## REST API and HTTP Requests
- How can Spark be used to fetch data from a REST API? (Hint: Discuss making API requests)

- Spark doesnt natively support REST APIs, but you can fetch data using standard Python or Scala code, then convert it into a DataFrame. 

- Steps:
 1. Make API requests (Python) or http.client(Scala)
 2. Parse the JSON/XML response
 3. Convert parsed data to a DataFrame
 4. Process with Spark

 

## ETL Job One: Parquet file
### Extract
Extract data from the managed tables (e.g. `bookings_csv`, `members_csv`, and `facilities_csv`)

### Transform
Data transformation requirements https://pgexercises.com/questions/aggregates/fachoursbymonth.html

### Load
Load data into a parquet file

### What is Parquet? 

Columnar files are an important technique for optimizing Spark queries. Additionally, they are often tested in interviews.
- https://www.youtube.com/watch?v=KLFadWdomyI
- https://www.databricks.com/glossary/what-is-parquet

In [0]:
from pyspark.sql.functions import col, sum

# Load data
bookings_df = spark.sql("SELECT * FROM bookings")
facilities_df = spark.sql("SELECT * FROM facilities")

# Filtering bookings for September 2012 and aggregating by facility ID
september_bookings_df = bookings_df.filter(
    (bookings_df.starttime >= "2012-09-01") & (bookings_df.starttime < "2012-10-01")
)

# Cast 'slots' to an integer before aggregating
bookings_count_df = september_bookings_df.withColumn("slots", col("slots").cast("int")).groupBy("facid").agg(
    sum("slots").alias("total_slots")
)

# Join with facilities to get the facility name, explicitly reference 'facid' columns
result_df = bookings_count_df.alias("bks").join(
    facilities_df.alias("fac"), 
    col("bks.facid") == col("fac.facid"), 
    "inner"
).select(
    col("bks.facid"), 
    col("fac.name"), 
    col("bks.total_slots")
).orderBy("bks.total_slots", ascending=False)

# Display the transformed data
# display(result_df)

# Save the result into a Parquet file
result_df.write.parquet("/FileStore/tables/total_slots_september_2012.parquet")


## ETL Job Two: Partitions

### Extract
Extract data from the managed tables (e.g. `bookings_csv`, `members_csv`, and `facilities_csv`)

### Transform
Transform the data https://pgexercises.com/questions/joins/threejoin.html

### Load
Partition the result data by facility column and then save to `threejoin_delta` managed table. Additionally, they are often tested in interviews.

hint: https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrameWriter.partitionBy.html

What are paritions? 

Partitions are an important technique to optimize Spark queries
- https://www.youtube.com/watch?v=hvF7tY2-L3U&t=268s

In [0]:
# Write your solution here

# Extracting the data
bookings_df = spark.sql("SELECT * FROM bookings")
members_df = spark.sql("SELECT * FROM members")
facilities_df = spark.sql("SELECT * FROM facilities")

# Transforming the data
from pyspark.sql.functions import col, concat_ws

# Filtering booking for tennis courts
tennis_facility_df = facilities_df.filter(facilities_df.name.contains('Tennis'))

# Join bookings with members and tennis facilities
result2_df = bookings_df.join(members_df, bookings_df.memid == members_df.memid) \
    .join(tennis_facility_df, bookings_df.facid == tennis_facility_df.facid) \
    .select(
        concat_ws(", ", members_df.firstname, members_df.surname).alias("member_name"),
        tennis_facility_df["name"]  # Fixed: Reference "name" directly
    ) \
    .distinct()

# Sort by member name and facility name
result2_df = result2_df.orderBy("member_name", "name")

# Displaying the result (Optional for testing purposes)
display(result2_df)
  

# Partition the data by 'facility_name' and write to Delta managed table
result_df.write \
    .partitionBy("name") \
    .format("delta") \
    .mode("overwrite") \
    .saveAsTable("threejoin_delta")


member_name,name
"Anna, Mackenzie",Table Tennis
"Anne, Baker",Table Tennis
"Anne, Baker",Tennis Court 1
"Anne, Baker",Tennis Court 2
"Burton, Tracy",Table Tennis
"Burton, Tracy",Tennis Court 1
"Burton, Tracy",Tennis Court 2
"Charles, Owen",Table Tennis
"Charles, Owen",Tennis Court 1
"Charles, Owen",Tennis Court 2


## ETL Job Three: HTTP Requests

### Extract
Extract daily stock price data price from the following companies, Google, Apple, Microsoft, and Tesla. 

Data Source
- API: https://rapidapi.com/alphavantage/api/alpha-vantage
- Endpoint: GET `TIME_SERIES_DAILY`

Sample HTTP request

```
curl --request GET \
	--url 'https://alpha-vantage.p.rapidapi.com/query?function=TIME_SERIES_DAILY&symbol=TSLA&outputsize=compact&datatype=json' \
	--header 'X-RapidAPI-Host: alpha-vantage.p.rapidapi.com' \
	--header 'X-RapidAPI-Key: [YOUR_KEY]'

```

Sample Python HTTP request

```
import requests

url = "https://alpha-vantage.p.rapidapi.com/query"

querystring = {
    "function":"TIME_SERIES_DAILY",
    "symbol":"IBM",
    "datatype":"json",
    "outputsize":"compact"
}

headers = {
    "X-RapidAPI-Host": "alpha-vantage.p.rapidapi.com",
    "X-RapidAPI-Key": "[YOUR_KEY]"
}

response = requests.get(url, headers=headers, params=querystring)

data = response.json()

# Now 'data' contains the daily time series data for "IBM"
```

### Transform
Find **weekly** max closing price for each company.

hints: 
  - Use a `for-loop` to get stock data for each company
  - Use the spark `union` operation to concat all data into one DF
  - create a new `week` column from the data column
  - use `group by` to calcualte max closing price

### Load
- Partition `DF` by company
- Load the DF in to a managed table called, `max_closing_price_weekly`

In [0]:
# Write your solution here

import requests
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, weekofyear, max as max_col, to_date

# Initialize Spark
spark = SparkSession.builder.appName("StockETL").getOrCreate()

# API details
url = "https://alpha-vantage.p.rapidapi.com/query"
headers = {
    "X-RapidAPI-Host": "alpha-vantage.p.rapidapi.com",
    "X-RapidAPI-Key": "65ed148c98mshe28b33adb9a585fp1bb3cbjsn51b126947fb7"
}

# List of companies
companies = ["GOOGL", "AAPL", "MSFT", "TSLA"]

# Extract and transform
all_data = []
for company in companies:
    querystring = {"function": "TIME_SERIES_DAILY", "symbol": company, "datatype": "json", "outputsize": "compact"}
    response = requests.get(url, headers=headers, params=querystring).json()
    print(response) 
    daily_data = response['Time Series (Daily)']
    df = spark.createDataFrame([
        (company, date, float(values['4. close']))
        for date, values in daily_data.items()
    ], ["company", "date", "closing_price"])
    all_data.append(df)

# Combine data
combined_df = all_data[0]
for df in all_data[1:]:
    combined_df = combined_df.union(df)

# Add week column and calculate max closing price
weekly_max_df = combined_df.withColumn("week", weekofyear(to_date(col("date")))) \
    .groupBy("company", "week").agg(max_col("closing_price").alias("max_closing_price"))

# Load to Delta table
weekly_max_df.write \
    .partitionBy("company") \
    .format("delta") \
    .mode("overwrite") \
    .saveAsTable("max_closing_price_weekly")


{'Meta Data': {'1. Information': 'Daily Prices (open, high, low, close) and Volumes', '2. Symbol': 'GOOGL', '3. Last Refreshed': '2025-03-28', '4. Output Size': 'Compact', '5. Time Zone': 'US/Eastern'}, 'Time Series (Daily)': {'2025-03-28': {'1. open': '160.4900', '2. high': '161.8200', '3. low': '153.6300', '4. close': '154.3300', '5. volume': '47801295'}, '2025-03-27': {'1. open': '164.6300', '2. high': '165.4200', '3. low': '162.0000', '4. close': '162.2400', '5. volume': '24508273'}, '2025-03-26': {'1. open': '169.0000', '2. high': '169.6100', '3. low': '164.8400', '4. close': '165.0600', '5. volume': '28939326'}, '2025-03-25': {'1. open': '168.9800', '2. high': '170.6300', '3. low': '168.3150', '4. close': '170.5600', '5. volume': '24174373'}, '2025-03-24': {'1. open': '167.0650', '2. high': '168.3200', '3. low': '165.1400', '4. close': '167.6800', '5. volume': '30879129'}, '2025-03-21': {'1. open': '161.2050', '2. high': '164.2400', '3. low': '160.8901', '4. close': '163.9900', '

## ETL Job Four: RDBMS


### Extract
Extract RNA data from a public PostgreSQL database.

- https://rnacentral.org/help/public-database
- Extract 100 RNA records from the `rna` table (hint: use `limit` in your sql)
- hint: use `spark.read.jdbc` https://docs.databricks.com/external-data/jdbc.html

### Transform
We want to load the data as it so there is no transformation required.


### Load
Load the DF in to a managed table called, `rna_100_records`

In [0]:
# Write your solution here
# Extract
jdbc_url = "jdbc:postgresql://hh-pgsql-public.ebi.ac.uk:5432/pfmegrnargs"
properties = {
    "user": "reader",
    "password": "NWDMCE5xdipIjRrp",
    "driver": "org.postgresql.Driver"
}

query = "(SELECT * FROM rna LIMIT 100) AS rna_data"
rna_df = spark.read.jdbc(url=jdbc_url, table=query, properties=properties)

# Load
rna_df.write \
    .format("delta") \
    .mode("overwrite") \
    .saveAsTable("rna_100_records")

