# Learning Objectives

In this notebook, you will craft sophisticated ETL jobs that interface with a variety of common data sources, such as 
- REST APIs (HTTP endpoints)
- RDBMS
- Hive tables (managed tables)
- Various file formats (csv, json, parquet, etc.)

d

# Interview Questions

As you progress through the practice, attempt to answer the following questions:

## Columnar File
- What is a columnar file format and what advantages does it offer?
- Why is Parquet frequently used with Spark and how does it function?
- How do you read/write data from/to a Parquet file using a DataFrame?

## Partitions
- How do you save data to a file system by partitions? (Hint: Provide the code)
- How and why can partitions reduce query execution time? (Hint: Give an example)

## JDBC and RDBMS
- How do you load data from an RDBMS into Spark? (Hint: Discuss the steps and JDBC)

## REST API and HTTP Requests
- How can Spark be used to fetch data from a REST API? (Hint: Discuss making API requests)

## ETL Job One: Parquet file
### Extract
Extract data from the managed tables (e.g. `bookings_csv`, `members_csv`, and `facilities_csv`)

### Transform
Data transformation requirements https://pgexercises.com/questions/aggregates/fachoursbymonth.html

### Load
Load data into a parquet file

### What is Parquet? 

Columnar files are an important technique for optimizing Spark queries. Additionally, they are often tested in interviews.
- https://www.youtube.com/watch?v=KLFadWdomyI
- https://www.databricks.com/glossary/what-is-parquet

In [0]:
# Write your solution here
dbutils.fs.rm('/FileStore/tables/bookings.parquet', True)
df = spark.read.format("csv").option("header", "true").option("inferSchema", "true").load("/FileStore/tables/bookings.csv")
df.write.parquet('/FileStore/tables/bookings.parquet')

dbutils.fs.rm('/FileStore/tables/facilities.parquet', True)
df = spark.read.format("csv").option("header", "true").option("inferSchema", "true").load("/FileStore/tables/facilities.csv")
df.write.parquet('/FileStore/tables/facilities.parquet')

dbutils.fs.rm('/FileStore/tables/members.parquet', True)
df = spark.read.format("csv").option("header", "true").option("inferSchema", "true").load("/FileStore/tables/members.csv")
df.write.parquet('/FileStore/tables/members.parquet')

## ETL Job Two: Partitions

### Extract
Extract data from the managed tables (e.g. `bookings_csv`, `members_csv`, and `facilities_csv`)

### Transform
Transform the data https://pgexercises.com/questions/joins/threejoin.html

### Load
Partition the result data by facility column and then save to `threejoin_delta` managed table. Additionally, they are often tested in interviews.

hint: https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrameWriter.partitionBy.html

What are paritions? 

Partitions are an important technique to optimize Spark queries
- https://www.youtube.com/watch?v=hvF7tY2-L3U&t=268s

In [0]:
# Write your solution here
from pyspark.sql.types import StructType, StructField, IntegerType, TimestampType
# Read CSV files into DataFrames
bookings_df = spark.read.format("csv").option("header", "true").option("inferSchema", "true").load("/FileStore/tables/bookings.csv")
facilities_df = spark.read.format("csv").option("header", "true").option("inferSchema", "true").load("/FileStore/tables/facilities.csv")
members_df = spark.read.format("csv").option("header", "true").option("inferSchema", "true").load("/FileStore/tables/members.csv")

# Perform joins
result_df = bookings_df \
    .join(members_df, bookings_df["memid"] == members_df["memid"], "inner") \
    .join(facilities_df, bookings_df["facid"] == facilities_df["facid"], "inner")

# Select relevant columns
result_df = result_df.select(
    facilities_df["name"].alias("facility_name"),
    members_df["firstname"].alias("member_firstname"),
    members_df["surname"].alias("member_surname"),
    bookings_df["starttime"],
    bookings_df["slots"]
)

# Save the DataFrame partitioned by 'facility_name' into a managed Delta table
result_df.write.format("delta") \
    .mode("overwrite") \
    .partitionBy("facility_name") \
    .saveAsTable("threejoin_delta")

## ETL Job Three: HTTP Requests

### Extract
Extract daily stock price data price from the following companies, Google, Apple, Microsoft, and Tesla. 

Data Source
- API: https://rapidapi.com/alphavantage/api/alpha-vantage
- Endpoint: GET `TIME_SERIES_DAILY`

Sample HTTP request

```
curl --request GET \
	--url 'https://alpha-vantage.p.rapidapi.com/query?function=TIME_SERIES_DAILY&symbol=TSLA&outputsize=compact&datatype=json' \
	--header 'X-RapidAPI-Host: alpha-vantage.p.rapidapi.com' \
	--header 'X-RapidAPI-Key: [YOUR_KEY]'

```

Sample Python HTTP request

```
import requests

url = "https://alpha-vantage.p.rapidapi.com/query"

querystring = {
    "function":"TIME_SERIES_DAILY",
    "symbol":"IBM",
    "datatype":"json",
    "outputsize":"compact"
}

headers = {
    "X-RapidAPI-Host": "alpha-vantage.p.rapidapi.com",
    "X-RapidAPI-Key": "[YOUR_KEY]"
}

response = requests.get(url, headers=headers, params=querystring)

data = response.json()

# Now 'data' contains the daily time series data for "IBM"
```

### Transform
Find **weekly** max closing price for each company.

hints: 
  - Use a `for-loop` to get stock data for each company
  - Use the spark `union` operation to concat all data into one DF
  - create a new `week` column from the data column
  - use `group by` to calcualte max closing price

### Load
- Partition `DF` by company
- Load the DF in to a managed table called, `max_closing_price_weekly`

In [0]:
# Write your solution here
import requests
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, FloatType, DateType
from pyspark.sql import Row
from pyspark.sql.functions import weekofyear

# Spark session
spark = SparkSession.builder.appName("StockPriceETL").getOrCreate()

def fetch_stock_data(symbol):
    url = "https://alpha-vantage.p.rapidapi.com/query"

    querystring = {
        "function":"TIME_SERIES_DAILY",
        "symbol":symbol,
        "datatype":"json",
        "outputsize":"compact"
    }

    headers = {
        "X-RapidAPI-Host": "alpha-vantage.p.rapidapi.com",
        "X-RapidAPI-Key": "df9ff107c1msh09c36f153034ef1p1b885djsnc02a719d3f61"
    }

    response = requests.get(url, headers=headers, params=querystring)

    data = response.json()

def parse_stock_data(data, company):
    rows = []
    if data:
        for date, values in data.items():
            rows.append(Row(
                company=company,
                date=date,
                close=float(values["4. close"])
            ))
    return rows

symbols = ["GOOGL", "AAPL", "MSFT", "TSLA"]

stock_data = {}
for symbol in symbols:
    stock_data[symbol] = fetch_stock_data(symbol)

all_rows = []
for symbol, data in stock_data.items():
    all_rows.extend(parse_stock_data(data, symbol))

schema = StructType([
    StructField("company", StringType(), True),
    StructField("date", DateType(), True),
    StructField("close", FloatType(), True)
])

df = spark.createDataFrame(all_rows, schema)

df = df.withColumn("week", weekofyear("date"))

# Group by company and week to calculate the maximum closing price
weekly_max_df = df.groupBy("company", "week").max("close").withColumnRenamed("max(close)", "max_closing_price")

# Save the DataFrame to a Delta table partitioned by company
weekly_max_df.write.format("delta") \
    .mode("overwrite") \
    .partitionBy("company") \
    .saveAsTable("max_closing_price_weekly")


## ETL Job Four: RDBMS


### Extract
Extract RNA data from a public PostgreSQL database.

- https://rnacentral.org/help/public-database
- Extract 100 RNA records from the `rna` table (hint: use `limit` in your sql)
- hint: use `spark.read.jdbc` https://docs.databricks.com/external-data/jdbc.html

### Transform
We want to load the data as it so there is no transformation required.


### Load
Load the DF in to a managed table called, `rna_100_records`

In [0]:
# Write your solution here
query = "(select * from rna limit 100) as rna_alias"

df = (spark.read
  .format("jdbc")
  .option("url", "jdbc:postgresql://hh-pgsql-public.ebi.ac.uk:5432/pfmegrnargs")
  .option("dbtable", query)
  .option("user", "reader")
  .option("password", "NWDMCE5xdipIjRrp")
  .option("fetchSize", "100")
  .load()
)

df.write.format("delta").mode("overwrite").saveAsTable("rna_100_records")
#display(df.select("*"))

id,upi,timestamp,userstamp,crc64,len,seq_short,seq_long,md5
13137664,URS0000C87700,2017-10-19T09:48:29.391+0000,rnacen,CB4C22EE48FCACF8,401,TCTCTAAAGATTTCCTCAGGATAGGAGGAGCTCTTAAGAGATACCCAATTTAATGTAACAAAGCAAACAATTACAGATCTTGAGGCCAAATTGATCTCAAACTATTCTCAAACTTTAAATGGTTGAAAAAAAATCTTAAATTATTGGCAAAGAGCCAGAACTATATAATTAAACTGGCACTTTAAAATGATCCAAATGCCAGGTTACAGCTTTAAATGCTGAAGATTATCAGACCTCAGTAAATGCTGAAGATTATCAGGAACAGAGACTATGAATGCCACAATCTACTAAGGAGTGATAATAAGTTTCCTAACCTATCAACTAGCTTTGAAAATACATGGCATGCTGGGCGTGGTGGCTCATCCCTGTAATCCCAGCACTCAGGAGGCTGAGGCAGGAGA,,f9b73ae9a16e4fc4abd1580d8f5232ba
13137665,URS0000C87701,2017-10-19T09:48:29.391+0000,rnacen,44DFBEB9AE09D9A5,276,CTGACATGTGTGCGAGTCAATGGGTGAGTAAACCCGTAGGGCGCAGGGAAGCTGATTGGCTGGATCCCTCACGGGTGCACACCCGACCGACCTTGATCTTCTGAGAAGGGTTCGAGTGTGAGCATGCCTGTCAGGACCCGAAAGATGGTGAACTATGCCTGAGAGAGGTGAAGCCAGAGGAAACTCTGGTGGAGGCCCGCAGCGATACTAACGTGTAAATCGTTCGTCTGACTTGGGTATAGGGGCGAAAGACTAATTGAACCATCTAGTATCTGG,,f9b74e50196a4de5ddcff5e25b6f7143
13137666,URS0000C87702,2017-10-19T09:48:29.391+0000,rnacen,33D08150969C606F,83,GACCTCGTAGCTCAGTTGGCTAGAGCGTTGGCTGTATAAATGCCTTCCACTTGCTGACGTGGGTTCGAATCCCACCGAAGCTG,,f9b753a0ad4a16c46b1bfd814e58e910
13137667,URS0000C87703,2017-10-19T09:48:29.391+0000,rnacen,7421F9491FDCCF6B,108,AATCCGTGGTGTGGACAGTCTCCCTGGAAGAACTTGAATCTGATCCACGCATCTTTGAAAGACTCTCCAGTCTCCTGCGCGAATGTAGCAATTTTGCTCCTCAAGTCT,,f9b7640244649c22bf58add40673cd2c
13137668,URS0000C87704,2017-10-19T09:48:29.391+0000,rnacen,2ED895697DFBF83F,537,TTGAAGGAAAAACAGTTTTTTTCACTGCCAGTCCAGAGTTGCCCTTGCTCATTCTAAGAACAAGTTCACTCAGAAAAACTGTAAGAAAACAGTTGGCCAGACTGTCCTTTCTGCTGGAATAGCCTCCACTGAAAACACTACTACTACTTAAAAAAACCAAACCAACAAAACCCAATGCCTTAGAGTGCACTCATTGAGAAGTCTGATACATTTTTCTGAAGAAAGCGAAGTGTTCAAAGTTGGCTCGCCAACAGTGCACTCAGTCAGGAGCAACAGAATAGGACTCTGGCTCTTTTTTTTGGTTGTTTTTTGAAAGCAGGGCCATGATTAAGAGCATTGGCTGGGATCACTCATACTGTGCCACTTGAACTGGAATTCTTGGACCAGTGCAAGATGAACTAAAGCAAAAGCATTTGCCAAGACTGTTTTCATTAATCAAAACCAAAAGCTGGAGGGTTGTGGCCAACTCACTATCAGGTGGCATTGCTCCCATCACCCACCAGGAAGATCGCAGTACACCCAAGTTTTTGGGTTCTC,,f9b79b7aa2a6b1f8adb6c05639fd9e2d
13137669,URS0000C87705,2017-10-19T09:48:29.392+0000,rnacen,42DC23FFB4E5D9F2,71,AGGGGCGTCGTTCAATGGTAGGACGGAGGTCTCCAAAACCTCTGACATAGGTTCGATTCCTATCGCCCCTG,,f9b7ae34ced8d8e0c238ff79984b0cdf
13137670,URS0000C87706,2017-10-19T09:48:29.392+0000,rnacen,8C71D6D584C646A9,72,TCCCTGATAGCTCAGTTGGTAAAGAATCTGCCTGCAATACAGGAGACCCTGGTTCAATTCCTGGATGGGGAA,,f9b7c687f973660c5d17050d1e03a499
13137671,URS0000C87707,2017-10-19T09:48:29.392+0000,rnacen,16A88A97D3267D7B,125,CATATACCCCCTCTGTCCGGAAATACTTGTCGGAGGAATAGATGTATCTAGATGTATTTTGGTTCTAGATACATTCATTTTTATCCATTTCTCCGACAAGTATTTTCGGACGGAGGGAGTACTAC,,f9b7d0f0ccd4e954655cc76a4e20cf19
13137672,URS0000C87708,2017-10-19T09:48:29.392+0000,rnacen,7D0C5E535D9EF31B,270,GGTCAGTGATGATCAGTAAAAGGTCTGATTACCCACAATCCTCTCCTGCTGCCCTCTGGCTCCCAGCAGCCCCTGCTCTGTATCCCATCATGCTCTAGTGGTGCAGTGGTGGTCCCAGGAGCTGCTGACAGACAGGATGTGACATCACAGTGAGTCTGTACAGAGATCCTACAGGCTCTGTGTGATGTCATCCAACAGAACCAATCAGCATGGAGCGTCAGGCTGAGAAGGAGGAGGAGGGAGCTGATCTCAAGTTTTCAACCTGAGACC,,f9b7d12c92646d2cbd367c29e294356a
13137673,URS0000C87709,2017-10-19T09:48:29.392+0000,rnacen,B086625FD080BAE2,118,AAGAGGGAACCCGGTGGAAGTCCGGGACTGCCCCGCAGCGGTGAGCGGGAACGACCGCCGTCACACGCACTGGCCCCGGAAGGGGGCTGGGAAGCGACGGCCACTAGGAGTCCGCCCT,,f9b7dc9a710087c40dbf07df63b95665
