### Uploading Files into Pyspark

In [0]:
from pyspark.sql import SparkSession

# Initializing Spark
spark = SparkSession.builder.appName("CMS_OpenPayments").getOrCreate()

# Defining file path
removed_deleted_path = "dbfs:/FileStore/tables/OP_REMOVED_DELETED_PGYR2023_P01302025_01212025.csv"
ownership_path = "dbfs:/FileStore/tables/OP_DTL_OWNRSHP_PGYR2023_P01302025_01212025.csv"
covered_recipient_path = "dbfs:/FileStore/tables/OP_CVRD_RCPNT_PRFL_SPLMTL_P01302025_01212025-1.csv"
research_path = "dbfs:/FileStore/tables/OP_DTL_RSRCH_PGYR2023_P01302025_01212025.csv"

# Reading CSV files into PySpark
ownership_df = spark.read.csv(ownership_path, header=True, inferSchema=True)
covered_recipient_df = spark.read.csv(covered_recipient_path, header=True, inferSchema=True)

# Displaying first 5 rows of each dataset
ownership_df.show(5)
covered_recipient_df.show(5)

# Print column names 
ownership_df.printSchema()
covered_recipient_df.printSchema()

+-----------+--------------------+-------------+--------------------+---------------------+-------------------+---------------------+-----------------------------------------------+-----------------------------------------------+--------------+---------------+------------------+-----------------+------------------+---------------------+----------------------+--------------------+----------+------------+-------------------------------+-----------------+--------------------+---------------------------------------------------------+-----------------------------------------------------------+-------------------------------------------------------------+--------------------------------------------------------------+----------------------------------------------------------------+------------------------------+--------------------------------------------------------+------------------------+
|Change_Type|Physician_Profile_ID|Physician_NPI|Physician_First_Name|Physician_Middle_Name|Physician_

### 1. What is the Nature of Payments with reimbursement amounts greater than $1,000 ordered by count?

In [0]:
# Filter for records where the value of Total_Amount_Invested_USDollars > 1000
filtered_ownership_df = ownership_df.filter(ownership_df.Total_Amount_Invested_USDollars > 1000)

# Group by Nature of Payment and count
nature_of_payment_count_df = filtered_ownership_df.groupBy("Terms_of_Interest").count().orderBy("count", ascending=False)

# Print results
nature_of_payment_count_df.show(10)


+--------------------+-----+
|   Terms_of_Interest|count|
+--------------------+-----+
| Membership Interest|  157|
|        Common Stock|  111|
|               Stock|   69|
|1.) Value of inte...|   33|
|              shares|   28|
|Interest acquired...|   25|
|ownership interes...|   23|
|Common and prefer...|   23|
|              Equity|   21|
|Limited Liability...|   20|
+--------------------+-----+
only showing top 10 rows



### 2. What are the top ten Nature of Payments by count?

In [0]:
# Group by Nature of Payment and ordered by count
nature_of_payment_count_top_10_df = ownership_df.groupBy("Terms_of_Interest").count().orderBy("count", ascending=False)

# Print top 10 payments
nature_of_payment_count_top_10_df.show(10)

+--------------------+-----+
|   Terms_of_Interest|count|
+--------------------+-----+
|        Common Stock|  995|
|Physician holds a...|  762|
| Membership Interest|  227|
|               Stock|  143|
|          Investment|   90|
|       Common Shares|   86|
|Limited Liability...|   70|
|                null|   63|
|PROFIT INTEREST A...|   55|
|Common/Preferred ...|   54|
+--------------------+-----+
only showing top 10 rows



### 3. What are the top ten Nature of Payments by total amount?

In [0]:
# Group by Nature of Payment and sum the Total_Amount_Invested_USDollars
nature_of_payment_total_amount_df = ownership_df.groupBy("Terms_of_Interest").sum("Total_Amount_Invested_USDollars").orderBy("sum(Total_Amount_Invested_USDollars)", ascending=False)

# Print top 10 
nature_of_payment_total_amount_df.show(10)

+--------------------+------------------------------------+
|   Terms_of_Interest|sum(Total_Amount_Invested_USDollars)|
+--------------------+------------------------------------+
|     Stock ownership|                      1.2468128362E8|
|Preferred Stock L...|                           6930000.0|
|               Stock|                   6053716.840000002|
|              shares|                  5189054.8100000005|
|Publicly held com...|                          4664013.29|
|ownership interes...|                           3831660.0|
|     SAFE investment|                           3605000.0|
|   42.415% ownership|                          3580063.57|
| Membership Interest|                   2703518.979999998|
|The current inves...|                           2500000.0|
+--------------------+------------------------------------+
only showing top 10 rows



### 4. What are the top ten physician specialties by total amount?

In [0]:
# Group by Physician Specialty and sum the Total_Amount_Invested_USDollars
physician_specialty_total_amount_df = ownership_df.groupBy("Physician_Specialty").sum("Total_Amount_Invested_USDollars").orderBy("sum(Total_Amount_Invested_USDollars)", ascending=False)

# Print top 10
physician_specialty_total_amount_df.show(10)

+--------------------+------------------------------------+
| Physician_Specialty|sum(Total_Amount_Invested_USDollars)|
+--------------------+------------------------------------+
|Allopathic & Oste...|                1.2617592538000001E8|
|Allopathic & Oste...|                 1.720776957999999E7|
|Allopathic & Oste...|                   9215456.699999997|
|Allopathic & Oste...|                  7833554.3999999985|
|Allopathic & Oste...|                   7774635.429999999|
|Allopathic & Oste...|                          5733256.29|
|Allopathic & Oste...|                          4132615.42|
|Dental Providers|...|                          3791925.97|
|Dental Providers|...|                          3580273.57|
|Allopathic & Oste...|                           2632503.0|
+--------------------+------------------------------------+
only showing top 10 rows



### 5. Who are the top ten physicians by total amount?

In [0]:
# Group by Physician NPI and sum the Total_Amount_Invested_USDollars
physician_total_amount_df = ownership_df.groupBy("Physician_NPI").sum("Total_Amount_Invested_USDollars").orderBy("sum(Total_Amount_Invested_USDollars)", ascending=False)

# Print top 10
physician_details_df.show(10)

+-------------+--------------------+-------------------+--------------------+------------------------------------+
|Physician_NPI|Physician_First_Name|Physician_Last_Name| Physician_Specialty|sum(Total_Amount_Invested_USDollars)|
+-------------+--------------------+-------------------+--------------------+------------------------------------+
|   1720029465|             William|             Melton|Allopathic & Oste...|                              400.35|
|   1720038334|           Elizabeth|             Forbes|Allopathic & Oste...|                                 0.0|
|   1336161561|               Cindy|              Tobin|Allopathic & Oste...|                                 0.0|
|   1720052103|             Eduardo|           Marichal|Allopathic & Oste...|                                 0.0|
|   1275588246|               Frank|            Cammisa|Allopathic & Oste...|                                 0.0|
|   1548349566|              Robert|          Strathman|Allopathic & Oste...|   