In [None]:
BASE_DIR = '/home/thanuja/Dropbox/coursera/Milestone1/data/'

In [None]:
#pyspark intitialization

from pyspark.sql import SparkSession, Window
from pyspark.sql import functions as F
from pyspark.sql.functions import *
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, FloatType
from scipy.stats import pearsonr
from itertools import chain
import warnings
import altair as alt
import numpy as np
import pandas as pd

warnings.filterwarnings('ignore')
spark = SparkSession \
    .builder \
    .master("local[*]") \
    .appName('org queries') \
    .config("spark.driver.memory", "8g")\
    .getOrCreate() 

sc = spark.sparkContext

In [None]:
#2019 payments file
general_payments_df = spark.read.option("header",True)\
    .csv(BASE_DIR + 'OP_DTL_GNRL_PGYR2019_P06302021.csv')

general_payments_df.agg(
    F.sum('Total_Amount_of_Payment_USDollars').alias('sum_payments'),
    F.count('*').alias('num_payments')
).show()

general_payments_df.select(F.countDistinct('Physician_Profile_ID')).show()

In [None]:
hcp_payments_df = general_payments_df.where(
    F.col('Covered_Recipient_Type') == 'Covered Recipient Physician')\
    .select(F.col('Physician_Profile_ID'),
           F.col('Physician_First_Name'),
           F.col('Physician_Middle_Name'),
           F.col('Physician_Last_Name'),
           F.col('Recipient_Primary_Business_Street_Address_Line1'),
           F.col('Recipient_State'),
           F.col('Recipient_City'),
           F.col('Recipient_Zip_Code'),
           F.col('Total_Amount_of_Payment_USDollars'))

# changing payment column to double
hcp_payments_df = hcp_payments_df.withColumn("Total_Amount_of_Payment_USDollars",
                                             hcp_payments_df["Total_Amount_of_Payment_USDollars"].cast('double'))\
                                 .withColumn('zip', F.substring('Recipient_Zip_Code', 1, 5))

physician_fields = ['Physician_Profile_ID','Physician_First_Name', 'Physician_Middle_Name',
                                           'Physician_Last_Name', 'Recipient_Primary_Business_Street_Address_Line1',
                                           'Recipient_State', 'Recipient_City', 'zip']

physician_payments_df = hcp_payments_df.groupBy(physician_fields).agg(
    F.sum('Total_Amount_of_Payment_USDollars').alias('sum_payments')
)

# get average per zip to compare each physician to others in their area
median_percentile = F.expr('percentile_approx(sum_payments, 0.5)')
zip_payments_df = physician_payments_df.groupBy('zip').agg(
    median_percentile.alias('median_payment')
)

physician_payments_df = physician_payments_df.join(on='zip', other=zip_payments_df)


def normalize(raw, median):
    if median is None or raw is None:
        return raw
    #if std is None or avg is None or std == 0:
    #    return raw
    #return (raw - avg) / std
    return raw / median

normalize_udf = F.udf(normalize, FloatType())

physician_payments_df = physician_payments_df.withColumn('normalized_payment',
                     normalize_udf('sum_payments',
                                   'median_payment'))
#hcp_payments_df.select('avg_payment', 'normalized_payment', 'Total_Amount_of_Payment_USDollars').show()

print(physician_payments_df.columns)
'''
#sum payment column for each healthcare provider
hcp_payments_df = hcp_payments_df.groupBy(['Physician_Profile_ID','Physician_First_Name', 'Physician_Middle_Name',
                                           'Physician_Last_Name', 'Recipient_Primary_Business_Street_Address_Line1',
                                           'Recipient_State', 'Recipient_City', 'zip'])\
                                    .agg(F.sum('normalized_payment').alias("sum_payment"),F.sum('Total_Amount_of_Payment_USDollars').alias("total_sum"))
print(hcp_payments_df.columns)
'''

#mapping file
ppi_npi_matches_df_schema = StructType([
    StructField("FirstName", StringType(), True),
    StructField("LastName", StringType(), True),
    StructField("NPI", StringType(), True),
    StructField("Physician_Profile_ID", StringType(), True),
    StructField("Score", FloatType(), True),
    StructField("NatAddr", StringType(), True),
    StructField("AddrScore", FloatType(), True),
    StructField("SupplAddr", StringType(), True),
    StructField("NatStateCity", StringType(), True),
    StructField("StateCityScore", FloatType(), True),
    StructField("SupplStateCity", StringType(), True),
    StructField("NatTaxonomy", StringType(), True),
    StructField("TaxonomyScore", FloatType(), True),
    StructField("SupplTaxonomy", StringType(), True),
    StructField("NatMiddleName", StringType(), True),
    StructField("MiddleNameScore", FloatType(), True),
    StructField("SupplMiddleName", StringType(), True)])


ppi_npi_matches_df = spark.read\
    .csv(BASE_DIR + 'data_processing/filtered_out/filtered_hcp_matches.csv',header=False, schema=ppi_npi_matches_df_schema)
#ppi_npi_matches_df = ppi_npi_matches_df[['NPI', 'Physician_Profile_ID']]

# TODO: adjust payments by cost of living and/or state/zip code average

# join payments file with mapping file to filter out physicians that don't have payment information
physician_payments_df = physician_payments_df.join(on='Physician_Profile_ID', other=ppi_npi_matches_df)
physician_payments_df.select('sum_payments', 'normalized_payment', 'median_payment').show()

In [None]:
#selecting only physician profile id and total payment information from the merged file

hcp_total_payments_df = physician_payments_df.select(F.col('NPI'),F.col('sum_payments'))
hcp_total_payments_df = physician_payments_df.dropna()
hcp_total_payments_df.show()

In [None]:
# performance ratings file
cms_ec_ratings_df = spark.read.options(header='True').csv(BASE_DIR + 'ec_score_file.csv')

cms_ec_ratings_df = cms_ec_ratings_df.withColumnRenamed(' final_MIPS_score', 'final_MIPS_score')
cms_ec_ratings_df = cms_ec_ratings_df[['NPI', 'final_MIPS_score']]

float_cols = ['final_MIPS_score']

#drop rows if all values in score cols have null values
cms_ec_ratings_df = cms_ec_ratings_df.dropna(how='all',subset=float_cols)

#cast float_cols to float
#for col_name in float_cols:
#    cms_org_ratings_df = cms_org_ratings_df.withColumn(col_name, F.col(col_name).cast('float'))
cms_ec_ratings_df.show()
print(cms_ec_ratings_df.columns)

In [None]:
hcp_ec_ratings_pa_df = cms_ec_ratings_df.join(on='NPI', other=physician_payments_df)
hcp_ec_ratings_pa_df.show(truncate=False)
hcp_ec_ratings_pa_df.count()

In [None]:
#join org_payments and org_ratings to merge payments data and perf ratings data at organization level
hcp_ec_ratings_payments_df = cms_ec_ratings_df.join(on='NPI', other=hcp_total_payments_df)
hcp_ec_ratings_payments_df.show(truncate=False)
hcp_ec_ratings_payments_df.count()

In [None]:
hcp_ec_ratings_pa_df.dtypes

In [None]:
hcp_ec_ratings_payments_df.dtypes

In [None]:
# Don't consider payments below this threshold, as they are too small to have much influence on physician behavior
payment_threshold = 10

#covert pyspark df to pandas df to use for our visualizations
hcp_ec_ratings_payments_pddf = hcp_ec_ratings_payments_df.toPandas()
hcp_ec_ratings_payments_pddf['final_MIPS_score'] = hcp_ec_ratings_payments_pddf['final_MIPS_score'].astype(float)
#hcp_ec_ratings_payments_pddf['log_payment'] = np.log(hcp_ec_ratings_payments_pddf['sum_payment'])
hcp_ec_ratings_payments_pddf = hcp_ec_ratings_payments_pddf[hcp_ec_ratings_payments_pddf['final_MIPS_score'] > 30.0]
hcp_ec_ratings_payments_pddf = hcp_ec_ratings_payments_pddf[hcp_ec_ratings_payments_pddf['sum_payments'] > payment_threshold]
#print(hcp_ec_ratings_payments_pddf)
#disable the altair error when dataset rows is > 5000
alt.data_transformers.disable_max_rows()

def zscale(column):
    return (column - column.mean()) / column.std()

lg_payments = zscale(np.log10(hcp_ec_ratings_payments_pddf['sum_payments']))
#lg_payments = hcp_ec_ratings_payments_pddf['sum_payments']
mips_score = zscale(hcp_ec_ratings_payments_pddf['final_MIPS_score'])
print('correlation', lg_payments.corr(mips_score))

#payments are in logarithmic scale to account for outliers
cms_payments_ratings_chart = alt.Chart(
    hcp_ec_ratings_payments_pddf[hcp_ec_ratings_payments_pddf['sum_payments'] < 1000000].sample(5000),
    title='Physician Performance Rating by Total Payments'
).mark_point().encode(
    x=alt.X('sum_payments:Q', title='Total Payments (USD)',
            scale=alt.Scale(type='log', domain=[payment_threshold, 1000000])),
    #x=alt.X('normalized_payment:Q'),
    y=alt.Y('final_MIPS_score:Q', title='Overall Performance Rating',
            scale=alt.Scale(type='linear', domain=[30, 100]))
).properties(width=700, height=200)
cms_payments_ratings_chart
#(payments_ratings_chart).properties(width=800,height=600)

In [None]:
#print(hcp_ec_ratings_pa_df.columns)
hcp_ec_ratings_pa_ab0 = hcp_ec_ratings_pa_df.toPandas()
#print(hcp_ec_ratings_pa_ab0.dtypes)
hcp_ec_ratings_pa_ab0 = hcp_ec_ratings_pa_ab0[hcp_ec_ratings_pa_ab0.sum_payments>0]
hcp_ec_ratings_pa_ab0=hcp_ec_ratings_pa_ab0.sort_values(by='sum_payments', ascending=False)
hcp_ec_ratings_pa_ab0=hcp_ec_ratings_pa_ab0.head(50)
print(hcp_ec_ratings_pa_ab0.head(10))
scale = alt.Scale(
    domain=[30,100],
    range=['pink', 'green'],
    type='linear'
)

alt.Chart(hcp_ec_ratings_pa_ab0).mark_bar().encode(
    x=alt.X('sum_payments:Q', title='Sum of payments for 2019 (USD)'),
    y=alt.Y("Physician_Last_Name:N", title='Physician Last Name', sort='-x'),
    color=alt.Color('final_MIPS_score:Q', title='Final MIPS rating', scale=scale),
).properties(height=700)

In [None]:
score_payment_corr = hcp_ec_ratings_payments_pddf.corr().reset_index()
score_payment_corr

In [None]:
# Taxonomies provided by CMS
hcp_taxonomies =spark.read.options(header='True').csv(BASE_DIR + "Medicare_Provider_and_Supplier_Taxonomy_Crosswalk_October_2021.csv")
#rename columns and remove whitespaces as appropriate.
hcp_taxonomies = hcp_taxonomies.withColumnRenamed("PROVIDER TAXONOMY DESCRIPTION:  TYPE, CLASSIFICATION, SPECIALIZATION","detail_desc")\
                               .withColumnRenamed("MEDICARE PROVIDER/SUPPLIER TYPE DESCRIPTION","hl_desc")\
                               .withColumnRenamed("MEDICARE SPECIALTY CODE","sp_code")\
                               .withColumnRenamed("PROVIDER TAXONOMY CODE","tx_code")
hcp_taxonomies = hcp_taxonomies.withColumn('tx_code', F.trim(hcp_taxonomies.tx_code))
hcp_taxonomies.show(truncate=False)

In [None]:
#create dictionaries with taxcodes as key and value being high level and detailed descriptions of specialities.
tx_codes = hcp_taxonomies.select(F.collect_list('tx_code')).first()[0]
detail_descs = hcp_taxonomies.select(F.collect_list('detail_desc')).first()[0]
tax_detail_dict = dict(zip(tx_codes, detail_descs))

hl_descs = hcp_taxonomies.select(F.collect_list('hl_desc')).first()[0]
tax_hl_dict = dict(zip(tx_codes, hl_descs))

# this file has mapping of specialities between CMS taxonomy file and DAC_NationalDownloadableFile.csv
cms_npi_map = pd.read_csv(BASE_DIR + "mapping_taxonomies.csv")

# dictionary of the mapping cms speciality and specialities listed in DAC_NationalDownloadableFile.csv
cms_nat_dict = dict(zip(cms_npi_map.CMS_SPECIALITY, cms_npi_map.NAT_SPECIALITY))

In [None]:
#physcians supplemental file (fname,lname,mname,address,state,city,zip,taxonomy,speciality)
hcp_suppl_file_df = spark.read.options(header='True').csv(BASE_DIR + "OP_PH_PRFL_SPLMTL_P06302021.csv")

hcp_suppl_file_df = hcp_suppl_file_df.withColumnRenamed("Physician_Profile_First_Name","f_name")\
                                           .withColumnRenamed("Physician_Profile_Last_Name","l_name")\
                                           .withColumnRenamed("Physician_Profile_Alternate_Middle_Name","ma_name")\
                                           .withColumnRenamed("Physician_Profile_Middle_Name","m_name")\
                                           .withColumnRenamed("Physician_Profile_Address_Line_1","adr_ln_1")\
                                           .withColumnRenamed("Physician_Profile_Address_Line_2","adr_ln_2")\
                                           .withColumnRenamed("Physician_Profile_City","city")\
                                           .withColumnRenamed("Physician_Profile_State","state")\
                                           .withColumnRenamed("Physician_Profile_Zipcode","zip")\
                                           .withColumnRenamed("Physician_Profile_OPS_Taxonomy_1","txcode_1")\
                                           .withColumnRenamed("Physician_Profile_OPS_Taxonomy_2","txcode_2")\
                                           .withColumnRenamed("Physician_Profile_OPS_Taxonomy_3","txcode_3")


hcp_suppl_file_df = hcp_suppl_file_df.withColumn('txcode_1', trim(hcp_suppl_file_df.txcode_1))

#separate 5 digit zip from 4 Codes
hcp_suppl_file_df = hcp_suppl_file_df.withColumn("zip", F.regexp_replace("zip", "-", " "))

#add specialities from DAC_NationalDownloadableFile (NAT) to supplemental file for 3 different taxonomy codes.
mapping_expr1 = create_map([lit(x) for x in chain(*tax_hl_dict.items())])
mapping_expr2 = create_map([lit(x) for x in chain(*tax_detail_dict.items())])
mapping_expr3 = create_map([lit(x) for x in chain(*cms_nat_dict.items())])
hcp_suppl_file_df = hcp_suppl_file_df.withColumn("cms_hl_speciality_1", mapping_expr1.getItem(col("txcode_1")))
hcp_suppl_file_df = hcp_suppl_file_df.withColumn("cms_detail_speciality_1", mapping_expr2.getItem(col("txcode_1")))
hcp_suppl_file_df = hcp_suppl_file_df.withColumn("nat_speciality_1", mapping_expr3.getItem(col("txcode_1")))

hcp_suppl_file_df = hcp_suppl_file_df.withColumn("cms_hl_speciality_2", mapping_expr1.getItem(col("txcode_2")))
hcp_suppl_file_df = hcp_suppl_file_df.withColumn("cms_detail_speciality_2", mapping_expr2.getItem(col("txcode_2")))
hcp_suppl_file_df = hcp_suppl_file_df.withColumn("nat_speciality_2", mapping_expr3.getItem(col("txcode_2")))

hcp_suppl_file_df = hcp_suppl_file_df.withColumn("cms_hl_speciality_3", mapping_expr1.getItem(col("txcode_3")))
hcp_suppl_file_df = hcp_suppl_file_df.withColumn("cms_detail_speciality_3", mapping_expr2.getItem(col("txcode_3")))
hcp_suppl_file_df = hcp_suppl_file_df.withColumn("nat_speciality_3", mapping_expr3.getItem(col("txcode_3")))


In [None]:
hcp_payments_suppl_df = hcp_ec_ratings_pa_df.join(on='Physician_Profile_ID', other=hcp_suppl_file_df)
hcp_payments_suppl_spec_df=hcp_payments_suppl_df.groupby("cms_hl_speciality_1").agg(
    sum("sum_payments").alias("payments"),
    count("Physician_Profile_ID").alias("num_doctors")
).sort(desc("payments"))
hcp_payments_suppl_spec_df.show()

In [None]:
hcp_payments_suppl_spec_pddf=hcp_payments_suppl_spec_df.toPandas()

In [None]:
len(hcp_payments_suppl_spec_pddf)

In [None]:
scale = alt.Scale(
    domain=[30,100],
    range=['pink', 'green'],
    type='linear'
)

print(hcp_payments_suppl_spec_pddf.dtypes)

# TODO: color of average rating 
# https://altair-viz.github.io/gallery/top_k_items.html
alt.Chart(hcp_payments_suppl_spec_pddf).mark_bar().encode(
    x=alt.X('payments:Q', title='Total Payments'),
    y=alt.Y("cms_hl_speciality_1:N", sort='-x', title=None),
).transform_window(
    rank='rank(payments)',
    sort=[alt.SortField('payments', order='descending')]
).transform_filter(
    (alt.datum.rank < 10)
).properties(height=150)