In [66]:
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession, SQLContext
import os 
import sys
from pyspark.sql.functions import *
from pyspark.sql.types import IntegerType
 

In [5]:
os.environ["JAVA_HOME"] = "C:\Program Files\Java\jdk-11"


In [19]:
from pyspark.sql import SparkSession
conf = SparkConf().setAppName('pyspark').setMaster("local")
# spark session
sc = SparkContext.getOrCreate(conf=conf)
spark = SparkSession(sc)

#spark = SparkSession.builder.config(conf=conf).getOrCreate()


In [20]:
spark

In [33]:
df2 = spark.read.format('csv') \
                .options(header=True) \
                .options(inferSchema=True) \
                .load(r'C:\Users\ttg\PycharmProjects\pyspark\source\oltp\USA_Presc_Medicare_Data_12021.csv')

In [34]:
df1 = spark.read.format('parquet').load(r'C:\Users\ttg\PycharmProjects\pyspark\source\olap\us_cities_dimension.parquet')

In [47]:
df_city_selection = df1.select(upper(df1.city).alias('city'), df1.state_id,
                                       upper(df1.state_name).alias('state_name'),
                                       upper(df1.county_name).alias('county_name'), df1.population, df1.zips)
df_presc_selection = df2.select(df2.npi.alias('presc_npi'), df2.nppes_provider_last_org_name.alias('last_name'),
                                        df2.nppes_provider_first_name.alias('first_name'),
                                        df2.nppes_provider_city.alias('precs_city'),
                                        df2.nppes_provider_state.alias('precs_state'),
                                        df2.specialty_description.alias('presc_specialty'),
                                        df2.drug_name, df2.total_claim_count, df2.total_day_supply, df2.total_drug_cost,
                                        df2.years_of_exp)

## Cleaing data


In [48]:
df_presc_selection = df_presc_selection.withColumn("full_name", concat_ws(" ", col("last_name"), col("first_name")))
df_presc_selection = df_presc_selection.drop("last_name", "first_name")


df_presc_selection.show(5)


+----------+----------+-----------+-----------------+--------------------+-----------------+----------------+---------------+------------+-----------------+
| presc_npi|precs_city|precs_state|  presc_specialty|           drug_name|total_claim_count|total_day_supply|total_drug_cost|years_of_exp|        full_name|
+----------+----------+-----------+-----------------+--------------------+-----------------+----------------+---------------+------------+-----------------+
|2006000252|CUMBERLAND|         MD|Internal Medicine|ATORVASTATIN CALCIUM|               13|             450|         139.32|      = 45.0|ENKESHAFI ARDALAN|
|2006000252|CUMBERLAND|         MD|Internal Medicine|   CIPROFLOXACIN HCL|               11|              96|          80.99|      = 43.0|ENKESHAFI ARDALAN|
|2006000252|CUMBERLAND|         MD|Internal Medicine| DOXYCYCLINE HYCLATE|               20|             199|         586.12|      = 33.0|ENKESHAFI ARDALAN|
|2006000252|CUMBERLAND|         MD|Internal Medicine|     

In [49]:
df_presc_selection.schema

StructType(List(StructField(presc_npi,IntegerType,true),StructField(precs_city,StringType,true),StructField(precs_state,StringType,true),StructField(presc_specialty,StringType,true),StructField(drug_name,StringType,true),StructField(total_claim_count,IntegerType,true),StructField(total_day_supply,IntegerType,true),StructField(total_drug_cost,DoubleType,true),StructField(years_of_exp,StringType,true),StructField(full_name,StringType,false)))

In [56]:
df_presc_selection = df_presc_selection.withColumn("years_of_exp", regexp_replace("years_of_exp", "=", ""))
df_presc_selection = df_presc_selection.withColumn("years_of_exp", df_presc_selection["years_of_exp"].cast(IntegerType()))
df_presc_selection.show(5)
df_presc_selection.schema

+----------+----------+-----------+-----------------+--------------------+-----------------+----------------+---------------+------------+-----------------+
| presc_npi|precs_city|precs_state|  presc_specialty|           drug_name|total_claim_count|total_day_supply|total_drug_cost|years_of_exp|        full_name|
+----------+----------+-----------+-----------------+--------------------+-----------------+----------------+---------------+------------+-----------------+
|2006000252|CUMBERLAND|         MD|Internal Medicine|ATORVASTATIN CALCIUM|               13|             450|         139.32|          45|ENKESHAFI ARDALAN|
|2006000252|CUMBERLAND|         MD|Internal Medicine|   CIPROFLOXACIN HCL|               11|              96|          80.99|          43|ENKESHAFI ARDALAN|
|2006000252|CUMBERLAND|         MD|Internal Medicine| DOXYCYCLINE HYCLATE|               20|             199|         586.12|          33|ENKESHAFI ARDALAN|
|2006000252|CUMBERLAND|         MD|Internal Medicine|     

StructType(List(StructField(presc_npi,IntegerType,true),StructField(precs_city,StringType,true),StructField(precs_state,StringType,true),StructField(presc_specialty,StringType,true),StructField(drug_name,StringType,true),StructField(total_claim_count,IntegerType,true),StructField(total_day_supply,IntegerType,true),StructField(total_drug_cost,DoubleType,true),StructField(years_of_exp,IntegerType,true),StructField(full_name,StringType,false)))

In [70]:
#check missing values 
df_presc_selection.select([count(when(isnan(c) | isnull(c), c)).alias(c) for c in df_presc_selection.columns]).show()



+---------+----------+-----------+---------------+---------+-----------------+----------------+---------------+------------+---------+
|presc_npi|precs_city|precs_state|presc_specialty|drug_name|total_claim_count|total_day_supply|total_drug_cost|years_of_exp|full_name|
+---------+----------+-----------+---------------+---------+-----------------+----------------+---------------+------------+---------+
|       22|         1|          1|              1|       15|                3|               1|              1|           1|        0|
+---------+----------+-----------+---------------+---------+-----------------+----------------+---------------+------------+---------+



In [71]:
df_presc_selection = df_presc_selection.dropna(subset = 'presc_npi')

In [72]:
df_presc_selection = df_presc_selection.dropna(subset = 'drug_name')

In [73]:
df_presc_selection.select([count(when(isnan(c) | isnull(c), c)).alias(c) for c in df_presc_selection.columns]).show()


+---------+----------+-----------+---------------+---------+-----------------+----------------+---------------+------------+---------+
|presc_npi|precs_city|precs_state|presc_specialty|drug_name|total_claim_count|total_day_supply|total_drug_cost|years_of_exp|full_name|
+---------+----------+-----------+---------------+---------+-----------------+----------------+---------------+------------+---------+
|        0|         0|          0|              0|        0|                2|               0|              0|           0|        0|
+---------+----------+-----------+---------------+---------+-----------------+----------------+---------------+------------+---------+



In [79]:
# replace total_claim_count with avg
mean_total_claim_count = df_presc_selection.select(mean('total_claim_count')).first()[0]
print(mean_total_claim_count)

51.57326467512733


In [82]:
df_presc_selection = df_presc_selection.fillna(mean_total_claim_count, 'total_claim_count')


In [83]:
df_presc_selection.select([count(when(isnan(c) | isnull(c), c)).alias(c) for c in df_presc_selection.columns]).show()


+---------+----------+-----------+---------------+---------+-----------------+----------------+---------------+------------+---------+
|presc_npi|precs_city|precs_state|presc_specialty|drug_name|total_claim_count|total_day_supply|total_drug_cost|years_of_exp|full_name|
+---------+----------+-----------+---------------+---------+-----------------+----------------+---------------+------------+---------+
|        0|         0|          0|              0|        0|                0|               0|              0|           0|        0|
+---------+----------+-----------+---------------+---------+-----------------+----------------+---------------+------------+---------+

