## ETL Job One: Parquet file

In [0]:
from pyspark.sql.types import DateType
from pyspark.sql.functions import *

bookings = spark.sql("SELECT * FROM bookings")
facilities = spark.sql("SELECT * FROM facilities")

res = facilities.join(bookings.withColumn("startdate", bookings.starttime.cast(DateType())), facilities.facid == bookings.facid).filter(col("startdate") >= "2012-09-01").filter(col("startdate") < "2012-10-01").groupBy(facilities.facid).sum("slots").withColumnRenamed("sum(slots)","Total Slots").orderBy("Total Slots")

res.write.mode("overwrite").parquet("output/jobone")

## ETL Job Two: Partitions

In [0]:
from pyspark.sql.functions import *

bookings = spark.sql("SELECT * FROM bookings")
members = spark.sql("SELECT * FROM members")
facilities = spark.sql("SELECT * FROM facilities")

res = members.join(bookings, members.memid == bookings.memid).join(facilities, bookings.facid == facilities.facid).filter(col("name").rlike("Tennis Court")).withColumn("member", concat("firstname", lit(" "), "surname")).withColumnRenamed("name", "facility").select("member", "facility").distinct().orderBy("member", "facility")

res.write.mode("overwrite").partitionBy("facility").saveAsTable("threejoin_delta")



## ETL Job Three: HTTP Requests

In [0]:
import requests
from pyspark.sql.types import StructType, StructField, DecimalType, DateType, LongType
from time import sleep
from pyspark.sql.functions import *

symbols = ["GOOG", "AAPL", "MSFT", "TSLA"]

url = "https://alpha-vantage.p.rapidapi.com/query"

querystring = {
    "function": "TIME_SERIES_DAILY",
    "datatype": "json",
    "outputsize": "compact"
}

headers = {
    "X-RapidAPI-Host": "alpha-vantage.p.rapidapi.com",
    "X-RapidAPI-Key": "changeme"
}

all_company_df = None

for symbol in symbols:
    querystring["symbol"] = symbol
    response = requests.get(url, headers=headers, params=querystring)
    data = response.json()

    df = spark.createDataFrame([data["Time Series (Daily)"]], "map<string, map<string, string>>")
    df = df.select(explode(col("value")))
    df = df.select(
        col("key").cast(DateType()).alias("date"),
        col("value").getItem("1. open").cast(DecimalType(10, 4)).alias("open"),
        col("value").getItem("2. high").cast(DecimalType(10, 4)).alias("high"),
        col("value").getItem("3. low").cast(DecimalType(10, 4)).alias("low"),
        col("value").getItem("4. close").cast(DecimalType(10, 4)).alias("close"),
        col("value").getItem("5. volume").cast(LongType()).alias("volume"),
).withColumn("company", lit(symbol))
    
    if not all_company_df:
        all_company_df = df
    else:
        all_company_df = all_company_df.union(df)
    
    # avoid hitting API request rate limit
    sleep(1)

all_company_df = all_company_df.withColumn("week", weekofyear(col("date"))).groupBy("company", "week").agg(max("close")).withColumnRenamed("max(close)", "weekly_max_close")
display(all_company_df)

all_company_df.write.mode("overwrite").partitionBy("company").saveAsTable("max_closing_price_weekly")


company,week,weekly_max_close
GOOG,11,169.0
GOOG,45,182.28
GOOG,44,176.14
GOOG,3,197.55
GOOG,46,183.32
GOOG,1,193.13
GOOG,9,181.19
GOOG,48,170.82
GOOG,12,166.57
GOOG,43,166.99


### Part 1 - Make requests and store raw results

### Part 2 - Transform and load

In [0]:

from pyspark.sql.types import StructType, StructField, DecimalType, DateTyp

schema = StructType([
    StructField("open", DecimalType(10, 4), True),
    StructField("high", DecimalType(10, 4), True),
    StructField("low", DecimalType(10, 4), True),
    StructField("close", DecimalType(10, 4), True),
    StructField("volume", DecimalType(10, 4), True),
    StructField("date", DateType(), True)
])

## ETL Job Four: RDBMS

In [0]:
driver = "org.postgresql.Driver"

database_host = "hh-pgsql-public.ebi.ac.uk"
database_port = "5432"
database_name = "pfmegrnargs"
table = "rna"
user = "reader"
password = "NWDMCE5xdipIjRrp"

url = f"jdbc:postgresql://{database_host}:{database_port}/{database_name}"

remote_table = (spark.read
  .format("jdbc")
  .option("driver", driver)
  .option("url", url)
  .option("dbtable", table)
  .option("user", user)
  .option("password", password)
  .load()
  .limit(100)
)

remote_table.write.mode("overwrite").saveAsTable("rna_100_records")