
## Imports

In [0]:
import tempfile
import requests
import os
from pyspark.sql import functions as F
from pyspark.sql.functions import concat_ws
from pyspark.sql.functions import col, max, weekofyear, year
from datetime import datetime, timedelta


## Data Preparation 

In [0]:
# File location and type
f1_location = "/FileStore/tables/members.csv"
f2_location = "/FileStore/tables/bookings.csv"
f3_location = "/FileStore/tables/facilities.csv"
file_type = "csv"

# CSV options
infer_schema = "true"
first_row_is_header = "true"
delimiter = ","

# The applied options are for CSV files. For other file types, these will be ignored.
members_df = spark.read.format(file_type) \
  .option("inferSchema", infer_schema) \
  .option("header", first_row_is_header) \
  .option("sep", delimiter) \
  .load(f1_location)

bookings_df = spark.read.format(file_type) \
  .option("inferSchema", infer_schema) \
  .option("header", first_row_is_header) \
  .option("sep", delimiter) \
  .load(f2_location)

facilities_df = spark.read.format(file_type) \
  .option("inferSchema", infer_schema) \
  .option("header", first_row_is_header) \
  .option("sep", delimiter) \
  .load(f3_location)

In [0]:
# Create a view or table
temp_bookings = "bookings"
temp_members = "members"
temp_facilities = "facilities"

members_df.createOrReplaceTempView(temp_members)
bookings_df.createOrReplaceTempView(temp_bookings)
facilities_df.createOrReplaceTempView(temp_facilities)


## ETL Job One: Parquet file

Extract
Extract data from the managed tables (e.g. bookings_csv, members_csv, and facilities_csv)

Transform
Data transformation requirements https://pgexercises.com/questions/aggregates/fachoursbymonth.html

Load
Load data into a parquet file

What is Parquet?
Columnar files are an important technique for optimizing Spark queries. Additionally, they are often tested in interviews.

In [0]:
b = bookings_df.alias("b")

etljob_1 = b.filter(F.col("b.facid").isNotNull() & 
  F.col("b.starttime").between('2012-09-01', '2012-10-01')) \
  .groupBy("b.facid") \
  .agg(F.sum("b.slots").alias("Total_Slots")) \
  .orderBy("Total_Slots")

etljob_1.write.mode("overwrite").parquet("/temp/etl_job1_parquet/")

In [0]:
data = spark.read.parquet("/temp/etl_job1_parquet/")
display(data)

facid,Total_Slots
5,122
3,422
7,426
8,471
6,540
2,570
1,588
0,591
4,648



## ETL Job Two: Partitions

Extract
Extract data from the managed tables (e.g. bookings_csv, members_csv, and facilities_csv)

Transform
Transform the data https://pgexercises.com/questions/joins/threejoin.html

Load
Partition the result data by facility column and then save to threejoin_delta managed table. Additionally, they are often tested in interviews.

In [0]:
# Transform the original sql query into a spark dataframe 
with tempfile.TemporaryDirectory() as d:
    etljob_2 = members_df.join(bookings_df, members_df.memid == bookings_df.memid) \
        .join(facilities_df, bookings_df.facid == facilities_df.facid) \
        .filter(facilities_df.name.like("Tennis Court%")) \
        .select(
            concat_ws(" ", members_df.firstname, members_df.surname).alias("member"),
            facilities_df.name.alias("facility")
        ) \
        .distinct() \
        .orderBy("member", "facility") 

    # Write a etl job 2 into a Parquet file in a partitioned manner by facility.
    etljob_2.write.partitionBy("facility").mode("overwrite").format("parquet").save(d)

    # Read the Parquet file as a DataFrame
    spark.read.parquet(d).show(n=100, truncate=False)

+-----------------+--------------+
|member           |facility      |
+-----------------+--------------+
|Anne Baker       |Tennis Court 1|
|Burton Tracy     |Tennis Court 1|
|Charles Owen     |Tennis Court 1|
|David Farrell    |Tennis Court 1|
|David Jones      |Tennis Court 1|
|David Pinker     |Tennis Court 1|
|Douglas Jones    |Tennis Court 1|
|Erica Crumpet    |Tennis Court 1|
|Florence Bader   |Tennis Court 1|
|GUEST GUEST      |Tennis Court 1|
|Gerald Butters   |Tennis Court 1|
|Jack Smith       |Tennis Court 1|
|Janice Joplette  |Tennis Court 1|
|Jemima Farrell   |Tennis Court 1|
|Joan Coplin      |Tennis Court 1|
|John Hunt        |Tennis Court 1|
|Matthew Genting  |Tennis Court 1|
|Nancy Dare       |Tennis Court 1|
|Ponder Stibbons  |Tennis Court 1|
|Ramnaresh Sarwin |Tennis Court 1|
|Tim Boothe       |Tennis Court 1|
|Tim Rownam       |Tennis Court 1|
|Timothy Baker    |Tennis Court 1|
|Tracy Smith      |Tennis Court 1|
|Anne Baker       |Tennis Court 2|
|Burton Tracy     |T


## ETL Job Three: HTTP Requests

Extract

Extract daily stock price data price from the following companies, Google, Apple, Microsoft, and Tesla.

Data Source

API: https://rapidapi.com/alphavantage/api/alpha-vantage
Endpoint: GET TIME_SERIES_DAILY

In [0]:

def get_stock_data(symbol):
    url = "https://alpha-vantage.p.rapidapi.com/query"
    querystring = {
        "function": "TIME_SERIES_DAILY",
        "symbol": symbol,
        "datatype": "json",
        "outputsize": "compact"
    }
    headers = {
        "X-RapidAPI-Host": "alpha-vantage.p.rapidapi.com",
        "X-RapidAPI-Key": "1751967b0dmshc7a6c9c16b777b4p189a86jsn0acab2cd3173"
    }
    
    response = requests.get(url, headers=headers, params=querystring)
    return response.json()

In [0]:
symbols = ["IBM", "AAPL", "MSFT", "TSLA"]  # stocks
# range
end_date = datetime.today()
start_date = end_date - timedelta(days=8)

# Initialize an empty DataFrame
final_df = None

for symbol in symbols:
    data = get_stock_data(symbol)
    time_series = data.get("Time Series (Daily)", {})

    # Filter data within date range
    filtered_data = [
        {"symbol": symbol, "date": date, **values}
        for date, values in time_series.items()
        if start_date.strftime("%Y-%m-%d") <= date <= end_date.strftime("%Y-%m-%d")
    ]

    # Convert to Spark DataFrame
    df = spark.createDataFrame(filtered_data)
    df = df.withColumn("date", col("date").cast("date"))
    final_df = df if final_df is None else final_df.union(df)

final_df = final_df.withColumn("year", year(col("date")))
final_df = final_df.withColumn("week", weekofyear(col("date")))


In [0]:
max_closing_price_weekly = final_df.groupBy("symbol", "year", "week") \
    .agg(max(col("`4. close`")).alias("max_close_price"))
max_closing_price_weekly.show(n=20, truncate=False)

+------+----+----+---------------+
|symbol|year|week|max_close_price|
+------+----+----+---------------+
|AAPL  |2025|10  |239.0700       |
|AAPL  |2025|11  |227.4800       |
|IBM   |2025|10  |261.5400       |
|IBM   |2025|11  |256.9000       |
|MSFT  |2025|10  |401.0200       |
|MSFT  |2025|11  |383.2700       |
|TSLA  |2025|10  |279.1000       |
|TSLA  |2025|11  |248.0900       |
+------+----+----+---------------+




## ETL Job Four: RDBMS

Extract
Extract RNA data from a public PostgreSQL database.

- https://rnacentral.org/help/public-database
- Extract 100 RNA records from the rna table (hint: use limit in your sql)

In [0]:
# db properties
jdbc_url = "jdbc:postgresql://hh-pgsql-public.ebi.ac.uk:5432/pfmegrnargs"  
username = "reader"
password = "NWDMCE5xdipIjRrp"  # Password taken from https://gist.github.com/AntonPetrov/ec248312feff6acc07a82b4bfb595440#file-example-rnacentral-postgres-script-py

# Read data with JDBC
rna_df = (
    spark.read
    .format("jdbc")
    .option("url", jdbc_url)
    .option("dbtable", "(SELECT * FROM rna LIMIT 100) AS rna_subset")  # Extract 100 RNA records
    .option("user", username)
    .option("password", password)
    .option("driver", "org.postgresql.Driver")
    .load()
)

# Show the extracted data
rna_df.show(n=100, truncate=True)

+--------+-------------+--------------------+---------+----------------+----+--------------------+--------+--------------------+
|      id|          upi|           timestamp|userstamp|           crc64| len|           seq_short|seq_long|                 md5|
+--------+-------------+--------------------+---------+----------------+----+--------------------+--------+--------------------+
| 8988357|URS00008926C5| 2015-10-20 18:04:07|   RNACEN|F9626977AB4E17FB|1336|TCAGCGGCGAACGGGTG...|    null|fe4792a9218a34fde...|
| 8988360|URS00008926C8| 2015-10-20 18:04:07|   RNACEN|DEA611A8ABDE9078|1307|ACTGCTATCGGATTGAT...|    null|5eb946fc85a2e16f4...|
| 8988361|URS00008926C9| 2015-10-20 18:04:07|   RNACEN|AE161A21AF6713C0|1367|AGCCCAGCTTGCTGGGT...|    null|fe4849b1977b5be3c...|
| 8988362|URS00008926CA| 2015-10-20 18:04:07|   RNACEN|03DF15DE82E78D7F|1398|GAGTTTGATCATGGCTC...|    null|c4bb7b410de36a58c...|
| 8988364|URS00008926CC| 2015-10-20 18:04:07|   RNACEN|AE0439B061E1640E|1409|GTCGAACGGTAACAGGA...