In [0]:
%%sh
# Download the CMS Open Payments ZIP file
curl -L https://download.cms.gov/openpayments/PGYR2023_P01302025_01212025.zip -o /tmp/openpayments_2023.zip

# Unzip the file to a directory, force overwriting existing files without prompting
unzip -o /tmp/openpayments_2023.zip -d /tmp/openpayments_2023

  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
  0     0    0     0    0     0      0      0 --:--:-- --:--:-- --:--:--     0  0  752M    0 6335k    0     0  21.0M      0  0:00:35 --:--:--  0:00:35 20.9M 24  752M   24  182M    0     0   141M      0  0:00:05  0:00:01  0:00:04  141M 49  752M   49  370M    0     0   161M      0  0:00:04  0:00:02  0:00:02  161M 73  752M   73  552M    0     0   168M      0  0:00:04  0:00:03  0:00:01  167M 97  752M   97  736M    0     0   171M      0  0:00:04  0:00:04 --:--:--  171M100  752M  100  752M    0     0   171M      0  0:00:04  0:00:04 --:--:--  182M


Archive:  /tmp/openpayments_2023.zip
  inflating: /tmp/openpayments_2023/OP_PGYR2023_README_P01302025.txt  
  inflating: /tmp/openpayments_2023/OP_DTL_OWNRSHP_PGYR2023_P01302025_01212025.csv  
  inflating: /tmp/openpayments_2023/OP_DTL_GNRL_PGYR2023_P01302025_01212025.csv  
  inflating: /tmp/openpayments_2023/OP_DTL_RSRCH_PGYR2023_P01302025_01212025.csv  
  inflating: /tmp/openpayments_2023/OP_REMOVED_DELETED_PGYR2023_P01302025_01212025.csv  


In [0]:
# Importing required libraries
import os
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, sum, desc, count

In [0]:
import os

# List the extracted files
os.listdir("/tmp/openpayments_2023")

Out[5]: ['OP_REMOVED_DELETED_PGYR2023_P01302025_01212025.csv',
 'OP_DTL_GNRL_PGYR2023_P01302025_01212025.csv',
 'OP_PGYR2023_README_P01302025.txt',
 'OP_DTL_OWNRSHP_PGYR2023_P01302025_01212025.csv',
 'OP_DTL_RSRCH_PGYR2023_P01302025_01212025.csv']

In [0]:
# Reading the CSV file into a Spark DataFrame
df = spark.read.csv("file:/tmp/openpayments_2023/OP_DTL_GNRL_PGYR2023_P01302025_01212025.csv", header=True, inferSchema=True)

# Saving it as a Delta table
df.write \
    .format("delta") \
    .mode("overwrite") \
    .saveAsTable("asritha_week7_General_openPays_2023")

In [0]:
# Reading the CSV file into a Spark DataFrame
df = spark.read.csv("dbfs:/FileStore/shared_uploads/asritha.suraparaju@slu.edu/OP_CVRD_RCPNT_PRFL_SPLMTL_P01302025_01212025.csv")

# Saving it as a Delta table (use underscores instead of spaces)
df.write \
    .format("delta") \
    .mode("overwrite") \
    .saveAsTable("Cp_covered_recipients")

In [0]:
from pyspark.sql.functions import col

df_filtered = df.filter(col("Total_Amount_of_Payment_USDollars") > 1000)
df_filtered.groupBy("Nature_of_Payment_or_Transfer_of_Value") \
    .agg(count("*").alias("Count")) \
    .orderBy(col("Count").desc()) \
    .show(10, truncate=False)


+--------------------------------------------------------------------------------------------------------------------------------------------------+------+
|Nature_of_Payment_or_Transfer_of_Value                                                                                                            |Count |
+--------------------------------------------------------------------------------------------------------------------------------------------------+------+
|Compensation for services other than consulting, including serving as faculty or as a speaker at a venue other than a continuing education program|164092|
|Consulting Fee                                                                                                                                    |105228|
|Travel and Lodging                                                                                                                                |24738 |
|Honoraria                                                      

In [0]:
df.groupBy("Nature_of_Payment_or_Transfer_of_Value") \
    .agg(count("*").alias("Count")) \
    .orderBy(col("Count").desc()) \
    .show(10, truncate=False)


+--------------------------------------------------------------------------------------------------------------------------------------------------+--------+
|Nature_of_Payment_or_Transfer_of_Value                                                                                                            |Count   |
+--------------------------------------------------------------------------------------------------------------------------------------------------+--------+
|Food and Beverage                                                                                                                                 |13378464|
|Travel and Lodging                                                                                                                                |545086  |
|Compensation for services other than consulting, including serving as faculty or as a speaker at a venue other than a continuing education program|236628  |
|Consulting Fee                                     

In [0]:
df_payments.groupBy("Nature_of_Payment_or_Transfer_of_Value") \
    .agg(sum("Total_Amount_of_Payment_USDollars").alias("Total_Amount")) \
    .orderBy(desc("Total_Amount")) \
    .show(10, truncate=False)




+--------------------------------------------------------------------------------------------------------------------------------------------------+--------------------+
|Nature_of_Payment_or_Transfer_of_Value                                                                                                            |Total_Amount        |
+--------------------------------------------------------------------------------------------------------------------------------------------------+--------------------+
|Royalty or License                                                                                                                                |1.1921745630200038E9|
|Compensation for services other than consulting, including serving as faculty or as a speaker at a venue other than a continuing education program|5.946326876500018E8 |
|Consulting Fee                                                                                                                                    |5.

In [0]:
from pyspark.sql.functions import col, sum, desc

# Reload the recipients table with proper headers if needed
df_recipients = spark.read.csv("dbfs:/FileStore/shared_uploads/asritha.suraparaju@slu.edu/OP_CVRD_RCPNT_PRFL_SPLMTL_P01302025_01212025.csv", 
                               header=True, inferSchema=True)

# Join Payments and Recipients on Profile ID
df_specialty = df_payments.join(df_recipients, 
                                df_payments["Covered_Recipient_Profile_ID"] == df_recipients["Covered_Recipient_Profile_ID"], 
                                "inner")

# Group by Specialty and Sum Payments
df_specialty.groupBy("Covered_Recipient_Profile_Primary_Specialty") \
    .agg(sum("Total_Amount_of_Payment_USDollars").alias("Total_Amount")) \
    .orderBy(desc("Total_Amount")) \
    .show(10, truncate=False)






+------------------------------------------------------------------------------------------------+--------------------+
|Covered_Recipient_Profile_Primary_Specialty                                                     |Total_Amount        |
+------------------------------------------------------------------------------------------------+--------------------+
|Allopathic & Osteopathic Physicians|Orthopaedic Surgery                                         |3.7775559615999675E8|
|Allopathic & Osteopathic Physicians|Psychiatry & Neurology|Neurology                            |8.680715037000027E7 |
|Allopathic & Osteopathic Physicians|Neurological Surgery                                        |8.58704690299999E7  |
|Allopathic & Osteopathic Physicians|Dermatology                                                 |8.076589144000065E7 |
|null                                                                                            |7.78133707000009E7  |
|Allopathic & Osteopathic Physicians|Ort

In [0]:
from pyspark.sql.functions import col, sum, desc

from pyspark.sql.functions import col, sum, desc, concat_ws

# Ensure df_payments is loaded correctly with header and schema inference
df_payments = spark.read.csv("file:/tmp/openpayments_2023/OP_DTL_GNRL_PGYR2023_P01302025_01212025.csv", 
                             header=True, inferSchema=True)

# Check column names
df_payments.printSchema()

# Create a Full Name Column for Clarity
df_physicians = df_payments.withColumn("Physician_Full_Name", 
                                       concat_ws(" ", col("Covered_Recipient_Profile_First_Name"), 
                                                    col("Covered_Recipient_Profile_Last_Name")))

# Group by Physician Name and Sum Payments
df_physicians.groupBy("Physician_Full_Name") \
    .agg(sum(col("Total_Amount_of_Payment_USDollars")).alias("Total_Amount")) \
    .orderBy(desc("Total_Amount")) \
    .show(10, truncate=False)






root
 |-- Change_Type: string (nullable = true)
 |-- Covered_Recipient_Type: string (nullable = true)
 |-- Teaching_Hospital_CCN: string (nullable = true)
 |-- Teaching_Hospital_ID: string (nullable = true)
 |-- Teaching_Hospital_Name: string (nullable = true)
 |-- Covered_Recipient_Profile_ID: string (nullable = true)
 |-- Covered_Recipient_NPI: string (nullable = true)
 |-- Covered_Recipient_First_Name: string (nullable = true)
 |-- Covered_Recipient_Middle_Name: string (nullable = true)
 |-- Covered_Recipient_Last_Name: string (nullable = true)
 |-- Covered_Recipient_Name_Suffix: string (nullable = true)
 |-- Recipient_Primary_Business_Street_Address_Line1: string (nullable = true)
 |-- Recipient_Primary_Business_Street_Address_Line2: string (nullable = true)
 |-- Recipient_City: string (nullable = true)
 |-- Recipient_State: string (nullable = true)
 |-- Recipient_Zip_Code: string (nullable = true)
 |-- Recipient_Country: string (nullable = true)
 |-- Recipient_Province: string (nu

Traceback (most recent call last):
  File "/databricks/python/lib/python3.9/site-packages/IPython/core/interactiveshell.py", line 3378, in run_code
    exec(code_obj, self.user_global_ns, self.user_ns)
  File "<command-969588386769765>", line 13, in <module>
    df_physicians = df_payments.withColumn("Physician_Full_Name",
  File "/databricks/spark/python/pyspark/instrumentation_utils.py", line 48, in wrapper
    res = func(*args, **kwargs)
  File "/databricks/spark/python/pyspark/sql/dataframe.py", line 4758, in withColumn
    return DataFrame(self._jdf.withColumn(colName, col._jc), self.sparkSession)
  File "/databricks/spark/python/lib/py4j-0.10.9.5-src.zip/py4j/java_gateway.py", line 1321, in __call__
    return_value = get_return_value(
  File "/databricks/spark/python/pyspark/errors/exceptions.py", line 234, in deco
    raise converted from None
pyspark.errors.exceptions.AnalysisException: [UNRESOLVED_COLUMN.WITH_SUGGESTION] A column or function parameter with name `Covered_Recip

