# Learning Objectives

In this notebook, you will craft sophisticated ETL jobs that interface with a variety of common data sources, such as 
- REST APIs (HTTP endpoints)
- RDBMS
- Hive tables (managed tables)
- Various file formats (csv, json, parquet, etc.)


# Interview Questions

As you progress through the practice, attempt to answer the following questions:

## Columnar File
- What is a columnar file format and what advantages does it offer?
- Why is Parquet frequently used with Spark and how does it function?
- How do you read/write data from/to a Parquet file using a DataFrame?

## Partitions
- How do you save data to a file system by partitions? (Hint: Provide the code)
- How and why can partitions reduce query execution time? (Hint: Give an example)

## JDBC and RDBMS
- How do you load data from an RDBMS into Spark? (Hint: Discuss the steps and JDBC)

## REST API and HTTP Requests
- How can Spark be used to fetch data from a REST API? (Hint: Discuss making API requests)

## ETL Job One: Parquet file
### Extract
Extract data from the managed tables (e.g. `bookings_csv`, `members_csv`, and `facilities_csv`)

### Transform
Data transformation requirements https://pgexercises.com/questions/aggregates/fachoursbymonth.html

### Load
Load data into a parquet file

### What is Parquet? 

Columnar files are an important technique for optimizing Spark queries. Additionally, they are often tested in interviews.
- https://www.youtube.com/watch?v=KLFadWdomyI
- https://www.databricks.com/glossary/what-is-parquet

In [0]:
# Write your solution here
df = spark.sql("SELECT facid, SUM(slots) AS Total_Slots FROM bookings6 WHERE starttime >= '2012-09-01' AND starttime < '2012-10-01' GROUP BY facid ORDER BY SUM(slots);")
df.write.mode("overwrite").parquet("/dbfs/FileStore/tables/bookings.parquet")
display(df)
display(dbutils.fs.ls("/dbfs/FileStore/tables/bookings.parquet"))



## ETL Job Two: Partitions

### Extract
Extract data from the managed tables (e.g. `bookings_csv`, `members_csv`, and `facilities_csv`)

### Transform
Transform the data https://pgexercises.com/questions/joins/threejoin.html

### Load
Partition the result data by facility column and then save to `threejoin_delta` managed table. Additionally, they are often tested in interviews.

hint: https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrameWriter.partitionBy.html

What are partitions? 

Partitions are an important technique to optimize Spark queries
- https://www.youtube.com/watch?v=hvF7tY2-L3U&t=268s

In [0]:
# Write your solution here
import tempfile
import os

df = spark.sql("SELECT DISTINCT mems.firstname || ' ' || mems.surname AS member, facs.name AS facility FROM members7 mems INNER JOIN bookings7 bks ON mems.memid = bks.memid INNER JOIN facilities7 facs ON bks.facid = facs.facid WHERE facs.name in ('Tennis Court 2','Tennis Court 1') ORDER BY member, facility;")
df.write.partitionBy("facility").parquet("dbfs:/FileStore/tables/threejoin_delta.parquet")
display(df)


## ETL Job Three: HTTP Requests

### Extract
Extract daily stock price data price from the following companies, Google, Apple, Microsoft, and Tesla. 

Data Source
- API: https://rapidapi.com/alphavantage/api/alpha-vantage
- Endpoint: GET `TIME_SERIES_DAILY`

Sample HTTP request

```
curl --request GET \
	--url 'https://alpha-vantage.p.rapidapi.com/query?function=TIME_SERIES_DAILY&symbol=TSLA&outputsize=compact&datatype=json' \
	--header 'X-RapidAPI-Host: alpha-vantage.p.rapidapi.com' \
	--header 'X-RapidAPI-Key: [YOUR_KEY]'

```

Sample Python HTTP request

```
import requests

url = "https://alpha-vantage.p.rapidapi.com/query"

querystring = {
    "function":"TIME_SERIES_DAILY",
    "symbol":"IBM",
    "datatype":"json",
    "outputsize":"compact"
}

headers = {
    "X-RapidAPI-Host": "alpha-vantage.p.rapidapi.com",
    "X-RapidAPI-Key": "[YOUR_KEY]"
}

response = requests.get(url, headers=headers, params=querystring)

data = response.json()

# Now 'data' contains the daily time series data for "IBM"
```

### Transform
Find **weekly** max closing price for each company.

hints: 
  - Use a `for-loop` to get stock data for each company
  - Use the spark `union` operation to concat all data into one DF
  - create a new `week` column from the data column
  - use `group by` to calcualte max closing price

### Load
- Partition `DF` by company
- Load the DF in to a managed table called, `max_closing_price_weekly`

In [0]:
# Write your solution here

import requests
import pandas as pd
import copy
from pyspark.sql.functions import lit
from pyspark.sql.functions import weekofyear, udf, monotonically_increasing_id
import json
compSymbols = ["GOOG", "AAPL", "MSFT", "TSLA"]
url = "https://alpha-vantage.p.rapidapi.com/query"
for x in range(4):
    querystring = {
        "function":"TIME_SERIES_DAILY",
        "symbol":compSymbols[x],
        "datatype":"json",
        "outputsize":"compact"
    }
    headers = {
        "X-RapidAPI-Host": "alpha-vantage.p.rapidapi.com",
        "X-RapidAPI-Key": "APIKEY"
    }
    response = requests.get(url, headers=headers, params=querystring)
    data = response.json()
    # Extract only the data side
    df = pd.DataFrame(data['Time Series (Daily)'])
    # Get a list of the dates
    dates = df.columns
    # Transpose the dataframe so that each row is a single date
    df_t = df.transpose()
    # Convert pandas dataframe to spark dataframe
    ddf = spark.createDataFrame(df_t)
    # Append the date as a column using the list from before
    ddf = ddf.repartition(1).withColumn("Date", udf(lambda id: dates[id])(monotonically_increasing_id()))
    # Add a column for the week number using the new date column
    ddf = ddf.withColumn('week num', weekofyear('Date'))
    #ddf = ddf.withColumn('Company', lit(compSymbols[x]))
    if x == 0:
        ddf = ddf.withColumn('Company', lit("GOOG"))
        unionFrame0 = spark.sql("SELECT Company, MAX(`4. close`) AS `Weekly_Max`, `week num` AS Week_Num FROM {table} GROUP BY Company, Week_Num", table = ddf)
    if x == 1:
        ddf = ddf.withColumn('Company', lit("AAPL"))
        unionFrame1 = spark.sql("SELECT Company, MAX(`4. close`) AS `Weekly_Max`, `week num` AS Week_Num FROM {table} GROUP BY Company, Week_Num", table = ddf)
    if x == 2:
        ddf = ddf.withColumn('Company', lit("MSFT"))
        unionFrame2 = spark.sql("SELECT Company, MAX(`4. close`) AS `Weekly_Max`, `week num` AS Week_Num FROM {table} GROUP BY Company, Week_Num", table = ddf)
    if x == 3:
        ddf = ddf.withColumn('Company', lit("TSLA"))
        unionFrame3 = spark.sql("SELECT Company, MAX(`4. close`) AS `Weekly_Max`, `week num` AS Week_Num FROM {table} GROUP BY Company, Week_Num", table = ddf)
        combinedFrame0 = unionFrame0.union(unionFrame1)
        combinedFrame1 = combinedFrame0.union(unionFrame2)
        combinedFrame2 = combinedFrame1.union(unionFrame3)
        dddf = combinedFrame2

dddf.printSchema()
dddf.write.partitionBy("Company").saveAsTable("max_closing_price_weekly")
display(dddf)



root
 |-- Company: string (nullable = false)
 |-- Weekly_Max: string (nullable = true)
 |-- Week_Num: integer (nullable = true)



Company,Weekly_Max,Week_Num
GOOG,153.79,4
GOOG,154.84,5
GOOG,150.22,6
GOOG,148.73,7
GOOG,145.32,8
GOOG,140.1,9
GOOG,136.29,10
GOOG,144.34,11
GOOG,151.77,12
GOOG,152.26,13


## ETL Job Four: RDBMS


### Extract
Extract RNA data from a public PostgreSQL database.

- https://rnacentral.org/help/public-database
- Extract 100 RNA records from the `rna` table (hint: use `limit` in your sql)
- hint: use `spark.read.jdbc` https://docs.databricks.com/external-data/jdbc.html

### Transform
We want to load the data as it so there is no transformation required.


### Load
Load the DF in to a managed table called, `rna_100_records`

In [0]:
# Write your solution here
remote_table = (spark.read
  .format("jdbc")
  .option("url", "jdbc:postgresql://hh-pgsql-public.ebi.ac.uk:5432/pfmegrnargs")
  .option("user", "reader")
  .option("password", "NWDMCE5xdipIjRrp")
  .option("dbtable", "rna")
  .option("fetchSize", "100")
  .load()
)
# display(remote_table)
df = remote_table.select("*").limit(100)
df.write.mode("overwrite").saveAsTable("rna_100_records")
display(df)
