In [0]:
# ETL Job One: Parquet file
from pyspark.sql.functions import col, sum

bookings_df = spark.table("bookings")
members_df = spark.table("members")
facilities_df = spark.table("facilities")

# Filter 
filtered_bookings_df = bookings_df.filter(
    (col("starttime") >= "2012-09-01") & (col("starttime") < "2012-10-01")
)

# Group by 
transformed_df = filtered_bookings_df.groupBy("facid").agg(sum("slots").alias("Total Slots"))

# Order by 
result_df = transformed_df.orderBy("Total Slots", ascending=False)

output_path = "/FileStore/tables/total_slots_per_facility_september_2012.parquet"

# Save
result_df.write.mode("overwrite").parquet(output_path)

# Verify
loaded_df = spark.read.parquet(output_path)
loaded_df.show(truncate=False)


+-----+-----------+
|facid|Total Slots|
+-----+-----------+
|4    |648        |
|0    |591        |
|1    |588        |
|2    |570        |
|6    |540        |
|8    |471        |
|7    |426        |
|3    |422        |
|5    |122        |
+-----+-----------+



In [0]:
# ETL Job Two: Partitions
from pyspark.sql.functions import concat_ws, col

bookings_df = spark.table("bookings")
members_df = spark.table("members")
facilities_df = spark.table("facilities")

joined_df = members_df.join(bookings_df, members_df.memid == bookings_df.memid) \
    .join(facilities_df, bookings_df.facid == facilities_df.facid)

# Filter 
filtered_df = joined_df.filter(col("name").isin("Tennis Court 1", "Tennis Court 2"))

# Select 
result_df = filtered_df.select(
    concat_ws(" ", col("firstname"), col("surname")).alias("member"),
    col("name").alias("facility")
).distinct().orderBy("member", "facility")

# Display 
result_df.show(truncate=False)

output_path = "/FileStore/tables/threejoin_delta"

# Save and Partition
result_df.write.mode("overwrite").partitionBy("facility").format("delta").saveAsTable("threejoin_delta")

# Verify 
loaded_df = spark.read.table("threejoin_delta")
loaded_df.show(truncate=False)


+--------------+--------------+
|member        |facility      |
+--------------+--------------+
|Anne Baker    |Tennis Court 1|
|Anne Baker    |Tennis Court 2|
|Burton Tracy  |Tennis Court 1|
|Burton Tracy  |Tennis Court 2|
|Charles Owen  |Tennis Court 1|
|Charles Owen  |Tennis Court 2|
|Darren Smith  |Tennis Court 2|
|David Farrell |Tennis Court 1|
|David Farrell |Tennis Court 2|
|David Jones   |Tennis Court 1|
|David Jones   |Tennis Court 2|
|David Pinker  |Tennis Court 1|
|Douglas Jones |Tennis Court 1|
|Erica Crumpet |Tennis Court 1|
|Florence Bader|Tennis Court 1|
|Florence Bader|Tennis Court 2|
|GUEST GUEST   |Tennis Court 1|
|GUEST GUEST   |Tennis Court 2|
|Gerald Butters|Tennis Court 1|
|Gerald Butters|Tennis Court 2|
+--------------+--------------+
only showing top 20 rows

+----------------+--------------+
|member          |facility      |
+----------------+--------------+
|Anne Baker      |Tennis Court 1|
|Burton Tracy    |Tennis Court 1|
|Charles Owen    |Tennis Court 1|
|D

In [0]:
# ETL Job Three: HTTP Requests
# Import necessary libraries
import requests
from pyspark.sql.functions import col, weekofyear, year, max as spark_max, to_date
from pyspark.sql.types import StructType, StructField, StringType, FloatType

# API details
api_key = "73b430a819mshe696c42860583d4p1f52a5jsn50b55e330925"
url = "https://alpha-vantage.p.rapidapi.com/query"
headers = {
    "X-RapidAPI-Host": "alpha-vantage.p.rapidapi.com",
    "X-RapidAPI-Key": api_key
}

# List of companies
companies = ["GOOGL", "AAPL", "MSFT", "TSLA"]

# Function to fetch data from API
def fetch_stock_data(symbol):
    querystring = {
        "function": "TIME_SERIES_DAILY",
        "symbol": symbol,
        "datatype": "json",
        "outputsize": "compact"
    }
    response = requests.get(url, headers=headers, params=querystring)
    data = response.json()
    return data

# Initialize an empty DataFrame with the correct schema
schema = StructType([
    StructField("date", StringType(), True),
    StructField("company", StringType(), True),
    StructField("closing_price", FloatType(), True)
])

all_data_df = spark.createDataFrame([], schema)

# Loop over each company and fetch the data
for company in companies:
    data = fetch_stock_data(company)
    daily_data = data['Time Series (Daily)']
    
    # Create a list of rows for the current company
    company_data = [(date, company, float(metrics['4. close'])) for date, metrics in daily_data.items()]
    
    # Create a Spark DataFrame for the current company
    company_df = spark.createDataFrame(company_data, schema=schema)
    
    # Union the DataFrame with the main DataFrame
    all_data_df = all_data_df.union(company_df)

# Convert date string to date type
all_data_df = all_data_df.withColumn("date", to_date(col("date"), "yyyy-MM-dd"))

# Transform data: find weekly max closing price
all_data_df = all_data_df.withColumn("year", year(col("date")))
all_data_df = all_data_df.withColumn("week", weekofyear(col("date")))

weekly_max_df = all_data_df.groupBy("company", "year", "week").agg(spark_max("closing_price").alias("max_closing_price"))

# Load data into a managed table, partitioned by company
output_table = "max_closing_price_weekly"
weekly_max_df.write.mode("overwrite").partitionBy("company").format("delta").saveAsTable(output_table)

# Verify the result
result_df = spark.read.table(output_table)
result_df.show(truncate=False)


+-------+----+----+-----------------+
|company|year|week|max_closing_price|
+-------+----+----+-----------------+
|AAPL   |2024|25  |216.67           |
|AAPL   |2024|24  |214.24           |
|AAPL   |2024|26  |214.1            |
|AAPL   |2024|27  |216.75           |
|AAPL   |2024|22  |192.25           |
|AAPL   |2024|23  |196.89           |
|AAPL   |2024|21  |192.35           |
|AAPL   |2024|20  |189.87           |
|AAPL   |2024|19  |184.57           |
|AAPL   |2024|17  |169.89           |
|AAPL   |2024|18  |183.38           |
|AAPL   |2024|14  |170.03           |
|AAPL   |2024|16  |172.69           |
|AAPL   |2024|15  |176.55           |
|AAPL   |2024|13  |173.31           |
|AAPL   |2024|12  |178.67           |
|AAPL   |2024|11  |173.23           |
|AAPL   |2024|9   |182.63           |
|AAPL   |2024|10  |175.1            |
|AAPL   |2024|6   |189.41           |
+-------+----+----+-----------------+
only showing top 20 rows



In [0]:
# ETL Job Four: RDBMS
# Define PostgreSQL connection details
jdbc_url = "jdbc:postgresql://hh-pgsql-public.ebi.ac.uk:5432/pfmegrnargs"
table_name = "(SELECT * FROM rna LIMIT 100) AS rna_limit_100"
connection_properties = {
    "user": "reader",
    "password": "NWDMCE5xdipIjRrp",
    "driver": "org.postgresql.Driver"
}

# Extract data from the PostgreSQL database
rna_df = spark.read.jdbc(url=jdbc_url, table=table_name, properties=connection_properties)

# Display the extracted data
rna_df.show(truncate=False)

+--------+-------------+--------------------------+---------+----------------+----+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------