In [0]:
import zipfile
import os
import requests
import io
import pyspark.sql.functions as F
import pyspark.sql.types as T

In [0]:
url = "https://download.cms.gov/openpayments/PGYR2023_P01302025_01212025.zip"
response = requests.get(url, stream=True)
if response.status_code == 200:
    buffer = io.BytesIO()
    buffer.seek(0)
    for chunk in response.iter_content(chunk_size=1024):
        if chunk:
            buffer.write(chunk)
else:
    print(f"Failed to download file. Status code: {response.status_code}")

In [0]:
temp_dir = os.getcwd()
buffer.seek(0)
response_1_loaded = False
with zipfile.ZipFile(buffer, 'r') as zip_ref:
    # Loop through each file in the ZIP archive
    for file_name in zip_ref.namelist():
        table_name,ext = os.path.splitext(file_name)
        if ext.lower() == ".csv":
            temp_path = os.path.join(temp_dir, file_name)
            zip_ref.extract(file_name, temp_dir)
            df = spark.read.csv(f"file:{temp_path}", header=True, inferSchema=True)
            df.write.format("parquet").saveAsTable(table_name)
    response_1_loaded = True

In [0]:
url2 = "https://download.cms.gov/openpayments/PHPRFL_P01302025_01212025.zip"
response = requests.get(url2, stream=True)

if response.status_code == 200:
    buffer = io.BytesIO()
    buffer.seek(0)
    for chunk in response.iter_content(chunk_size=1024):
        if chunk:
            buffer.write(chunk)
else:
    print(f"Failed to download file. Status code: {response.status_code}")

In [0]:
buffer.seek(0)
response_2_loaded = False
with zipfile.ZipFile(buffer, 'r') as zip_ref:
    # Loop through each file in the ZIP archive
    for file_name in zip_ref.namelist():
        table_name,ext = os.path.splitext(file_name)
        if ext.lower() == ".csv":
            temp_path = os.path.join(temp_dir, file_name)
            zip_ref.extract(file_name, temp_dir)
            df = spark.read.csv(f"file:{temp_path}", header=True, inferSchema=True)
            df.write.format("parquet").saveAsTable(table_name)
    response_2_loaded = False

In [0]:
if response_1_loaded:
    payment_df = spark.read.table(f"default.op_dtl_gnrl_pgyr2023_p01302025_01212025")
else:
    PATH = "dbfs:/user/hive/warehouse/op_dtl_gnrl_pgyr2023_p01302025_01212025"
    payment_df = spark.read.format('parquet').load(PATH)

In [0]:
# Display schema of the payment dataframe
payment_df.schema.fields

Out[7]: [StructField('Change_Type', StringType(), True),
 StructField('Covered_Recipient_Type', StringType(), True),
 StructField('Teaching_Hospital_CCN', StringType(), True),
 StructField('Teaching_Hospital_ID', StringType(), True),
 StructField('Teaching_Hospital_Name', StringType(), True),
 StructField('Covered_Recipient_Profile_ID', StringType(), True),
 StructField('Covered_Recipient_NPI', StringType(), True),
 StructField('Covered_Recipient_First_Name', StringType(), True),
 StructField('Covered_Recipient_Middle_Name', StringType(), True),
 StructField('Covered_Recipient_Last_Name', StringType(), True),
 StructField('Covered_Recipient_Name_Suffix', StringType(), True),
 StructField('Recipient_Primary_Business_Street_Address_Line1', StringType(), True),
 StructField('Recipient_Primary_Business_Street_Address_Line2', StringType(), True),
 StructField('Recipient_City', StringType(), True),
 StructField('Recipient_State', StringType(), True),
 StructField('Recipient_Zip_Code', String

In [0]:
# Convert payment amount to float type
payment_df = payment_df.withColumn("Total_Amount_of_Payment_USDollars", 
                                   F.col("Total_Amount_of_Payment_USDollars").cast(T.FloatType()))

In [0]:
# Filter payments greater than $1000
high_value_payments_df = payment_df.filter(F.col("Total_Amount_of_Payment_USDollars") > 1000)

In [0]:
# Aggregate by payment nature
payment_nature_summary_df = high_value_payments_df.groupBy("Nature_of_Payment_or_Transfer_of_Value").agg(
    F.count("Total_Amount_of_Payment_USDollars").alias("payment_count"),
    F.sum("Total_Amount_of_Payment_USDollars").alias("total_payment_sum")
)

In [0]:
# Display top 10 payment natures by count
payment_nature_summary_df.orderBy("payment_count", ascending=False).limit(10).show(truncate=False)

+--------------------------------------------------------------------------------------------------------------------------------------------------+-------------+--------------------+
|Nature_of_Payment_or_Transfer_of_Value                                                                                                            |payment_count|total_payment_sum   |
+--------------------------------------------------------------------------------------------------------------------------------------------------+-------------+--------------------+
|Compensation for services other than consulting, including serving as faculty or as a speaker at a venue other than a continuing education program|164093       |5.582959709562378E8 |
|Consulting Fee                                                                                                                                    |105239       |4.819268881451416E8 |
|Travel and Lodging                                                             

In [0]:
# Display top 10 payment natures by total amount
payment_nature_summary_df.orderBy("total_payment_sum", ascending=False).limit(10).show(truncate=False)

+--------------------------------------------------------------------------------------------------------------------------------------------------+-------------+--------------------+
|Nature_of_Payment_or_Transfer_of_Value                                                                                                            |payment_count|total_payment_sum   |
+--------------------------------------------------------------------------------------------------------------------------------------------------+-------------+--------------------+
|Royalty or License                                                                                                                                |11538        |1.190650138420227E9 |
|Compensation for services other than consulting, including serving as faculty or as a speaker at a venue other than a continuing education program|164093       |5.582959709562378E8 |
|Consulting Fee                                                                 

In [0]:
# Aggregate by recipient specialty
recipient_specialty_summary_df = payment_df.groupBy("Covered_Recipient_Specialty_1").agg(
    F.count("Total_Amount_of_Payment_USDollars").alias("payment_count"),
    F.sum("Total_Amount_of_Payment_USDollars").alias("total_payment_sum")
)


In [0]:
# Display top 10 recipient specialties by total amount
recipient_specialty_summary_df.orderBy("total_payment_sum", ascending=False).limit(10).show(truncate=False)

+------------------------------------------------------------------------------------------------+-------------+--------------------+
|Covered_Recipient_Specialty_1                                                                   |payment_count|total_payment_sum   |
+------------------------------------------------------------------------------------------------+-------------+--------------------+
|null                                                                                            |30686        |7.936674627555835E8 |
|Allopathic & Osteopathic Physicians|Orthopaedic Surgery                                         |210504       |4.0345021277223873E8|
|Allopathic & Osteopathic Physicians|Internal Medicine                                           |1307850      |1.3136300312194332E8|
|Allopathic & Osteopathic Physicians|Psychiatry & Neurology|Neurology                            |457774       |8.979213624269351E7 |
|Allopathic & Osteopathic Physicians|Neurological Surgery     

In [0]:

# Aggregate by recipient primary type
recipient_type_summary_df = payment_df.groupBy("Covered_Recipient_Primary_Type_1").agg(
    F.count("Total_Amount_of_Payment_USDollars").alias("payment_count"),
    F.sum("Total_Amount_of_Payment_USDollars").alias("total_payment_sum")
)

In [0]:
# Display top 10 recipient primary types by total amount
recipient_type_summary_df.orderBy("total_payment_sum", ascending=False).limit(10).show(truncate=False)

+--------------------------------------+-------------+--------------------+
|Covered_Recipient_Primary_Type_1      |payment_count|total_payment_sum   |
+--------------------------------------+-------------+--------------------+
|Medical Doctor                        |7913524      |2.0401834645222669E9|
|null                                  |30588        |7.933900793164766E8 |
|Nurse Practitioner                    |3266415      |1.3546701068595254E8|
|Doctor of Osteopathy                  |941641       |9.368468839218245E7 |
|Doctor of Dentistry                   |350066       |8.315500001068492E7 |
|Physician Assistant                   |1618627      |6.882057343604396E7 |
|Doctor of Podiatric Medicine          |148617       |3.442229348336857E7 |
|Doctor of Optometry                   |243641       |2.331802892928546E7 |
|Certified Registered Nurse Anesthetist|47191        |2143071.420371622   |
|Clinical Nurse Specialist             |27756        |2023960.0999247283  |
+-----------