# Learning Objectives

In this notebook, you will craft sophisticated ETL jobs that interface with a variety of common data sources, such as 
- REST APIs (HTTP endpoints)
- RDBMS
- Hive tables (managed tables)
- Various file formats (csv, json, parquet, etc.)


# Interview Questions

As you progress through the practice, attempt to answer the following questions:

## Columnar File
- What is a columnar file format and what advantages does it offer?
- Why is Parquet frequently used with Spark and how does it function?
- How do you read/write data from/to a Parquet file using a DataFrame?

## Partitions
- How do you save data to a file system by partitions? (Hint: Provide the code)
- How and why can partitions reduce query execution time? (Hint: Give an example)

## JDBC and RDBMS
- How do you load data from an RDBMS into Spark? (Hint: Discuss the steps and JDBC)

## REST API and HTTP Requests
- How can Spark be used to fetch data from a REST API? (Hint: Discuss making API requests)

## ETL Job One: Parquet file
### Extract
Extract data from the managed tables (e.g. `bookings_csv`, `members_csv`, and `facilities_csv`)

### Transform
Data transformation requirements https://pgexercises.com/questions/aggregates/fachoursbymonth.html

### Load
Load data into a parquet file

### What is Parquet? 

Columnar files are an important technique for optimizing Spark queries. Additionally, they are often tested in interviews.
- https://www.youtube.com/watch?v=KLFadWdomyI
- https://www.databricks.com/glossary/what-is-parquet

In [0]:
from pyspark.sql.functions import col, to_date, lit, sum



df_bookings = spark.table("bookings")

result = (df_bookings
          .filter((to_date(col("starttime"), "yyyy-mm-dd") >= to_date(lit("2012-09-01"), "yyyy-MM-dd")) & (to_date(col("starttime"), "yyyy-mm-dd") <= to_date(lit("2012-10-01"), "yyyy-MM-dd")))
          .agg(sum("slots").alias("Total Slots"))
          .groupBy(col("facid"))
          .orderBy("Total Slots"))
result.show()

result.write.mode("overwrite").parquet("/Data/Parquet/Job1")


#dbutils.fs.rm("/Data/Parquet/Job1", recurse=True)





+-----+-----------+
|facid|Total Slots|
+-----+-----------+
|    5|        122|
|    3|        422|
|    7|        426|
|    8|        471|
|    6|        540|
|    2|        570|
|    1|        588|
|    0|        591|
|    4|        648|
+-----+-----------+



## ETL Job Two: Partitions

### Extract
Extract data from the managed tables (e.g. `bookings_csv`, `members_csv`, and `facilities_csv`)

### Transform
Transform the data https://pgexercises.com/questions/joins/threejoin.html

### Load
Partition the result data by facility column and then save to `threejoin_delta` managed table. Additionally, they are often tested in interviews.

hint: https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrameWriter.partitionBy.html

What are paritions? 

Partitions are an important technique to optimize Spark queries
- https://www.youtube.com/watch?v=hvF7tY2-L3U&t=268s

In [0]:
from pyspark.sql.functions import col, concat_ws

df_booking = spark.table("bookings")
df_facilities = spark.table("members")
df_members = spark.table("facilities")
result = df_booking.join(df_members,  df_booking.memid == df_members.memid, "inner").join(df_facilities, df_facilities.facid == df_booking.facid, "inner").filter(col("name").contains("Tennis Court")).select(concat_ws(" ",col("FIRSTNAME"),col("SURNAME")).alias("Member"), (col("name").alias("Facility"))).distinct().orderBy(col("member"), col("Facility"))
result.show()

result.write.mode("overwrite").partitionBy("Facility").saveAsTable("threejoin_delta")

spark.sql("select * from threejoin_delta").show()




## ETL Job Three: HTTP Requests

### Extract
Extract daily stock price data price from the following companies, Google, Apple, Microsoft, and Tesla. 

Data Source
- API: https://rapidapi.com/alphavantage/api/alpha-vantage
- Endpoint: GET `TIME_SERIES_DAILY`

Sample HTTP request

```
curl --request GET \
	--url 'https://alpha-vantage.p.rapidapi.com/query?function=TIME_SERIES_DAILY&symbol=TSLA&outputsize=compact&datatype=json' \
	--header 'X-RapidAPI-Host: alpha-vantage.p.rapidapi.com' \
	--header 'X-RapidAPI-Key: [YOUR_KEY]'

```

Sample Python HTTP request

```
import requests

url = "https://alpha-vantage.p.rapidapi.com/query"

querystring = {
    "function":"TIME_SERIES_DAILY",
    "symbol":"IBM",
    "datatype":"json",
    "outputsize":"compact"
}

headers = {
    "X-RapidAPI-Host": "alpha-vantage.p.rapidapi.com",
    "X-RapidAPI-Key": "[YOUR_KEY]"
}

response = requests.get(url, headers=headers, params=querystring)

data = response.json()

# Now 'data' contains the daily time series data for "IBM"
```

### Transform
Find **weekly** max closing price for each company.

hints: 
  - Use a `for-loop` to get stock data for each company
  - Use the spark `union` operation to concat all data into one DF
  - create a new `week` column from the data column
  - use `group by` to calcualte max closing price

### Load
- Partition `DF` by company
- Load the DF in to a managed table called, `max_closing_price_weekly`

In [0]:
import requests
from pyspark.sql.types import StructType, StructField, StringType, FloatType, DateType 
from pyspark.sql.functions import weekofyear, col, max, to_date



url = "https://alpha-vantage.p.rapidapi.com/query"
headers = {
    "X-RapidAPI-Host": "alpha-vantage.p.rapidapi.com",
    "X-RapidAPI-Key": "4f233cc6f1msh55df3814d1f8decp1ba6c2jsn678c171e4410"
}

company_list = {"GOOG","AAPL","MSFT","TSLA"}
stock_data = []

for symbol in company_list: 
    querystring = {
        "function":"TIME_SERIES_DAILY",
        "symbol":symbol,
        "datatype":"json",
        "outputsize":"compact"
    }

    response = requests.get(url, headers=headers, params=querystring)

    data = response.json()
    time_series = data["Time Series (Daily)"]

    for date, details in time_series.items():
       # print(f"Date: {date}, Closing Price: {details}", symbol)
        stock_data.append([
                symbol, 
                date,  
                float(details["1. open"]) ,
                float(details["2. high"]) ,
                float(details["3. low"]),
                float(details["4. close"]),
                float(details["5. volume"])
            ])
schema = StructType([
    StructField("Symbol", StringType(), True),
    StructField("Date", StringType(), True),
    StructField("open", FloatType(), True),
    StructField("high", FloatType(), True),
    StructField("low", FloatType(), True),
    StructField("Close", FloatType(), True),
    StructField("volume", FloatType(), True)
])
df = spark.createDataFrame(stock_data, schema=schema)
df = df.withColumn("Date", to_date(col("Date"), "yyyy-MM-dd"))
df = df.withColumn("Week", weekofyear(col("Date")))


df = df.groupBy("Symbol", "Week").agg(max(col("Close")).alias("Max_Close"))
df.write.partitionBy("Symbol").mode("overwrite").saveAsTable("stock_data_table")
df.show()



+------+----+---------+
|Symbol|Week|Max_Close|
+------+----+---------+
|  GOOG|  11|    169.0|
|  GOOG|   3|   197.55|
|  GOOG|   1|   193.13|
|  GOOG|   9|   181.19|
|  GOOG|  12|   166.57|
|  GOOG|   4|    201.9|
|  GOOG|   6|   207.71|
|  GOOG|   5|    205.6|
|  GOOG|   2|   197.96|
|  GOOG|  10|   175.75|
|  GOOG|   7|    188.2|
|  GOOG|   8|   187.13|
|  GOOG|  45|   182.28|
|  GOOG|  44|   176.14|
|  GOOG|  46|   183.32|
|  GOOG|  48|   170.82|
|  GOOG|  43|   166.99|
|  GOOG|  49|   176.49|
|  GOOG|  52|   197.57|
|  GOOG|  47|   179.58|
+------+----+---------+
only showing top 20 rows



## ETL Job Four: RDBMS


### Extract
Extract RNA data from a public PostgreSQL database.

- https://rnacentral.org/help/public-database
- Extract 100 RNA records from the `rna` table (hint: use `limit` in your sql)
- hint: use `spark.read.jdbc` https://docs.databricks.com/external-data/jdbc.html

### Transform
We want to load the data as it so there is no transformation required.


### Load
Load the DF in to a managed table called, `rna_100_records`

In [0]:
from pyspark.sql import SparkSession


spark = SparkSession.builder \
    .appName("PostgresConnection") \
    .config("spark.jars", "/path/to/postgresql-connector.jar") \
    .getOrCreate()

db_url = "jdbc:postgresql://hh-pgsql-public.ebi.ac.uk:5432/pfmegrnargs"
db_properties = {
    "user": "reader",
    "password": "NWDMCE5xdipIjRrp",
    "driver": "org.postgresql.Driver"
}

df = spark.read.jdbc(url=db_url, table="rna", properties=db_properties)
df = df.limit(100)
df.show()
df.write.mode("overwrite").saveAsTable("rna_100_records")


+--------+-------------+--------------------+---------+----------------+----+--------------------+--------+--------------------+
|      id|          upi|           timestamp|userstamp|           crc64| len|           seq_short|seq_long|                 md5|
+--------+-------------+--------------------+---------+----------------+----+--------------------+--------+--------------------+
| 8992498|URS00008936F2| 2015-10-20 18:04:07|   RNACEN|2730EB2FA9B28C9A|1340|GACGAACGCTGGCGGCG...|    null|8618c00bc0151d140...|
| 8992504|URS00008936F8| 2015-10-20 18:04:07|   RNACEN|82E068BFA7627F08|1377|AGGTCTTCGGACGCTGA...|    null|6220ffbaa3abdf199...|
| 8992505|URS00008936F9| 2015-10-20 18:04:07|   RNACEN|C67A056BB38B781A|1343|GAGTGGCGAACTGGTGA...|    null|bb3ee68aed9a7f9b5...|
| 8992510|URS00008936FE| 2015-10-20 18:04:07|   RNACEN|8D38DF2C3ADE58AA|1343|GACGAACGCTGGCGGCG...|    null|861cdda6f680ace2f...|
| 8992511|URS00008936FF| 2015-10-20 18:04:07|   RNACEN|8F21E0C7591D2A51|1489|ATTGAACGCTGGCGGCA...