# Learning Objectives

In this notebook, you will craft sophisticated ETL jobs that interface with a variety of common data sources, such as 
- REST APIs (HTTP endpoints)
- RDBMS
- Hive tables (managed tables)
- Various file formats (csv, json, parquet, etc.)


# Interview Questions

As you progress through the practice, attempt to answer the following questions:

## Columnar File
- What is a columnar file format and what advantages does it offer?
- Why is Parquet frequently used with Spark and how does it function?
- How do you read/write data from/to a Parquet file using a DataFrame?

## Partitions
- How do you save data to a file system by partitions? (Hint: Provide the code)
- How and why can partitions reduce query execution time? (Hint: Give an example)

## JDBC and RDBMS
- How do you load data from an RDBMS into Spark? (Hint: Discuss the steps and JDBC)

## REST API and HTTP Requests
- How can Spark be used to fetch data from a REST API? (Hint: Discuss making API requests)

## ETL Job One: Parquet file
### Extract
Extract data from the managed tables (e.g. `bookings_csv`, `members_csv`, and `facilities_csv`)

### Transform
Data transformation requirements https://pgexercises.com/questions/aggregates/fachoursbymonth.html

### Load
Load data into a parquet file

### What is Parquet? 

Columnar files are an important technique for optimizing Spark queries. Additionally, they are often tested in interviews.
- https://www.youtube.com/watch?v=KLFadWdomyI
- https://www.databricks.com/glossary/what-is-parquet

In [0]:
# Write your solution here
from pyspark.sql.functions import col
# Step 1:Extract data from managed tables
bookings_df = spark.table("bookings")
members_df = spark.table("members")
facilities_df = spark.table("facilities")

# Step 2:Transform data
from pyspark.sql.functions import year, month, date_trunc

# Transform data to calculate the total number of hours booked per facility per month
result_df = bookings_df \
    .filter((bookings_df['starttime'] >= '2012-09-01') & (bookings_df['starttime'] < '2012-10-01')) \
    .groupBy('facid') \
    .agg(expr('sum(slots) as `Total Slots`')) \
    .orderBy('Total Slots')

# Step 3: Load data into a Parquet file
output_path = "dbfs:/FileStore/parquet/parquet_file"
result_df.write.mode("overwrite").parquet(output_path)



## ETL Job Two: Partitions

### Extract
Extract data from the managed tables (e.g. `bookings_csv`, `members_csv`, and `facilities_csv`)

### Transform
Transform the data https://pgexercises.com/questions/joins/threejoin.html

### Load
Partition the result data by facility column and then save to `threejoin_delta` managed table. Additionally, they are often tested in interviews.

hint: https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrameWriter.partitionBy.html

What are paritions? 

Partitions are an important technique to optimize Spark queries
- https://www.youtube.com/watch?v=hvF7tY2-L3U&t=268s

In [0]:
# Write your solution here
from pyspark.sql.functions import concat_ws

# Extract data from managed tables
bookings_df = spark.table("bookings")
members_df = spark.table("members")
facilities_df = spark.table("facilities")

# Transform data to produce a list of members who have used a tennis court
tennis_bookings_df = bookings_df \
    .join(facilities_df, bookings_df.facid == facilities_df.facid) \
    .join(members_df, bookings_df.memid == members_df.memid) \
    .filter(facilities_df.name.rlike("Tennis")) \
    .select(concat_ws(", ", members_df.firstname, members_df.surname).alias("MemberName"),
            facilities_df.name.alias("CourtName")) \
    .distinct() \
    .orderBy("MemberName", "CourtName")

# Load transformed data into "threejoin_delta" managed table partitioned by facility column
tennis_bookings_df.write.mode("overwrite").partitionBy("CourtName").saveAsTable("threejoin_delta")



## ETL Job Three: HTTP Requests

### Extract
Extract daily stock price data price from the following companies, Google, Apple, Microsoft, and Tesla. 

Data Source
- API: https://rapidapi.com/alphavantage/api/alpha-vantage
- Endpoint: GET `TIME_SERIES_DAILY`

Sample HTTP request

```
curl --request GET \
	--url 'https://alpha-vantage.p.rapidapi.com/query?function=TIME_SERIES_DAILY&symbol=TSLA&outputsize=compact&datatype=json' \
	--header 'X-RapidAPI-Host: alpha-vantage.p.rapidapi.com' \
	--header 'X-RapidAPI-Key: [YOUR_KEY]'

```

Sample Python HTTP request

```
import requests

url = "https://alpha-vantage.p.rapidapi.com/query"

querystring = {
    "function":"TIME_SERIES_DAILY",
    "symbol":"IBM",
    "datatype":"json",
    "outputsize":"compact"
}

headers = {
    "X-RapidAPI-Host": "alpha-vantage.p.rapidapi.com",
    "X-RapidAPI-Key": "[YOUR_KEY]"
}

response = requests.get(url, headers=headers, params=querystring)

data = response.json()

# Now 'data' contains the daily time series data for "IBM"
```

### Transform
Find **weekly** max closing price for each company.

hints: 
  - Use a `for-loop` to get stock data for each company
  - Use the spark `union` operation to concat all data into one DF
  - create a new `week` column from the data column
  - use `group by` to calcualte max closing price

### Load
- Partition `DF` by company
- Load the DF in to a managed table called, `max_closing_price_weekly`

In [0]:
# Write your solution here

import requests
from pyspark.sql import SparkSession,DataFrame
from pyspark.sql.functions import lit, weekofyear, max
from functools import reduce
import pandas as pd


# Initialize Spark session
spark = SparkSession.builder \
    .appName("StockPriceETL") \
    .getOrCreate()

# Define companies and their symbols
companies = [("Google", "GOOGL"), ("Apple", "AAPL"), ("Microsoft", "MSFT"), ("Tesla", "TSLA")]

#headers
api_key = "d01b1e12b3msh1c971dd3e923817p141282jsna30e0ef7a63f"
headers = {
    "X-RapidAPI-Host": "alpha-vantage.p.rapidapi.com",
    "X-RapidAPI-Key": api_key
}
# Extract and transform data for each company
dfs = []
for company, symbol in companies:
    # Make HTTP request to get daily stock price data
    response = requests.get("https://alpha-vantage.p.rapidapi.com/query",
                            headers=headers,
                            params={"function": "TIME_SERIES_DAILY",
                                    "symbol": symbol,
                                    "datatype": "json",
                                    "outputsize": "compact"})
    data = response.json()
    # Convert data to DataFrame
    df = spark.createDataFrame(pd.DataFrame(data["Time Series (Daily)"])
                               .transpose()
                               .reset_index()
                               .rename(columns={"index": "date"}))
    # Convert date column to date type
    df = df.withColumn("date", df["date"].cast("date"))
    # Calculate week column from date
    df = df.withColumn("week", weekofyear("date"))
    # Convert closing price to float
    df = df.withColumn("close", df["`4. close`"].cast("float"))
    # Calculate max closing price for each week
    weekly_max_df = df.groupBy("week").agg(max("close").alias("max_closing_price"))
    # Add company column
    weekly_max_df = weekly_max_df.withColumn("company", lit(company))
    # Append to list of DataFrames
    dfs.append(weekly_max_df)

# Concatenate all DataFrames into one
result_df = reduce(DataFrame.unionAll, dfs)

# Partition by company
result_df.write.partitionBy("company").mode("overwrite").saveAsTable("max_closing_price_weekly")


## ETL Job Four: RDBMS


### Extract
Extract RNA data from a public PostgreSQL database.

- https://rnacentral.org/help/public-database
- Extract 100 RNA records from the `rna` table (hint: use `limit` in your sql)
- hint: use `spark.read.jdbc` https://docs.databricks.com/external-data/jdbc.html

### Transform
We want to load the data as it so there is no transformation required.


### Load
Load the DF in to a managed table called, `rna_100_records`

In [0]:
# Write your solution here
# Define JDBC connection properties
jdbc_url = "jdbc:postgresql://hh-pgsql-public.ebi.ac.uk:5432/pfmegrnargs"
jdbc_properties = {
    "user": "reader",
    "password": "NWDMCE5xdipIjRrp",
    "driver": "org.postgresql.Driver"
}

# Define SQL query to extract 100 RNA records from the rna table
sql_query = "(SELECT * FROM rna LIMIT 100) AS rna_100_records"

# Extract RNA data from PostgreSQL database
rna_df = spark.read.jdbc(url=jdbc_url, table=sql_query, properties=jdbc_properties)

# Load the DataFrame into a managed table called rna_100_records
rna_df.write.mode("overwrite").saveAsTable("rna_100_records")

display(spark.table("rna_100_records"))

id,upi,timestamp,userstamp,crc64,len,seq_short,seq_long,md5
6467147,URS000062AE4B,2014-05-29T15:05:26.000+0000,RNACEN,A30505128C56B09F,108,GTGCTTGCCTCAGCAGAATATATACTAAAATTGGAACGACACAGAGAAGTTTAGCATGGCCCCTTGCGCAAGGATGACACACAAATTCATGAAGCCTTCCATATTTTT,,f92e69cce37c750bccd260ee78208759
6467150,URS000062AE4E,2014-05-29T15:05:26.000+0000,RNACEN,3EECEBCCB3FC43D1,71,GCGTCCATTGTCTAATGGGAAGGACAGAGGTTGTGTAACCTTTGGTATAGGTTCAAATCCTATTGGACGTG,,f92ea95474d3801ce4f4c89c88c99a4a
6467153,URS000062AE51,2014-05-29T15:05:26.000+0000,RNACEN,41DBE95A8F0F64ED,1384,CANTTTTNGAATTTTGTTATGGTCTCAGCAAGCGCGCTCGCGGCGCGCATAACACACGCAAGTAGAGCGGTCTTACTGTATTTTTTGCACTCGTGAGGGTTAGCGGCGGACGGGTGAGTAACACGTAGGCAACCTGCCTGTAAGACTGGGATAACTACCGGAAACGGTAGCTAATACCGGATAGGCTTTCTTCTCACCTGAGAGGAAAGAGAAAGACGGAGCAATCTGTCACTTACAGATGGGCCTGCGGCGCATTAGCTAGTTGGTGAGGTAACGGCTCACCAAGGCGACGATGCGTAGCCGACCTGAGAGGGTGATCGGCCACACTGGGACTGAGACACGGCCCAGACTCCTACGGGAGGCAGCAGTAGGGAATCTTCCGCAATGGACGAAAGTCTGACGGAGCAACGCCGCGTGAGTGATGAAGGCCTTCGGGTCGTAAAGCTCTGTTGCCAAGGAAGAACAGCTGAGAGAGTAACTGCTCTCGGAATGACGGTACTTGAGAAGAAAGCCCCGGCTAACTACGTGCCAGCAGCCGCGGTAATACGTAGGGGGCAAGCGTTGTCCGGAATTATTGGGCGTAAAGCGCGCGCAGGCGGTAATGTAAGTTGGGTGTTTAAGGCGAGGGCTCAACCCTCGTTCGCACCCAAAACTGCATCACTTGAGTGCAGCAGAGGAAAGTGGAATTCCACGTGTAGCGGTGAAATGCGTAGAGATGTGGAGGAACACCAGTGGCGAAGGCGACTTTCTGGGCTGTAACTGACGCTGAGGCGCGAAAGCGTGGGGAGCAAACAGGATTAGATACCCTGGTAGTCCACGCCGTAAACGATGAATGCTAGGTGTTAGGGGTTTCGATACCCTTGGTGCCGAAGTTAACACATTAAGCATTCCGCCTGGGGAGTACGGTCGCAAGACTGAAACTCAAAGGAATTGACGGGGACCCGCACAAGCAGTGGAGTATGTGGTTTAATTCGAAGCAACGCGAAGAACCTTACCAGGTCTTGACATCTGAATGACCGGTGCAGAGATGTGCCTTTCCTTCGGGACATTCAAGACAGGTGGTGCATGGTTGTCGTCAGCTCGTGTCGTGAGATGTTGGGTTAAGTCCCGCAACGAGCGCAACCCCTAATTTTAGTTGCCAGCATTCAGTTGGGCACTCTAAAGTGACTGCCGGTGACAAACCGGAGGAAGGTGGGGATGACGTCAAATCATCATGCCCCTTATGACCTGGGCTACACACGTACTACAATGGTCGGTACAACGGGAAGCGAAGCCGCGAGGTGGAGCCAATCCTAAAAAGCCGATCTCAGTTCGGATTGTAGGCTGCAACTCGCCTACATGAAGTCGGAATTGCTAGTAATCGCGGATCAGCATGCCGCGGTGA,,f92eb5e4ca8baf49faac53c9de6fd3a6
6467154,URS000062AE52,2014-05-29T15:05:26.000+0000,RNACEN,045EF834A0592D93,85,GCCTGGGTGGCTCAGTCGGTTGAGCTTCCGACTTCAGATCAGGTCATGATCTCACAGTTCGTGGGTTCAAGCCCCGCATCGGGCC,,301d2533787f77f10f6d3e79d65abbb8
6467155,URS000062AE53,2014-05-29T15:05:26.000+0000,RNACEN,E0CDC2D926642A48,73,GCCTGGCTGGCTTGGTTGGTAGAACATGCGGCTCTTAATCTCAGGGTTGTGGGTTCAAGCCCCATGTTGGGCA,,3e5c5e19ad2168b404fd9a99f9d3946f
6467158,URS000062AE56,2014-05-29T15:05:26.000+0000,RNACEN,5100105C0CA4EEE0,103,TTGCTTCCGCAGCACATACACTAACATTGGAAAGCTAAAGAGAAGATCAGCATGACCCCTACAGAAGGATGACATGCACACTCGTGACGCGTTCCATATTTTT,,301d57150c047db06f1fbbe2141ee51d
6467159,URS000062AE57,2014-05-29T15:05:26.000+0000,RNACEN,60469075D67963B3,64,GGGGATTTAGCTCAGCGGCATAAGCACCTGCCTTGCAAGCAGGCAGTCGTTAGTTCGATCCCCG,,3e5ce3c8e46358055351a9b0f82b944a
6467163,URS000062AE5B,2014-05-29T15:05:26.000+0000,RNACEN,D82C72ED7C1ED18F,92,TTATAGGATGAACCCTGATAGGGACAACTTATGCTACGGCATTTGATGAGAACCAGGGACTGTATTTTAGTAGATGCTTGTCTTTTCTGATA,,3e5ce6ed34c972eb59e4fd097302eb34
6467164,URS000062AE5C,2014-05-29T15:05:26.000+0000,RNACEN,29351B9237BAC973,74,GGGCGGCTAGCTCAGCTGGTCAGAGCGCTCGCCTTACAAGCGAGAGGCCAGAGGTTCAAGTCCTCTGCCGCCCA,,d9c6c16392e064dfa050b577045cd759
6467165,URS000062AE5D,2014-05-29T15:05:26.000+0000,RNACEN,75BD272B21090812,155,AACTTTCAGCGATCGATGTCTCGGCTCGAACAACGATGAAGGGCGCAGCGAAGTGTGATAAGCATTGTGAATTGCAGGATTCCGTGAACCAATAGGGACTTGAACGTACACTGCGCTTTCGGGAWATCCCTGAAAGCATGCCTACTTCAGTGTCC,,f92fc59077d64c04257a7f047e91d085
