# HDS-5230 W7-Assignment
##### Author: Wenshan, Liu
##### Date: 03/22/2025

### Unzip dataset
The **OP_DTL_GNRL_PGYR2023_P01302025_01212025.csv** file is too large that can not be upload throgh UI interface. However, it can be solved by uploading **PGYR2023_P01302025_01212025.zip** and then decompressing it.

In [0]:
import subprocess

dbutils.fs.cp("dbfs:/FileStore/tables/PGYR2023_P01302025_01212025.zip", "file:/tmp/PGYR2023.zip")

subprocess.run("unzip -o /tmp/PGYR2023.zip -d /tmp/", shell=True)

dbutils.fs.cp("file:/tmp/OP_DTL_GNRL_PGYR2023_P01302025_01212025.csv", "dbfs:/FileStore/tables/", recurse=True)

True

### Make sure the datasets exist in DBFS

In [0]:
files = dbutils.fs.ls("dbfs:/FileStore/tables/")
for file in files:
    print(file.name, file.path)


OP_CVRD_RCPNT_PRFL_SPLMTL_P01302025_01212025.csv dbfs:/FileStore/tables/OP_CVRD_RCPNT_PRFL_SPLMTL_P01302025_01212025.csv
OP_DTL_GNRL_PGYR2023_P01302025_01212025.csv dbfs:/FileStore/tables/OP_DTL_GNRL_PGYR2023_P01302025_01212025.csv
OP_DTL_OWNRSHP_PGYR2023_P01302025_01212025.csv dbfs:/FileStore/tables/OP_DTL_OWNRSHP_PGYR2023_P01302025_01212025.csv
OP_DTL_RSRCH_PGYR2023_P01302025_01212025.csv dbfs:/FileStore/tables/OP_DTL_RSRCH_PGYR2023_P01302025_01212025.csv
OP_REMOVED_DELETED_PGYR2023_P01302025_01212025.csv dbfs:/FileStore/tables/OP_REMOVED_DELETED_PGYR2023_P01302025_01212025.csv
PGYR2023_P01302025_01212025.zip dbfs:/FileStore/tables/PGYR2023_P01302025_01212025.zip
PHPRFL_P01302025_01212025.zip dbfs:/FileStore/tables/PHPRFL_P01302025_01212025.zip


## Import libs and Initialize SparkSession

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, sum as _sum, count, desc, concat_ws, first

# Initialize SparkSession
spark = SparkSession.builder.appName("OpenPayments2023Analysis").getOrCreate()


## Load Dataset

In [0]:

# Load General Payments file
general_payments_path = "/FileStore/tables/OP_DTL_GNRL_PGYR2023_P01302025_01212025.csv"
general_df = spark.read.format("csv").option("header", "true").load(general_payments_path)

# Cast payment amount to double type
general_df = general_df.withColumn("Total_Amount_of_Payment_USDollars", 
                                   col("Total_Amount_of_Payment_USDollars").cast("double"))

# Load Supplementary file
recipient_profile_path = "/FileStore/tables/OP_CVRD_RCPNT_PRFL_SPLMTL_P01302025_01212025.csv"
recipient_df = spark.read.format("csv").option("header", "true").load(recipient_profile_path)


## Analysis Tasks

### Q1: 
What is the Nature of Payments with reimbursement amounts greater than $1,000 ordered by count?

In [0]:
task1_df = general_df.filter(col("Total_Amount_of_Payment_USDollars") > 1000) \
    .groupBy("Nature_of_Payment_or_Transfer_of_Value") \
    .agg(count("*").alias("Count")) \
    .orderBy(desc("Count")) # sort (desc)

print("Task 1: Nature of Payments with reimbursement amounts > $1,000 ordered by count")

# show the data
task1_df.show()


Task 1: Nature of Payments with reimbursement amounts > $1,000 ordered by count
+--------------------------------------+------+
|Nature_of_Payment_or_Transfer_of_Value| Count|
+--------------------------------------+------+
|                  Compensation for ...|164093|
|                        Consulting Fee|105239|
|                    Travel and Lodging| 24793|
|                             Honoraria| 13750|
|                             Education| 13376|
|                    Royalty or License| 11538|
|                  Compensation for ...|  8658|
|                                 Grant|  4922|
|                  Space rental or f...|  4917|
|                  Long term medical...|  2930|
|                      Debt forgiveness|  1788|
|                     Food and Beverage|   968|
|                                  Gift|   630|
|                          Acquisitions|   563|
|                  Charitable Contri...|   239|
|                         Entertainment|    30|
+-------

### Q2:
What are the top ten Nature of Payments by count?

In [0]:
task2_df = general_df.groupBy("Nature_of_Payment_or_Transfer_of_Value") \
    .agg(count("*").alias("Count")) \
    .orderBy(desc("Count")) \
    .limit(10) # to get top 10 data

print("Task 2: Top ten Nature of Payments by count")
task2_df.show()

Task 2: Top ten Nature of Payments by count
+--------------------------------------+--------+
|Nature_of_Payment_or_Transfer_of_Value|   Count|
+--------------------------------------+--------+
|                     Food and Beverage|13378464|
|                    Travel and Lodging|  545086|
|                  Compensation for ...|  236628|
|                        Consulting Fee|  170630|
|                             Education|  161078|
|                                  Gift|   31786|
|                             Honoraria|   20232|
|                    Royalty or License|   15865|
|                  Compensation for ...|   12234|
|                         Entertainment|    7967|
+--------------------------------------+--------+



### Q3:
What are the top ten Nature of Payments by total amount?

In [0]:
task3_df = general_df.groupBy("Nature_of_Payment_or_Transfer_of_Value") \
    .agg(_sum("Total_Amount_of_Payment_USDollars").alias("Total_Amount")) \
    .orderBy(desc("Total_Amount")) \
    .limit(10)

print("Task 3: Top ten Nature of Payments by total amount")
task3_df.show()

Task 3: Top ten Nature of Payments by total amount
+--------------------------------------+--------------------+
|Nature_of_Payment_or_Transfer_of_Value|        Total_Amount|
+--------------------------------------+--------------------+
|                    Royalty or License|     1.19217456302E9|
|                  Compensation for ...| 5.946326876500002E8|
|                        Consulting Fee| 5.148558758999996E8|
|                     Food and Beverage| 3.744878240099897E8|
|                    Travel and Lodging|1.7954842378000867E8|
|                                 Grant|      1.1188856182E8|
|                          Acquisitions| 7.192577675999999E7|
|                             Education| 6.469532594000257E7|
|                             Honoraria| 5.585182388999997E7|
|                  Long term medical...|       3.009879195E7|
+--------------------------------------+--------------------+



### Q4:
 What are the top ten physician specialties by total amount?

In [0]:
# First join General Payments with the Supplementary file using Covered_Recipient_Profile_ID
joined_df = general_df.join(recipient_df, on="Covered_Recipient_Profile_ID", how="left")

# Filter only Recipient Physician
joined_df = joined_df.filter(col("Covered_Recipient_Profile_Type") == "Covered Recipient Physician")

# Count how many rows have a NULL Covered_Recipient_Profile_Primary_Specialty
null_specialty_count = joined_df.filter(col("Covered_Recipient_Profile_Primary_Specialty").isNull()).count()
print("Task 4: Number of rows with NULL Covered_Recipient_Profile_Primary_Specialty:", null_specialty_count)

# Since NULL value would affect the result, filter out rows with NULL specialty values
joined_df = joined_df.filter(col("Covered_Recipient_Profile_Primary_Specialty").isNotNull())

# Group by physician specialty and calculate total payment amount, then show top ten
task4_df = joined_df.groupBy("Covered_Recipient_Profile_Primary_Specialty") \
    .agg(_sum("Total_Amount_of_Payment_USDollars").alias("Total_Amount")) \
    .orderBy(desc("Total_Amount")) \
    .limit(10)

print("Task 4: Top ten physician specialties by total amount")
task4_df.show()

Task 4: Number of rows with NULL Covered_Recipient_Profile_Primary_Specialty: 275266
Task 4: Top ten physician specialties by total amount
+-------------------------------------------+--------------------+
|Covered_Recipient_Profile_Primary_Specialty|        Total_Amount|
+-------------------------------------------+--------------------+
|                       Allopathic & Oste...| 3.777387421599988E8|
|                       Allopathic & Oste...| 8.678168521999991E7|
|                       Allopathic & Oste...| 8.586152214999998E7|
|                       Allopathic & Oste...| 8.065875356000061E7|
|                       Allopathic & Oste...| 7.277526618000008E7|
|                       Allopathic & Oste...|6.5723980810000904E7|
|                       Allopathic & Oste...| 6.566676087000152E7|
|                       Allopathic & Oste...|6.5418961260000445E7|
|                       Allopathic & Oste...| 6.302487097000034E7|
|                       Allopathic & Oste...| 6.131864283

### Q5:
Who are the top ten physicians by total amount?

In [0]:
# Filter General Payments file for physician records based on Covered_Recipient_Type
physician_df = general_df.filter(col("Covered_Recipient_Type") == "Covered Recipient Physician")

# Create a physician name field by concatenating first, middle, and last names
physician_df = physician_df.withColumn("Physician_Name", 
                                         concat_ws(" ", col("Covered_Recipient_First_Name"), 
                                                        col("Covered_Recipient_Middle_Name"),
                                                        col("Covered_Recipient_Last_Name")))

# Group by Covered_Recipient_Profile_ID
task5_df = physician_df.groupBy("Covered_Recipient_Profile_ID") \
    .agg(first("Physician_Name").alias("Physician_Name"),
         _sum("Total_Amount_of_Payment_USDollars").alias("Total_Amount")) \
    .orderBy(desc("Total_Amount")) \
    .limit(10)

print("Task 5: Top ten physicians by total amount")
task5_df.show()

Task 5: Top ten physicians by total amount
+----------------------------+----------------+-----------------+
|Covered_Recipient_Profile_ID|  Physician_Name|     Total_Amount|
+----------------------------+----------------+-----------------+
|                      288926|STEPHEN BURKHART|    3.392202493E7|
|                     1166415|  WILLIAM BINDER|    2.943435593E7|
|                      311622|     KEVIN FOLEY|    1.730653526E7|
|                      105583|     IVAN OSORIO|    1.606551551E7|
|                     1141140|  GEORGE MAXWELL|    1.160032024E7|
|                      177835|    ROBERT BOOTH|        8459144.4|
|                      120589| NEAL ELATTRACHE|        7810628.2|
|                      284714| AARON ROSENBERG|6871466.720000001|
|                      354917|   ROGER JACKSON|        6660383.8|
|                       72707|   PETER BONUTTI|6385096.170000001|
+----------------------------+----------------+-----------------+

