# Learning Objectives

In this notebook, you will craft sophisticated ETL jobs that interface with a variety of common data sources, such as 
- REST APIs (HTTP endpoints)
- RDBMS
- Hive tables (managed tables)
- Various file formats (csv, json, parquet, etc.)

d

# Interview Questions

As you progress through the practice, attempt to answer the following questions:

## Columnar File
- What is a columnar file format and what advantages does it offer?
- Why is Parquet frequently used with Spark and how does it function?
- How do you read/write data from/to a Parquet file using a DataFrame?

## Partitions
- How do you save data to a file system by partitions? (Hint: Provide the code)
- How and why can partitions reduce query execution time? (Hint: Give an example)

## JDBC and RDBMS
- How do you load data from an RDBMS into Spark? (Hint: Discuss the steps and JDBC)

## REST API and HTTP Requests
- How can Spark be used to fetch data from a REST API? (Hint: Discuss making API requests)

## ETL Job One: Parquet file
### Extract
Extract data from the managed tables (e.g. `bookings_csv`, `members_csv`, and `facilities_csv`)

### Transform
Data transformation requirements https://pgexercises.com/questions/aggregates/fachoursbymonth.html

### Load
Load data into a parquet file

### What is Parquet? 

Columnar files are an important technique for optimizing Spark queries. Additionally, they are often tested in interviews.
- https://www.youtube.com/watch?v=KLFadWdomyI
- https://www.databricks.com/glossary/what-is-parquet

In [0]:
# Write your solution here
# Extract data from managed tables

bookings_df = spark.sql("select * from bookings")
members_df = spark.sql("select * from members")
facilities_df = spark.sql("select * from facilities")

from pyspark.sql.functions import col, sum

# Transform: Filter, group, and aggregate data
transformed_df = bookings_df.filter(
    (col("starttime") >= '2012-09-01') & (col("starttime") < '2012-10-01')
).groupBy("facid").agg(
    sum("slots").alias("Total Slots")
).orderBy("Total Slots")

# Load data into a Parquet file
output_path = "/path/to/output/directory/total_slots.parquet"

transformed_df.write.mode("overwrite").parquet(output_path)
transformed_df.show()

+-----+-----------+
|facid|Total Slots|
+-----+-----------+
|    5|        122|
|    3|        422|
|    7|        426|
|    8|        471|
|    6|        540|
|    2|        570|
|    1|        588|
|    0|        591|
|    4|        648|
+-----+-----------+



In [0]:
df = spark.read.parquet("/path/to/output/directory/total_slots.parquet")
df.show()

+-----+-----------+
|facid|Total Slots|
+-----+-----------+
|    5|        122|
|    3|        422|
|    7|        426|
|    8|        471|
|    6|        540|
|    2|        570|
|    1|        588|
|    0|        591|
|    4|        648|
+-----+-----------+



## ETL Job Two: Partitions

### Extract
Extract data from the managed tables (e.g. `bookings_csv`, `members_csv`, and `facilities_csv`)

### Transform
Transform the data https://pgexercises.com/questions/joins/threejoin.html

### Load
Partition the result data by facility column and then save to `threejoin_delta` managed table. Additionally, they are often tested in interviews.

hint: https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrameWriter.partitionBy.html

What are paritions? 

Partitions are an important technique to optimize Spark queries
- https://www.youtube.com/watch?v=hvF7tY2-L3U&t=268s

In [0]:
# Write your solution here
# Extract data from managed tables

bookings_df = spark.sql("select * from bookings")
members_df = spark.sql("select * from members")
facilities_df = spark.sql("select * from facilities")

from pyspark.sql.functions import concat_ws

# Join the tables and filter the required facilities
transformed_df = members_df.join(bookings_df, members_df.memid == bookings_df.memid) \
    .join(facilities_df, bookings_df.facid == facilities_df.facid) \
    .filter(facilities_df.name.isin('Tennis Court 2', 'Tennis Court 1')) \
    .select(concat_ws(' ', members_df.firstname, members_df.surname).alias("member"),
            facilities_df.name.alias("facility")) \
    .distinct() \
    .orderBy("member", "facility")

# Save the DataFrame as a managed table, partitioned by the 'facility' column
transformed_df.write \
    .mode("overwrite") \
    .partitionBy("facility") \
    .format("delta") \
    .saveAsTable("threejoin_delta")

transformed_df.show()


+--------------+--------------+
|        member|      facility|
+--------------+--------------+
|    Anne Baker|Tennis Court 1|
|    Anne Baker|Tennis Court 2|
|  Burton Tracy|Tennis Court 1|
|  Burton Tracy|Tennis Court 2|
|  Charles Owen|Tennis Court 1|
|  Charles Owen|Tennis Court 2|
|  Darren Smith|Tennis Court 2|
| David Farrell|Tennis Court 1|
| David Farrell|Tennis Court 2|
|   David Jones|Tennis Court 1|
|   David Jones|Tennis Court 2|
|  David Pinker|Tennis Court 1|
| Douglas Jones|Tennis Court 1|
| Erica Crumpet|Tennis Court 1|
|Florence Bader|Tennis Court 1|
|Florence Bader|Tennis Court 2|
|   GUEST GUEST|Tennis Court 1|
|   GUEST GUEST|Tennis Court 2|
|Gerald Butters|Tennis Court 1|
|Gerald Butters|Tennis Court 2|
+--------------+--------------+
only showing top 20 rows



In [0]:

spark.sql("SHOW PARTITIONS threejoin_delta").show()

+--------------+
|      facility|
+--------------+
|Tennis Court 2|
|Tennis Court 1|
+--------------+



## ETL Job Three: HTTP Requests

### Extract
Extract daily stock price data price from the following companies, Google, Apple, Microsoft, and Tesla. 

Data Source
- API: https://rapidapi.com/alphavantage/api/alpha-vantage
- Endpoint: GET `TIME_SERIES_DAILY`

Sample HTTP request

```
curl --request GET \
	--url 'https://alpha-vantage.p.rapidapi.com/query?function=TIME_SERIES_DAILY&symbol=TSLA&outputsize=compact&datatype=json' \
	--header 'X-RapidAPI-Host: alpha-vantage.p.rapidapi.com' \
	--header 'X-RapidAPI-Key: [YOUR_KEY]'

```

Sample Python HTTP request

```
import requests

url = "https://alpha-vantage.p.rapidapi.com/query"

querystring = {
    "function":"TIME_SERIES_DAILY",
    "symbol":"IBM",
    "datatype":"json",
    "outputsize":"compact"
}

headers = {
    "X-RapidAPI-Host": "alpha-vantage.p.rapidapi.com",
    "X-RapidAPI-Key": "[YOUR_KEY]"
}

response = requests.get(url, headers=headers, params=querystring)

data = response.json()

# Now 'data' contains the daily time series data for "IBM"
```

### Transform
Find **weekly** max closing price for each company.

hints: 
  - Use a `for-loop` to get stock data for each company
  - Use the spark `union` operation to concat all data into one DF
  - create a new `week` column from the data column
  - use `group by` to calcualte max closing price

### Load
- Partition `DF` by company
- Load the DF in to a managed table called, `max_closing_price_weekly`

In [0]:
import requests
import json
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, to_date, weekofyear, max as max_
from pyspark.sql.types import StructType, StructField, StringType, FloatType

# Initialize Spark session
spark = SparkSession.builder.appName("StockPricesETL").getOrCreate()

# Define the API details
url = "https://alpha-vantage.p.rapidapi.com/query"
headers = {
    "X-RapidAPI-Host": "alpha-vantage.p.rapidapi.com",
    "X-RapidAPI-Key": "f344ede3famshcfbe67e26491b84p1d77cejsnd90a2a421393"
}

# List of companies to fetch data for
companies = ["GOOGL", "AAPL", "MSFT", "TSLA"]

# Define the schema
schema = StructType([
    StructField("date", StringType(), True),
    StructField("company", StringType(), True),
    StructField("close", FloatType(), True)
])

# Initialize an empty DataFrame with the defined schema
combined_df = spark.createDataFrame([], schema)

# Loop through each company and extract data
for company in companies:
    querystring = {
        "function": "TIME_SERIES_DAILY",
        "symbol": company,
        "datatype": "json",
        "outputsize": "compact"
    }

    response = requests.get(url, headers=headers, params=querystring)
    data = response.json()

    # Extract the daily time series data
    time_series = data.get("Time Series (Daily)", {})

    # Transform the data into a list of dictionaries
    records = [
        {"date": date, "company": company, "close": float(values["4. close"])}
        for date, values in time_series.items()
    ]

    # Convert the records into a DataFrame using the predefined schema
    df = spark.createDataFrame(records, schema=schema)

    # Union the data into a single DataFrame
    combined_df = combined_df.union(df)

# Convert the 'date' column to DateType
combined_df = combined_df.withColumn("date", to_date(col("date"), "yyyy-MM-dd"))

# Display the combined DataFrame to ensure correctness
combined_df.show()

+----------+-------+------+
|      date|company| close|
+----------+-------+------+
|2024-08-12|  GOOGL|162.29|
|2024-08-09|  GOOGL|163.67|
|2024-08-08|  GOOGL|162.03|
|2024-08-07|  GOOGL|158.94|
|2024-08-06|  GOOGL|158.29|
|2024-08-05|  GOOGL|159.25|
|2024-08-02|  GOOGL|166.66|
|2024-08-01|  GOOGL|170.76|
|2024-07-31|  GOOGL|171.54|
|2024-07-30|  GOOGL|170.29|
|2024-07-29|  GOOGL|169.53|
|2024-07-26|  GOOGL| 167.0|
|2024-07-25|  GOOGL|167.28|
|2024-07-24|  GOOGL|172.63|
|2024-07-23|  GOOGL|181.79|
|2024-07-22|  GOOGL|181.67|
|2024-07-19|  GOOGL|177.66|
|2024-07-18|  GOOGL|177.69|
|2024-07-17|  GOOGL|181.02|
|2024-07-16|  GOOGL|183.92|
+----------+-------+------+
only showing top 20 rows



In [0]:
# Add a 'week' column based on the date
transformed_df = combined_df.withColumn("week", weekofyear(col("date")))

# Group by company and week, then calculate the max closing price
weekly_max_df = transformed_df.groupBy("company", "week") \
    .agg(max_("close").alias("max_closing_price"))

In [0]:
# Save the DataFrame to a managed table, partitioned by company
weekly_max_df.write \
    .mode("overwrite") \
    .partitionBy("company") \
    .format("delta") \
    .saveAsTable("max_closing_price_weekly")

In [0]:
spark.sql("SHOW PARTITIONS max_closing_price_weekly").show()

+-------+
|company|
+-------+
|   AAPL|
|  GOOGL|
|   MSFT|
|   TSLA|
+-------+



## ETL Job Four: RDBMS


### Extract
Extract RNA data from a public PostgreSQL database.

- https://rnacentral.org/help/public-database
- Extract 100 RNA records from the `rna` table (hint: use `limit` in your sql)
- hint: use `spark.read.jdbc` https://docs.databricks.com/external-data/jdbc.html

### Transform
We want to load the data as it so there is no transformation required.


### Load
Load the DF in to a managed table called, `rna_100_records`

In [0]:
# Write your solution here
from pyspark.sql import SparkSession

# Initialize Spark session
spark = SparkSession.builder.appName("RNADataETL").getOrCreate()

# PostgreSQL connection properties
jdbc_url = "jdbc:postgresql://hh-pgsql-public.ebi.ac.uk:5432/pfmegrnargs"
connection_properties = {
    "user": "reader",
    "password": "NWDMCE5xdipIjRrp",
    "driver": "org.postgresql.Driver"
}

# SQL query to extract 100 records from the rna table
query = "(SELECT * FROM rna LIMIT 100) AS rna_100"

# Read data from PostgreSQL database
rna_df = spark.read.jdbc(url=jdbc_url, table=query, properties=connection_properties)
# Load the DataFrame into a managed table
rna_df.write.mode("overwrite").saveAsTable("rna_100_records")


In [0]:
df = spark.sql("SELECT * FROM rna_100_records")
df.show()

+--------+-------------+--------------------+---------+----------------+----+--------------------+--------+--------------------+
|      id|          upi|           timestamp|userstamp|           crc64| len|           seq_short|seq_long|                 md5|
+--------+-------------+--------------------+---------+----------------+----+--------------------+--------+--------------------+
| 5110258|URS00004DF9F2| 2014-05-29 13:51:05|   RNACEN|61AE5251E4F4E67A| 848|GATAAACGCTAGCGGAG...|    null|26fa8b5b4fbc5e3bc...|
| 5110259|URS00004DF9F3| 2014-05-29 13:51:05|   RNACEN|63056E024E222787|  76|GCGGGCGTAGCTCAGTT...|    null|f725b5fff34986f21...|
| 5110262|URS00004DF9F6| 2014-05-29 13:51:05|   RNACEN|06115F73D962D669| 693|TGCAGTCGGACGGGATT...|    null|26fa93769ae137737...|
| 5110264|URS00004DF9F8| 2014-05-29 13:51:05|   RNACEN|168BF325E98D59CD|1514|AGAGTTTGATCATGGCT...|    null|be74650d6063133a0...|
| 5110266|URS00004DF9FA| 2014-05-29 13:51:05|   RNACEN|F7A8B3D48878A1EB|1415|GCGGCATGGATTAGGCA...