# Learning Objectives

In this notebook, you will craft sophisticated ETL jobs that interface with a variety of common data sources, such as 
- REST APIs (HTTP endpoints)
- RDBMS
- Hive tables (managed tables)
- Various file formats (csv, json, parquet, etc.)

# Interview Questions

As you progress through the practice, attempt to answer the following questions:

## Columnar File
- What is a columnar file format and what advantages does it offer?
- Why is Parquet frequently used with Spark and how does it function?
- How do you read/write data from/to a Parquet file using a DataFrame?

## Partitions
- How do you save data to a file system by partitions? (Hint: Provide the code)
- How and why can partitions reduce query execution time? (Hint: Give an example)

## JDBC and RDBMS
- How do you load data from an RDBMS into Spark? (Hint: Discuss the steps and JDBC)

## REST API and HTTP Requests
- How can Spark be used to fetch data from a REST API? (Hint: Discuss making API requests)

## ETL Job One: Parquet file
### Extract
Extract data from the managed tables (e.g. `bookings_csv`, `members_csv`, and `facilities_csv`)

### Transform
Data transformation requirements https://pgexercises.com/questions/aggregates/fachoursbymonth.html

### Load
Load data into a parquet file

### What is Parquet? 

Columnar files are an important technique for optimizing Spark queries. Additionally, they are often tested in interviews.
- https://www.youtube.com/watch?v=KLFadWdomyI
- https://www.databricks.com/glossary/what-is-parquet

In [0]:
# Write your solution here
df_mem = spark.table('members')
df_book = spark.table('bookings')
df_fac = spark.table('facilities')


In [0]:
from pyspark.sql.functions import col, sum

df_etl_1 = df_book.filter(
    (col('starttime') >= "2012-09-01") & (col('starttime') < "2012-10-01")
).groupBy(
    'facid'
    ).agg(
        sum('slots').alias("Total Slots")
        ).orderBy('Total Slots')

df_etl_1.show()

In [0]:
path_out = "/FileStore/data/ETLjob1"

df_etl_1.write.parquet(path_out)

## ETL Job Two: Partitions

### Extract
Extract data from the managed tables (e.g. `bookings_csv`, `members_csv`, and `facilities_csv`)

### Transform
Transform the data https://pgexercises.com/questions/joins/threejoin.html

### Load
Partition the result data by facility column and then save to `threejoin_delta` managed table. Additionally, they are often tested in interviews.

hint: https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrameWriter.partitionBy.html

What are paritions? 

Partitions are an important technique to optimize Spark queries
- https://www.youtube.com/watch?v=hvF7tY2-L3U&t=268s

In [0]:
# Write your solution here
df_mem = spark.table('members')
df_book = spark.table('bookings')
df_fac = spark.table('facilities')

In [0]:
from pyspark.sql.functions import concat, lit

df_etl_2 = df_mem.join(
    df_book, df_mem.memid == df_book.memid, 'inner'
).join(
    df_fac, df_book.facid == df_fac.facid, 'inner'
).select(
    concat(
        df_mem.firstname, lit(' '), df_mem.surname
    ).alias('member'),
    df_fac.name.alias('facility')
).filter(
    col('facility').contains('Tennis Court')
).distinct().orderBy('member', 'facility')

df_etl_2.show()

In [0]:
df_etl_2.write.partitionBy('facility').saveAsTable('threejoin_delta')

In [0]:
spark.sql(
    "SELECT * FROM threejoin_delta LIMIT 10"
).show()

## ETL Job Three: HTTP Requests

### Extract
Extract daily stock price data price from the following companies, Google, Apple, Microsoft, and Tesla. 

Data Source
- API: https://rapidapi.com/alphavantage/api/alpha-vantage
- Endpoint: GET `TIME_SERIES_DAILY`

Sample HTTP request

```
curl --request GET \
	--url 'https://alpha-vantage.p.rapidapi.com/query?function=TIME_SERIES_DAILY&symbol=TSLA&outputsize=compact&datatype=json' \
	--header 'X-RapidAPI-Host: alpha-vantage.p.rapidapi.com' \
	--header 'X-RapidAPI-Key: [YOUR_KEY]'

```

Sample Python HTTP request

```
import requests

url = "https://alpha-vantage.p.rapidapi.com/query"

querystring = {
    "function":"TIME_SERIES_DAILY",
    "symbol":"IBM",
    "datatype":"json",
    "outputsize":"compact"
}

headers = {
    "X-RapidAPI-Host": "alpha-vantage.p.rapidapi.com",
    "X-RapidAPI-Key": "[YOUR_KEY]"
}

response = requests.get(url, headers=headers, params=querystring)

data = response.json()

# Now 'data' contains the daily time series data for "IBM"
```

### Transform
Find **weekly** max closing price for each company.

hints: 
  - Use a `for-loop` to get stock data for each company
  - Use the spark `union` operation to concat all data into one DF
  - create a new `week` column from the data column
  - use `group by` to calculate max closing price

### Load
- Partition `DF` by company
- Load the DF in to a managed table called, `max_closing_price_weekly`

In [0]:
# Write your solution here
import requests
import os
from pyspark.sql import SparkSession
from pyspark.sql.functions import lit, col

spark = SparkSession.builder.getOrCreate()

def get_time_series_daily(symbol):

  url = "https://alpha-vantage.p.rapidapi.com/query"

  querystring = {
    "function":"TIME_SERIES_DAILY",
    "symbol":"MSFT",
    "datatype":"json"
  }

  headers = {
	  "x-rapidapi-key": "760895d090msh1fbe4a93d8b1b98p11ae36jsnc52dd60380a7",
	  "x-rapidapi-host": "alpha-vantage.p.rapidapi.com"
  }

  response = requests.get(url, headers=headers, params=querystring)
  response = response.json()

  return response['Time Series (Daily)']




In [0]:
company_symbols = {
    "GOOG",
    "AAPL",
    "MSFT",
    "TSLA"
}

def check_df_exists():
    try:
        df_all_companies.count()
        return True
    except NameError:
        return False

for symbol in company_symbols:
    response = get_time_series_daily(symbol)
    data = [
        {"date": date, **prices} for date, prices in response.items()
    ]
    if check_df_exists(): 
        df_new_data = spark.createDataFrame(data)
        df_new_data = df_new_data.withColumn("company", lit(symbol))
        df_all_companies = df_all_companies.union(df_new_data)
    else: 
        df_all_companies = spark.createDataFrame(data)
        df_all_companies = df_all_companies.withColumn("company", lit(symbol))
    

df_all_companies.filter(
    col("date") == "2025-03-07"
).show(5)




+--------+--------+--------+--------+---------+----------+-------+
| 1. open| 2. high|  3. low|4. close|5. volume|      date|company|
+--------+--------+--------+--------+---------+----------+-------+
|392.3200|394.8000|385.5400|393.3100| 22034087|2025-03-07|   GOOG|
|392.3200|394.8000|385.5400|393.3100| 22034087|2025-03-07|   AAPL|
|392.3200|394.8000|385.5400|393.3100| 22034087|2025-03-07|   TSLA|
|392.3200|394.8000|385.5400|393.3100| 22034087|2025-03-07|   MSFT|
|392.3200|394.8000|385.5400|393.3100| 22034087|2025-03-07|   GOOG|
+--------+--------+--------+--------+---------+----------+-------+
only showing top 5 rows



In [0]:
#Rename the cols
new_cols = ['open', 'high', 'low', 'close', 'volume']

for old_col, new_col in zip(df_all_companies.columns[:5], new_cols):
    df_all_companies = df_all_companies.withColumnRenamed(old_col, new_col)

df_all_companies.show(1)

+--------+--------+--------+--------+--------+----------+-------+
|    open|    high|     low|   close|  volume|      date|company|
+--------+--------+--------+--------+--------+----------+-------+
|392.3200|394.8000|385.5400|393.3100|22034087|2025-03-07|   GOOG|
+--------+--------+--------+--------+--------+----------+-------+
only showing top 1 row



In [0]:
#Add week column
from pyspark.sql.functions import weekofyear, max

df_all_companies_week = df_all_companies.withColumn(
    "week",
    weekofyear(
        col('date')
    )
)

df_all_companies_week.show(3)

+--------+--------+--------+--------+--------+----------+-------+----+
|    open|    high|     low|   close|  volume|      date|company|week|
+--------+--------+--------+--------+--------+----------+-------+----+
|392.3200|394.8000|385.5400|393.3100|22034087|2025-03-07|   GOOG|  10|
|394.2800|402.1500|392.6777|396.8900|23304625|2025-03-06|   GOOG|  10|
|389.3400|401.6700|388.8100|401.0200|23433132|2025-03-05|   GOOG|  10|
+--------+--------+--------+--------+--------+----------+-------+----+
only showing top 3 rows



In [0]:
#Perform transformation
df_max_close = df_all_companies_week.groupBy('company', 'week').agg(
    max(
        col('close')
    ).alias("max_close_of_week")
)

df_max_close.show(10)

+-------+----+-----------------+
|company|week|max_close_of_week|
+-------+----+-----------------+
|   AAPL|   1|         424.8300|
|   AAPL|   2|         427.8500|
|   AAPL|   3|         429.0300|
|   AAPL|   4|         446.7100|
|   AAPL|   5|         447.2000|
|   AAPL|   6|         415.8200|
|   AAPL|   7|         412.2200|
|   AAPL|   8|         416.1300|
|   AAPL|   9|         404.0000|
|   AAPL|  10|         401.0200|
+-------+----+-----------------+
only showing top 10 rows



In [0]:
#Partition by company and load into 'max_closing_price_weekly'
df_max_close.write.partitionBy('company').saveAsTable('max_closing_price_weekly')


In [0]:
spark.sql(
    "SELECT * FROM max_closing_price_weekly LIMIT 10"
).show()

+-------+----+-----------------+
|company|week|max_close_of_week|
+-------+----+-----------------+
|   AAPL|   1|         424.8300|
|   AAPL|   2|         427.8500|
|   AAPL|   3|         429.0300|
|   AAPL|   4|         446.7100|
|   AAPL|   5|         447.2000|
|   AAPL|   6|         415.8200|
|   AAPL|   7|         412.2200|
|   AAPL|   8|         416.1300|
|   AAPL|   9|         404.0000|
|   AAPL|  10|         401.0200|
+-------+----+-----------------+



## ETL Job Four: RDBMS


### Extract
Extract RNA data from a public PostgreSQL database.

- https://rnacentral.org/help/public-database
- Extract 100 RNA records from the `rna` table (hint: use `limit` in your sql)
- hint: use `spark.read.jdbc` https://docs.databricks.com/external-data/jdbc.html

### Transform
We want to load the data as it so there is no transformation required.


### Load
Load the DF in to a managed table called, `rna_100_records`

In [0]:
# Write your solution here
jdbc_host = "hh-pgsql-public.ebi.ac.uk"
jdbc_port = "5432"
jdbc_database = "pfmegrnargs"
jdbc_table = "(SELECT * FROM rna LIMIT 100) as temp_table"
jdbc_user = "reader"
jdbc_password = "NWDMCE5xdipIjRrp"

df_rna_table = (
    spark.read.format("jdbc")
    .option("driver", "org.postgresql.Driver")
    .option("url", f"jdbc:postgresql://{jdbc_host}:{jdbc_port}/{jdbc_database}")
    .option("dbtable", jdbc_table)  
    .option("user", jdbc_user)
    .option("password", jdbc_password)
    .load()
)

In [0]:
df_rna_table.count()

In [0]:
df_rna_table.write.saveAsTable("rna_100_records")

In [0]:
spark.sql("SELECT * FROM rna_100_records LIMIT 10").show()