<a href="https://colab.research.google.com/github/sagar0226/sagar0226/blob/main/analytics_lab.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
pip install pyspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyspark
  Downloading pyspark-3.3.1.tar.gz (281.4 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m281.4/281.4 MB[0m [31m3.5 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Collecting py4j==0.10.9.5
  Downloading py4j-0.10.9.5-py2.py3-none-any.whl (199 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m199.7/199.7 KB[0m [31m20.1 MB/s[0m eta [36m0:00:00[0m
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.3.1-py2.py3-none-any.whl size=281845512 sha256=63c3bcf4d37e2b59354c45d1d339f70133529a454602bbe75135d5b42255069f
  Stored in directory: /root/.cache/pip/wheels/43/dc/11/ec201cd671da62fa9c5cc77078235e40722170ceba231d7598
Successfully built pyspark
Installing collected packages: py4j, pyspa

In [52]:
from pyspark.sql import SparkSession, SQLContext
from pyspark.context import SparkContext
from pyspark.sql.functions import * 
from pyspark.sql.types import * 

In [53]:
#@title Default title text
## Download Input dataset for analytics
# https://propensity-labs-screening.s3.amazonaws.com/analytics/analytics_input.csv

In [54]:
spark = SparkSession.builder.appName('analytics_lab').getOrCreate()

In [55]:
#Created Spark Session
spark

In [56]:
path = "/content/analytics_input.csv"


In [57]:
import os

# Install java
! apt-get install -y openjdk-8-jdk-headless -qq > /dev/null
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["PATH"] = os.environ["JAVA_HOME"] + "/bin:" + os.environ["PATH"]
! java -version

# Install pyspark
! pip install --ignore-installed -q pyspark==2.4.4

# Install Spark NLP
! pip install --ignore-installed -q spark-nlp==2.5.1

openjdk version "1.8.0_352"
OpenJDK Runtime Environment (build 1.8.0_352-8u352-ga-1~20.04-b08)
OpenJDK 64-Bit Server VM (build 25.352-b08, mixed mode)


In [85]:
#Read CSV to Dataframe
df = spark.read.option("header",'True').option('delimiter', ',').csv(path)

#Display Scheme
df.printSchema()


root
 |-- DRG Definition: string (nullable = true)
 |-- Provider Id: string (nullable = true)
 |-- Provider Name: string (nullable = true)
 |-- Provider Street Address: string (nullable = true)
 |-- Provider City: string (nullable = true)
 |-- Provider State: string (nullable = true)
 |-- Provider Zip Code: string (nullable = true)
 |-- Hospital Referral Region Description: string (nullable = true)
 |--  Total Discharges : string (nullable = true)
 |--  Average Covered Charges : string (nullable = true)
 |--  Average Total Payments : string (nullable = true)
 |-- Average Medicare Payments: string (nullable = true)



In [86]:
df.show()

+--------------------+-----------+--------------------+-----------------------+-------------+--------------+-----------------+------------------------------------+------------------+-------------------------+------------------------+-------------------------+
|      DRG Definition|Provider Id|       Provider Name|Provider Street Address|Provider City|Provider State|Provider Zip Code|Hospital Referral Region Description| Total Discharges | Average Covered Charges | Average Total Payments |Average Medicare Payments|
+--------------------+-----------+--------------------+-----------------------+-------------+--------------+-----------------+------------------------------------+------------------+-------------------------+------------------------+-------------------------+
|039 - EXTRACRANIA...|      10001|SOUTHEAST ALABAMA...|   1108 ROSS CLARK C...|       DOTHAN|            AL|            36301|                         AL - Dothan|                91|                $32963.07|            

In [87]:
cols = df.columns
new_cols = [col.lower() for col in cols]

for i in range(len(cols)):
    df = df.withColumnRenamed(cols[i], new_cols[i])

df.show()


+--------------------+-----------+--------------------+-----------------------+-------------+--------------+-----------------+------------------------------------+------------------+-------------------------+------------------------+-------------------------+
|      drg definition|provider id|       provider name|provider street address|provider city|provider state|provider zip code|hospital referral region description| total discharges | average covered charges | average total payments |average medicare payments|
+--------------------+-----------+--------------------+-----------------------+-------------+--------------+-----------------+------------------------------------+------------------+-------------------------+------------------------+-------------------------+
|039 - EXTRACRANIA...|      10001|SOUTHEAST ALABAMA...|   1108 ROSS CLARK C...|       DOTHAN|            AL|            36301|                         AL - Dothan|                91|                $32963.07|            

In [88]:
#trim the initial spaces

from pyspark.sql.functions import trim

cols = df.columns
new_cols = [col.strip() for col in cols]
df = df.select([trim(c).alias(nc) for c, nc in zip(cols, new_cols)])
df.show()

+--------------------+-----------+--------------------+-----------------------+-------------+--------------+-----------------+------------------------------------+----------------+-----------------------+----------------------+-------------------------+
|      drg definition|provider id|       provider name|provider street address|provider city|provider state|provider zip code|hospital referral region description|total discharges|average covered charges|average total payments|average medicare payments|
+--------------------+-----------+--------------------+-----------------------+-------------+--------------+-----------------+------------------------------------+----------------+-----------------------+----------------------+-------------------------+
|039 - EXTRACRANIA...|      10001|SOUTHEAST ALABAMA...|   1108 ROSS CLARK C...|       DOTHAN|            AL|            36301|                         AL - Dothan|              91|              $32963.07|              $5777.24|           

In [89]:
#Change Column names to remove spaces, lower case and replace spaces in column names with "_" (underscore)
from pyspark.sql import functions as F


df = df.select([F.col(col).alias(col.replace(' ', '_')) for col in df.columns])

df.show()

+--------------------+-----------+--------------------+-----------------------+-------------+--------------+-----------------+------------------------------------+----------------+-----------------------+----------------------+-------------------------+
|      drg_definition|provider_id|       provider_name|provider_street_address|provider_city|provider_state|provider_zip_code|hospital_referral_region_description|total_discharges|average_covered_charges|average_total_payments|average_medicare_payments|
+--------------------+-----------+--------------------+-----------------------+-------------+--------------+-----------------+------------------------------------+----------------+-----------------------+----------------------+-------------------------+
|039 - EXTRACRANIA...|      10001|SOUTHEAST ALABAMA...|   1108 ROSS CLARK C...|       DOTHAN|            AL|            36301|                         AL - Dothan|              91|              $32963.07|              $5777.24|           

In [90]:
#Identify all rows with Null values for provider id
from pyspark.sql.functions import isnull
null_rows = df.filter(isnull("provider_id"))
null_rows.show()
null_count = null_rows.count()

+--------------+-----------+-------------+-----------------------+-------------+--------------+-----------------+------------------------------------+----------------+-----------------------+----------------------+-------------------------+
|drg_definition|provider_id|provider_name|provider_street_address|provider_city|provider_state|provider_zip_code|hospital_referral_region_description|total_discharges|average_covered_charges|average_total_payments|average_medicare_payments|
+--------------+-----------+-------------+-----------------------+-------------+--------------+-----------------+------------------------------------+----------------+-----------------------+----------------------+-------------------------+
+--------------+-----------+-------------+-----------------------+-------------+--------------+-----------------+------------------------------------+----------------+-----------------------+----------------------+-------------------------+



In [91]:
null_count = null_rows.count()
null_count

0

In [103]:
#Change fields with Payment information into numbers by using Lambda functions in python

from pyspark.sql.types import IntegerType,DoubleType
from pyspark.sql.functions import udf

remove_dollar_sign = udf(lambda x: float(x.replace("$", "")), DoubleType())
columns_to_transform = ['average_covered_charges', 'average_total_payments', 'average_medicare_payments']
for column in columns_to_transform:
    df = df.withColumn(column, remove_dollar_sign(df[column]))
row_count = df.count()
row_count

163065

In [97]:
df.printSchema()

root
 |-- drg_definition: string (nullable = true)
 |-- provider_id: string (nullable = true)
 |-- provider_name: string (nullable = true)
 |-- provider_street_address: string (nullable = true)
 |-- provider_city: string (nullable = true)
 |-- provider_state: string (nullable = true)
 |-- provider_zip_code: string (nullable = true)
 |-- hospital_referral_region_description: string (nullable = true)
 |-- total_discharges: string (nullable = true)
 |-- average_covered_charges: double (nullable = true)
 |-- average_total_payments: double (nullable = true)
 |-- average_medicare_payments: double (nullable = true)



In [None]:
#Plot a Bar chart by state and "Average Total Payments" and zip code vs "total_charges"

In [None]:
import matplotlib.pyplot as plt
import pandas as pd

# Convert PySpark DataFrame to Pandas DataFrame
pandas_df = df.toPandas()

# Group data by state and calculate average total payments
grouped_df = pandas_df.groupby("provider_state").mean()["average_total_payments"]

# Plot bar chart
grouped_df.plot(kind='bar')

# Show plot
plt.show()

In [None]:
unique_values = df.select("drg_definition").distinct().rdd.map(lambda row: row[0]).take(100)

In [None]:
#BONUS - Plot intersting statistics from the dataset

pdf["provider_city"].plot.hist()

In [100]:
#Write the Dataset to a Parquet format partitioned by State

df.write.partitionBy("provider_state").parquet("C:\Users\ABC\Documents\")

SyntaxError: ignored