In [None]:
pip install pyspark

Collecting pyspark
  Downloading pyspark-3.4.1.tar.gz (310.8 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m310.8/310.8 MB[0m [31m2.4 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.4.1-py2.py3-none-any.whl size=311285397 sha256=d1229e22d90138498debbcc7d3f7c19132e7abf5410eabbeb6396c9e8713fc87
  Stored in directory: /root/.cache/pip/wheels/0d/77/a3/ff2f74cc9ab41f8f594dabf0579c2a7c6de920d584206e0834
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.4.1


In [None]:
#Create Spark Session

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, ltrim,substring

spark= SparkSession.builder.master('local').getOrCreate()

In [None]:
#Read CSV to Dataframe
df = spark.read.csv('analytics_input.csv', header = True)

In [None]:
df.show()

+--------------------+-----------+--------------------+-----------------------+-------------+--------------+-----------------+------------------------------------+------------------+-------------------------+------------------------+-------------------------+
|      DRG Definition|Provider Id|       Provider Name|Provider Street Address|Provider City|Provider State|Provider Zip Code|Hospital Referral Region Description| Total Discharges | Average Covered Charges | Average Total Payments |Average Medicare Payments|
+--------------------+-----------+--------------------+-----------------------+-------------+--------------+-----------------+------------------------------------+------------------+-------------------------+------------------------+-------------------------+
|039 - EXTRACRANIA...|      10001|SOUTHEAST ALABAMA...|   1108 ROSS CLARK C...|       DOTHAN|            AL|            36301|                         AL - Dothan|                91|                $32963.07|            

In [None]:
#Display Scheme
df.printSchema()

root
 |-- DRG Definition: string (nullable = true)
 |-- Provider Id: string (nullable = true)
 |-- Provider Name: string (nullable = true)
 |-- Provider Street Address: string (nullable = true)
 |-- Provider City: string (nullable = true)
 |-- Provider State: string (nullable = true)
 |-- Provider Zip Code: string (nullable = true)
 |-- Hospital Referral Region Description: string (nullable = true)
 |--  Total Discharges : string (nullable = true)
 |--  Average Covered Charges : string (nullable = true)
 |--  Average Total Payments : string (nullable = true)
 |-- Average Medicare Payments: string (nullable = true)



In [None]:
#Change Column names to remove spaces, lower case and replace spaces in column names with "_" (underscore)
df2=[col.replace(' ','_').lower().strip() for col in df.columns]

In [None]:
df3=df.toDF(*df2)
df3.printSchema()

root
 |-- drg_definition: string (nullable = true)
 |-- provider_id: string (nullable = true)
 |-- provider_name: string (nullable = true)
 |-- provider_street_address: string (nullable = true)
 |-- provider_city: string (nullable = true)
 |-- provider_state: string (nullable = true)
 |-- provider_zip_code: string (nullable = true)
 |-- hospital_referral_region_description: string (nullable = true)
 |-- _total_discharges_: string (nullable = true)
 |-- _average_covered_charges_: string (nullable = true)
 |-- _average_total_payments_: string (nullable = true)
 |-- average_medicare_payments: string (nullable = true)



In [None]:
#column names has been replaces with lower case and all spaces are removed with '_'(underscore) successfully.
df3.show()
df7=df3.alias('df7')

+--------------------+-----------+--------------------+-----------------------+-------------+--------------+-----------------+------------------------------------+------------------+-------------------------+------------------------+-------------------------+
|      drg_definition|provider_id|       provider_name|provider_street_address|provider_city|provider_state|provider_zip_code|hospital_referral_region_description|_total_discharges_|_average_covered_charges_|_average_total_payments_|average_medicare_payments|
+--------------------+-----------+--------------------+-----------------------+-------------+--------------+-----------------+------------------------------------+------------------+-------------------------+------------------------+-------------------------+
|039 - EXTRACRANIA...|      10001|SOUTHEAST ALABAMA...|   1108 ROSS CLARK C...|       DOTHAN|            AL|            36301|                         AL - Dothan|                91|                $32963.07|            

In [None]:
#Identify all rows with Null values for provider id
df4=df3.filter(df3.provider_id.isNull())
df4.show()

+--------------+-----------+-------------+-----------------------+-------------+--------------+-----------------+------------------------------------+------------------+-------------------------+------------------------+-------------------------+
|drg_definition|provider_id|provider_name|provider_street_address|provider_city|provider_state|provider_zip_code|hospital_referral_region_description|_total_discharges_|_average_covered_charges_|_average_total_payments_|average_medicare_payments|
+--------------+-----------+-------------+-----------------------+-------------+--------------+-----------------+------------------------------------+------------------+-------------------------+------------------------+-------------------------+
+--------------+-----------+-------------+-----------------------+-------------+--------------+-----------------+------------------------------------+------------------+-------------------------+------------------------+-------------------------+



In [None]:
#Change fields with Payment information into numbers by using Lambda functions in python
payment_columns = ["average_medicare_payments","_average_covered_charges_","_average_total_payments_"]

In [None]:
for col_name in payment_columns:
  df3=df3.withColumn(col_name,col(col_name).cast('int'))

In [None]:
#AS per my understanding i have changed payment related fields in numbers data type

df3.printSchema()

root
 |-- drg_definition: string (nullable = true)
 |-- provider_id: string (nullable = true)
 |-- provider_name: string (nullable = true)
 |-- provider_street_address: string (nullable = true)
 |-- provider_city: string (nullable = true)
 |-- provider_state: string (nullable = true)
 |-- provider_zip_code: string (nullable = true)
 |-- hospital_referral_region_description: string (nullable = true)
 |-- _total_discharges_: string (nullable = true)
 |-- _average_covered_charges_: integer (nullable = true)
 |-- _average_total_payments_: integer (nullable = true)
 |-- average_medicare_payments: integer (nullable = true)



In [None]:
#if you want remove the $ (doller) symbol in field so below implimentation i have done for it
for column_name in payment_columns:
  clean_col=f"cleaned_{column_name}"
  df8=df7.withColumn(clean_col, ltrim(substring(col(column_name),2,100)))

In [None]:
df8.show()
#here i have removed $ dollar symbol in three fieds ["average_medicare_payments","_average_covered_charges_","_average_total_payments_"]

+--------------------+-----------+--------------------+-----------------------+-------------+--------------+-----------------+------------------------------------+------------------+-------------------------+------------------------+-------------------------+--------------------------------+
|      drg_definition|provider_id|       provider_name|provider_street_address|provider_city|provider_state|provider_zip_code|hospital_referral_region_description|_total_discharges_|_average_covered_charges_|_average_total_payments_|average_medicare_payments|cleaned__average_total_payments_|
+--------------------+-----------+--------------------+-----------------------+-------------+--------------+-----------------+------------------------------------+------------------+-------------------------+------------------------+-------------------------+--------------------------------+
|039 - EXTRACRANIA...|      10001|SOUTHEAST ALABAMA...|   1108 ROSS CLARK C...|       DOTHAN|            AL|            3

In [None]:
#Write the Dataset to a Parquet format partitioned by State
df8.write.partitionBy("provider_state").parquet("parquet_data")

In [None]:
#Plot a Bar chart by state and "Average Total Payments" and zip code vs "total_charges"
#BONUS - Plot intersting statistics from the dataset
#comment- I do not have hands on knowledge of visualization tools so skiping this two tasks.
