In [1]:
import pyspark
# import matplotlib.pyplot as plt
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, LongType, FloatType, DateType
from pyspark.sql.functions import isnan, isnull, when, count, col, to_date, udf

from airflow import DAG
from airflow.operators.python import PythonOperator
# from airflow.hooks.postgres_hook import PostgresHook
# from airflow.hooks.S3_hook import S3Hook
# import pandas as pd
from io import StringIO
from datetime import datetime
import re
import pandas as pd
from constants import COVID_EPIDEMIOLOGY_FILE, COVID_HOSPITALIZATIONS_FILE, SP500_FILE, NASDAQ_FILE, COVID_ECONOMY_FILE, COVID_WEATHER_FILE, \
    RDS_ENDPOINT, RDS_PORT, RDS_USERNAME, RDS_PASSWORD, RDS_DB_NAME, JDBC_DRIVER
from aws_helpers import read_s3_to_spark, spark_write_to_rds, spark_read_from_rds
from spark_helpers import get_spark_session_and_context

import psycopg2

# airflow standalone
# give ec2 access to s3 using iam roles

%load_ext autoreload
%autoreload 2

[[34m2024-07-14T16:57:52.452-0600[0m] {[34mutils.py:[0m148} INFO[0m - Note: NumExpr detected 12 cores but "NUMEXPR_MAX_THREADS" not set, so enforcing safe limit of 8.[0m
[[34m2024-07-14T16:57:52.461-0600[0m] {[34mutils.py:[0m160} INFO[0m - NumExpr defaulting to 8 threads.[0m


In [2]:
spark, sc = get_spark_session_and_context()

your 131072x1 screen size is bogus. expect trouble
24/07/14 16:57:54 WARN Utils: Your hostname, PeterNguyen resolves to a loopback address: 127.0.1.1; using 10.255.255.254 instead (on interface lo)
24/07/14 16:57:54 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
24/07/14 16:57:55 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


In [9]:
conn = psycopg2.connect(  # engine
    database=RDS_DB_NAME,
    user=RDS_USERNAME,
    password=RDS_PASSWORD,
    host=RDS_ENDPOINT,
    port=RDS_PORT
)

curr = conn.cursor()
curr.execute("SELECT date FROM sp_five_hundred LIMIT(2);")

rows = curr.fetchall()
for row in rows:
    print(row[0])

2021-03-22
2021-03-19


### Stock Data

In [3]:
sp500_df = read_s3_to_spark(SP500_FILE, spark)  # seemed simpler to read s3 into pandas then spark
nasdaq_df = read_s3_to_spark(NASDAQ_FILE, spark)

sp500_df.printSchema()
sp500_df.show(5)

[[34m2024-07-14T16:58:13.737-0600[0m] {[34mcredentials.py:[0m621} INFO[0m - Found credentials in shared credentials file: ~/.aws/credentials[0m
root
 |-- Date: string (nullable = true)
 |-- Close/Last: double (nullable = true)
 |-- Open: double (nullable = true)
 |-- High: double (nullable = true)
 |-- Low: double (nullable = true)



[Stage 0:>                                                          (0 + 1) / 1]

+----------+----------+-------+-------+-------+
|      Date|Close/Last|   Open|   High|    Low|
+----------+----------+-------+-------+-------+
|07/12/2024|   5615.35|5590.76|5655.56|5590.44|
|07/11/2024|   5584.54|5635.21|5642.45|5576.53|
|07/10/2024|   5633.91|5591.26|5635.39|5586.44|
|07/09/2024|   5576.98|5584.24|5590.75|5574.57|
|07/08/2024|   5572.85|5572.75|5583.11|5562.51|
+----------+----------+-------+-------+-------+
only showing top 5 rows



                                                                                

In [4]:
# convert date format
def format_stock_df(df):
    df = df.withColumn('Date', to_date(col('Date'), 'MM/dd/yyyy')).withColumnRenamed('Date', 'date')  # specify the given date
    df = df.withColumnRenamed('Close/Last', 'close')
    df = df.withColumnRenamed('Open', 'open')
    df = df.withColumnRenamed('High', 'high')
    df = df.withColumnRenamed('Low', 'low')
    return df

In [7]:
sp500_df = format_stock_df(sp500_df)
nasdaq_df = format_stock_df(nasdaq_df)

spark_write_to_rds(sp500_df, table_name="sp_five_hundred")
spark_write_to_rds(nasdaq_df, table_name="nasdaq")

                                                                                

In [4]:
data_file = "data/covid/epidemiology.json"

import json
with open(data_file, "r") as f:
    data = json.load(f)


In [None]:
temp_df = spark.read.json(data_file)

In [None]:
%%configure
{"conf": {"spark.jars.packages": "org.mongodb.spark:mongo-spark-connector_2.11:2.3.2"}}

In [12]:
print(data.keys())
columns = data['columns']

# schema = StructType()
print(type(data['data']))
print(len(data['data']))

dict_keys(['columns', 'data'])
<class 'list'>
12525378


In [None]:
df = sp500_df

print("null count")
df.select([count(when(isnull(c), c)).alias(c) for c in df.columns]).show()

# print("nan count")  # none
# df.select([count(when(isnan(c), c)).alias(c) for c in df.columns]).show()

# Drop Nulls
cleaned_df = df.na.drop()  # subset=['gdp_usd', 'gdp_per_capita_usd']
# df.na.fill({"age": 0, "name": "unknown"}).show()  # Fill missing values


for col_name in ['new_confirmed', 'new_deceased', 'new_recovered', 'new_tested']:
    print(f"Changing {cleaned_df.where(col(col_name) < 0).select(col(col_name)).count()}, negative values in {col_name} to 0")

    cleaned_df = cleaned_df.withColumn(col_name, when(col(col_name) < 0, 0).otherwise(col(col_name)))

# print("Duplicates")  # none
duplicate_rows = cleaned_df.count() - cleaned_df.dropDuplicates().count()
print(f"Number of duplicate rows: {duplicate_rows}")
cleaned_df = cleaned_df.dropDuplicates()
cleaned_df.limit(10).show()
print(f"Reduced {df.count()} rows to {cleaned_df.count()} rows")
claned_epidemiology_df = cleaned_df

In [79]:
# https://github.com/GoogleCloudPlatform/covid-19-open-data/blob/main/docs/table-epidemiology.md

schema = StructType([
    StructField('date', DateType(), True),
    StructField('location_key', StringType(), True),
    StructField('new_confirmed', IntegerType(), True),
    StructField('new_deceased', IntegerType(), True),
    StructField('new_recovered', IntegerType(), True),
    StructField('new_tested', IntegerType(), True),
    StructField('cumulative_confirmed', IntegerType(), True),
    StructField('cumulative_deceased', IntegerType(), True),
    StructField('cumulative_recovered', IntegerType(), True),
    StructField('cumulative_tested', IntegerType(), True)
])

epidemiology_df = spark.read.format("csv").schema(schema).option("header", True).load(COVID_EPIDEMIOLOGY_FILE)

epidemiology_df.limit(5).show()

+----------+------------+-------------+------------+-------------+----------+--------------------+-------------------+--------------------+-----------------+
|      date|location_key|new_confirmed|new_deceased|new_recovered|new_tested|cumulative_confirmed|cumulative_deceased|cumulative_recovered|cumulative_tested|
+----------+------------+-------------+------------+-------------+----------+--------------------+-------------------+--------------------+-----------------+
|2020-01-01|          AD|            0|           0|         null|      null|                   0|                  0|                null|             null|
|2020-01-02|          AD|            0|           0|         null|      null|                   0|                  0|                null|             null|
|2020-01-03|          AD|            0|           0|         null|      null|                   0|                  0|                null|             null|
|2020-01-04|          AD|            0|           0|

In [80]:
df = epidemiology_df

print("null count")
df.select([count(when(isnull(c), c)).alias(c) for c in df.columns]).show()

# print("nan count")  # none
# df.select([count(when(isnan(c), c)).alias(c) for c in df.columns]).show()

# Drop Nulls
cleaned_df = df.na.drop()  # subset=['gdp_usd', 'gdp_per_capita_usd']
# df.na.fill({"age": 0, "name": "unknown"}).show()  # Fill missing values


for col_name in ['new_confirmed', 'new_deceased', 'new_recovered', 'new_tested']:
    print(f"Changing {cleaned_df.where(col(col_name) < 0).select(col(col_name)).count()}, negative values in {col_name} to 0")

    cleaned_df = cleaned_df.withColumn(col_name, when(col(col_name) < 0, 0).otherwise(col(col_name)))

# print("Duplicates")  # none
duplicate_rows = cleaned_df.count() - cleaned_df.dropDuplicates().count()
print(f"Number of duplicate rows: {duplicate_rows}")
cleaned_df = cleaned_df.dropDuplicates()
cleaned_df.limit(10).show()
print(f"Reduced {df.count()} rows to {cleaned_df.count()} rows")
claned_epidemiology_df = cleaned_df

null count


                                                                                

+----+------------+-------------+------------+-------------+----------+--------------------+-------------------+--------------------+-----------------+
|date|location_key|new_confirmed|new_deceased|new_recovered|new_tested|cumulative_confirmed|cumulative_deceased|cumulative_recovered|cumulative_tested|
+----+------------+-------------+------------+-------------+----------+--------------------+-------------------+--------------------+-----------------+
|   0|           0|        50025|      858687|      8545363|   9331336|              198780|            1051000|             8534668|          9512906|
+----+------------+-------------+------------+-------------+----------+--------------------+-------------------+--------------------+-----------------+



                                                                                

Changing 26453, negative values in new_confirmed to 0


                                                                                

Changing 6986, negative values in new_deceased to 0


                                                                                

Changing 1888, negative values in new_recovered to 0


                                                                                

Changing 2769, negative values in new_tested to 0


                                                                                

Number of duplicate rows: 0


                                                                                

+----------+------------+-------------+------------+-------------+----------+--------------------+-------------------+--------------------+-----------------+
|      date|location_key|new_confirmed|new_deceased|new_recovered|new_tested|cumulative_confirmed|cumulative_deceased|cumulative_recovered|cumulative_tested|
+----------+------------+-------------+------------+-------------+----------+--------------------+-------------------+--------------------+-----------------+
|2020-12-23|          AT|         2843|          99|         3115|     38440|              344938|               6994|              307537|          3682136|
|2021-02-07|          AT|          993|          40|         1052|    212647|              420176|               9780|              391279|         10313170|
|2021-06-08|          AT|          356|           4|          607|    412011|              641360|              12964|              620881|         45983005|
|2021-08-15|          AT|          797|           1|



Reduced 12525825 rows to 2450433 rows


                                                                                

In [35]:
# https://github.com/GoogleCloudPlatform/covid-19-open-data/blob/main/docs/table-economy.md

schema = StructType([
    StructField('location_key', StringType(), True),
    StructField('gdp_usd', LongType(), True),
    StructField('gdp_per_capita_usd', IntegerType(), True),
    StructField('human_capital_index', FloatType(), True)
])

economy_df = spark.read.format("csv").schema(schema).option("header", True).load(COVID_ECONOMY_FILE)

economy_df.limit(5).show()

+------------+------------+------------------+-------------------+
|location_key|     gdp_usd|gdp_per_capita_usd|human_capital_index|
+------------+------------+------------------+-------------------+
|          AD|  3154057987|             40886|               null|
|          AE|421142267937|             43103|              0.659|
|          AF| 19101353832|               502|              0.389|
|          AG|  1727759259|             17790|               null|
|          AL| 15278077446|              5352|              0.621|
+------------+------------+------------------+-------------------+



In [34]:
print("null count")
economy_df.select([count(when(isnull(c), c)).alias(c) for c in economy_df.columns]).show()
# print("nan count")  # none
# economy_df.select([count(when(isnan(c), c)).alias(c) for c in economy_df.columns]).show()

# Drop Nulls
cleaned_economy_df = economy_df.na.drop(subset=['gdp_usd', 'gdp_per_capita_usd'])
# df.na.fill({"age": 0, "name": "unknown"}).show()  # Fill missing values

print("TODO: still need to handle human_capital_index")

print(f"Reduced {economy_df.count()} rows to {cleaned_economy_df.count()} rows")

print("Duplicates")  # none
duplicate_rows = cleaned_economy_df.count() - cleaned_economy_df.dropDuplicates().count()
print(f"Number of duplicate rows: {duplicate_rows}")
cleaned_economy_df = cleaned_economy_df.dropDuplicates()
cleaned_economy_df.limit(10).show()

null count
+------------+-------+------------------+-------------------+
|location_key|gdp_usd|gdp_per_capita_usd|human_capital_index|
+------------+-------+------------------+-------------------+
|           0|     31|                39|                248|
+------------+-------+------------------+-------------------+

TODO: still need to handle human_capital_index
Reduced 404 rows to 334 rows
Duplicates
Number of duplicate rows: 0
+------------+------------+------------------+-------------------+
|location_key|     gdp_usd|gdp_per_capita_usd|human_capital_index|
+------------+------------+------------------+-------------------+
|       DE_ST| 73969539000|             33394|               null|
|       IT_25|457916381400|             45548|               null|
|        AT_5| 34272274000|             61832|               null|
|       IT_88| 41212125400|             25016|               null|
|          GN| 13590281808|              1064|              0.374|
|      BE_BRU| 99104176200|

[[34m2024-07-14T14:08:44.722-0600[0m] {[34mcredentials.py:[0m621} INFO[0m - Found credentials in shared credentials file: ~/.aws/credentials[0m
root
 |-- Date: string (nullable = true)
 |-- Close/Last: double (nullable = true)
 |-- Open: double (nullable = true)
 |-- High: double (nullable = true)
 |-- Low: double (nullable = true)



[Stage 0:>                                                          (0 + 1) / 1]

+----------+----------+-------+-------+-------+
|      Date|Close/Last|   Open|   High|    Low|
+----------+----------+-------+-------+-------+
|07/12/2024|   5615.35|5590.76|5655.56|5590.44|
|07/11/2024|   5584.54|5635.21|5642.45|5576.53|
+----------+----------+-------+-------+-------+
only showing top 2 rows



                                                                                

In [81]:
# df.cache()

Below: Not in use

In [78]:
# https://github.com/GoogleCloudPlatform/covid-19-open-data/blob/main/docs/table-weather.md

schema = StructType([
    StructField('date', DateType(), True),
    StructField('location_key', StringType(), True),
    StructField('average_temperature_celsius', FloatType(), True),
    StructField('minimum_temperature_celsius', FloatType(), True),
    StructField('maximum_temperature_celsius', FloatType(), True),
    StructField('rainfall_mm', FloatType(), True),
    StructField('snowfall_mm', FloatType(), True),
    StructField('dew_point', FloatType(), True),
    StructField('relative_humidity', FloatType(), True),
])

weather_df = spark.read.format("csv").schema(schema).option("header", True).load(COVID_WEATHER_FILE)  # 

weather_df.limit(5).show()

+----------+------------+---------------------------+---------------------------+---------------------------+-----------+-----------+---------+-----------------+
|      date|location_key|average_temperature_celsius|minimum_temperature_celsius|maximum_temperature_celsius|rainfall_mm|snowfall_mm|dew_point|relative_humidity|
+----------+------------+---------------------------+---------------------------+---------------------------+-----------+-----------+---------+-----------------+
|2020-01-01|          AD|                   4.236111|                   0.138889|                   8.208333|      3.302|       null|-0.972222|         72.77305|
|2020-01-02|          AD|                      3.875|                  -0.722222|                  10.055556|   6.688667|       null|   -1.625|         70.84132|
|2020-01-03|          AD|                   4.763889|                   0.597222|                   8.402778|     5.0165|       null|-0.611111|         71.11725|
|2020-01-04|          AD|   