In [22]:
import pyspark
import pyspark.sql.functions as F
from pyspark.sql.types import ArrayType, IntegerType

import pandas as pd
import numpy as np

In [2]:
spark = pyspark.sql.SparkSession.builder.getOrCreate()
sc = spark.sparkContext

24/04/21 13:44:04 WARN Utils: Your hostname, Michaels-Laptop.local resolves to a loopback address: 127.0.0.1; using 10.0.0.250 instead (on interface en0)
24/04/21 13:44:04 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/04/21 13:44:05 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [16]:
#import census dataframe
census_df = spark.read.csv('data/NewYork.csv', header = True)

In [17]:
# Prefixes of the column names to drop
prefixes_to_drop = ['M', 'EP', 'SP', 'F']
# Identify columns to drop based on prefixes
columns_to_drop = [column for column in census_df.columns if any(column.startswith(prefix) for prefix in prefixes_to_drop)]
# Drop the columns identified by prefixes
census_df = census_df.drop(*columns_to_drop)

# Additional columns to drop by specific names
cols_to_drop = ['RPL_THEME1', 'RPL_THEME2', 'RPL_THEME3', 'RPL_THEME4', 'E_DAYPOP',
                'E_AFAM', 'E_HISP', 'E_ASIAN', 'E_AIAN', 'E_NHPI','E_TWOMORE', 
                'E_OTHERRACE', 'ST', 'STATE', 'ST_ABBR', 'STCNTY', 'LOCATION']
# Drop the specifically named columns
census_df = census_df.drop(*cols_to_drop)

#renaming a column
census_df = census_df.withColumnRenamed('RPL_THEMES', 'SVI_Rank')  # Ensure the original column name is 'RPL_THEMES'

#drop rows with -999 in SVI Rank
census_df = census_df.filter(census_df.SVI_Rank != -999) 

# Show the first 5 rows of the DataFrame to verify changes
census_df.show(5)



+------+--------------+--------+----+----+--------+-------+-------+--------+---------+-------+-------+--------+--------+--------+--------+-------+--------+-------+-------+--------+--------+-------+
|COUNTY|     AREA_SQMI|E_TOTPOP|E_HU|E_HH|E_POV150|E_UNEMP|E_HBURD|E_NOHSDP|E_UNINSUR|E_AGE65|E_AGE17|E_DISABL|E_SNGPNT|E_LIMENG|E_MINRTY|E_MUNIT|E_MOBILE|E_CROWD|E_NOVEH|E_GROUPQ|SVI_Rank|E_NOINT|
+------+--------------+--------+----+----+--------+-------+-------+--------+---------+-------+-------+--------+--------+--------+--------+-------+--------+-------+-------+--------+--------+-------+
|Albany|0.914079496512|    2029| 909| 769|     687|    167|    349|     251|      177|    280|    565|     231|     149|      86|    1605|     14|       0|     23|     93|      13|  0.8532|    379|
|Albany|0.237787480434|    3263|1861|1382|    1331|     91|    594|     239|      121|    204|    623|     475|     336|      80|    2646|    205|       0|      0|    440|      71|   0.664|    278|
|Albany| 0

In [18]:
#import covid dataframe
covid_df = spark.read.csv('data/United_States_COVID-19_Community_Levels_by_County.csv', header = True)

In [19]:
covid_df.show(5)

+----------------+-----------+---------+-----------------+--------------------------+--------------------+------------------------------+-------------------------------+----------------------------------+--------------------+------------------------+------------+
|          county|county_fips|    state|county_population|health_service_area_number| health_service_area|health_service_area_population|covid_inpatient_bed_utilization|covid_hospital_admissions_per_100k|covid_cases_per_100k|covid-19_community_level|date_updated|
+----------------+-----------+---------+-----------------+--------------------------+--------------------+------------------------------+-------------------------------+----------------------------------+--------------------+------------------------+------------+
|  Lincoln County|      55069|Wisconsin|            27593|                       282|Marathon (Wausau)...|                        291401|                            4.7|                              13.4|    

In [20]:
#filter covid df for NY only and prepare to merge with census data
covid_df = covid_df.filter((covid_df.state == "New York") & (covid_df.date_updated == '2022-02-24'))
ny_covid_county = covid_df.select('county', 'covid-19_community_level')
ny_covid_county = ny_covid_county.withColumn("county_name", F.split(ny_covid_county.county, " ")[0])
ny_covid_county = ny_covid_county.drop('county')
ny_covid_county = ny_covid_county.replace({'New': 'New York', 'St.': 'St. Lawrence'}, subset = 'county_name')
ny_covid_county = ny_covid_county.withColumnRenamed('county_name', 'COUNTY')

ny_covid_county.show()

+------------------------+-----------+
|covid-19_community_level|     COUNTY|
+------------------------+-----------+
|                  Medium|     Albany|
|                     Low|   Allegany|
|                     Low|      Bronx|
|                    High|     Broome|
|                     Low|Cattaraugus|
|                  Medium|     Cayuga|
|                  Medium| Chautauqua|
|                  Medium|    Chemung|
|                     Low|   Chenango|
|                    High|    Clinton|
|                     Low|   Columbia|
|                  Medium|   Cortland|
|                     Low|   Delaware|
|                     Low|   Dutchess|
|                  Medium|       Erie|
|                  Medium|      Essex|
|                    High|   Franklin|
|                     Low|     Fulton|
|                  Medium|    Genesee|
|                     Low|     Greene|
+------------------------+-----------+
only showing top 20 rows



In [28]:
#join dataframes

from pyspark.sql.functions import when, col

joint_df = census_df.join(ny_covid_county, on = 'COUNTY', how = 'left'  )
joint_df = joint_df.withColumnRenamed('covid-19_community_level', 'target')
joint_df = joint_df.drop('COUNTY', 'SVI_Rank')
joint_df = joint_df.replace({999: np.nan})

joint_df = joint_df.withColumn("target", 
                   when(col("target") == "Low", 0)
                   .when(col("target") == "Medium", 1)
                   .when(col("target") == "High", 2)
                   .otherwise(None))

joint_df.show(5)

+--------------+--------+----+----+--------+-------+-------+--------+---------+-------+-------+--------+--------+--------+--------+-------+--------+-------+-------+--------+-------+------+
|     AREA_SQMI|E_TOTPOP|E_HU|E_HH|E_POV150|E_UNEMP|E_HBURD|E_NOHSDP|E_UNINSUR|E_AGE65|E_AGE17|E_DISABL|E_SNGPNT|E_LIMENG|E_MINRTY|E_MUNIT|E_MOBILE|E_CROWD|E_NOVEH|E_GROUPQ|E_NOINT|target|
+--------------+--------+----+----+--------+-------+-------+--------+---------+-------+-------+--------+--------+--------+--------+-------+--------+-------+-------+--------+-------+------+
|0.914079496512|    2029| 909| 769|     687|    167|    349|     251|      177|    280|    565|     231|     149|      86|    1605|     14|       0|     23|     93|      13|    379|     1|
|0.237787480434|    3263|1861|1382|    1331|     91|    594|     239|      121|    204|    623|     475|     336|      80|    2646|    205|       0|      0|    440|      71|    278|     1|
| 0.55656217198|    2153|1039| 913|    1097|    138|   

In [30]:
value_counts_df = joint_df.groupBy("target").count().orderBy(col("count").desc())
value_counts_df.show(5)

+------+-----+
|target|count|
+------+-----+
|     0| 3921|
|     1| 1054|
|     2|  285|
+------+-----+

