# Nursing Home Data Cleaning & Merging. 
Using NH 2567 deficiency tags data, merging with Facility-level Characteristics to study `(TBD)`  
**Date: May 24, 2024**  
**Author: Sandeep**  


In [43]:
%load_ext jupyter_black

In [1]:
import os
import re
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt
from pyspark.sql import SparkSession
from pyspark.sql.window import Window
from pyspark.sql.functions import when, col, udf, pandas_udf, PandasUDFType
import pyspark.sql.functions as F
from collections import Counter
from pyspark.sql.types import *

In [None]:
pd.set_option("display.max.rows", 500)

In [185]:
spark = SparkSession.builder.appName("NH_Data").getOrCreate()
spark.sql("set spark.sql.legacy.timeParserPolicy=CORRECTED")

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/05/31 09:50:19 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


DataFrame[key: string, value: string]

In [186]:
# GET DATA
df_duped = spark.read.parquet("nh_set.parquet")

                                                                                

In [187]:
df_duped.columns

['facility_name',
 'facility_id',
 'address',
 'city',
 'state',
 'zip',
 'inspection_date',
 'deficiency_tag',
 'scope_severity',
 'eventid',
 'infctrl',
 'inspect_date',
 'insp_year']

In [188]:
# Drop infctrl -
# df_duped.select("infctrl").groupBy(["infctrl"]).count().show()
df_duped = df_duped.drop("infctrl", "facility_name", "address", "city")

## NH Citations data - 2018-2022
### Data reduction in the deficiency tag dataset (to merge with facility characteristics.) 
Aggregate counts to `{state-facility-year}` observation unit --> aggregate as `number of inspections`, `number of tags per inspection`, `most frequent tag`, `severe tags score`, etc.

**Build all the different variables, first, then join `em all**

#### count of NH homes inpsected (state, year)

actual NH in the state, year may be more since all NHs are not inspected within a calendar year

In [195]:
count_nh_insp_st_yr = (
    df_duped.dropDuplicates(["state", "insp_year", "facility_id"])
    .groupBy("state", "insp_year")
    .agg(F.count(F.lit(1)).alias("count_nh_insp_st_yr"))
)
count_nh_insp_st_yr.show(2)

                                                                                

+-----+---------+-------------------+
|state|insp_year|count_nh_insp_st_yr|
+-----+---------+-------------------+
|   TN|     2020|                146|
|   MT|     2021|                 58|
+-----+---------+-------------------+
only showing top 2 rows



#### count of NH inpsections (state-year-facility level) 
 (more than one inpsection observed for some facilities)
`eventid` is unique for each inspection episode (date)


In [196]:
fac_times_inspected = (
    df_duped.dropDuplicates(["state", "insp_year", "facility_id", "eventid"])
    .groupBy("state", "insp_year", "facility_id")
    .agg(F.count(F.lit(1)).alias("fac_times_inpsected"))
)
fac_times_inspected.show(2)

[Stage 9:>                                                          (0 + 1) / 1]

+-----+---------+-----------+-------------------+
|state|insp_year|facility_id|fac_times_inpsected|
+-----+---------+-----------+-------------------+
|   CA|     2019|     056359|                  2|
|   IN|     2022|     155566|                  4|
+-----+---------+-----------+-------------------+
only showing top 2 rows



                                                                                

#### average labels per year  (by facility, state, year)
(there are may more than 1 inspection episode in a calendar year for a given facility, in which case the labels are aggregated across the episodes)

In [197]:
def_tags_pa = (
    df_duped.dropDuplicates(["state", "insp_year", "facility_id", "deficiency_tag"])
    .groupBy("state", "insp_year", "facility_id")
    .agg(F.count(F.lit(1)).alias("no_of_tags_peryear"))
)

# def_tags_pa.show(2)

#### Deficiency tags based measures
Each Deficiency tag citation goes with a scope severity measure that captures how widespread is that particular problem (CMS defined quality dimension)

**Scope Severity - A thru K**
Labels: (J, K, H, I, F, L) describe the danger imminent to a patient or more patients    
source: https://www.cms.gov/Medicare/Provider-Enrollment-and-Certification/CertificationandComplianc/Downloads/SFFSCORINGMETHODOLOGY.pdf

Note that the scope severity is associated with type of deficiency tag the NH has received. 

In [198]:
# list of severe tags see link above
severe = ["J", "K", "H", "I", "F", "L"]

**Deficiency tags**

In [199]:
df_duped.dropDuplicates(
    ["state", "insp_year", "facility_id", "deficiency_tag"]
).groupBy("state", "insp_year", "facility_id").agg(
    F.collect_list("deficiency_tag").alias("list_tags")
).dropDuplicates().show(
    10
)



+-----+---------+-----------+--------------------+
|state|insp_year|facility_id|           list_tags|
+-----+---------+-----------+--------------------+
|   AK|     2018|     025020|[921, 557, 697, 6...|
|   AK|     2018|     025031|[607, 657, 812, 7...|
|   AK|     2019|     025010|[585, 684, 909, 6...|
|   AK|     2019|     025025|[700, 684, 578, 6...|
|   AK|     2021|     025021|[695, 803, 656, 5...|
|   AK|     2022|     025015|[812, 552, 610, 7...|
|   AK|     2022|     025027|[790, 880, 641, 5...|
|   AK|     2022|     025037|[657, 812, 689, 880]|
|   AL|     2018|     015035|               [657]|
|   AL|     2018|     015042|[814, 880, 565, 812]|
+-----+---------+-----------+--------------------+
only showing top 10 rows



                                                                                

**Pick up tags that is cited to have a scope severity measure that is risky**  
create a list of tags that a have high risk on scope (see cell above on scope severity and  the list `severe`)

In [200]:
# number of severe tags found during the investigation
def func_hr_tag(x, y):
    if len(x) > 0:
        dictXY = dict(zip(x, y))
        newlist = [
            "{}-{}".format(im, key) for im, key in dictXY.items() if key in severe
        ]
        return newlist


func_udf = udf(func_hr_tag, StringType())

In [201]:
df_with_svr_tags = (
    df_duped.dropDuplicates(
        ["state", "insp_year", "facility_id", "deficiency_tag", "scope_severity"]
    )
    .groupBy("state", "insp_year", "facility_id")
    .agg(
        F.collect_list("deficiency_tag").alias("defTag"),
        F.collect_list("scope_severity").alias("scTag"),
    )
    .dropDuplicates()
    .withColumn("new_val", func_udf("defTag", "scTag"))
)

In [155]:
df_with_svr_tags.show(10)



+-----+---------+-----------+--------------------+--------------------+--------------------+
|state|insp_year|facility_id|              defTag|               scTag|             new_val|
+-----+---------+-----------+--------------------+--------------------+--------------------+
|   AK|     2018|     025020|[697, 921, 656, 6...|[E, E, D, E, D, E...|                  []|
|   AK|     2018|     025031|[607, 686, 657, 5...|[F, G, D, F, C, D...|[607-F, 563-F, 81...|
|   AK|     2019|     025010|[837, 585, 839, 6...|[F, F, F, E, D, D...|[837-F, 585-F, 83...|
|   AK|     2021|     025021|[689, 609, 656, 8...|[E, E, E, F, D, E...|      [804-F, 803-F]|
|   AK|     2022|     025015|[812, 700, 641, 7...|[F, E, D, D, D, D...|             [812-F]|
|   AK|     2022|     025027|[812, 790, 883, 6...|[E, D, E, D, E, D...|                  []|
|   AK|     2022|     025037|[689, 812, 657, 880]|        [D, F, D, F]|      [812-F, 880-F]|
|   AL|     2018|     015035|               [657]|                 [D]

                                                                                

**number of severe tags found during the investigation**

In [202]:
# number of severe tags found during the investigation
my_udf = udf(
    lambda pair: len([key for im, key in enumerate(pair) if key in severe]),
    IntegerType(),
)

In [203]:
sever_tags_prsnt = (
    df_duped.dropDuplicates(["state", "insp_year", "facility_id", "scope_severity"])
    .groupBy("state", "insp_year", "facility_id")
    .agg(F.collect_list("scope_severity").alias("scope_list"))
    .withColumn("severe_tags_applied", my_udf(F.col("scope_list")))
)

In [17]:
sever_tags_prsnt.show(3)

[Stage 30:>                                                         (0 + 1) / 1]

+-----+---------+-----------+------------+-------------------+
|state|insp_year|facility_id|  scope_list|severe_tags_applied|
+-----+---------+-----------+------------+-------------------+
|   AK|     2018|     025010|   [E, F, D]|                  1|
|   AK|     2018|     025015|[D, C, E, F]|                  1|
|   AK|     2018|     025018|   [D, F, E]|                  1|
+-----+---------+-----------+------------+-------------------+
only showing top 3 rows



                                                                                

### Merging the different datasets to obtain an aggregated dataset

In [204]:
%who_ls DataFrame

['count_nh_insp_st_yr',
 'dat',
 'def_tags_pa',
 'df_duped',
 'df_with_svr_tags',
 'fac_times_inspected',
 'nh2018',
 'nh2019',
 'nh2020',
 'nh2021',
 'nh2022',
 'nh2023',
 'prvdr',
 'prvdr_sp',
 'sever_tags_prsnt']

In [205]:
list_df = [
    "count_nh_insp_st_yr",
    "def_tags_pa",
    "df_with_svr_tags",
    "fac_times_inspected",
    "sever_tags_prsnt",
]

In [206]:
mm1 = def_tags_pa.join(
    sever_tags_prsnt, on=["state", "insp_year", "facility_id"]
).dropDuplicates()

In [207]:
mm2 = mm1.join(
    fac_times_inspected, on=["state", "insp_year", "facility_id"]
).dropDuplicates()

In [208]:
mm3 = mm2.join(
    df_with_svr_tags, on=["state", "insp_year", "facility_id"]
).dropDuplicates()

In [209]:
mm3.count()

                                                                                

60668

In [210]:
mm3 = mm3.join(
    count_nh_insp_st_yr, on=["state", "insp_year"], how="left"
).dropDuplicates()
mm3.count()

60668

## NH Facility Characteristics - 2018-2022

The .csv files part of the facility level charactersitics download collects all the information in the excel spreadsheet below going back to the last three inspection episodes for each of the NH.  Collect all the spreadsheets over time and merge some of the characteristics to the deficiency tag data.

source: https://data.cms.gov/provider-data/archived-data/nursing-homes

The files/filenames for the `.csv` here contain the text "provider info" in them. There are changes in variable naming format from 2020 onwards. 

In [2]:
# 2018 & 2019 - harmonizing to 2020 and later years
columns_to_rename = {
    "PROVNUM": "Federal Provider Number",
    "OWNERSHIP": "Ownership Type",
    "BEDCERT": "Number of Certified Beds",
    "RESTOT": "Average Number of Residents per Day",
    "CERTIFICATION": "Provider Type",
    "INHOSP": "Provider Resides in Hospital",
    "CHOW_LAST_12MOS": "Provider Changed Ownership in Last 12 Months",
    "RESFAMCOUNCIL": "With a Resident and Family Council",
    "AIDHRD": "Reported Nurse Aide Staffing Hours per Resident per Day",
    "VOCHRD": "Reported LPN Staffing Hours per Resident per Day",
    "RNHRD": "Reported RN Staffing Hours per Resident per Day",
    "SURVEY_RATING": "Overall Rating",
    "TOTLICHRD": "Reported Licensed Staffing Hours per Resident per Day",
    "TOTHRD": "Reported Total Nurse Staffing Hours per Resident per Day",
    "CM_AIDE": "Case-Mix Nurse Aide Staffing Hours per Resident per Day",
    "ADJ_TOTAL": "Adjusted Total Nurse Staffing Hours per Resident per Day",
    "CM_LPN": "Case-Mix LPN Staffing Hours per Resident per Day",
    "CM_RN": "Case-Mix RN Staffing Hours per Resident per Day",
    "CM_TOTAL": "Case-Mix Total Nurse Staffing Hours per Resident per Day",
    "ADJ_AIDE": "Adjusted Nurse Aide Staffing Hours per Resident per Day",
    "ADJ_LPN": "Adjusted LPN Staffing Hours per Resident per Day",
    "ADJ_RN": "Adjusted RN Staffing Hours per Resident per Day",
    "WEIGHTED_ALL_CYCLES_SCORE": "Total Weighted Health Survey Score",
    "INCIDENT_CNT": "Number of Facility Reported Incidents",
    "CMPLNT_CNT": "Number of Substantiated Complaints",
    "FINE_CNT": "Number of Fines",
    "FINE_TOT": "Total Amount of Fines in Dollars",
    "PAYDEN_CNT": "Number of Payment Denials",
    "TOT_PENLTY_CNT": "Total Number of Penalties",
    "FILEDATE": "Processing Date",
}

In [3]:
columns_to_keep = [
    "Federal Provider Number",
    "Ownership Type",
    "Number of Certified Beds",
    "Average Number of Residents per Day",
    "Provider Type",
    "Provider Resides in Hospital",
    # "Date First Approved to Provide Medicare and Medicaid Services",
    # "Continuing Care Retirement Community",
    # "Special Focus Status",
    "Provider Changed Ownership in Last 12 Months",
    "With a Resident and Family Council",
    "Overall Rating",
    # "Health Inspection Rating",
    "Reported Nurse Aide Staffing Hours per Resident per Day",
    "Reported LPN Staffing Hours per Resident per Day",
    "Reported RN Staffing Hours per Resident per Day",
    "Reported Licensed Staffing Hours per Resident per Day",
    "Reported Total Nurse Staffing Hours per Resident per Day",
    "Case-Mix Nurse Aide Staffing Hours per Resident per Day",
    "Case-Mix LPN Staffing Hours per Resident per Day",
    "Case-Mix RN Staffing Hours per Resident per Day",
    "Case-Mix Total Nurse Staffing Hours per Resident per Day",
    "Adjusted Nurse Aide Staffing Hours per Resident per Day",
    "Adjusted LPN Staffing Hours per Resident per Day",
    "Adjusted RN Staffing Hours per Resident per Day",
    "Adjusted Total Nurse Staffing Hours per Resident per Day",
    "Total Weighted Health Survey Score",
    "Number of Facility Reported Incidents",
    "Number of Substantiated Complaints",
    # "Number of Citations from Infection Control Inspections",
    "Number of Fines",
    "Total Amount of Fines in Dollars",
    "Number of Payment Denials",
    "Total Number of Penalties",
    "Processing Date",
]

#### List of Provider Info files from CMS - 2019-2023

In [4]:

list_of_files = [
    "NH_ProviderInfo_Nov2021.csv",
    "NH_ProviderInfo_Nov2020.csv",
    "NH_ProviderInfo_Nov2022.csv",
    "NH_ProviderInfo_Nov2023.csv",
    # 'ProviderInfo_Nov2018.csv',
    "ProviderInfo_Nov2019.csv",
]

In [5]:
FILE_DIR = "./provider_info/"

#### Combine the files for one dataframe

In [6]:
prvdr = pd.DataFrame()
for file_name in list_of_files:
    cond = int(re.search("([0-9]{2,4})", file_name).group(0))
    if cond == 2019:
        dat = pd.read_csv(os.path.join(FILE_DIR, file_name), encoding="latin-1")
        dat.rename(columns=columns_to_rename, inplace=True)
    else:
        dat = pd.read_csv(os.path.join(FILE_DIR, file_name), encoding="latin-1")
        if cond == 2023:
            dat.rename(
                columns={"CMS Certification Number (CCN)": "Federal Provider Number"},
                inplace=True,
            )
    dat = dat[columns_to_keep]
    prvdr = pd.concat([prvdr, dat], ignore_index=True)

In [7]:
# Prefer the short names from 2019 table (using it)
rename_cols = {item: key.lower() for key, item in columns_to_rename.items()}

In [8]:
prvdr.rename(columns=rename_cols, inplace=True)
prvdr.rename(columns={"provnum": "facility_id"}, inplace=True)

In [9]:
prvdr["filedate"] = pd.to_datetime(prvdr.filedate)
prvdr["insp_year"] = prvdr.filedate.dt.year

In [184]:
prvdr.head()

Unnamed: 0,provnum,ownership,bedcert,restot,certification,inhosp,chow_last_12mos,resfamcouncil,survey_rating,aidhrd,...,adj_total,weighted_all_cycles_score,incident_cnt,cmplnt_cnt,fine_cnt,fine_tot,payden_cnt,tot_penlty_cnt,filedate,year
0,15009,For profit - Corporation,57,53.2,Medicare and Medicaid,N,N,Both,5.0,,...,,5.333,0,0,0,0.0,0,0,2021-11-01,2021
1,15010,For profit - Corporation,85,72.5,Medicare and Medicaid,N,N,Both,4.0,2.69344,...,4.98569,18.667,0,0,1,650.0,0,1,2021-11-01,2021
2,15012,Government - County,50,40.3,Medicare and Medicaid,Y,N,Resident,1.0,1.32635,...,2.07468,30.667,0,0,0,0.0,0,0,2021-11-01,2021
3,15014,For profit - Individual,92,68.1,Medicare and Medicaid,N,N,Both,3.0,,...,,12.667,0,0,0,0.0,0,0,2021-11-01,2021
4,15015,For profit - Individual,103,75.7,Medicare and Medicaid,N,N,Resident,3.0,2.23481,...,4.21466,16.0,0,1,4,31235.75,0,4,2021-11-01,2021


In [10]:
prvdr.to_csv("nh_providers_list_cms.csv")

#### Convert facility characteristcs data to pyspark data

In [218]:
prvdr_sp = spark.createDataFrame(prvdr)

# NH Citations & Characteristics data 
## Merge Facility Chars with Tags Data

In [223]:
f_insp_chr = mm3.join(prvdr_sp, on=["insp_year", "facility_id"], how="left")

In [225]:
f_insp_chr.count()

24/05/31 10:11:59 WARN TaskSetManager: Stage 82 contains a task of very large size (1807 KiB). The maximum recommended task size is 1000 KiB.
                                                                                

60668

In [227]:
f_insp_chr.select("insp_year").groupBy("insp_year").count().show()

24/05/31 10:19:46 WARN TaskSetManager: Stage 97 contains a task of very large size (1807 KiB). The maximum recommended task size is 1000 KiB.
                                                                                

+---------+-----+
|insp_year|count|
+---------+-----+
|     2018|13616|
|     2022|11459|
|     2019|13808|
|     2020|10316|
|     2021|11469|
+---------+-----+



In [228]:
f_insp_chr.write.parquet("nh_insp_char_2019_2022")

24/05/31 10:19:56 WARN TaskSetManager: Stage 116 contains a task of very large size (1807 KiB). The maximum recommended task size is 1000 KiB.
24/05/31 10:20:07 WARN MemoryManager: Total allocation exceeds 95.00% (941,359,092 bytes) of heap memory
Scaling row group sizes to 87.67% for 8 writers
                                                                                