In [0]:
dbutils.fs.cp("dbfs:/FileStore/tables/OP_DTL_GNRL_PGYR2023_P01302025_01212025.zip", "file:/tmp/OP_DTL_GNRL_PGYR2023_P01302025_01212025.zip", True)

Out[1]: True

In [0]:
import zipfile
import os

zip_path = "/tmp/OP_DTL_GNRL_PGYR2023_P01302025_01212025.zip"  # Now it's accessible locally
extract_path = "/tmp/pgy_extracted"

# Ensure the directory exists
os.makedirs(extract_path, exist_ok=True)

# Extract the ZIP file
with zipfile.ZipFile(zip_path, "r") as zip_ref:
    zip_ref.extractall(extract_path)


In [0]:
extracted_files = os.listdir(extract_path)
print("Extracted Files:", extracted_files)

Extracted Files: ['OP_DTL_GNRL_PGYR2023_P01302025_01212025.csv']


In [0]:
for file in extracted_files:
    if file.endswith(".csv"):
        file_path = f"file://{extract_path}/{file}"
        df = spark.read.format("csv").option("header", "true").load(file_path)
        df.show(5)

+-----------+----------------------+---------------------+--------------------+----------------------+----------------------------+---------------------+----------------------------+-----------------------------+---------------------------+-----------------------------+-----------------------------------------------+-----------------------------------------------+---------------+---------------+------------------+-----------------+------------------+---------------------+--------------------------------+--------------------------------+--------------------------------+--------------------------------+--------------------------------+--------------------------------+-----------------------------+-----------------------------+-----------------------------+-----------------------------+-----------------------------+-----------------------------+-------------------------------------+-------------------------------------+-------------------------------------+--------------------------------

In [0]:
df.write.format("delta").mode("overwrite").saveAsTable("OP_dtl_gnrl_pgyr2023_P01302025_01212025")

In [0]:
df1 = spark.read.format("delta").load("dbfs:/user/hive/warehouse/op_dtl_gnrl_pgyr2023_p01302025_01212025")
df.show()


+-----------+----------------------+---------------------+--------------------+----------------------+----------------------------+---------------------+----------------------------+-----------------------------+---------------------------+-----------------------------+-----------------------------------------------+-----------------------------------------------+---------------+---------------+------------------+-----------------+------------------+---------------------+--------------------------------+--------------------------------+--------------------------------+--------------------------------+--------------------------------+--------------------------------+-----------------------------+-----------------------------+-----------------------------+-----------------------------+-----------------------------+-----------------------------+-------------------------------------+-------------------------------------+-------------------------------------+--------------------------------

What is the Nature of Payments with reimbursement amounts greater than $1,000 ordered by count?

In [0]:
from pyspark.sql.functions import col

# Filter for payments greater than $1,000 and cast Total_Amount_of_Payment_USDollars to numeric
df_filtered = df.filter(col("Total_Amount_of_Payment_USDollars").cast("double") > 1000)

# Group by the Nature of Payment and count the occurrences
df_grouped = df_filtered.groupBy("Nature_of_Payment_or_Transfer_of_Value").count()

# Order by count in descending order
df_grouped.orderBy("count", ascending=False).show()


+--------------------------------------+------+
|Nature_of_Payment_or_Transfer_of_Value| count|
+--------------------------------------+------+
|                  Compensation for ...|164093|
|                        Consulting Fee|105239|
|                    Travel and Lodging| 24793|
|                             Honoraria| 13750|
|                             Education| 13376|
|                    Royalty or License| 11538|
|                  Compensation for ...|  8658|
|                                 Grant|  4922|
|                  Space rental or f...|  4917|
|                  Long term medical...|  2930|
|                      Debt forgiveness|  1788|
|                     Food and Beverage|   968|
|                                  Gift|   630|
|                          Acquisitions|   563|
|                  Charitable Contri...|   239|
|                         Entertainment|    30|
+--------------------------------------+------+



What are the top ten Nature of Payments by count?

In [0]:
from pyspark.sql import functions as F

# Group by 'Nature_of_Payment_or_Transfer_of_Value' and count occurrences
nature_c = df1.groupBy("Nature_of_Payment_or_Transfer_of_Value").count()

# Order by the 'count' column in descending order and limit the result to the top 10
q2 = nature_c.orderBy(F.col("count").desc()).limit(10)

# Show the result
q2.show()


+--------------------------------------+--------+
|Nature_of_Payment_or_Transfer_of_Value|   count|
+--------------------------------------+--------+
|                     Food and Beverage|13378464|
|                    Travel and Lodging|  545086|
|                  Compensation for ...|  236628|
|                        Consulting Fee|  170630|
|                             Education|  161078|
|                                  Gift|   31786|
|                             Honoraria|   20232|
|                    Royalty or License|   15865|
|                  Compensation for ...|   12234|
|                         Entertainment|    7967|
+--------------------------------------+--------+



What are the top ten Nature of Payments by total amount?

In [0]:
from pyspark.sql import functions as F

# Group by 'Nature_of_Payment_or_Transfer_of_Value' and sum the total payment amount
nature_amount = df1.groupBy("Nature_of_Payment_or_Transfer_of_Value") \
                   .agg(F.sum("Total_Amount_of_Payment_USDollars").alias("total_amount"))

# Order by the 'total_amount' column in descending order and limit the result to the top 10
top_nature_amount = nature_amount.orderBy(F.col("total_amount").desc()).limit(10)

# Show the result
top_nature_amount.show()


+--------------------------------------+--------------------+
|Nature_of_Payment_or_Transfer_of_Value|        total_amount|
+--------------------------------------+--------------------+
|                    Royalty or License|1.1921745630200038E9|
|                  Compensation for ...| 5.946326876500018E8|
|                        Consulting Fee| 5.148558758999997E8|
|                     Food and Beverage| 3.744878240099856E8|
|                    Travel and Lodging|1.7954842377999967E8|
|                                 Grant|      1.1188856182E8|
|                          Acquisitions| 7.192577675999999E7|
|                             Education| 6.469532594000477E7|
|                             Honoraria| 5.585182388999998E7|
|                  Long term medical...|3.0098791950000003E7|
+--------------------------------------+--------------------+



 What are the top ten physician specialties by total amount?

In [0]:
from pyspark.sql import functions as F

# Group by physician specialties columns and sum the total payment amount
specialty_amount = df1.groupBy(
    "Covered_Recipient_Specialty_1", 
    "Covered_Recipient_Specialty_2",
    "Covered_Recipient_Specialty_3", 
    "Covered_Recipient_Specialty_4",
    "Covered_Recipient_Specialty_5", 
    "Covered_Recipient_Specialty_6"
).agg(F.sum("Total_Amount_of_Payment_USDollars").alias("total_amount"))

# Filter rows where specialty columns are not null
specialty_amount_filtered = specialty_amount.filter(
    (F.col("Covered_Recipient_Specialty_1").isNotNull()) | 
    (F.col("Covered_Recipient_Specialty_2").isNotNull()) | 
    (F.col("Covered_Recipient_Specialty_3").isNotNull()) | 
    (F.col("Covered_Recipient_Specialty_4").isNotNull()) | 
    (F.col("Covered_Recipient_Specialty_5").isNotNull()) | 
    (F.col("Covered_Recipient_Specialty_6").isNotNull())
)

# Flatten the specialty columns and aggregate the total amounts
specialty_flat = specialty_amount_filtered.select(
    F.col("Covered_Recipient_Specialty_1").alias("Specialty"),
    "total_amount"
).union(
    specialty_amount_filtered.select(
        F.col("Covered_Recipient_Specialty_2").alias("Specialty"),
        "total_amount"
    )
).union(
    specialty_amount_filtered.select(
        F.col("Covered_Recipient_Specialty_3").alias("Specialty"),
        "total_amount"
    )
).union(
    specialty_amount_filtered.select(
        F.col("Covered_Recipient_Specialty_4").alias("Specialty"),
        "total_amount"
    )
).union(
    specialty_amount_filtered.select(
        F.col("Covered_Recipient_Specialty_5").alias("Specialty"),
        "total_amount"
    )
).union(
    specialty_amount_filtered.select(
        F.col("Covered_Recipient_Specialty_6").alias("Specialty"),
        "total_amount"
    )
)

# Group by the flattened specialties and sum the total amount
specialty_total = specialty_flat.groupBy("Specialty").agg(F.sum("total_amount").alias("total_amount"))

# Order by total amount in descending order and limit to the top 10
top_specialties = specialty_total.orderBy(F.col("total_amount").desc()).limit(10)

# Show the result
top_specialties.show()


+--------------------+--------------------+
|           Specialty|        total_amount|
+--------------------+--------------------+
|                null|1.241966047687004...|
|Allopathic & Oste...| 4.034502130899938E8|
|Allopathic & Oste...|1.3136300307000254E8|
|Allopathic & Oste...| 8.979213626000045E7|
|Allopathic & Oste...| 8.608847857000019E7|
|Allopathic & Oste...| 8.320264774000138E7|
|Allopathic & Oste...|  7.02208411500008E7|
|Allopathic & Oste...|  6.94689412100008E7|
|Allopathic & Oste...| 6.677283748999991E7|
|Allopathic & Oste...| 6.329825407000022E7|
+--------------------+--------------------+



Who are the top ten physicians by total amount?

In [0]:
from pyspark.sql import functions as F

# Group by Covered_Recipient_NPI (or Covered_Recipient_Profile_ID if you prefer)
physician_total_amount = df1.groupBy("Covered_Recipient_NPI").agg(
    F.sum("Total_Amount_of_Payment_USDollars").alias("total_amount")
)

# Order by total amount in descending order and limit to the top 10
top_physicians = physician_total_amount.orderBy(F.col("total_amount").desc()).limit(10)

# Show the result
top_physicians.show()


+---------------------+-------------------+
|Covered_Recipient_NPI|       total_amount|
+---------------------+-------------------+
|                 null|8.064709590000004E8|
|           1366487498|      3.392202493E7|
|           1205980448|      2.943435593E7|
|           1861451874|      1.730653526E7|
|           1336247469|      1.606551551E7|
|           1295737930|      1.160032024E7|
|           1932140266|          8459144.4|
|           1669457354|  7810628.200000001|
|           1275512964|  6871466.720000001|
|           1811044548|  6660383.799999999|
+---------------------+-------------------+

