# Learning Objectives

In this notebook, you will craft sophisticated ETL jobs that interface with a variety of common data sources, such as 
- REST APIs (HTTP endpoints)
- RDBMS
- Hive tables (managed tables)
- Various file formats (csv, json, parquet, etc.)

d

# Interview Questions

As you progress through the practice, attempt to answer the following questions:

## Columnar File
- What is a columnar file format and what advantages does it offer?
- Why is Parquet frequently used with Spark and how does it function?
- How do you read/write data from/to a Parquet file using a DataFrame?

## Partitions
- How do you save data to a file system by partitions? (Hint: Provide the code)
- How and why can partitions reduce query execution time? (Hint: Give an example)

## JDBC and RDBMS
- How do you load data from an RDBMS into Spark? (Hint: Discuss the steps and JDBC)

## REST API and HTTP Requests
- How can Spark be used to fetch data from a REST API? (Hint: Discuss making API requests)

## ETL Job One: Parquet file
### Extract
Extract data from the managed tables (e.g. `bookings_csv`, `members_csv`, and `facilities_csv`)

### Transform
Data transformation requirements https://pgexercises.com/questions/aggregates/fachoursbymonth.html

### Load
Load data into a parquet file

### What is Parquet? 

Columnar files are an important technique for optimizing Spark queries. Additionally, they are often tested in interviews.
- https://www.youtube.com/watch?v=KLFadWdomyI
- https://www.databricks.com/glossary/what-is-parquet

In [0]:
#Extract data from the managed tables.

bk = spark.sql("select * from booking")
bk.show(5)

mem= spark.sql("SELECT* FROM members")
mem.show(5)

fac= spark.sql("select * from facilities")
fac.show(5)

+------+-----+-----+-------------------+-----+
|bookid|facid|memid|          starttime|slots|
+------+-----+-----+-------------------+-----+
|     0|    3|    1|2012-07-03 11:00:00|    2|
|     1|    4|    1|2012-07-03 08:00:00|    2|
|     2|    6|    0|2012-07-03 18:00:00|    2|
|     3|    7|    1|2012-07-03 19:00:00|    2|
|     4|    8|    1|2012-07-03 10:00:00|    1|
+------+-----+-----+-------------------+-----+
only showing top 5 rows

+-----+--------+---------+--------------------+-------+--------------+-------------+-------------------+
|memid| surname|firstname|             address|zipcode|     telephone|recommendedby|           joindate|
+-----+--------+---------+--------------------+-------+--------------+-------------+-------------------+
|    0|   GUEST|    GUEST|               GUEST|      0|(000) 000-0000|         null|2012-07-01 00:00:00|
|    1|   Smith|   Darren|8 Bloomsbury Clos...|   4321|  555-555-5555|         null|2012-07-02 12:02:00|
|    2|   Smith|    Tracy|8

In [0]:
#Transform data as requested into dataframe.

#Produce a list of the total number of slots booked per facility in the month of September 2012. Produce an output table consisting of facility id and slots, sorted by the number of slots.

from pyspark.sql.functions import col,sum

Slots= bk.filter((col("starttime") >= "2012-09-01") & (col("starttime") < "2012-10-01"))
#Slots.show()

totalslots= Slots.groupBy(bk.facid).agg(sum('slots').alias('TotalSlots')).orderBy('TotalSlots').show()

+-----+----------+
|facid|TotalSlots|
+-----+----------+
|    5|       122|
|    3|       422|
|    7|       426|
|    8|       471|
|    6|       540|
|    2|       570|
|    1|       588|
|    0|       591|
|    4|       648|
+-----+----------+



In [0]:
#Load operation
#Writing dataframe into Parquet file.

totalslots.write.mode(SaveMode.overwrite).parquet("/FileStore/tables/slots.parquet")


[0;31m---------------------------------------------------------------------------[0m
[0;31mAttributeError[0m                            Traceback (most recent call last)
File [0;32m<command-3167512355302901>:4[0m
[1;32m      1[0m [38;5;66;03m#Load operation[39;00m
[1;32m      2[0m [38;5;66;03m#Writing dataframe into Parquet file.[39;00m
[0;32m----> 4[0m [43mtotalslots[49m[38;5;241;43m.[39;49m[43mwrite[49m[38;5;241m.[39mmode(SaveMode[38;5;241m.[39moverwrite)[38;5;241m.[39mparquet([38;5;124m"[39m[38;5;124m/FileStore/tables/slots.parquet[39m[38;5;124m"[39m)

[0;31mAttributeError[0m: 'NoneType' object has no attribute 'write'

In [0]:
#Reading dataframe from Parquet file.
spark.read.parquet("/FileStore/tables/slots.parquet").show()

+-----+----------+
|facid|TotalSlots|
+-----+----------+
|    5|       122|
|    3|       422|
|    7|       426|
|    8|       471|
|    6|       540|
|    2|       570|
|    1|       588|
|    0|       591|
|    4|       648|
+-----+----------+



## ETL Job Two: Partitions

### Extract
Extract data from the managed tables (e.g. `bookings_csv`, `members_csv`, and `facilities_csv`)

### Transform
Transform the data https://pgexercises.com/questions/joins/threejoin.html

### Load
Partition the result data by facility column and then save to `threejoin_delta` managed table. Additionally, they are often tested in interviews.

hint: https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrameWriter.partitionBy.html

What are paritions? 

Partitions are an important technique to optimize Spark queries
- https://www.youtube.com/watch?v=hvF7tY2-L3U&t=268s

In [0]:
### Transform data as per request

#How can you produce a list of all members who have used a tennis court? Include in your output the name of the court, and the name of the member formatted as a single column. Ensure no duplicate data, and order by the member name followed by the facility name.

from pyspark.sql.functions import concat,concat_ws,asc,desc

li= ['Tennis Court 1','Tennis Court 2']

data = mem.join(bk, mem.memid==bk.memid, 'inner')\
.join(fac, bk.facid==fac.facid, 'inner')

data1= data.select(concat_ws(' ',mem.firstname, mem.surname).alias('Name'),fac.name.alias('facility')).filter((fac.name.isin(li)))

tennis= data1.distinct().orderBy('Name','facility')
tennis.show()
#tennis.count()--46



+--------------+--------------+
|          Name|      facility|
+--------------+--------------+
|    Anne Baker|Tennis Court 1|
|    Anne Baker|Tennis Court 2|
|  Burton Tracy|Tennis Court 1|
|  Burton Tracy|Tennis Court 2|
|  Charles Owen|Tennis Court 1|
|  Charles Owen|Tennis Court 2|
|  Darren Smith|Tennis Court 2|
| David Farrell|Tennis Court 1|
| David Farrell|Tennis Court 2|
|   David Jones|Tennis Court 1|
|   David Jones|Tennis Court 2|
|  David Pinker|Tennis Court 1|
| Douglas Jones|Tennis Court 1|
| Erica Crumpet|Tennis Court 1|
|Florence Bader|Tennis Court 1|
|Florence Bader|Tennis Court 2|
|   GUEST GUEST|Tennis Court 1|
|   GUEST GUEST|Tennis Court 2|
|Gerald Butters|Tennis Court 1|
|Gerald Butters|Tennis Court 2|
+--------------+--------------+
only showing top 20 rows



In [0]:
#Loading data 
# Partition the result data by facility column and then save to threejoin_delta managed table.

tennis.write.partitionBy("facility").mode("overwrite").format("parquet").save("/FileStore/tables/threejoin_delta.parquet")



In [0]:
#Reading specific parquet partition.
#Reading data for facility = tennis Court 1.
#spark.read.parquet("/FileStore/tables/threejoin_delta.parquet/facility=Tennis Court 1").count()-- 24
spark.read.parquet("/FileStore/tables/threejoin_delta.parquet/facility=Tennis Court 1").show()

+----------------+
|            Name|
+----------------+
|      Anne Baker|
|    Burton Tracy|
|    Charles Owen|
|   David Farrell|
|     David Jones|
|    David Pinker|
|   Douglas Jones|
|   Erica Crumpet|
|  Florence Bader|
|     GUEST GUEST|
|  Gerald Butters|
|      Jack Smith|
| Janice Joplette|
|  Jemima Farrell|
|     Joan Coplin|
|       John Hunt|
| Matthew Genting|
|      Nancy Dare|
| Ponder Stibbons|
|Ramnaresh Sarwin|
+----------------+
only showing top 20 rows



In [0]:
#Reading specific parquet partition.
#Reading data for facility = tennis Court 2.
#spark.read.parquet("/FileStore/tables/threejoin_delta.parquet/facility=Tennis Court 2").orderBy('name' ,ascending=False).count()-- 22
spark.read.parquet("/FileStore/tables/threejoin_delta.parquet/facility=Tennis Court 2").orderBy('name' ,ascending=False).show()

+-----------------+
|             Name|
+-----------------+
|      Tracy Smith|
|    Timothy Baker|
|       Tim Rownam|
|       Tim Boothe|
| Ramnaresh Sarwin|
|  Ponder Stibbons|
|       Nancy Dare|
|Millicent Purview|
|        John Hunt|
|   Jemima Farrell|
|  Janice Joplette|
|       Jack Smith|
| Henrietta Rumney|
|   Gerald Butters|
|      GUEST GUEST|
|   Florence Bader|
|      David Jones|
|    David Farrell|
|     Darren Smith|
|     Charles Owen|
+-----------------+
only showing top 20 rows



## ETL Job Three: HTTP Requests

### Extract
Extract daily stock price data price from the following companies, Google, Apple, Microsoft, and Tesla. 

Data Source
- API: https://rapidapi.com/alphavantage/api/alpha-vantage
- Endpoint: GET `TIME_SERIES_DAILY`

Sample HTTP request

```
curl --request GET \
	--url 'https://alpha-vantage.p.rapidapi.com/query?function=TIME_SERIES_DAILY&symbol=TSLA&outputsize=compact&datatype=json' \
	--header 'X-RapidAPI-Host: alpha-vantage.p.rapidapi.com' \
	--header 'X-RapidAPI-Key: [YOUR_KEY]'

```

Sample Python HTTP request

```
import requests

url = "https://alpha-vantage.p.rapidapi.com/query"

querystring = {
    "function":"TIME_SERIES_DAILY",
    "symbol":"IBM",
    "datatype":"json",
    "outputsize":"compact"
}

headers = {
    "X-RapidAPI-Host": "alpha-vantage.p.rapidapi.com",
    "X-RapidAPI-Key": "[YOUR_KEY]"
}

response = requests.get(url, headers=headers, params=querystring)

data = response.json()

# Now 'data' contains the daily time series data for "IBM"
```

### Transform
Find **weekly** max closing price for each company.

hints: 
  - Use a `for-loop` to get stock data for each company
  - Use the spark `union` operation to concat all data into one DF
  - create a new `week` column from the data column
  - use `group by` to calcualte max closing price

### Load
- Partition `DF` by company
- Load the DF in to a managed table called, `max_closing_price_weekly`

In [0]:
# Write your solution here
import requests
from pyspark.sql import SparkSession

# Set up your Alpha Vantage API key and base URL
api_key = "4a3c8ab6f6mshf33c91629f0ecd6p194a71jsn0810a5a1a065"
base_url = "https://alpha-vantage.p.rapidapi.com/query"

# List of companies (symbols) you want to fetch data for
#companies = ["GOOGL", "AAPL", "MSFT", "TSLA"]

# Initialize a Spark session
spark = SparkSession.builder.appName("StockPriceAnalysis").getOrCreate()

# Loop through each company and fetch data

#GOOGL
params = {
        "function": "TIME_SERIES_DAILY",
        "symbol": "GOOGL",
        "apikey": api_key,
        "datatype":"json",
        "outputsize":"compact"
    }
headers = {
        "X-RapidAPI-Host": "alpha-vantage.p.rapidapi.com",
        "X-RapidAPI-Key": "4a3c8ab6f6mshf33c91629f0ecd6p194a71jsn0810a5a1a065"
    }
    
response1 = requests.get(base_url, params=params, headers=headers)
data1 = response1.json()

#AAPL
params = {
        "function": "TIME_SERIES_DAILY",
        "symbol": "AAPL",
        "apikey": api_key,
        "datatype":"json",
        "outputsize":"compact"
    }
headers = {
        "X-RapidAPI-Host": "alpha-vantage.p.rapidapi.com",
        "X-RapidAPI-Key": "4a3c8ab6f6mshf33c91629f0ecd6p194a71jsn0810a5a1a065"
    }
    
response2 = requests.get(base_url, params=params, headers=headers)
data2 = response2.json()

#MSFT
params = {
        "function": "TIME_SERIES_DAILY",
        "symbol": "MSFT",
        "apikey": api_key,
        "datatype":"json",
        "outputsize":"compact"
    }
headers = {
        "X-RapidAPI-Host": "alpha-vantage.p.rapidapi.com",
        "X-RapidAPI-Key": "4a3c8ab6f6mshf33c91629f0ecd6p194a71jsn0810a5a1a065"
    }
    
response3 = requests.get(base_url, params=params, headers=headers)
data3 = response3.json()

#TSLA
params = {
        "function": "TIME_SERIES_DAILY",
        "symbol": "TSLA",
        "apikey": api_key,
        "datatype":"json",
        "outputsize":"compact"
    }
headers = {
        "X-RapidAPI-Host": "alpha-vantage.p.rapidapi.com",
        "X-RapidAPI-Key": "4a3c8ab6f6mshf33c91629f0ecd6p194a71jsn0810a5a1a065"
    }
    
response4 = requests.get(base_url, params=params, headers=headers)
data4 = response4.json()

In [0]:
 # Convert JSON data to a PySpark DataFrame

from pyspark.sql.functions import col, to_date, year, weekofyear, max,lit
# Create a list to store DataFrames for each company
dataframes = []
#companies = ["GOOGL", "AAPL", "MSFT", "TSLA"]

#GOOGL

stock_data1 = [(date, float(values["1. open"]), float(values["2. high"]), float(values["3. low"]), float(values["4. close"]), 
                int(values["5. volume"])) for date, values in data1["Time Series (Daily)"].items()]
columns = ["date", "open", "high", "low", "close", "volume"]
stock_df1 = spark.createDataFrame(stock_data1, columns)
    
# Convert date column to a proper date type
stock_df1 = stock_df1.withColumn("date", to_date(col("date")))
stock_df1 = stock_df1.withColumn("company", lit("GOOGL"))
dataframes.append(stock_df1)
#stock_df1.show()  

#AAPL
stock_data2 = [(date, float(values["1. open"]), float(values["2. high"]), float(values["3. low"]), float(values["4. close"]),
                 int(values["5. volume"]))for date, values in data2["Time Series (Daily)"].items()]
columns = ["date", "open", "high", "low", "close", "volume"]
stock_df2 = spark.createDataFrame(stock_data2, columns)
    
# Convert date column to a proper date type
stock_df2 = stock_df2.withColumn("date", to_date(col("date")))
stock_df2 = stock_df2.withColumn("company", lit("AAPL"))
dataframes.append(stock_df2)
#stock_df2.show() 

#MSFT
stock_data3 = [(date, float(values["1. open"]), float(values["2. high"]), float(values["3. low"]), float(values["4. close"]),
                 int(values["5. volume"]))for date, values in data3["Time Series (Daily)"].items()]
columns = ["date", "open", "high", "low", "close", "volume"]
stock_df3 = spark.createDataFrame(stock_data3, columns)
    
# Convert date column to a proper date type
stock_df3 = stock_df3.withColumn("date", to_date(col("date")))
stock_df3 = stock_df3.withColumn("company", lit("MSFT"))
dataframes.append(stock_df3)
#stock_df3.show() 


#TSLA
stock_data4 = [(date, float(values["1. open"]), float(values["2. high"]), float(values["3. low"]), float(values["4. close"]),
                 int(values["5. volume"]))for date, values in data4["Time Series (Daily)"].items()]
columns = ["date", "open", "high", "low", "close", "volume"]
stock_df4 = spark.createDataFrame(stock_data4, columns)
    
# Convert date column to a proper date type
stock_df4 = stock_df4.withColumn("date", to_date(col("date")))
stock_df4 = stock_df4.withColumn("company", lit("TSLA"))
dataframes.append(stock_df4)
#stock_df4.show() 



all_data_df = dataframes[0]
for df in dataframes[1:]:
    all_data_df = all_data_df.union(df)

all_data_df = all_data_df.withColumn("year", year(col("date")))
all_data_df = all_data_df.withColumn("week", weekofyear(col("date")))


# Group by year and week and find the maximum closing price
weekly_max_closing = all_data_df.groupBy("year", "week", "close","company").agg(max("close").alias("max_close")).orderBy("week", ascending= False)

# Show the result
weekly_max_closing.show()


+----+----+------+-------+---------+
|year|week| close|company|max_close|
+----+----+------+-------+---------+
|2023|  34|129.88|  GOOGL|   129.88|
|2023|  34|132.37|  GOOGL|   132.37|
|2023|  34|129.08|  GOOGL|   129.08|
|2023|  34|129.78|  GOOGL|   129.78|
|2023|  34|128.37|  GOOGL|   128.37|
|2023|  34|177.23|   AAPL|   177.23|
|2023|  34|181.12|   AAPL|   181.12|
|2023|  34|176.38|   AAPL|   176.38|
|2023|  34|175.84|   AAPL|   175.84|
|2023|  34|178.61|   AAPL|   178.61|
|2023|  34|322.98|   MSFT|   322.98|
|2023|  34|321.88|   MSFT|   321.88|
|2023|  34| 327.0|   MSFT|    327.0|
|2023|  34|322.46|   MSFT|   322.46|
|2023|  34|319.97|   MSFT|   319.97|
|2023|  34|233.19|   TSLA|   233.19|
|2023|  34|231.28|   TSLA|   231.28|
|2023|  34|230.04|   TSLA|   230.04|
|2023|  34|238.59|   TSLA|   238.59|
|2023|  34|236.86|   TSLA|   236.86|
+----+----+------+-------+---------+
only showing top 20 rows



In [0]:
weekly_max_closing.write.partitionBy("company").mode('overwrite').parquet("/FileStore/tables/weekly_max_closing.parquet")


In [0]:
spark.read.parquet("/FileStore/tables/weekly_max_closing.parquet/company=AAPL").show()

+----+----+-------+---------+
|year|week|  close|max_close|
+----+----+-------+---------+
|2023|  34| 177.23|   177.23|
|2023|  34| 181.12|   181.12|
|2023|  34| 176.38|   176.38|
|2023|  34| 175.84|   175.84|
|2023|  34| 178.61|   178.61|
|2023|  33| 177.45|   177.45|
|2023|  33|  174.0|    174.0|
|2023|  33| 176.57|   176.57|
|2023|  33| 179.46|   179.46|
|2023|  33| 174.49|   174.49|
|2023|  32| 177.97|   177.97|
|2023|  32| 177.79|   177.79|
|2023|  32| 178.19|   178.19|
|2023|  32|  179.8|    179.8|
|2023|  32| 178.85|   178.85|
|2023|  31| 181.99|   181.99|
|2023|  31| 191.17|   191.17|
|2023|  31| 196.45|   196.45|
|2023|  31| 192.58|   192.58|
|2023|  31|195.605|  195.605|
+----+----+-------+---------+
only showing top 20 rows



In [0]:
spark.read.parquet("/FileStore/tables/weekly_max_closing.parquet/company=GOOGL").show()

+----+----+------+---------+
|year|week| close|max_close|
+----+----+------+---------+
|2023|  34|129.88|   129.88|
|2023|  34|132.37|   132.37|
|2023|  34|129.08|   129.08|
|2023|  34|129.78|   129.78|
|2023|  34|128.37|   128.37|
|2023|  33| 128.7|    128.7|
|2023|  33|127.46|   127.46|
|2023|  33|129.92|   129.92|
|2023|  33|129.78|   129.78|
|2023|  33|131.33|   131.33|
|2023|  32|129.69|   129.69|
|2023|  32|129.56|   129.56|
|2023|  32|131.53|   131.53|
|2023|  32|129.66|   129.66|
|2023|  32| 131.4|    131.4|
|2023|  31|132.72|   132.72|
|2023|  31|128.11|   128.11|
|2023|  31|128.45|   128.45|
|2023|  31|131.55|   131.55|
|2023|  31|128.38|   128.38|
+----+----+------+---------+
only showing top 20 rows



In [0]:
spark.read.parquet("/FileStore/tables/weekly_max_closing.parquet/company=TSLA").show()

+----+----+------+---------+
|year|week| close|max_close|
+----+----+------+---------+
|2023|  34|233.19|   233.19|
|2023|  34|231.28|   231.28|
|2023|  34|230.04|   230.04|
|2023|  34|238.59|   238.59|
|2023|  34|236.86|   236.86|
|2023|  33|219.22|   219.22|
|2023|  33|232.96|   232.96|
|2023|  33| 225.6|    225.6|
|2023|  33|215.49|   215.49|
|2023|  33|239.76|   239.76|
|2023|  32|242.65|   242.65|
|2023|  32|245.34|   245.34|
|2023|  32| 249.7|    249.7|
|2023|  32|251.45|   251.45|
|2023|  32|242.19|   242.19|
|2023|  31|261.07|   261.07|
|2023|  31|259.32|   259.32|
|2023|  31|267.43|   267.43|
|2023|  31|254.11|   254.11|
|2023|  31|253.86|   253.86|
+----+----+------+---------+
only showing top 20 rows



## ETL Job Four: RDBMS


### Extract
Extract RNA data from a public PostgreSQL database.

- https://rnacentral.org/help/public-database
- Extract 100 RNA records from the `rna` table (hint: use `limit` in your sql)
- hint: use `spark.read.jdbc` https://docs.databricks.com/external-data/jdbc.html

### Transform
We want to load the data as it so there is no transformation required.


### Load
Load the DF in to a managed table called, `rna_100_records`

In [0]:
#Extract
#Extract RNA data from a public PostgreSQL database. only 100 rows 

rna_100_records = (spark.read
  .format("jdbc")
  .option("url", "jdbc:postgresql://hh-pgsql-public.ebi.ac.uk:5432/pfmegrnargs")
  .option("dbtable", "rna")
  .option("user", "reader")
  .option("password", "NWDMCE5xdipIjRrp")
  .load()
  .limit(100)
)

rna_100_records.count()
rna_100_records.printSchema()




In [0]:
#fetching all rows 
rna_records = (spark.read
  .format("jdbc")
  .option("url", "jdbc:postgresql://hh-pgsql-public.ebi.ac.uk:5432/pfmegrnargs")
  .option("dbtable", "rna")
  .option("user", "reader")
  .option("password", "NWDMCE5xdipIjRrp")
  .load()
)

rna_records.count()



In [0]:
#Load the DF in to a managed table called, rna_100_records

#drop table if exist 

#spark.sql("DROP TABLE IF EXISTS rna_100_record")
rna_100_records.write.saveAsTable("rna_100_records_1")




In [0]:
%sql

Select * from rna_100_records_1 limit 4;


