## 1- Data Ingestion
Source : https://covidtracking.com/data/api

Data of covid 19 cases in USA

In [None]:
#test commit
import requests
import csv
from pyspark.sql import SparkSession
from pyspark.sql.types import *

url = 'https://api.covidtracking.com/v1/states/daily.csv'
response = requests.get(url)

csv_data = csv.reader(response.text.strip().split('\n'))

header = next(csv_data)

fields = [StructField(field_name, StringType(), True) for field_name in header]
schema = StructType(fields)

spark = SparkSession.builder.appName("COVID Tracking Data - Data Ingestion").getOrCreate()

df = spark.createDataFrame(csv_data, schema=schema)
destination_path = '/FileStore/tables/Covid-USA/raw/daily_cases_raw'
df.write.mode('overwrite').format("parquet").save(destination_path)
df.show(10)


+--------+-----+--------+-------------+--------+-------+----------------------+----------------+---------------------+----------------------+--------------+---------------+---------------------+----------------------+---------+---------------+--------------------+-----------+-----+------------+----------------------+--------------------+---------------+------------------+------------------+------------------+--------------+-------------+------------------------+---------------------+------------------+---------------------+---------------------+------------------------+---------------------------+---------------------------+-----------------------+--------------------------+-----------------+--------------------+----+----------------+----------------+-------+------------------------+-------+----------------+-------------+--------------------+--------------------+---------------+--------------------+-------------+-------------+-----+-----+
|    date|state|positive|probableCases|negative

## 2- Data clean and standardize

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import when, regexp_replace, col, trim, to_date

destination_path = '/FileStore/tables/Covid-USA/raw/daily_cases_raw'
spark = SparkSession.builder.appName("COVID Tracking Data - Data cleaning").getOrCreate()
df = spark.read.format("parquet").load(destination_path)
df = df.select("date","state", "positive", "death", "totalTestsViral")

df = df.withColumn("state", trim(df.state))


df = df.withColumn("positive", when(col("positive").isNull(), 0).otherwise(col("positive")))
df = df.withColumn("death", when(col("death").isNull(), 0).otherwise(col("death")))

average_tests = df.selectExpr("avg(totalTestsViral)").collect()[0][0]
df = df.withColumn("totalTestsViral", when(col("totalTestsViral").isNull(), average_tests).otherwise(col("totalTestsViral")))
df = df.withColumn('date', to_date(df['date'], 'yyyyMMdd'))

destination_path = '/FileStore/tables/Covid-USA/bronze/daily_cases_bronze'
df.write.mode('overwrite').format("parquet").save(destination_path)
df.show(10)


+----------+-----+--------+-----+---------------+
|      date|state|positive|death|totalTestsViral|
+----------+-----+--------+-----+---------------+
|2021-01-30|   NE|  190570| 1920|        2090962|
|2021-01-30|   NH|   65362| 1042|        1254902|
|2021-01-30|   NJ|  692543|21455|        9326347|
|2021-01-30|   NM|  173539| 3265|        2328334|
|2021-01-30|   NV|  277349| 4264|               |
|2021-01-30|   NY| 1399863|35036|               |
|2021-01-30|   OH|  892781|11121|        9033143|
|2021-01-30|   OK|  386590| 3504|        3166000|
|2021-01-30|   OR|  141729| 1938|        3162750|
|2021-01-30|   PA|  839239|21602|               |
+----------+-----+--------+-----+---------------+
only showing top 10 rows



## 3- Data preparation

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *

destination_path = '/FileStore/tables/Covid-USA/bronze/daily_cases_bronze'
spark = SparkSession.builder.appName("COVID Tracking Data - Data prep").getOrCreate()
df = spark.read.format("parquet").load(destination_path)

state_dict = {
    'AL': 'Alabama', 'AK': 'Alaska', 'AZ': 'Arizona', 'AR': 'Arkansas', 'CA': 'California', 'CO': 'Colorado',
    'CT': 'Connecticut', 'DE': 'Delaware', 'FL': 'Florida', 'GA': 'Georgia', 'HI': 'Hawaii', 'ID': 'Idaho',
    'IL': 'Illinois', 'IN': 'Indiana', 'IA': 'Iowa', 'KS': 'Kansas', 'KY': 'Kentucky', 'LA': 'Louisiana',
    'ME': 'Maine', 'MD': 'Maryland', 'MA': 'Massachusetts', 'MI': 'Michigan', 'MN': 'Minnesota',
    'MS': 'Mississippi', 'MO': 'Missouri', 'MT': 'Montana', 'NE': 'Nebraska', 'NV': 'Nevada',
    'NH': 'New Hampshire', 'NJ': 'New Jersey', 'NM': 'New Mexico', 'NY': 'New York', 'NC': 'North Carolina',
    'ND': 'North Dakota', 'OH': 'Ohio', 'OK': 'Oklahoma', 'OR': 'Oregon', 'PA': 'Pennsylvania',
    'RI': 'Rhode Island', 'SC': 'South Carolina', 'SD': 'South Dakota', 'TN': 'Tennessee', 'TX': 'Texas',
    'UT': 'Utah', 'VT': 'Vermont', 'VA': 'Virginia', 'WA': 'Washington', 'WV': 'West Virginia',
    'WI': 'Wisconsin', 'WY': 'Wyoming'
}
state_cond = [when(col('state') == k, v).otherwise(None).alias(v) for k, v in state_dict.items()]

df = df.select(['date', *state_cond, col('positive'), col('death'), col('totalTestsViral')])
df = df.withColumn('state', coalesce(*state_dict.values())) 
df = df.drop(*state_dict.keys())

df = df.withColumn("year", year("date"))
df_filtered = df.filter(col('state').isNotNull())
df_grouped = df_filtered.groupBy("state", "year") \
               .agg(sum("positive").alias("total_positive_cases"), 
                    sum("death").alias("total_deaths"))
df_ordered = df_grouped.orderBy("state", "year")
df_ordered = df_ordered.withColumn("total_positive_cases", col("total_positive_cases").cast("integer"))
df_ordered = df_ordered.withColumn("total_deaths", col("total_deaths").cast("integer"))

destination_path = '/FileStore/tables/Covid-USA/silver/daily_cases_silver'
df_ordered.write.mode('overwrite').format("parquet").save(destination_path)
df_ordered.show(10)

+----------+----+--------------------+------------+
|     state|year|total_positive_cases|total_deaths|
+----------+----+--------------------+------------+
|   Alabama|2020|            32225868|      526142|
|   Alabama|2021|            29836458|      512122|
|    Alaska|2020|             2695918|       14079|
|    Alaska|2021|             3462229|       17407|
|   Arizona|2020|            47068180|     1047790|
|   Arizona|2021|            48396191|      870644|
|  Arkansas|2020|            18390160|      285482|
|  Arkansas|2021|            19232376|      316079|
|California|2020|           171681857|     3039476|
|California|2021|           207418602|     2693613|
+----------+----+--------------------+------------+
only showing top 10 rows



## 4- Data Consuming and Visualization

In [None]:
import folium
from pyspark.sql import SparkSession
import pandas as pd

destination_path = '/FileStore/tables/Covid-USA/silver/daily_cases_silver'
spark = SparkSession.builder.appName("COVID Tracking Data - Data Visualization").getOrCreate()
df = spark.read.format("parquet").load(destination_path)
df_pandas = df.toPandas()

us_states_geojson = 'https://raw.githubusercontent.com/python-visualization/folium/master/examples/data/us-states.json'

m = folium.Map(location=[37, -102], zoom_start=4)
folium.Choropleth(
    geo_data=us_states_geojson,
    name='Total Positive Cases',
    data=df_pandas,
    columns=['state', 'total_positive_cases'],
    key_on='feature.properties.name',
    fill_color='YlOrRd',
    fill_opacity=0.7,
    line_opacity=0.2,
    highlight=True,
    legend_name='Total Positive Cases'
).add_to(m)

folium.Choropleth(
    geo_data=us_states_geojson,
    name='Total Deaths',
    data=df_pandas,
    columns=['state', 'total_deaths'],
    key_on='feature.properties.name',
    fill_color='PuBuGn',
    fill_opacity=0.7,
    line_opacity=0.2,
    highlight=True,
    legend_name='Total Deaths'
).add_to(m)

# Add a layer control to the map
folium.LayerControl().add_to(m)

# Display the map
m