In [0]:
#ran with google colab
# Install Java, Spark, and Findspark
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://www-us.apache.org/dist/spark/spark-2.4.5/spark-2.4.5-bin-hadoop2.7.tgz
!tar xf spark-2.4.5-bin-hadoop2.7.tgz
!pip install -q findspark


In [0]:
# Set Environment Variables
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.4.5-bin-hadoop2.7"
#import dependencies for spark
from pyspark.sql import SparkSession
from pyspark import SparkFiles
from pyspark.sql.types import DoubleType
# Start a SparkSession
import findspark
findspark.init()

In [0]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("healthcare_fraud").config("spark.driver.extraClassPath","/content/postgresql-42.2.9.jar").getOrCreate()

In [0]:
# Load in user_data.csv from S3 into a DataFrame
url = "https://healthcare-fraud-project.s3.amazonaws.com/Opiod_drug_names.csv"
spark.sparkContext.addFile(url)
opiodNames = spark.read.option('header', 'true').csv(SparkFiles.get("Opiod_drug_names.csv"), inferSchema=True, sep=',', timestampFormat="mm/dd/yy")


In [0]:
opiodNames.count()
#dependency to save CSVs after cleaned onto drive so do not need to download again
from google.colab import files
#save as a csv so we never have to do this again!
opiodNames.toPandas().to_csv("opiodNames.csv")

In [0]:
# Load in user_data.csv from S3 into a DataFrame
url = "https://healthcare-fraud-project.s3.amazonaws.com/OP_DTL_GNRL_PGYR2016_P01172020.csv"
spark.sparkContext.addFile(url)
df = spark.read.option('header', 'true').csv(SparkFiles.get("OP_DTL_GNRL_PGYR2016_P01172020.csv"), inferSchema=True, sep=',', timestampFormat="mm/dd/yy")

In [0]:
#drop unnecessary columns
col_dropped = ['Teaching_Hospital_CCN','Teaching_Hospital_ID','Teaching_Hospital_Name','Physician_Middle_Name','Physician_Name_Suffix','Recipient_Province','Recipient_Postal_Code','Recipient_Country','Physician_License_State_code2','Physician_License_State_code3','Physician_License_State_code4','Physician_License_State_code5','Submitting_Applicable_Manufacturer_or_Applicable_GPO_Name','Applicable_Manufacturer_or_Applicable_GPO_Making_Payment_ID','Applicable_Manufacturer_or_Applicable_GPO_Making_Payment_Name','Applicable_Manufacturer_or_Applicable_GPO_Making_Payment_State','Applicable_Manufacturer_or_Applicable_GPO_Making_Payment_Country','Number_of_Payments_Included_in_Total_Amount','Date_of_Payment','Form_of_Payment_or_Transfer_of_Value','Nature_of_Payment_or_Transfer_of_Value','City_of_Travel','State_of_Travel','Country_of_Travel','Name_of_Third_Party_Entity_Receiving_Payment_or_Transfer_of_Value','Charity_Indicator','Third_Party_Equals_Covered_Recipient_Indicator','Contextual_Information','Associated_Drug_or_Biological_NDC_1','Associated_Drug_or_Biological_NDC_2','Covered_or_Noncovered_Indicator_3','Indicate_Drug_or_Biological_or_Device_or_Medical_Supply_3','Product_Category_or_Therapeutic_Area_3','Name_of_Drug_or_Biological_or_Device_or_Medical_Supply_3','Associated_Drug_or_Biological_NDC_3','Covered_or_Noncovered_Indicator_4','Indicate_Drug_or_Biological_or_Device_or_Medical_Supply_4','Product_Category_or_Therapeutic_Area_4','Name_of_Drug_or_Biological_or_Device_or_Medical_Supply_4','Associated_Drug_or_Biological_NDC_4','Covered_or_Noncovered_Indicator_5','Indicate_Drug_or_Biological_or_Device_or_Medical_Supply_5','Product_Category_or_Therapeutic_Area_5','Name_of_Drug_or_Biological_or_Device_or_Medical_Supply_5','Associated_Drug_or_Biological_NDC_5']
df_smaller = df.select([column for column in df.columns if column not in col_dropped])

In [0]:
#change payment column to double
df_smaller = df_smaller.withColumn("Total_Amount_of_Payment_USDollars",df_smaller["Total_Amount_of_Payment_USDollars"].cast(DoubleType()))
#make table smaler for only necessary information
df_fin = df_smaller.select(["Physician_First_Name",'Physician_Last_Name','Recipient_City','Recipient_State','Total_Amount_of_Payment_USDollars'])
df_fin = df_fin.groupby(["Physician_First_Name",'Physician_Last_Name','Recipient_City','Recipient_State']).sum('Total_Amount_of_Payment_USDollars')

In [0]:
#rename columns for easier readability
df_fin_renamed = df_fin.withColumnRenamed("sum(Total_Amount_of_Payment_USDollars)" , "total_payment")
df_fin_renamed = df_fin_renamed.withColumnRenamed("Physician_First_Name" , "physician_first_name")
df_fin_renamed = df_fin_renamed.withColumnRenamed("Physician_Last_Name" , "physician_last_name")
df_fin_renamed = df_fin_renamed.withColumnRenamed("Physician_Last_Name" , "physician_last_name")
df_fin_renamed = df_fin_renamed.withColumnRenamed("Recipient_City" , "recipient_city")
df_fin_renamed = df_fin_renamed.withColumnRenamed("Recipient_State" , "recipient_state")

In [0]:
#check what types to be able to create table in postgress
df_fin_renamed.dtypes
#save as a csv so we never have to do this again!
df_fin_renamed.toPandas().to_csv("physicianInfo.csv")

In [0]:
#add to postgress for processing
mode = "append"
jdbc_url="jdbc:postgresql://healthcare-fraud-db.cakair5uyevn.us-east-1.rds.amazonaws.com:5432/healthcare-fraud-db"
config = {"user":"root", 
          "password": "rootroot", 
          "driver":"org.postgresql.Driver"}

In [0]:
df_fin_renamed.write.jdbc(url=jdbc_url, table='provider_payment_info', mode=mode, properties=config)



In [0]:
#get exclusion list
url = "https://healthcare-fraud-project.s3.amazonaws.com/2004EXCL.csv"
spark.sparkContext.addFile(url)
exclusionList = spark.read.option('header', 'true').csv(SparkFiles.get("2004EXCL.csv"), inferSchema=True, sep=',', timestampFormat="mm/dd/yy")

In [14]:
#clean up with only the columns that we want
excludeCol = ['LASTNAME','FIRSTNAME','BUSNAME','SPECIALTY','NPI','CITY','STATE','ZIP']
exclusionListSmall = exclusionList.select([column for column in exclusionList.columns if column in excludeCol])
exclusionListSmall.show()

+---------+-----------+--------------------+--------------------+----------+---------------+-----+-----+
| LASTNAME|  FIRSTNAME|             BUSNAME|           SPECIALTY|       NPI|           CITY|STATE|  ZIP|
+---------+-----------+--------------------+--------------------+----------+---------------+-----+-----+
|     null|       null|     ANATOMY RX, LLC|            PHARMACY|1356713226|    LOS ANGELES|   CA|90025|
|     null|       null|CANARSIE A W A R ...|SUBSTANCE ABUSE REHA|1497971741|       BROOKLYN|   NY|11236|
|     null|       null|DAY AND NIGHT BEH...|   COUNSELING CENTER|1619426715|NORTH LAS VEGAS|   NV|89081|
|     null|       null|EDWARDO M YAMBO M...|    GENERAL PRACTICE|1194829754|      DEER PARK|   NY|11729|
|     null|       null|LIFESTREAM HEALTH...|  HOME HEALTH AGENCY|1679844161|     STURBRIDGE|   MA| 1566|
|   ABBASI|       MAAZ|                null|   INTERNAL MEDICINE|1679754725|  MISSOURI CITY|   TX|77459|
|    ADAME|    MATTHEW|                null|   NURSE/NU

In [15]:
#rename columns to make easier to read
exclusionListSmall = exclusionListSmall.withColumnRenamed("LASTNAME" , "Last Name").withColumnRenamed("FIRSTNAME" , "First Name").withColumnRenamed("BUSNAME" , "Business Name").withColumnRenamed("SPECIALTY" , "Specialty").withColumnRenamed("CITY" , "City").withColumnRenamed("STATE" , "State")
exclusionListSmall.show()

+---------+-----------+--------------------+--------------------+----------+---------------+-----+-----+
|Last Name| First Name|       Business Name|           Specialty|       NPI|           City|State|  ZIP|
+---------+-----------+--------------------+--------------------+----------+---------------+-----+-----+
|     null|       null|     ANATOMY RX, LLC|            PHARMACY|1356713226|    LOS ANGELES|   CA|90025|
|     null|       null|CANARSIE A W A R ...|SUBSTANCE ABUSE REHA|1497971741|       BROOKLYN|   NY|11236|
|     null|       null|DAY AND NIGHT BEH...|   COUNSELING CENTER|1619426715|NORTH LAS VEGAS|   NV|89081|
|     null|       null|EDWARDO M YAMBO M...|    GENERAL PRACTICE|1194829754|      DEER PARK|   NY|11729|
|     null|       null|LIFESTREAM HEALTH...|  HOME HEALTH AGENCY|1679844161|     STURBRIDGE|   MA| 1566|
|   ABBASI|       MAAZ|                null|   INTERNAL MEDICINE|1679754725|  MISSOURI CITY|   TX|77459|
|    ADAME|    MATTHEW|                null|   NURSE/NU

In [0]:
#save as a csv so we never have to do this again!
exclusionListSmall.toPandas().to_csv("exclusionlist.csv")

In [0]:
#drug provider CSV
url = "https://healthcare-fraud-project.s3.amazonaws.com/Medicare_Provider_Utilization_and_Payment_Data__2016_Part_D_Prescriber.csv"
spark.sparkContext.addFile(url)
drugProvider = spark.read.option('header', 'true').csv(SparkFiles.get("Medicare_Provider_Utilization_and_Payment_Data__2016_Part_D_Prescriber.csv"), inferSchema=True, sep=',', timestampFormat="mm/dd/yy")

In [0]:
#drug provider data frame
drugProvider.columns
#clean up with only the columns that we want
drugColumns = ['npi','nppes_provider_last_org_name','nppes_provider_first_name','nppes_provider_city','nppes_provider_state','drug_name','generic_name','total_claim_count','total_drug_cost']
drugProviderSmall = drugProvider.select([column for column in drugProvider.columns if column in drugColumns])

+----------+----------------------------+-------------------------+-------------------+--------------------+--------------------+--------------------+-----------------+---------------+
|       npi|nppes_provider_last_org_name|nppes_provider_first_name|nppes_provider_city|nppes_provider_state|           drug_name|        generic_name|total_claim_count|total_drug_cost|
+----------+----------------------------+-------------------------+-------------------+--------------------+--------------------+--------------------+-----------------+---------------+
|1003000126|                   ENKESHAFI|                  ARDALAN|         CUMBERLAND|                  MD|ATORVASTATIN CALCIUM|ATORVASTATIN CALCIUM|               13|         139.32|
|1003000126|                   ENKESHAFI|                  ARDALAN|         CUMBERLAND|                  MD|   CIPROFLOXACIN HCL|   CIPROFLOXACIN HCL|               11|          80.99|
|1003000126|                   ENKESHAFI|                  ARDALAN|        

In [0]:
#rename columns for easy reading
drugProviderSmall = drugProviderSmall.withColumnRenamed("npi", "NPI").withColumnRenamed("nppes_provider_last_org_name","Provider Last Name").withColumnRenamed("nppes_provider_first_name","Provider First Name").withColumnRenamed("nppes_provider_city","Provider City").withColumnRenamed("nppes_provider_state","Provider State").withColumnRenamed("drug_name","Drug Name").withColumnRenamed("generic_name","Generic Name").withColumnRenamed("total_claim_count","Total Claim Count").withColumnRenamed("total_drug_cost","Total Drug Cost")

+----------+------------------+-------------------+------------+--------------+--------------------+--------------------+-----------------+---------------+
|       NPI|Provider Last Name|Provider First Name|ProviderCity|Provider State|           Drug Name|        Generic Name|Total Claim Count|Total Drug Cost|
+----------+------------------+-------------------+------------+--------------+--------------------+--------------------+-----------------+---------------+
|1003000126|         ENKESHAFI|            ARDALAN|  CUMBERLAND|            MD|ATORVASTATIN CALCIUM|ATORVASTATIN CALCIUM|               13|         139.32|
|1003000126|         ENKESHAFI|            ARDALAN|  CUMBERLAND|            MD|   CIPROFLOXACIN HCL|   CIPROFLOXACIN HCL|               11|          80.99|
|1003000126|         ENKESHAFI|            ARDALAN|  CUMBERLAND|            MD| DOXYCYCLINE HYCLATE| DOXYCYCLINE HYCLATE|               20|         586.12|
|1003000126|         ENKESHAFI|            ARDALAN|  CUMBERLAND|

In [0]:
#select only the columns we want and drop all duplicates
drugProviderSmallnoDup = drugProviderSmallnoDup.select("NPI","Provider Last Name","Provider First Name","Provider City","Provider State")
drugProviderSmallnoDup = drugProviderSmallnoDup.dropDuplicates()
drugProviderSmallnoDup.show()

In [0]:
#get the sum of the total claim count and drug cost
drugProviderSmallagg = drugProviderSmall.groupby("NPI").sum()
drugProviderSmallagg.show()

In [0]:
#get only the columns that we want
drugProviderSmallagg = drugProviderSmallagg.select("NPI","sum(Total Claim Count)","sum(Total Drug Cost)")
drugProviderSmallagg.show()


In [0]:
#join the table back on NPI -  it should be alot smaller!
Providerjoined = drugProviderSmallagg.join(drugProviderSmallnoDup, on = ["NPI"])
Providerjoined.show()

In [0]:
#dependency to save CSVs after cleaned
from google.colab import files
#save as a csv so we never have to do this again!
Providerjoined.toPandas().to_csv("PrescriberDrugs.csv")
#future reference: to download, go to the left, folder icon, and right click to download onto desktop then can import into database
