
## Unzip and Load datasets


In [0]:
# Lets know
display(dbutils.fs.ls("FileStore/tables/"))  


path,name,size,modificationTime
dbfs:/FileStore/tables/PGYR2023_P01302025_01212025.zip,PGYR2023_P01302025_01212025.zip,789005271,1742771449000
dbfs:/FileStore/tables/PHPRFL_P01302025_01212025.zip,PHPRFL_P01302025_01212025.zip,82966770,1742770908000


In [0]:
import zipfile
import os

# File locations and type's
file_location_1 = "/FileStore/tables/PGYR2023_P01302025_01212025.zip"
file_type_1 = "zip"

file_location_2 = "/FileStore/tables/PHPRFL_P01302025_01212025.zip"
file_type_2 = "zip"

dbutils.fs.mkdirs("dbfs:/FileStore/week07/")
dbutils.fs.mkdirs("dbfs:/FileStore/week07/pgz")
dbutils.fs.mkdirs("dbfs:/FileStore/week07/phz")

dbutils.fs.cp("dbfs:/FileStore/tables/PGYR2023_P01302025_01212025.zip", "dbfs:/FileStore/week07/pg_z.zip")
dbutils.fs.cp("dbfs:/FileStore/tables/PHPRFL_P01302025_01212025.zip", "dbfs:/FileStore/week07/ph_z.zip")

display(dbutils.fs.ls("dbfs:/FileStore/week07/"))


path,name,size,modificationTime
dbfs:/FileStore/week07/pg_corrected.zip,pg_corrected.zip,789005271,1742776391000
dbfs:/FileStore/week07/pg_z.zip,pg_z.zip,789005271,1742777715000
dbfs:/FileStore/week07/pgz/,pgz/,0,0
dbfs:/FileStore/week07/ph_corrected.zip,ph_corrected.zip,82966770,1742776403000
dbfs:/FileStore/week07/ph_z.zip,ph_z.zip,82966770,1742777727000
dbfs:/FileStore/week07/phz/,phz/,0,0


In [0]:
import zipfile
import shutil
import os

# If the file is mistakenly a directory, rename it to a proper .zip file
dbutils.fs.mv("dbfs:/FileStore/week07/pg_z.zip", "dbfs:/FileStore/week07/pg_corrected.zip")
dbutils.fs.mv("dbfs:/FileStore/week07/ph_z.zip", "dbfs:/FileStore/week07/ph_corrected.zip")


# Correct paths after verifying the files
zip1 = "/dbfs/FileStore/week07/pg_corrected.zip"  # Update if needed
zip2 = "/dbfs/FileStore/week07/ph_corrected.zip"  # Update if needed
tmp_zip1 = "/tmp/pg.zip"
tmp_zip2 = "/tmp/ph.zip"
pg_path = "/dbfs/FileStore/week07/pgz"
ph_path = "/dbfs/FileStore/week07/phz"

# Copy the files to /tmp/
dbutils.fs.cp("dbfs:/FileStore/tables/PGYR2023_P01302025_01212025.zip", tmp_zip1)
dbutils.fs.cp("dbfs:/FileStore/tables/PHPRFL_P01302025_01212025.zip", tmp_zip2)
print("ZIP files copied to /tmp/ successfully.")

# Ensure directories exist for extraction
os.makedirs(pg_path, exist_ok=True)
os.makedirs(ph_path, exist_ok=True)

# Extract General Payments Data
if os.path.isfile(tmp_zip1):
    with zipfile.ZipFile(tmp_zip1, 'r') as zip_ref:
        zip_ref.extractall(pg_path)
        print("General payments data extracted successfully.")
else:
    print(f"General payments zip not found at {tmp_zip1}")

# Extract Recipient Data
if os.path.isfile(tmp_zip2):
    with zipfile.ZipFile(tmp_zip2, 'r') as zip_ref:
        zip_ref.extractall(ph_path)
        print("Recipient data extracted successfully.")
else:
    print(f"Recipient payments zip not found at {tmp_zip2}")


ZIP files copied to /tmp/ successfully.
General payments zip not found at /tmp/pg.zip
Recipient payments zip not found at /tmp/ph.zip


In [0]:
dbutils.fs.cp("dbfs:/FileStore/tables/PGYR2023_P01302025_01212025.zip", "file:/tmp/pg.zip")
dbutils.fs.cp("dbfs:/FileStore/tables/PHPRFL_P01302025_01212025.zip", "file:/tmp/ph.zip")

# Extract 
with zipfile.ZipFile("/tmp/pg.zip", 'r') as zip_ref:
    zip_ref.extractall("/tmp/pgf")

with zipfile.ZipFile("/tmp/ph.zip", 'r') as zip_ref:
    zip_ref.extractall("/tmp/phf")


#display(dbutils.fs.ls("dbfs:/FileStore/tables/"))
#destination_path = "/dbfs/FileStore/tables/pgf.csv"
#with open(destination_path, 'w') as f:
#    pass  # Just open the file in write mode to create it

os.makedirs("/dbfs/FileStore/tables/", exist_ok=True)
# Copy general file to FileStore
shutil.copy("/tmp/pgf/OP_DTL_GNRL_PGYR2023_P01302025_01212025.csv", "/dbfs/FileStore/tables/pgf.csv")

shutil.copy("/tmp/phf/OP_CVRD_RCPNT_PRFL_SPLMTL_P01302025_01212025.csv", "/dbfs/FileStore/tables/phf.csv")

df1 = spark.read.csv("file:/dbfs/FileStore/tables/pgf.csv", header=True, inferSchema=True)

df2 = spark.read.csv("file:/dbfs/FileStore/tables/phf.csv", header=True, inferSchema=True)

df1.show(5)

+-----------+----------------------+---------------------+--------------------+----------------------+----------------------------+---------------------+----------------------------+-----------------------------+---------------------------+-----------------------------+-----------------------------------------------+-----------------------------------------------+---------------+---------------+------------------+-----------------+------------------+---------------------+--------------------------------+--------------------------------+--------------------------------+--------------------------------+--------------------------------+--------------------------------+-----------------------------+-----------------------------+-----------------------------+-----------------------------+-----------------------------+-----------------------------+-------------------------------------+-------------------------------------+-------------------------------------+--------------------------------

In [0]:
df2.show(2)

In [0]:
from pyspark.sql import functions as F

# Q1. Nature of Payments with reimbursement amounts greater than $1,000 ordered by count
df1_filtered = df1.filter(F.col("Total_Amount_of_Payment_USDollars").cast("float") > 1000)
nature_c = df1_filtered.groupBy("Nature_of_Payment_or_Transfer_of_Value").count()
q1 = nature_c.orderBy(F.col("count").desc())
q1.show()

# Q2. Top ten Nature of Payments by count
nature_c = df1.groupBy("Nature_of_Payment_or_Transfer_of_Value").count()
q2 = nature_c.orderBy(F.col("count").desc()).limit(10)
q2.show()

# Q3. Top ten Nature of Payments by total amount
nature_a = df1.withColumn("Total_Amount_of_Payment_USDollars", F.col("Total_Amount_of_Payment_USDollars").cast("float")) \
    .groupBy("Nature_of_Payment_or_Transfer_of_Value") \
    .agg(F.sum("Total_Amount_of_Payment_USDollars").alias("total_amount"))

q3 = nature_a.orderBy(F.col("total_amount").desc()).limit(10)
q3.show()

# Q4. Top ten physician specialties by total amount
specialty = df1.withColumn("Total_Amount_of_Payment_USDollars", F.col("Total_Amount_of_Payment_USDollars").cast("float")) \
    .groupBy("Covered_Recipient_Specialty_1") \
    .agg(F.sum("Total_Amount_of_Payment_USDollars").alias("total_amount"))

q4 = specialty.orderBy(F.col("total_amount").desc()).limit(10)
q4.show()

# Q5. Top ten physicians by total amount
physician = df1.withColumn("Total_Amount_of_Payment_USDollars", F.col("Total_Amount_of_Payment_USDollars").cast("float")) \
    .groupBy("Covered_Recipient_NPI") \
    .agg(F.sum("Total_Amount_of_Payment_USDollars").alias("total_amount"))

q5 = physician.orderBy(F.col("total_amount").desc()).limit(10)
q5.show()

+--------------------------------------+------+
|Nature_of_Payment_or_Transfer_of_Value| count|
+--------------------------------------+------+
|                  Compensation for ...|164093|
|                        Consulting Fee|105239|
|                    Travel and Lodging| 24793|
|                             Honoraria| 13750|
|                             Education| 13376|
|                    Royalty or License| 11538|
|                  Compensation for ...|  8658|
|                                 Grant|  4922|
|                  Space rental or f...|  4917|
|                  Long term medical...|  2930|
|                      Debt forgiveness|  1788|
|                     Food and Beverage|   968|
|                                  Gift|   630|
|                          Acquisitions|   563|
|                  Charitable Contri...|   239|
|                         Entertainment|    30|
+--------------------------------------+------+

+--------------------------------------

In [0]:
# Q5. with names
phy = df1.join(df2, df1.Covered_Recipient_NPI == df2.Covered_Recipient_NPI, "inner") \
    .select(df1.Covered_Recipient_NPI, df2.Covered_Recipient_Profile_First_Name, df2.Covered_Recipient_Profile_Last_Name, 
            df1.Total_Amount_of_Payment_USDollars)

# Convert 'Total_Amount_of_Payment_USDollars' to float
phy = phy.withColumn("Total_Amount_of_Payment_USDollars", F.col("Total_Amount_of_Payment_USDollars").cast("float"))

# Group by 'Covered_Recipient_NPI' and sum the 'Total_Amount_of_Payment_USDollars', including the physician's name
Tot = phy.groupBy("Covered_Recipient_NPI", "Covered_Recipient_Profile_First_Name", "Covered_Recipient_Profile_Last_Name") \
    .agg(F.sum("Total_Amount_of_Payment_USDollars").alias("total_amount"))

# Order by total amount in descending order and take the top 10 physicians
Q5_1 = Tot.orderBy(F.col("total_amount").desc()).limit(10)

# Show the results
Q5_1.show()

+---------------------+------------------------------------+-----------------------------------+--------------------+
|Covered_Recipient_NPI|Covered_Recipient_Profile_First_Name|Covered_Recipient_Profile_Last_Name|        total_amount|
+---------------------+------------------------------------+-----------------------------------+--------------------+
|           1366487498|                             STEPHEN|                           BURKHART|  3.39220252890625E7|
|           1205980448|                             WILLIAM|                             BINDER|2.9434355869995117E7|
|           1861451874|                               KEVIN|                              FOLEY|1.7306535259869576E7|
|           1336247469|                                IVAN|                             OSORIO|        1.60655155E7|
|           1295737930|                              GEORGE|                            MAXWELL|1.1600320239997864E7|
|           1932140266|                              ROB