# Enrich BHI facility information

In [0]:
from pyspark.sql.functions import col, lit, rank, desc, sum, when, lower, concat, split, countDistinct
from pyspark.sql.types import FloatType
from pyspark.sql.window import Window

In [0]:
%run ./Environment

In [0]:
# Read in the claimline table and the mapping tables

claimline = spark.read.table(claimline_table)

ccs_location = spark.read.table(ccs_location_mapping)

procedure_location = spark.read.table(procedure_location_mapping)

In [0]:
claimline_columns = claimline.columns

In [0]:
# Read in the ccs table so that we can map claimline procedure_code to ccs code

ccs_codes = spark.read.table(ccs_code_table) \
                 .select("event_code", "ccs_category_code").distinct() \
                 .filter((col("event_type") == "PROCEDURE") & \
                         (col("event_code").isNotNull()) & \
                         (col("ccs_category_code").isNotNull()))

ccs_codes = ccs_codes.withColumn("ccs_category_code", split("ccs_category_code", "-")[0])

In [0]:
# Select the most probable location for a given provider and procedure code (or css code)

ccs_location = ccs_location.filter(col("encounter_probability_rank") == 1) \
                                           .orderBy(col("location_encounter_rank").asc()) \
                                           .drop_duplicates(["provider_npi", "ccs_category_code"])

procedure_location = procedure_location.filter(col("encounter_probability_rank") == 1) \
                                                       .orderBy(col("location_encounter_rank").asc()) \
                                                       .drop_duplicates(["provider_npi", "procedure_code"])

In [0]:
# Read in relevant fields from the the HGPD tables. Use only the primary npi2.

provider_location = spark.read.table(hgpd_provider_location).select("provider_id", "provider_location_name", "provider_location_id", "location_id", "source", "primary")

location = spark.read.table(hgpd_location).select("city", "state", "zip", "addr_line_1", "addr_line_2", "latitude", "longitude", "location_id", "kli_location_key")

npi1_npi2 = spark.read.table(hgpd_npi1_npi2).select("npi1", "npi2", "rank").filter(col("rank") == 1)

In [0]:
# Join the HGPD tables together

hgpd = provider_location.join(location, "location_id", how="inner").join(npi1_npi2, col("provider_id") == col("npi1"), how="inner")

In [0]:
# Add tags to non-claimline column names so that we can drop them later

ccs_location = ccs_location.select(*(col(i).alias(i + "_") for i in ccs_location.columns))

procedure_location = procedure_location.select(*(col(i).alias(i + "_") for i in procedure_location.columns))

ccs_codes = ccs_codes.select(*(col(i).alias(i + "_") for i in ccs_codes.columns))

hgpd = hgpd.select(*(col(i).alias(i + "_") for i in hgpd.columns))

In [0]:
# Null out the facility_* columns in claimline and we'll fill them in below

columns = ["facility_addresskey",
           "facility_addresssource",
           "facility_city",
           "facility_latitude",
           "facility_longitude",
           "facility_name",
           "facility_npi",
           "facility_primarytaxonomy",
           "facility_state",
           "facility_street",
           "facility_suite",
           "facility_zipcode",
           "bill_type_facility",
           "facility_cbsaname",
           "facility_id",
           "facility_marketname",
           "facility_primarytaxonomydescription",
           "facility_primarytaxonomygroup",
           "facility_region",
           "facility_standardizationmethod",
           "facility_type",
           "nonstandardized_facilityaddresskey"]

for i in columns:
  claimline = claimline.withColumn(i, lit(None).cast(FloatType()))

#####Fill in BHI information using the ccs mapping table

In [0]:
# Join the ccs table to claimline to get the procedure_code to ccs mapping

claimline = claimline.join(ccs_codes, col("procedure_code") == col("event_code_"), how="left")

In [0]:
# Join the ccs mapping table to claimline on rendering provider npi and ccs code, then join to hgpd on provider npi and provider location id

dm = claimline.join(ccs_location, [claimline.RENDERING_PROVIDER_NPI == ccs_location.provider_npi_,
                                                 claimline.ccs_category_code_ == ccs_location.ccs_category_code_], how="left")

dm = dm.join(hgpd, [dm.RENDERING_PROVIDER_NPI == hgpd.provider_id_, dm.provider_location_id_ == hgpd.provider_location_id_], how="left")

In [0]:
# Fill in the BHI fields using the most probable HGPD provider location information

dm = dm.withColumn("facility_npi", when((dm.VENDORNAME == "BHI") & (hgpd.provider_location_id_.isNotNull()), hgpd.npi2_).otherwise(col("facility_npi"))) \
       .withColumn("facility_city", when((dm.VENDORNAME == "BHI") & (hgpd.provider_location_id_.isNotNull()), hgpd.city_).otherwise(col("facility_city"))) \
       .withColumn("facility_state", when((dm.VENDORNAME == "BHI") & (hgpd.provider_location_id_.isNotNull()), hgpd.state_).otherwise(col("facility_state"))) \
       .withColumn("facility_suite", when((dm.VENDORNAME == "BHI") & (hgpd.provider_location_id_.isNotNull()), hgpd.addr_line_2_).otherwise(col("facility_suite"))) \
       .withColumn("facility_street", when((dm.VENDORNAME == "BHI") & (hgpd.provider_location_id_.isNotNull()), hgpd.addr_line_1_).otherwise(col("facility_street"))) \
       .withColumn("facility_zipcode", when((dm.VENDORNAME == "BHI") & (hgpd.provider_location_id_.isNotNull()), hgpd.zip_).otherwise(col("facility_zipcode"))) \
       .withColumn("facility_latitude", when((dm.VENDORNAME == "BHI") & (hgpd.provider_location_id_.isNotNull()), hgpd.latitude_).otherwise(col("facility_latitude"))) \
       .withColumn("facility_longitude", when((dm.VENDORNAME == "BHI") & (hgpd.provider_location_id_.isNotNull()), hgpd.longitude_).otherwise(col("facility_longitude"))) \
       .withColumn("facility_addresskey", when((dm.VENDORNAME == "BHI") & (hgpd.provider_location_id_.isNotNull()), hgpd.kli_location_key_).otherwise(col("facility_addresskey"))) \
       .withColumn("facility_name", when((dm.VENDORNAME == "BHI") & (hgpd.provider_location_id_.isNotNull()), hgpd.provider_location_name_).otherwise(col("facility_name"))) \
       .withColumn("facility_addresssource", when(dm.VENDORNAME == "BHI", "BHIFACILITYFIX").otherwise(col("facility_addresssource"))) \
       .drop(*ccs_codes.columns).drop(*ccs_location.columns).drop(*hgpd.columns)

#####Fill in BHI information using new procedure mapping table

In [0]:
# Join the procedure mapping table to claimline on rendering provider npi and procedure code, then join to hgpd on provider npi and provider location id

dm = claimline.join(procedure_location, [claimline.RENDERING_PROVIDER_NPI == procedure_location.provider_npi_,
                                                       claimline.PROCEDURE_CODE == procedure_location.procedure_code_], how="left")

dm = dm.join(hgpd, [dm.RENDERING_PROVIDER_NPI == hgpd.provider_id_, dm.provider_location_id_ == hgpd.provider_location_id_], how="left")

In [0]:
# Fill in the BHI fields using the most probable HGPD provider location information

dm = dm.withColumn("facility_npi", when((dm.VENDORNAME == "BHI") & (hgpd.provider_location_id_.isNotNull()), hgpd.npi2_).otherwise(col("facility_npi"))) \
       .withColumn("facility_city", when((dm.VENDORNAME == "BHI") & (hgpd.provider_location_id_.isNotNull()), hgpd.city_).otherwise(col("facility_city"))) \
       .withColumn("facility_state", when((dm.VENDORNAME == "BHI") & (hgpd.provider_location_id_.isNotNull()), hgpd.state_).otherwise(col("facility_state"))) \
       .withColumn("facility_suite", when((dm.VENDORNAME == "BHI") & (hgpd.provider_location_id_.isNotNull()), hgpd.addr_line_2_).otherwise(col("facility_suite"))) \
       .withColumn("facility_street", when((dm.VENDORNAME == "BHI") & (hgpd.provider_location_id_.isNotNull()), hgpd.addr_line_1_).otherwise(col("facility_street"))) \
       .withColumn("facility_zipcode", when((dm.VENDORNAME == "BHI") & (hgpd.provider_location_id_.isNotNull()), hgpd.zip_).otherwise(col("facility_zipcode"))) \
       .withColumn("facility_latitude", when((dm.VENDORNAME == "BHI") & (hgpd.provider_location_id_.isNotNull()), hgpd.latitude_).otherwise(col("facility_latitude"))) \
       .withColumn("facility_longitude", when((dm.VENDORNAME == "BHI") & (hgpd.provider_location_id_.isNotNull()), hgpd.longitude_).otherwise(col("facility_longitude"))) \
       .withColumn("facility_addresskey", when((dm.VENDORNAME == "BHI") & (hgpd.provider_location_id_.isNotNull()), hgpd.kli_location_key_).otherwise(col("facility_addresskey"))) \
       .withColumn("facility_name", when((dm.VENDORNAME == "BHI") & (hgpd.provider_location_id_.isNotNull()), hgpd.provider_location_name_).otherwise(col("facility_name"))) \
       .withColumn("facility_addresssource", when(dm.VENDORNAME == "BHI", "BHIFACILITYFIX").otherwise(col("facility_addresssource"))) \
       .drop(*ccs_codes.columns).drop(*procedure_location.columns).drop(*hgpd.columns)

In [0]:
# Make column names uppercase

for i in dm.columns:
    dm = dm.withColumnRenamed(i, i.upper())

In [0]:
dm.select(claimline_columns).write.mode("overwrite").saveAsTable(claimline_bhi_facility_fix)