In [None]:
#usefull links
# https://medium.com/analytics-vidhya/etl-pipeline-using-spark-sql-746bbfae4d03

In [1]:
#install postgres connector
# https://jdbc.postgresql.org/download.html
# https://stackoverflow.com/questions/34948296/using-pyspark-to-connect-to-postgresql
import findspark
findspark.init('/home/realtour/sparkTutorial/spark-2.4.7-bin-hadoop2.7')

In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *
spark = SparkSession \
    .builder \
    .appName("ETL pipeline") \
    .config("spark.jars", "/usr/share/java/postgresql-42.2.19.jar") \
    .getOrCreate()
cores = spark._jsc.sc().getExecutorMemoryStatus().keySet().size()
print("You are working with", cores, "core(s)")

spark

You are working with 1 core(s)


In [3]:
df=spark.read.csv("../Dataset/OP_DTL_GNRL_PGYR2019_P01222021.csv",header=True,inferSchema=True)

In [4]:
df.printSchema()

root
 |-- Change_Type: string (nullable = true)
 |-- Covered_Recipient_Type: string (nullable = true)
 |-- Teaching_Hospital_CCN: integer (nullable = true)
 |-- Teaching_Hospital_ID: integer (nullable = true)
 |-- Teaching_Hospital_Name: string (nullable = true)
 |-- Physician_Profile_ID: integer (nullable = true)
 |-- Physician_First_Name: string (nullable = true)
 |-- Physician_Middle_Name: string (nullable = true)
 |-- Physician_Last_Name: string (nullable = true)
 |-- Physician_Name_Suffix: string (nullable = true)
 |-- Recipient_Primary_Business_Street_Address_Line1: string (nullable = true)
 |-- Recipient_Primary_Business_Street_Address_Line2: string (nullable = true)
 |-- Recipient_City: string (nullable = true)
 |-- Recipient_State: string (nullable = true)
 |-- Recipient_Zip_Code: string (nullable = true)
 |-- Recipient_Country: string (nullable = true)
 |-- Recipient_Province: string (nullable = true)
 |-- Recipient_Postal_Code: string (nullable = true)
 |-- Physician_Primary

In [5]:
df.limit(2).toPandas()

Unnamed: 0,Change_Type,Covered_Recipient_Type,Teaching_Hospital_CCN,Teaching_Hospital_ID,Teaching_Hospital_Name,Physician_Profile_ID,Physician_First_Name,Physician_Middle_Name,Physician_Last_Name,Physician_Name_Suffix,...,Product_Category_or_Therapeutic_Area_4,Name_of_Drug_or_Biological_or_Device_or_Medical_Supply_4,Associated_Drug_or_Biological_NDC_4,Covered_or_Noncovered_Indicator_5,Indicate_Drug_or_Biological_or_Device_or_Medical_Supply_5,Product_Category_or_Therapeutic_Area_5,Name_of_Drug_or_Biological_or_Device_or_Medical_Supply_5,Associated_Drug_or_Biological_NDC_5,Program_Year,Payment_Publication_Date
0,UNCHANGED,Covered Recipient Physician,,,,247267,LAWRENCE,,PAOLINI,,...,,,,,,,,,2019,01/22/2021
1,UNCHANGED,Covered Recipient Physician,,,,401312,OTTER,QUAKING,ASPEN,,...,,,,,,,,,2019,01/22/2021


In [None]:
"""
{
"physician_id":"258145",
"date_payment":"04/13/2018 ",
"record_id":"521226951",
"payer":"Mission Pharmacal Company",
"amount":13.78,
"Physician_Specialty":"Allopathic & Osteopathic Physicians|Pediatrics ",
"Nature_of_payment":"Food and Beverage"
}
"""

In [6]:
df.schema['Total_Amount_of_Payment_USDollars'].dataType

StringType

In [7]:
#change dataType str to double
df = df.withColumn("Total_Amount_of_Payment_USDollars", df["Total_Amount_of_Payment_USDollars"].cast(DoubleType()))

In [8]:
#lets create a temp view for sql
df.createOrReplaceTempView("payment")

In [9]:
#Transform into a Dataset of payment object :
ds=spark.sql("""
        select Physician_Profile_ID as physician_id ,Date_of_Payment as date_payment,
        Applicable_Manufacturer_or_Applicable_GPO_Making_Payment_Name as payer,Total_Amount_of_Payment_USDollars as amount,Physician_Specialty,
        Nature_of_Payment_or_Transfer_of_Value as nature_of_payment from payment where Physician_Profile_ID is not null and Total_Amount_of_Payment_USDollars is not null
        """)

In [10]:
#print schema
ds.limit(3).show()

+------------+------------+--------------------+------+--------------------+-----------------+
|physician_id|date_payment|               payer|amount| Physician_Specialty|nature_of_payment|
+------------+------------+--------------------+------+--------------------+-----------------+
|      247267|  03/29/2019|Mission Pharmacal...| 13.81|Allopathic & Oste...|Food and Beverage|
|      401312|  06/20/2019|Mission Pharmacal...| 15.59|Allopathic & Oste...|Food and Beverage|
|      194933|  06/20/2019|Mission Pharmacal...| 15.59|Allopathic & Oste...|Food and Beverage|
+------------+------------+--------------------+------+--------------------+-----------------+



In [11]:
ds.printSchema()

root
 |-- physician_id: integer (nullable = true)
 |-- date_payment: string (nullable = true)
 |-- payer: string (nullable = true)
 |-- amount: double (nullable = true)
 |-- Physician_Specialty: string (nullable = true)
 |-- nature_of_payment: string (nullable = true)



In [12]:
#lets perform some operation
#What are Nature of payments with reimbursement amount greater than $1000 order by count
ds.filter("amount>1000").groupBy('nature_of_payment').count().orderBy(desc('count')).show()

+--------------------+------+
|   nature_of_payment| count|
+--------------------+------+
|Compensation for ...|177095|
|      Consulting Fee| 89468|
|  Travel and Lodging| 24399|
|           Education| 14165|
|           Honoraria| 13769|
|  Royalty or License|  9840|
|Compensation for ...|  6441|
|                Gift|  2639|
|Compensation for ...|  2112|
|               Grant|  1722|
|   Food and Beverage|   598|
|Current or prospe...|   555|
|Charitable Contri...|    57|
|       Entertainment|    52|
+--------------------+------+



In [13]:
#what are the top 5 nature of payments by count?
ds.groupBy('nature_of_payment').count().orderBy(desc('count')).show()

+--------------------+-------+
|   nature_of_payment|  count|
+--------------------+-------+
|   Food and Beverage|9048510|
|  Travel and Lodging| 606698|
|Compensation for ...| 234452|
|           Education| 185504|
|      Consulting Fee| 140192|
|                Gift|  32243|
|           Honoraria|  17780|
|  Royalty or License|  12644|
|Compensation for ...|   7450|
|       Entertainment|   7445|
|               Grant|   3537|
|Compensation for ...|   2555|
|Current or prospe...|    812|
|Charitable Contri...|    168|
|                null|      1|
+--------------------+-------+



In [21]:
#what are the total nature of payment by total amount
ds.groupBy('nature_of_payment').agg(sum('amount').alias('total')).orderBy(sum('amount').desc()).show()

+--------------------+--------------------+
|   nature_of_payment|               total|
+--------------------+--------------------+
|Compensation for ...| 6.365367171399996E8|
|  Royalty or License| 5.245590497699999E8|
|      Consulting Fee|4.3529978323999995E8|
|   Food and Beverage| 2.407318597400052E8|
|  Travel and Lodging|2.0036612613999912E8|
|Current or prospe...|       8.109926975E7|
|           Education| 5.657138269000026E7|
|           Honoraria|5.1883151209999986E7|
|Compensation for ...|2.1783281809999995E7|
|               Grant|1.9302150380000014E7|
|                Gift|1.4222111529999997E7|
|Compensation for ...|          9462830.71|
|       Entertainment|  491200.93999999977|
|Charitable Contri...|   388194.5900000001|
|                null|                 1.0|
+--------------------+--------------------+



In [20]:
#who are the top 5 Physician_Specialty
ds.groupBy('Physician_Specialty').agg(sum('amount').alias('total')).orderBy(sum('amount').desc()).show()

+--------------------+--------------------+
| Physician_Specialty|               total|
+--------------------+--------------------+
|Allopathic & Oste...|3.9413944994999164E8|
|Allopathic & Oste...|1.1315516435000023E8|
|Allopathic & Oste...| 8.731879314000003E7|
|Allopathic & Oste...| 8.481322198999985E7|
|Allopathic & Oste...| 8.044425124000001E7|
|Allopathic & Oste...| 7.139019721000034E7|
|Allopathic & Oste...|6.3081675829999976E7|
|Allopathic & Oste...|  6.21732372899999E7|
|Allopathic & Oste...| 6.144094537000008E7|
|Allopathic & Oste...| 6.138162544000033E7|
|Allopathic & Oste...| 5.928968186000011E7|
|Allopathic & Oste...| 5.444086342000023E7|
|Allopathic & Oste...|5.2219958410000004E7|
|Allopathic & Oste...|5.1851462589999996E7|
|Allopathic & Oste...|4.9831269490000054E7|
|Allopathic & Oste...| 4.669641069000014E7|
|Allopathic & Oste...| 4.629543193999997E7|
|Allopathic & Oste...| 4.406640779999997E7|
|Allopathic & Oste...| 3.812526727999997E7|
|Allopathic & Oste...| 3.4453306

In [22]:
#Save datasets to json file :
ds.write.mode("overwrite").csv('Dataset/payment.csv')

In [23]:
#saving data into postgres
ds.write.format('jdbc').options(
      url='jdbc:postgresql://localhost:5432/rahul',
      driver='org.postgresql.Driver',
      dbtable='payment',
      user='postgres',
      password='Meta@123').mode('append').save()
