In [6]:
import pandas as pd
from pyspark import SparkFiles
from pyspark.sql import SparkSession
from pyspark.sql import types as T
from pyspark.sql import Row
#from pyspark.sql.functions import *
from pyspark.ml.feature import *
import pickle
#import functions
from pyspark.sql import functions
from pyspark.sql.functions import isnan, when, count, col
import warnings
warnings.filterwarnings("ignore")

## Loading data

### Making sparksession object

In [2]:
import findspark
findspark.init()
findspark.find()

'C:\\Spark\\spark3'

In [3]:
input_uri="mongodb://127.0.0.1:27017/"
output_uri="mongodb://127.0.0.1:27017/"

In [4]:
spark=SparkSession.builder.appName("Drug_data").config("spark.mongodb.input.uri",input_uri) \
        .config("spark.mongodb.output.uri",output_uri) \
        .config("spark.jars.packages","org.mongodb.spark:mongo-spark-connector_2.12:2.4.2") \
        .getOrCreate()

In [7]:
spark = SparkSession.builder.appName("DataLoad").getOrCreate()

### Loading data with readcsv

In [8]:
df = spark.read.csv("D:/Laptop/Data/MUP_DPR_RY22_P04_V10_DY20_NPIBN_0.csv",inferSchema=True,header=True)

In [6]:
type(df)

pyspark.sql.dataframe.DataFrame

In [9]:
#No missing values in concernd columns
df.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in df.columns]).show()

+-----------+---------------------+------------------+------------+--------------------+------------------+------------+----------------+---------+---------+--------+---------------+-------------+------------+---------+---------------+-------------+--------------------+-----------------+------------------+--------------------+--------------+
|Prscrbr_NPI|Prscrbr_Last_Org_Name|Prscrbr_First_Name|Prscrbr_City|Prscrbr_State_Abrvtn|Prscrbr_State_FIPS|Prscrbr_Type|Prscrbr_Type_Src|Brnd_Name|Gnrc_Name|Tot_Clms|Tot_30day_Fills|Tot_Day_Suply|Tot_Drug_Cst|Tot_Benes|GE65_Sprsn_Flag|GE65_Tot_Clms|GE65_Tot_30day_Fills|GE65_Tot_Drug_Cst|GE65_Tot_Day_Suply|GE65_Bene_Sprsn_Flag|GE65_Tot_Benes|
+-----------+---------------------+------------------+------------+--------------------+------------------+------------+----------------+---------+---------+--------+---------------+-------------+------------+---------+---------------+-------------+--------------------+-----------------+------------------+-----

### Understanding basic features of data

In [10]:
df.printSchema()

root
 |-- Prscrbr_NPI: integer (nullable = true)
 |-- Prscrbr_Last_Org_Name: string (nullable = true)
 |-- Prscrbr_First_Name: string (nullable = true)
 |-- Prscrbr_City: string (nullable = true)
 |-- Prscrbr_State_Abrvtn: string (nullable = true)
 |-- Prscrbr_State_FIPS: string (nullable = true)
 |-- Prscrbr_Type: string (nullable = true)
 |-- Prscrbr_Type_Src: string (nullable = true)
 |-- Brnd_Name: string (nullable = true)
 |-- Gnrc_Name: string (nullable = true)
 |-- Tot_Clms: integer (nullable = true)
 |-- Tot_30day_Fills: double (nullable = true)
 |-- Tot_Day_Suply: integer (nullable = true)
 |-- Tot_Drug_Cst: double (nullable = true)
 |-- Tot_Benes: integer (nullable = true)
 |-- GE65_Sprsn_Flag: string (nullable = true)
 |-- GE65_Tot_Clms: integer (nullable = true)
 |-- GE65_Tot_30day_Fills: double (nullable = true)
 |-- GE65_Tot_Drug_Cst: double (nullable = true)
 |-- GE65_Tot_Day_Suply: integer (nullable = true)
 |-- GE65_Bene_Sprsn_Flag: string (nullable = true)
 |-- GE65_T

In [8]:
df.show(1)

+-----------+---------------------+------------------+------------+--------------------+------------------+-----------------+----------------+--------------+--------------+--------+---------------+-------------+------------+---------+---------------+-------------+--------------------+-----------------+------------------+--------------------+--------------+
|Prscrbr_NPI|Prscrbr_Last_Org_Name|Prscrbr_First_Name|Prscrbr_City|Prscrbr_State_Abrvtn|Prscrbr_State_FIPS|     Prscrbr_Type|Prscrbr_Type_Src|     Brnd_Name|     Gnrc_Name|Tot_Clms|Tot_30day_Fills|Tot_Day_Suply|Tot_Drug_Cst|Tot_Benes|GE65_Sprsn_Flag|GE65_Tot_Clms|GE65_Tot_30day_Fills|GE65_Tot_Drug_Cst|GE65_Tot_Day_Suply|GE65_Bene_Sprsn_Flag|GE65_Tot_Benes|
+-----------+---------------------+------------------+------------+--------------------+------------------+-----------------+----------------+--------------+--------------+--------+---------------+-------------+------------+---------+---------------+-------------+------------------

In [9]:
print((df.count(),len(df.columns)))

(24616378, 22)


## Data preprocessing

### Processing drug dataframe

In [10]:
df.printSchema()

root
 |-- Prscrbr_NPI: integer (nullable = true)
 |-- Prscrbr_Last_Org_Name: string (nullable = true)
 |-- Prscrbr_First_Name: string (nullable = true)
 |-- Prscrbr_City: string (nullable = true)
 |-- Prscrbr_State_Abrvtn: string (nullable = true)
 |-- Prscrbr_State_FIPS: string (nullable = true)
 |-- Prscrbr_Type: string (nullable = true)
 |-- Prscrbr_Type_Src: string (nullable = true)
 |-- Brnd_Name: string (nullable = true)
 |-- Gnrc_Name: string (nullable = true)
 |-- Tot_Clms: integer (nullable = true)
 |-- Tot_30day_Fills: double (nullable = true)
 |-- Tot_Day_Suply: integer (nullable = true)
 |-- Tot_Drug_Cst: double (nullable = true)
 |-- Tot_Benes: integer (nullable = true)
 |-- GE65_Sprsn_Flag: string (nullable = true)
 |-- GE65_Tot_Clms: integer (nullable = true)
 |-- GE65_Tot_30day_Fills: double (nullable = true)
 |-- GE65_Tot_Drug_Cst: double (nullable = true)
 |-- GE65_Tot_Day_Suply: integer (nullable = true)
 |-- GE65_Bene_Sprsn_Flag: string (nullable = true)
 |-- GE65_T

### Renaming column names of drug dataframe

In [11]:

df_rename = df.withColumnRenamed("Tot_Clms","Total_claims").withColumnRenamed("Prscrbr_First_Name","First_name")

In [12]:
df_rename = df_rename.withColumnRenamed("Prscrbr_City","City").withColumnRenamed("Prscrbr_State_Abrvtn","State")

In [13]:
df_rename = df_rename.withColumnRenamed("Gnrc_Name","Drug_name").withColumnRenamed("Prscrbr_NPI","NPI").withColumnRenamed("Prscrbr_Last_Org_Name","last_org_name")

In [14]:
df_rename = df_rename.withColumnRenamed("Prscrbr_Type","Speciality")

In [15]:
df_rename.printSchema()

root
 |-- NPI: integer (nullable = true)
 |-- last_org_name: string (nullable = true)
 |-- First_name: string (nullable = true)
 |-- City: string (nullable = true)
 |-- State: string (nullable = true)
 |-- Prscrbr_State_FIPS: string (nullable = true)
 |-- Speciality: string (nullable = true)
 |-- Prscrbr_Type_Src: string (nullable = true)
 |-- Brnd_Name: string (nullable = true)
 |-- Drug_name: string (nullable = true)
 |-- Total_claims: integer (nullable = true)
 |-- Tot_30day_Fills: double (nullable = true)
 |-- Tot_Day_Suply: integer (nullable = true)
 |-- Tot_Drug_Cst: double (nullable = true)
 |-- Tot_Benes: integer (nullable = true)
 |-- GE65_Sprsn_Flag: string (nullable = true)
 |-- GE65_Tot_Clms: integer (nullable = true)
 |-- GE65_Tot_30day_Fills: double (nullable = true)
 |-- GE65_Tot_Drug_Cst: double (nullable = true)
 |-- GE65_Tot_Day_Suply: integer (nullable = true)
 |-- GE65_Bene_Sprsn_Flag: string (nullable = true)
 |-- GE65_Tot_Benes: integer (nullable = true)



### Grouping all rows on npi with sum , mean and max for drug values

In [16]:
group_cols = ['NPI']

agg_dict = {'Tot_Drug_Cst':['sum','mean','max'],
           'Total_claims':['sum','mean','max'],
           'Tot_Day_Suply':['sum','mean','max'],
           'Tot_30day_Fills':['sum','mean','max']
           }

In [17]:
df_agg = df_rename.groupby("NPI").agg(functions.max('Tot_Drug_Cst'), 
                                      functions.sum('Tot_Drug_Cst'),
                                      functions.mean('Tot_Drug_Cst'),
                                      functions.max('Total_claims'), 
                                      functions.sum('Total_claims'),
                                      functions.mean('Total_claims'),
                                      functions.max('Tot_Day_Suply'), 
                                      functions.sum('Tot_Day_Suply'),
                                      functions.mean('Tot_Day_Suply'),
                                      functions.max('Tot_30day_Fills'), 
                                      functions.sum('Tot_30day_Fills'),
                                      functions.mean('Tot_30day_Fills'))
                                      
df_agg.show(1)

+----------+-----------------+-----------------+-----------------+-----------------+-----------------+-----------------+------------------+------------------+------------------+--------------------+--------------------+--------------------+
|       NPI|max(Tot_Drug_Cst)|sum(Tot_Drug_Cst)|avg(Tot_Drug_Cst)|max(Total_claims)|sum(Total_claims)|avg(Total_claims)|max(Tot_Day_Suply)|sum(Tot_Day_Suply)|avg(Tot_Day_Suply)|max(Tot_30day_Fills)|sum(Tot_30day_Fills)|avg(Tot_30day_Fills)|
+----------+-----------------+-----------------+-----------------+-----------------+-----------------+-----------------+------------------+------------------+------------------+--------------------+--------------------+--------------------+
|1003039298|           123.95|           123.95|           123.95|               12|               12|             12.0|               212|               212|             212.0|                12.1|                12.1|                12.1|
+----------+-----------------+------

In [18]:
#Total rows reduced from 24616378 to 959546
#df_agg.count()

959546

In [18]:
df_agg.printSchema()

root
 |-- NPI: integer (nullable = true)
 |-- max(Tot_Drug_Cst): double (nullable = true)
 |-- sum(Tot_Drug_Cst): double (nullable = true)
 |-- avg(Tot_Drug_Cst): double (nullable = true)
 |-- max(Total_claims): integer (nullable = true)
 |-- sum(Total_claims): long (nullable = true)
 |-- avg(Total_claims): double (nullable = true)
 |-- max(Tot_Day_Suply): integer (nullable = true)
 |-- sum(Tot_Day_Suply): long (nullable = true)
 |-- avg(Tot_Day_Suply): double (nullable = true)
 |-- max(Tot_30day_Fills): double (nullable = true)
 |-- sum(Tot_30day_Fills): double (nullable = true)
 |-- avg(Tot_30day_Fills): double (nullable = true)



### Selecting string columns from drug dataframe

In [19]:
df_rename.createOrReplaceTempView("df_sql")

In [21]:
df_names = spark.sql("SELECT distinct(NPI),First_name,LAST_ORG_NAME,CITY,STATE,SPECIALITY from df_sql")

In [22]:
df_names.show(1)

+----------+----------+-------------+----------+-----+----------+
|       NPI|First_name|LAST_ORG_NAME|      CITY|STATE|SPECIALITY|
+----------+----------+-------------+----------+-----+----------+
|1003001868|    Pamela|       Linker|Harrisburg|   NC|   Dentist|
+----------+----------+-------------+----------+-----+----------+
only showing top 1 row



In [23]:
#COunt matches with our aggerate dataframe :-)

#df_names.count()

959546

In [23]:
df_agg = df_agg.withColumnRenamed("NPI","NPI_1")

In [24]:
df_agg.printSchema()

root
 |-- NPI_1: integer (nullable = true)
 |-- max(Tot_Drug_Cst): double (nullable = true)
 |-- sum(Tot_Drug_Cst): double (nullable = true)
 |-- avg(Tot_Drug_Cst): double (nullable = true)
 |-- max(Total_claims): integer (nullable = true)
 |-- sum(Total_claims): long (nullable = true)
 |-- avg(Total_claims): double (nullable = true)
 |-- max(Tot_Day_Suply): integer (nullable = true)
 |-- sum(Tot_Day_Suply): long (nullable = true)
 |-- avg(Tot_Day_Suply): double (nullable = true)
 |-- max(Tot_30day_Fills): double (nullable = true)
 |-- sum(Tot_30day_Fills): double (nullable = true)
 |-- avg(Tot_30day_Fills): double (nullable = true)



### Joining string and aggregated dataframe

In [25]:
df_join = df_names.join(df_agg,df_names.NPI == df_agg.NPI_1,"inner")

In [27]:
df_join.show(1)

+----------+----------+-------------+--------+-----+-----------------+----------+-----------------+-----------------+-----------------+-----------------+-----------------+-----------------+------------------+------------------+------------------+--------------------+--------------------+--------------------+
|       NPI|First_name|LAST_ORG_NAME|    CITY|STATE|       SPECIALITY|     NPI_1|max(Tot_Drug_Cst)|sum(Tot_Drug_Cst)|avg(Tot_Drug_Cst)|max(Total_claims)|sum(Total_claims)|avg(Total_claims)|max(Tot_Day_Suply)|sum(Tot_Day_Suply)|avg(Tot_Day_Suply)|max(Tot_30day_Fills)|sum(Tot_30day_Fills)|avg(Tot_30day_Fills)|
+----------+----------+-------------+--------+-----+-----------------+----------+-----------------+-----------------+-----------------+-----------------+-----------------+-----------------+------------------+------------------+------------------+--------------------+--------------------+--------------------+
|1003000126|   Ardalan|    Enkeshafi|Bethesda|   MD|Internal Medicine|

In [26]:
df_join = df_join.drop("NPI_1")

In [27]:
df_join = df_join.withColumnRenamed("max(Tot_Drug_Cst)","max_Tot_Drug_Cst").withColumnRenamed("sum(Tot_Drug_Cst)","sum_Tot_Drug_Cst").withColumnRenamed("avg(Tot_Drug_Cst)","avg_Tot_Drug_Cst").withColumnRenamed("max(Total_claims)","max_Total_claims").withColumnRenamed("sum(Total_claims)","sum_Total_claims").withColumnRenamed("avg(Total_claims)","avg_Total_claims").withColumnRenamed("max(Tot_Day_Suply)","max_Tot_Day_Suply").withColumnRenamed("sum(Tot_Day_Suply)","sum_Tot_Day_Suply").withColumnRenamed("avg(Tot_Day_Suply)", "avg_Tot_Day_Suply").withColumnRenamed("max(Tot_30day_Fills)","max_Tot_30day_Fills").withColumnRenamed("sum(Tot_30day_Fills)","sum_Tot_30day_Fills").withColumnRenamed("avg(Tot_30day_Fills)", "avg_Tot_30day_Fills")

In [28]:
df_join.createOrReplaceTempView("df_sql_drug")

In [29]:
df_join = spark.sql("select NPI,lower(First_name),lower(LAST_ORG_NAME),City,State,Speciality,max_Tot_Drug_Cst,sum_Tot_Drug_Cst,avg_Tot_Drug_Cst,max_Total_claims,sum_Total_claims,avg_Total_claims,max_Tot_Day_Suply,sum_Tot_Day_Suply,avg_Tot_Day_Suply,max_Tot_30day_Fills,sum_Tot_30day_Fills,avg_Tot_30day_Fills from df_sql_drug")

In [30]:
df_join = df_join.withColumnRenamed("lower(First_name)","First_name").withColumnRenamed("lower(LAST_ORG_NAME)","Last_org_name")

In [32]:
df_join = df_join.drop("Last_org_name")

In [34]:
df_join = df_join.withColumnRenamed("First_name","First_name_drug")

In [35]:
df_join.printSchema()

root
 |-- NPI: integer (nullable = true)
 |-- First_name_drug: string (nullable = true)
 |-- City: string (nullable = true)
 |-- State: string (nullable = true)
 |-- Speciality: string (nullable = true)
 |-- max_Tot_Drug_Cst: double (nullable = true)
 |-- sum_Tot_Drug_Cst: double (nullable = true)
 |-- avg_Tot_Drug_Cst: double (nullable = true)
 |-- max_Total_claims: integer (nullable = true)
 |-- sum_Total_claims: long (nullable = true)
 |-- avg_Total_claims: double (nullable = true)
 |-- max_Tot_Day_Suply: integer (nullable = true)
 |-- sum_Tot_Day_Suply: long (nullable = true)
 |-- avg_Tot_Day_Suply: double (nullable = true)
 |-- max_Tot_30day_Fills: double (nullable = true)
 |-- sum_Tot_30day_Fills: double (nullable = true)
 |-- avg_Tot_30day_Fills: double (nullable = true)



### Saving dataframe to csv

In [35]:
df_join.write.format("com.mongodb.spark.sql.DefaultSource") \
    .option("database","Medicare") \
    .option("collection","Drug__sort_data") \
    .mode("overwrite").save()

In [36]:
df_join.write.csv("../data/spark_csv/drug_data_final.csv")