In [1]:
from pyspark import SparkContext
context = SparkContext()
context.addPyFile('graphframes-0.8.1-spark3.0-s_2.12.jar')

from pyspark.sql.session import SparkSession
spark = SparkSession(context)

from pyspark.sql.types import *
from graphframes import *
spark = SparkSession.builder.appName("mehrzadHealthcareFraudDetector").getOrCreate()

In [2]:
path ="medicare/"

# read data file
file = spark.read.csv(path+'Medicare_Provider_Util_Payment_PUF_CY2012.csv',inferSchema=True,header=True)

In [3]:
from pyspark.sql.functions import col
rawData = file.select("NPI", "PROVIDER_TYPE", "HCPCS_CODE").limit(50000)
data = rawData.select([col(c).cast("string") for c in rawData.columns])

In [4]:
data.printSchema()

root
 |-- NPI: string (nullable = true)
 |-- PROVIDER_TYPE: string (nullable = true)
 |-- HCPCS_CODE: string (nullable = true)



In [5]:
specialities = data.select("PROVIDER_TYPE").distinct()
specialitiesList = [row['PROVIDER_TYPE'] for row in specialities.collect()]
specialitiesList

['Internal Medicine',
 'Pathology',
 'Anesthesiology',
 'Physical Therapist',
 'Family Practice',
 'Obstetrics/Gynecology',
 'Clinical Psychologist',
 'General Surgery',
 'Occupational therapist',
 'Chiropractic',
 'Nurse Practitioner',
 'Physician Assistant',
 'Cardiology',
 'Dermatology',
 'Mass Immunization Roster Biller',
 'Physical Medicine and Rehabilitation',
 'Radiation Oncology',
 'Infectious Disease',
 'Orthopedic Surgery',
 'Endocrinology',
 'Urology',
 'Diagnostic Radiology',
 'Neurology',
 'Emergency Medicine',
 'Nephrology',
 'Licensed Clinical Social Worker',
 'Psychiatry',
 'Ambulatory Surgical Center',
 'Pulmonary Disease',
 'Otolaryngology',
 'Plastic and Reconstructive Surgery',
 'General Practice',
 'Cardiac Surgery',
 'Ambulance Service Supplier',
 'Osteopathic Manipulative Medicine',
 'Allergy/Immunology',
 'CRNA',
 'Ophthalmology',
 'Audiologist (billing independently)',
 'Geriatric Medicine',
 'Gastroenterology',
 'Thoracic Surgery',
 'Pain Management',
 'Podiat

In [6]:
from pyspark.sql.functions import lit
npis = data.selectExpr("NPI as id", "NPI as name", "PROVIDER_TYPE as speciality").distinct()
npis = npis.withColumn("type", lit("npi"))
npis.count()

4807

In [7]:
hcpcsCodes = data.selectExpr("HCPCS_CODE as id", "HCPCS_CODE as name").distinct()
hcpcsCodes = hcpcsCodes.withColumn("speciality", lit(""))
hcpcsCodes = hcpcsCodes.withColumn("type", lit("hcpcs"))
hcpcsCodes.count()

2086

In [8]:
# Vertex DataFrame
v = npis.union(hcpcsCodes)
v.count()

6893

In [9]:
v.head(10)

[Row(id='1003000126', name='1003000126', speciality='Internal Medicine', type='npi'),
 Row(id='1003000134', name='1003000134', speciality='Pathology', type='npi'),
 Row(id='1003000142', name='1003000142', speciality='Anesthesiology', type='npi'),
 Row(id='1003000381', name='1003000381', speciality='Physical Therapist', type='npi'),
 Row(id='1003000407', name='1003000407', speciality='Family Practice', type='npi'),
 Row(id='1003000423', name='1003000423', speciality='Obstetrics/Gynecology', type='npi'),
 Row(id='1003000449', name='1003000449', speciality='Clinical Psychologist', type='npi'),
 Row(id='1003000480', name='1003000480', speciality='General Surgery', type='npi'),
 Row(id='1003000522', name='1003000522', speciality='Family Practice', type='npi'),
 Row(id='1003000530', name='1003000530', speciality='Internal Medicine', type='npi')]

In [10]:
# Edge DataFrame
e = data.selectExpr("NPI as src", "HCPCS_CODE as dst").distinct()
e.count()

48351

In [11]:
e.head(10)

[Row(src='1003000126', dst='99222'),
 Row(src='1003000126', dst='99223'),
 Row(src='1003000126', dst='99231'),
 Row(src='1003000126', dst='99232'),
 Row(src='1003000126', dst='99233'),
 Row(src='1003000126', dst='99238'),
 Row(src='1003000126', dst='99239'),
 Row(src='1003000134', dst='88304'),
 Row(src='1003000134', dst='88305'),
 Row(src='1003000134', dst='88311')]

In [12]:
# Create a GraphFrame
g = GraphFrame(v, e)

In [13]:
# Display the vertex and edge DataFrames
g.vertices.head(10)

[Row(id='1003000126', name='1003000126', speciality='Internal Medicine', type='npi'),
 Row(id='1003000134', name='1003000134', speciality='Pathology', type='npi'),
 Row(id='1003000142', name='1003000142', speciality='Anesthesiology', type='npi'),
 Row(id='1003000381', name='1003000381', speciality='Physical Therapist', type='npi'),
 Row(id='1003000407', name='1003000407', speciality='Family Practice', type='npi'),
 Row(id='1003000423', name='1003000423', speciality='Obstetrics/Gynecology', type='npi'),
 Row(id='1003000449', name='1003000449', speciality='Clinical Psychologist', type='npi'),
 Row(id='1003000480', name='1003000480', speciality='General Surgery', type='npi'),
 Row(id='1003000522', name='1003000522', speciality='Family Practice', type='npi'),
 Row(id='1003000530', name='1003000530', speciality='Internal Medicine', type='npi')]

In [14]:
# Display the vertex and edge DataFrames
g.edges.head(10)

[Row(src='1003000126', dst='99222'),
 Row(src='1003000126', dst='99223'),
 Row(src='1003000126', dst='99231'),
 Row(src='1003000126', dst='99232'),
 Row(src='1003000126', dst='99233'),
 Row(src='1003000126', dst='99238'),
 Row(src='1003000126', dst='99239'),
 Row(src='1003000134', dst='88304'),
 Row(src='1003000134', dst='88305'),
 Row(src='1003000134', dst='88311')]

In [None]:
import statistics
    
for speciality in specialitiesList:
    print(speciality)
    sourceIds = data.where(data.PROVIDER_TYPE == speciality).selectExpr("NPI as id").distinct()
    sourceIdsList = [row['id'] for row in sourceIds.collect()]
    # Run PageRank personalized for vertex ["a", "b", "c", "d"] in parallel
    pageRankResults = g.parallelPersonalizedPageRank(resetProbability=0.15, sourceIds=sourceIdsList, maxIter=10)
    results = pageRankResults.vertices.where(pageRankResults.vertices.type == "npi").where(pageRankResults.vertices.speciality != speciality).select("id", "name", "pageranks")
    finalResults = results.toPandas()
    output = []
    for index, row in finalResults.iterrows():
        ranksList = list([float(x) for x in row['pageranks']])
        output.append([row['id'], row['name'], statistics.mean(ranksList)])
    finalDf = spark.createDataFrame(output, ["id", "NPI", "pagerank"])
    topTenNpis = finalDf.sort(col("pagerank").desc()).limit(5)
    topTenNpis.show()
    

Internal Medicine
+----------+----------+--------+
|        id|       NPI|pagerank|
+----------+----------+--------+
|1003848722|1003848722|     0.0|
|1003047853|1003047853|     0.0|
|1003171307|1003171307|     0.0|
|1003824509|1003824509|     0.0|
|1003013616|1003013616|     0.0|
+----------+----------+--------+

Pathology
+----------+----------+--------+
|        id|       NPI|pagerank|
+----------+----------+--------+
|1003848722|1003848722|     0.0|
|1003047853|1003047853|     0.0|
|1003171307|1003171307|     0.0|
|1003824509|1003824509|     0.0|
|1003013616|1003013616|     0.0|
+----------+----------+--------+

Anesthesiology
+----------+----------+--------+
|        id|       NPI|pagerank|
+----------+----------+--------+
|1003848722|1003848722|     0.0|
|1003047853|1003047853|     0.0|
|1003171307|1003171307|     0.0|
|1003824509|1003824509|     0.0|
|1003013616|1003013616|     0.0|
+----------+----------+--------+

Physical Therapist
+----------+----------+--------+
|        id