In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *

# Load the datasets
payments_file = "dbfs:/FileStore/tables/OP_DTL_RSRCH_PGYR2023_P01302025_01212025.csv"
recipient_file = "dbfs:/FileStore/tables/OP_CVRD_RCPNT_PRFL_SPLMTL_P01302025_01212025.csv"
ownership_file = "dbfs:/FileStore/tables/OP_DTL_OWNRSHP_PGYR2023_P01302025_01212025.csv"

# Read the payments data
payments_df = spark.read.option("header", "true").option("inferSchema", "true").csv(payments_file)

# Read the physicians profile data
physicians_df = spark.read.option("header", "true").option("inferSchema", "true").csv(recipient_file)

# Read the ownership data
ownership_df = spark.read.option("header", "true").option("inferSchema", "true").csv(ownership_file)

# Display schema to understand the data
payments_df.printSchema()
physicians_df.printSchema()

root
 |-- Change_Type: string (nullable = true)
 |-- Covered_Recipient_Type: string (nullable = true)
 |-- Noncovered_Recipient_Entity_Name: string (nullable = true)
 |-- Teaching_Hospital_CCN: string (nullable = true)
 |-- Teaching_Hospital_ID: string (nullable = true)
 |-- Teaching_Hospital_Name: string (nullable = true)
 |-- Covered_Recipient_Profile_ID: integer (nullable = true)
 |-- Covered_Recipient_NPI: string (nullable = true)
 |-- Covered_Recipient_First_Name: string (nullable = true)
 |-- Covered_Recipient_Middle_Name: string (nullable = true)
 |-- Covered_Recipient_Last_Name: string (nullable = true)
 |-- Covered_Recipient_Name_Suffix: string (nullable = true)
 |-- Recipient_Primary_Business_Street_Address_Line1: string (nullable = true)
 |-- Recipient_Primary_Business_Street_Address_Line2: string (nullable = true)
 |-- Recipient_City: string (nullable = true)
 |-- Recipient_State: string (nullable = true)
 |-- Recipient_Zip_Code: string (nullable = true)
 |-- Recipient_Coun

In [0]:
# Convert the amount to double to ensure proper comparison
payments_df = payments_df.withColumn("Total_Amount_of_Payment_USDollars", 
                                   col("Total_Amount_of_Payment_USDollars").cast("double"))

nature_payment_gt_1000 = payments_df.filter(col("Total_Amount_of_Payment_USDollars") > 1000) \
    .groupBy("Form_of_Payment_or_Transfer_of_Value") \
    .agg(count("*").alias("count")) \
    .orderBy("count", ascending=False)

display(nature_payment_gt_1000)

Form_of_Payment_or_Transfer_of_Value,count
Cash or cash equivalent,421998
In-kind items and services,24323
"Stock, stock option, or any other ownership interest",3


In [0]:
top_10_nature_by_count = payments_df.groupBy("Form_of_Payment_or_Transfer_of_Value") \
    .agg(count("*").alias("count")) \
    .orderBy("count", ascending=False) \
    .limit(10)

display(top_10_nature_by_count)

Form_of_Payment_or_Transfer_of_Value,count
Cash or cash equivalent,871107
In-kind items and services,156809
,358
2023,6
"Stock, stock option, or any other ownership interest",3


In [0]:
top_10_nature_by_amount = payments_df.withColumn("Total_Amount_of_Payment_USDollars", 
                                               col("Total_Amount_of_Payment_USDollars").cast("double")) \
    .groupBy("Form_of_Payment_or_Transfer_of_Value") \
    .agg(sum("Total_Amount_of_Payment_USDollars").alias("total_amount")) \
    .orderBy("total_amount", ascending=False) \
    .limit(10)

display(top_10_nature_by_amount)

Form_of_Payment_or_Transfer_of_Value,total_amount
Cash or cash equivalent,6425417513.020019
In-kind items and services,1646168272.299991
"Stock, stock option, or any other ownership interest",38562.48
,
2023,


In [0]:
# Join the payments dataframe with physicians dataframe
payment_physician_df = payments_df.withColumn("Total_Amount_of_Payment_USDollars", 
                                             col("Total_Amount_of_Payment_USDollars").cast("double")) \
    .join(
        physicians_df,
        payments_df["Covered_Recipient_Profile_ID"] == physicians_df["Covered_Recipient_Profile_ID"],
        "inner"
    )

# Get top 10 specialties by amount
top_10_specialties = payment_physician_df.groupBy("Covered_Recipient_Profile_Primary_Specialty") \
    .agg(sum("Total_Amount_of_Payment_USDollars").alias("total_amount")) \
    .orderBy("total_amount", ascending=False) \
    .limit(10)

display(top_10_specialties)

Covered_Recipient_Profile_Primary_Specialty,total_amount
Allopathic & Osteopathic Physicians|Radiology|Diagnostic Radiology,6455814.080000001
Allopathic & Osteopathic Physicians|Psychiatry & Neurology|Neurology,4568966.959999999
Allopathic & Osteopathic Physicians|Internal Medicine|Hematology & Oncology,4088892.719999998
Allopathic & Osteopathic Physicians|Ophthalmology,3911043.819999999
,3728113.11
Allopathic & Osteopathic Physicians|Psychiatry & Neurology|Psychiatry,3713079.490000001
Allopathic & Osteopathic Physicians|Internal Medicine,3612870.480000001
Allopathic & Osteopathic Physicians|Dermatology,2939528.539999999
Allopathic & Osteopathic Physicians|Internal Medicine|Medical Oncology,2197938.37
Allopathic & Osteopathic Physicians|Internal Medicine|Cardiovascular Disease,2161473.7


In [0]:
# Alternative approach with aliases
payments_alias = payments_df.withColumn("Total_Amount_of_Payment_USDollars", 
                                      col("Total_Amount_of_Payment_USDollars").cast("double")).alias("payments")
physicians_alias = physicians_df.alias("physicians")

# Join with aliases
payment_physician_df = payments_alias.join(
    physicians_alias,
    col("payments.Covered_Recipient_Profile_ID") == col("physicians.Covered_Recipient_Profile_ID"),
    "inner"
)

# Create a full name column
payment_physician_df = payment_physician_df.withColumn(
    "Physician_Full_Name", 
    concat_ws(" ", 
              col("physicians.Covered_Recipient_Profile_First_Name"), 
              col("physicians.Covered_Recipient_Profile_Middle_Name"), 
              col("physicians.Covered_Recipient_Profile_Last_Name"))
)

# Get top 10 physicians by amount
top_10_physicians = payment_physician_df.groupBy(
    "Physician_Full_Name", 
    col("physicians.Covered_Recipient_Profile_ID")
) \
.agg(
    sum(col("payments.Total_Amount_of_Payment_USDollars")).alias("total_amount"),
    first(col("physicians.Covered_Recipient_Profile_Primary_Specialty")).alias("specialty")
) \
.orderBy("total_amount", ascending=False) \
.limit(10)

display(top_10_physicians)

Physician_Full_Name,Covered_Recipient_Profile_ID,total_amount,specialty
MARTIN I SCHUSTER,565647,1247560.39,Allopathic & Osteopathic Physicians|Psychiatry & Neurology|Psychiatry
RAJAN AGARWAL,701937,822379.5,Allopathic & Osteopathic Physicians|Radiology|Diagnostic Radiology
DAVID F SCOTT,49989,683830.05,Allopathic & Osteopathic Physicians|Orthopaedic Surgery|Adult Reconstructive Orthopaedic Surgery
DERK D PURCELL,708599,651062.0,Allopathic & Osteopathic Physicians|Radiology|Diagnostic Radiology
DONALD JOAQUIN GARCIA,85984,646130.24,Allopathic & Osteopathic Physicians|Psychiatry & Neurology|Psychiatry
SIBYL E WRAY,357387,604756.4999999999,Allopathic & Osteopathic Physicians|Psychiatry & Neurology|Neurology
HESSAM AAZAMI,143141,573234.9899999999,Allopathic & Osteopathic Physicians|General Practice
JORGE J CASTILLO,31442,521252.79,Allopathic & Osteopathic Physicians|Internal Medicine|Hematology & Oncology
CYNTHIA A BEHLING,139195,489970.0,Allopathic & Osteopathic Physicians|Pathology|Anatomic Pathology & Clinical Pathology
JEROME A BARAKOS,960512,477800.0,Allopathic & Osteopathic Physicians|Radiology|Diagnostic Radiology
