In [0]:

# Check out the pre-loaded dataset
#display(dbutils.fs.ls('dbfs:/databricks-datasets/COVID/covid-19-data/'))
display(dbutils.fs.ls('dbfs:/databricks-datasets/COVID/covid-19-data/'))

path,name,size,modificationTime
dbfs:/databricks-datasets/COVID/covid-19-data/.git/,.git/,0,0
dbfs:/databricks-datasets/COVID/covid-19-data/.github/,.github/,0,0
dbfs:/databricks-datasets/COVID/covid-19-data/.gitignore,.gitignore,10,1615898767000
dbfs:/databricks-datasets/COVID/covid-19-data/LICENSE,LICENSE,1289,1615898767000
dbfs:/databricks-datasets/COVID/covid-19-data/NEW-YORK-DEATHS-METHODOLOGY.md,NEW-YORK-DEATHS-METHODOLOGY.md,2771,1615898767000
dbfs:/databricks-datasets/COVID/covid-19-data/NYT-readme.md,NYT-readme.md,1748,1586273566000
dbfs:/databricks-datasets/COVID/covid-19-data/PROBABLE-CASES-NOTE.md,PROBABLE-CASES-NOTE.md,3162,1615898767000
dbfs:/databricks-datasets/COVID/covid-19-data/README.md,README.md,22959,1615898767000
dbfs:/databricks-datasets/COVID/covid-19-data/colleges/,colleges/,0,0
dbfs:/databricks-datasets/COVID/covid-19-data/excess-deaths/,excess-deaths/,0,0


In [0]:
# Display and read README file
spark.read.text('dbfs:/databricks-datasets/COVID/covid-19-data/README.md').display()

value
# Coronavirus (Covid-19) Data in the United States
"**NEW:** As the [us-counties.csv](us-counties.csv) file has grown too large to open in Excel, we're providing a new [us-counties-recent.csv](us-counties-recent.csv) file that contains only the most recent 30 days of data for each county. It is otherwise identical. Both files will continue to be updated."
"**Change:** As of Feb. 10, 2021, we are changing how we report data for a few low-population Alaska geographies to better align with how the state reports data. Data for Bristol Bay Borough and Lake and Peninsula Borough are combined in a new area called ""Bristol Bay plus Lake and Peninsula"", and data for Yakutat City and Borough and Hoonah-Angoon Census Area are combined as ""Yakutat plus Hoonah-Angoon"". Many cases now assigned to those new geographies were previously reported as Unknown. The entire timeseries will be revised to use these new geographies."
**NEW:** We are publishing the data behind our [survey of mask usage](https://www.nytimes.com/interactive/2020/07/17/upshot/coronavirus-face-mask-map.html) in the United States in order to provide researchers a way to understand the role of mask wearing in the course of the pandemic. See the data and documentation in the [mask-use/](mask-use/) directory.
**NEW:** We are publishing the data behind our [excess deaths tracker](https://www.nytimes.com/interactive/2020/04/21/world/coronavirus-missing-deaths.html) in order to provide researchers and the public with a better record of the true toll of the pandemic. This data is compiled from official national and municipal data for 24 countries. See the data and documentation in the [excess-deaths/](excess-deaths/) directory.
---
[ [U.S. Data](us.csv) ([Raw CSV](https://raw.githubusercontent.com/nytimes/covid-19-data/master/us.csv)) | [U.S. State-Level Data](us-states.csv) ([Raw CSV](https://raw.githubusercontent.com/nytimes/covid-19-data/master/us-states.csv)) | [U.S. County-Level Data](us-counties.csv) ([Raw CSV](https://raw.githubusercontent.com/nytimes/covid-19-data/master/us-counties.csv)) ]
"The New York Times is releasing a series of data files with cumulative counts of coronavirus cases in the United States, at the state and county level, over time. We are compiling this time series data from state and local governments and health departments in an attempt to provide a complete record of the ongoing outbreak."
"Since late January, The Times has tracked cases of coronavirus in real time as they were identified after testing. Because of the widespread shortage of testing, however, the data is necessarily limited in the picture it presents of the outbreak."
"We have used this data to power our [maps](https://www.nytimes.com/interactive/2020/us/coronavirus-us-cases.html) and [reporting](https://www.nytimes.com/coronavirus) tracking the outbreak, and it is now being made available to the public in response to requests from researchers, scientists and government officials who would like access to the data to better understand the outbreak."


In [0]:
df = spark.read.csv("dbfs:/databricks-datasets/COVID/covid-19-data/us-states.csv", header=True, inferSchema=True)

# Show the first few rows to understand the structure
df.show(5)

+----------+----------+----+-----+------+
|      date|     state|fips|cases|deaths|
+----------+----------+----+-----+------+
|2020-01-21|Washington|  53|    1|     0|
|2020-01-22|Washington|  53|    1|     0|
|2020-01-23|Washington|  53|    1|     0|
|2020-01-24|  Illinois|  17|    1|     0|
|2020-01-24|Washington|  53|    1|     0|
+----------+----------+----+-----+------+
only showing top 5 rows



In [0]:
states = (spark.read.format('csv')
            .option("header", "true")
            .option("InferSchema", "true")
            .load('dbfs:/databricks-datasets/COVID/covid-19-data/us-states.csv'))

# Display the dataframe
states.display()

date,state,fips,cases,deaths
2020-01-21,Washington,53,1,0
2020-01-22,Washington,53,1,0
2020-01-23,Washington,53,1,0
2020-01-24,Illinois,17,1,0
2020-01-24,Washington,53,1,0
2020-01-25,California,6,1,0
2020-01-25,Illinois,17,1,0
2020-01-25,Washington,53,1,0
2020-01-26,Arizona,4,1,0
2020-01-26,California,6,2,0


In [0]:

# Print schema (in this case it was inferred on the read)
states.printSchema()
     

root
 |-- date: date (nullable = true)
 |-- state: string (nullable = true)
 |-- fips: integer (nullable = true)
 |-- cases: integer (nullable = true)
 |-- deaths: integer (nullable = true)



In [0]:

# Read the us-counties.csv file and infer schema
counties = (spark.read.format('csv')
            .option("header", "true")
            .option("InferSchema", "true")
            .load('dbfs:/databricks-datasets/COVID/covid-19-data/live/us-counties.csv'))
counties.display()
     

date,county,state,fips,cases,deaths,confirmed_cases,confirmed_deaths,probable_cases,probable_deaths
2021-03-12,Autauga,Alabama,1001.0,6409,95.0,5523.0,85.0,886.0,10.0
2021-03-12,Baldwin,Alabama,1003.0,20072,294.0,14228.0,220.0,5844.0,74.0
2021-03-12,Barbour,Alabama,1005.0,2175,52.0,1217.0,35.0,958.0,17.0
2021-03-12,Bibb,Alabama,1007.0,2475,58.0,2009.0,34.0,466.0,24.0
2021-03-12,Blount,Alabama,1009.0,6282,129.0,4835.0,109.0,1447.0,20.0
2021-03-12,Bullock,Alabama,1011.0,1183,39.0,1057.0,29.0,126.0,10.0
2021-03-12,Butler,Alabama,1013.0,2037,66.0,1858.0,60.0,179.0,6.0
2021-03-12,Calhoun,Alabama,1015.0,14034,299.0,10551.0,240.0,3483.0,59.0
2021-03-12,Chambers,Alabama,1017.0,3439,112.0,1711.0,72.0,1728.0,40.0
2021-03-12,Cherokee,Alabama,1019.0,1787,42.0,1152.0,32.0,635.0,10.0


In [0]:
counties.printSchema()

root
 |-- date: date (nullable = true)
 |-- county: string (nullable = true)
 |-- state: string (nullable = true)
 |-- fips: integer (nullable = true)
 |-- cases: integer (nullable = true)
 |-- deaths: integer (nullable = true)
 |-- confirmed_cases: integer (nullable = true)
 |-- confirmed_deaths: integer (nullable = true)
 |-- probable_cases: integer (nullable = true)
 |-- probable_deaths: integer (nullable = true)



# FIRST day in the dataset 
     

In [0]:
from pyspark.sql.functions import to_date

counties = counties.withColumn("date", to_date(counties["date"], "yyyy-MM-dd"))

# Find the first and last date
first_date = counties.agg({"date": "min"}).collect()[0][0]

first_date

Out[10]: datetime.date(2021, 3, 12)

# Last day in the dataset

In [0]:
last_date = counties.agg({"date": "max"}).collect()[0][0]
last_date

Out[11]: datetime.date(2021, 3, 12)

EXTRACTING ONLY CASES AND DEATHS FOR MY REFERENCE

In [0]:
aggregated_counties = counties.groupBy("state").agg(
    {"confirmed_cases": "sum", "confirmed_deaths": "sum"}
).withColumnRenamed("sum(confirmed_cases)", "confirmed_cases") \
 .withColumnRenamed("sum(confirmed_deaths)", "confirmed_deaths")
aggregated_counties.show(15)

+--------------------+---------------+----------------+
|               state|confirmed_cases|confirmed_deaths|
+--------------------+---------------+----------------+
|                Utah|          85210|             655|
|              Hawaii|           null|            null|
|           Minnesota|         469579|            null|
|                Ohio|           null|            null|
|Northern Mariana ...|            146|               2|
|            Arkansas|         256864|            4352|
|              Oregon|          10738|             138|
|               Texas|        2370468|           29319|
|        North Dakota|          95186|            null|
|        Pennsylvania|         834207|            8012|
|         Connecticut|         270354|            6384|
|            Nebraska|          68336|             595|
|             Vermont|           null|            null|
|              Nevada|         298705|            5099|
|         Puerto Rico|          94336|          

#COUNTING NUMBER OF COINFIRMED CASES IN THE STATE CALIFORNIA

In [0]:
aggregated_counties = counties.groupBy("state").agg(
    {"confirmed_cases": "sum"}
).withColumnRenamed("sum(confirmed_cases)", "confirmed_cases")
max_confirmed_state = aggregated_counties.orderBy("confirmed_cases", ascending=False).first()
max_confirmed_state

Out[13]: Row(state='California', confirmed_cases=3595199)

#COUNTING NUMBER OF CONFIRMED DEATHS IN THE STATE CALIFORNIA

In [0]:
aggregated_counties = counties.groupBy("state").agg(
    {"confirmed_deaths": "sum"}
).withColumnRenamed("sum(confirmed_deaths)", "confirmed_deaths")

max_confirmed_state = aggregated_counties.orderBy("confirmed_deaths", ascending=False).first()

max_confirmed_state

Out[14]: Row(state='California', confirmed_deaths=55314)

# Do we have the data for all the states?

In [0]:
unique_states = counties.select("state").distinct().rdd.flatMap(lambda x: x).collect()

In [0]:
# List of all US states (a simplified list for the check)
all_states = [
    'Missouri', 'Delaware', 'Virginia', 'Minnesota', 'Kansas', 'Oregon', 'Hawaii', 'Florida', 'Colorado', 'Ohio', 'Mississippi', 'Michigan', 'Connecticut', 'Rhode Island', 'Pennsylvania', 'Wisconsin', 'Massachusetts', 'Utah', 'South Carolina', 'Alaska', 'New Jersey', 'West Virginia', 'Nevada', 'Texas', 'New York', 'North Dakota', 'Oklahoma', 'Maine', 'Indiana', 'South Dakota', 'Wyoming', 'Iowa', 'Montana', 'Alabama', 'New Mexico', 'Arizona', 'Maryland', 'Arkansas', 'Washington', 'Georgia', 'Illinois', 'Idaho', 'Vermont', 'North Carolina', 'California', 'Nebraska', 'New Hampshire', 'Tennessee', 'Kentucky', 'Louisiana'
]

# Step 3: Check if the dataset contains all states
missing_states = set(all_states) - set(unique_states)

# Step 4: Display the result
if not missing_states:
    print("The dataset contains data for all states.")
else:
    print(f"The dataset is missing data for the following states: {missing_states}")

The dataset contains data for all states.


#How many counties is in each state?

In [0]:
from pyspark.sql.functions import countDistinct

counties_per_state = counties.groupBy("state").agg(
    countDistinct("county").alias("num_counties")
)

# Step 3: Show the results
counties_per_state.show(50, truncate=False)  

+------------------------+------------+
|state                   |num_counties|
+------------------------+------------+
|Utah                    |30          |
|Hawaii                  |5           |
|Minnesota               |88          |
|Ohio                    |89          |
|Northern Mariana Islands|2           |
|Oregon                  |36          |
|Arkansas                |76          |
|Texas                   |254         |
|North Dakota            |54          |
|Pennsylvania            |67          |
|Connecticut             |9           |
|Nebraska                |93          |
|Vermont                 |15          |
|Nevada                  |17          |
|Puerto Rico             |79          |
|Washington              |39          |
|Illinois                |103         |
|Oklahoma                |78          |
|Virgin Islands          |3           |
|District of Columbia    |1           |
|Delaware                |4           |
|Alaska                  |28          |


In [0]:
# Read the README file
spark.read.text('dbfs:/databricks-datasets/COVID/covid-19-data/mask-use/README.md').display()

value
# Mask-Wearing Survey Data
The New York Times is releasing estimates of [mask usage](https://www.nytimes.com/interactive/2020/07/17/upshot/coronavirus-face-mask-map.html) by county in the United States.
"This data comes from a large number of interviews conducted online by the global data and survey firm Dynata at the request of The New York Times. The firm asked a question about mask use to obtain 250,000 survey responses between July 2 and July 14, enough data to provide estimates more detailed than the state level. (Several states have imposed new mask requirements since the completion of these interviews.)"
"Specifically, each participant was asked: _How often do you wear a mask in public when you expect to be within six feet of another person?_"
"This survey was conducted a single time, and at this point we have no plans to update the data or conduct the survey again."
## Data
Data on the estimated prevalence of mask-wearing in counties in the United States can be found in the **[mask-use-by-county.csv](mask-use-by-county.csv)** file. ([Raw CSV](https://raw.githubusercontent.com/nytimes/covid-19-data/master/mask-use/mask-use-by-county.csv))
```
"COUNTYFP,NEVER,RARELY,SOMETIMES,FREQUENTLY,ALWAYS"
"01001,0.053,0.074,0.134,0.295,0.444"


#Created Data frame for mask-use

In [0]:
dataframe = (spark.read.format('csv')
            .option("header", "true")
            .option("InferSchema", "true")
            .load('dbfs:/databricks-datasets/COVID/covid-19-data/mask-use/mask-use-by-county.csv'))
dataframe.display()

COUNTYFP,NEVER,RARELY,SOMETIMES,FREQUENTLY,ALWAYS
1001,0.053,0.074,0.134,0.295,0.444
1003,0.083,0.059,0.098,0.323,0.436
1005,0.067,0.121,0.12,0.201,0.491
1007,0.02,0.034,0.096,0.278,0.572
1009,0.053,0.114,0.18,0.194,0.459
1011,0.031,0.04,0.144,0.286,0.5
1013,0.102,0.053,0.257,0.137,0.451
1015,0.152,0.108,0.13,0.167,0.442
1017,0.117,0.037,0.15,0.136,0.56
1019,0.135,0.027,0.161,0.158,0.52


Mask wearing values in mask group colums (almost_never (NEVER+RARELY) and almost_always (FREQUENTLY+ALWAYS)) and other colum is for remaining values

In [0]:

from pyspark.sql.functions import when, col
dataframe_long = dataframe.selectExpr(
    "COUNTYFP",
    "stack(5, 'NEVER', NEVER, 'RARELY', RARELY, 'SOMETIMES', SOMETIMES, 'FREQUENTLY', FREQUENTLY, 'ALWAYS', ALWAYS) as (mask_wearing, count)"
)

dataframe_long = dataframe_long.withColumn(
    "mask_groups",
    when(col("mask_wearing").isin(["NEVER", "RARELY"]), "almost_never")
    .when(col("mask_wearing").isin(["FREQUENTLY", "ALWAYS"]), "almost_always")
    .otherwise("other")
)
mask_groups_dataframe = dataframe_long.groupBy("COUNTYFP", "mask_groups").sum("count")
#mask_groups_dataframe = dataframe_long.groupBy("mask_groups").sum("count")
mask_groups_dataframe.show()

+--------+-------------+-------------------+
|COUNTYFP|  mask_groups|         sum(count)|
+--------+-------------+-------------------+
|    1009|almost_always|              0.653|
|    1015| almost_never|               0.26|
|    1091|almost_always|              0.698|
|    6041| almost_never|              0.011|
|    6041|almost_always| 0.9430000000000001|
|    8041|almost_always| 0.7989999999999999|
|    8069|almost_always|               0.89|
|    8085| almost_never|              0.177|
|   13069| almost_never|0.20900000000000002|
|   13175|        other|              0.141|
|   17037|almost_always| 0.9470000000000001|
|   17061| almost_never|              0.165|
|   18139|almost_always|              0.687|
|   18157|        other|              0.102|
|   20207|        other|              0.171|
|   25009| almost_never|              0.033|
|   27043|almost_always|              0.601|
|   27135|        other|              0.211|
|   28029|almost_always| 0.7090000000000001|
|   28031|

In [0]:
print("mask_groups_dataframe Columns:", mask_groups_dataframe.columns)
print("dataframe_counties Columns:", dataframe.columns)


mask_groups_dataframe Columns: ['COUNTYFP', 'mask_groups', 'sum(count)']
dataframe_counties Columns: ['COUNTYFP', 'NEVER', 'RARELY', 'SOMETIMES', 'FREQUENTLY', 'ALWAYS']


In [0]:
from pyspark.sql.functions import col

mask_groups_dataframe = mask_groups_dataframe.withColumn("COUNTYFP", col("COUNTYFP").cast("string"))
dataframe_counties = dataframe.withColumn("COUNTYFP", col("COUNTYFP").cast("string"))


#Join tables mask_group and counties

In [0]:
final_dataframe = mask_groups_dataframe.join(
    dataframe_counties,  
    on="COUNTYFP",  # Assuming COUNTYFP is the correct column to join on
    how="inner"
)

In [0]:
print(f"Mask Groups count: {mask_groups_dataframe.count()}")
print(f"Mask Use count: {dataframe_counties.count()}")
print(f"Joined DataFrame count: {final_dataframe.count()}")


Mask Groups count: 9426
Mask Use count: 3142
Joined DataFrame count: 9426


counting lines of counties, mask group, mask use and joint data frame

In [0]:
print(f"Counties count: {dataframe_counties.count()}")
print(f"Mask Groups count: {mask_groups_dataframe.count()}")
print(f"Mask Use count: {dataframe_counties.count()}")
print(f"Joined DataFrame count: {final_dataframe.count()}")


Counties count: 3142
Mask Groups count: 9426
Mask Use count: 3142
Joined DataFrame count: 9426


In [0]:
print(final_dataframe.columns)


['COUNTYFP', 'mask_groups', 'sum(count)', 'NEVER', 'RARELY', 'SOMETIMES', 'FREQUENTLY', 'ALWAYS']


In [0]:
final_dataframe.show()

+--------+-------------+-------------------+-----+------+---------+----------+------+
|COUNTYFP|  mask_groups|         sum(count)|NEVER|RARELY|SOMETIMES|FREQUENTLY|ALWAYS|
+--------+-------------+-------------------+-----+------+---------+----------+------+
|    1009|almost_always|              0.653|0.053| 0.114|     0.18|     0.194| 0.459|
|    1015| almost_never|               0.26|0.152| 0.108|     0.13|     0.167| 0.442|
|    1091|almost_always|              0.698|0.033| 0.151|    0.118|     0.186| 0.512|
|    6041| almost_never|              0.011|0.011|   0.0|    0.046|     0.141| 0.802|
|    6041|almost_always| 0.9430000000000001|0.011|   0.0|    0.046|     0.141| 0.802|
|    8041|almost_always| 0.7989999999999999|0.031| 0.089|    0.081|     0.196| 0.603|
|    8069|almost_always|               0.89|0.013| 0.037|     0.06|     0.231| 0.659|
|    8085| almost_never|              0.177|0.085| 0.092|    0.074|     0.314| 0.436|
|   13069| almost_never|0.20900000000000002|0.085| 0.1

Keeping data for only one state california

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
state_selected = "California"
state_df = counties.filter(col("state") == state_selected)
#state_df = final_dataframe.filter(col("STATE") == state_selected)
state_df.show()


+----------+------------+----------+----+-------+------+---------------+----------------+--------------+---------------+
|      date|      county|     state|fips|  cases|deaths|confirmed_cases|confirmed_deaths|probable_cases|probable_deaths|
+----------+------------+----------+----+-------+------+---------------+----------------+--------------+---------------+
|2021-03-12|     Alameda|California|6001|  81887|  1327|          81887|            1327|          null|           null|
|2021-03-12|      Alpine|California|6003|     78|     0|             78|               0|          null|           null|
|2021-03-12|      Amador|California|6005|   3497|    41|           3497|              41|          null|           null|
|2021-03-12|       Butte|California|6007|  11110|   170|          11110|             170|          null|           null|
|2021-03-12|   Calaveras|California|6009|   1915|    50|           1915|              50|          null|           null|
|2021-03-12|      Colusa|Califor