<a href="https://colab.research.google.com/github/yagamiAbhi/Patient-Data-Analysis-pySpark/blob/main/Patient_Data_Analysis.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Patient Data Analysis

Heathy-You is an international health care company working with various other hospitals globally. The company has a huge collection of the patients registering for consultation and operation at different centers.

    
Calculate the final fee to be paid by each patient during registration. It requires us to check whether a registration has insurance cover coming under the collaborated companies or not. So technical team has decided to store data on to HDFS and use Spark core and Spark SQL for processing the fee structure for calculating the final fee for a particular registration.
    

In the process of getting final fee, hospital must verify the below business rules:

* Check patient’s insurance id, background status and discharge approval.

* List of companies with which hospital has collaborated for insurance and reimbursement.

In [1]:
pip install pyspark

Collecting pyspark
  Downloading pyspark-3.5.0.tar.gz (316.9 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m316.9/316.9 MB[0m [31m4.5 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.0-py2.py3-none-any.whl size=317425344 sha256=ce4c4b0155349fc73e6b026e866ee391503916adde69fc37cc8aa7329925ff19
  Stored in directory: /root/.cache/pip/wheels/41/4e/10/c2cf2467f71c678cfc8a6b9ac9241e5e44a01940da8fbb17fc
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.0


In [2]:
from pyspark.context import SparkContext
from pyspark.sql import SparkSession
sc = SparkContext.getOrCreate()
spark = SparkSession(sc)

### 1. Creating RDDs

In [3]:
patientRegistrationRDD = sc.textFile('/content/PatientRegistrationData.csv')
billTillDateRDD = sc.textFile('/content/BillTillDate.txt')
companyInsuranceRDD = sc.textFile('/content/CompanyInsuranceData.csv')

In [4]:
patientRegistrationRDD = patientRegistrationRDD.map(lambda c:c.split(','))
billTillDateRDD = billTillDateRDD.map(lambda c:c.split('\t'))
companyInsuranceRDD = companyInsuranceRDD.map(lambda c:c.split(','))

In [5]:
companyInsuranceRDD .take(2)

[['Tincidunt Donec Vitae Ltd', 'K1N 7B5', '94', '500000'],
 ['Tellus Justo Sit LLC', 'I7J 4Z6', '80', '500000']]

### 2. Create a separate RDD as patientToBeDischargedRDD for those patients whose background status is “COMPLETED” and discharge approval is “POSITIVE”

In [6]:
patientToBeDischargedRDD = patientRegistrationRDD.filter(lambda c : c[9]=='COMPLETED' and c[10]=='POSITIVE')

In [7]:
patientToBeDischargedRDD.map(lambda c : c[9]).distinct().collect()

['COMPLETED']

### 3. Converting RDDs into respective Dataframes

In [8]:
from collections import namedtuple

patient=namedtuple('patient',['RegNo', 'PatientName', 'Age', 'DBO', 'Gender', 'BloodG', 'Department',
                            'Doctor', 'InsuranceId', 'BackgroundStatus', 'DischargeApproval'])

bill=namedtuple('bill',['RegNo', 'AmountPaid', 'TotalAmount'])

company=namedtuple('company',['CompanyName', 'InsuranceId', 'PercentCover', 'Limit'])

In [9]:
patientRegistrationDf = patientRegistrationRDD.map(lambda c: patient(c[0],c[1],c[2],c[3],c[4],c[5],c[6],c[7],c[8],c[9],c[10])).toDF()
billTillDateDf = billTillDateRDD.map(lambda c: bill(c[0],int(c[1]),int(c[2]))).toDF()
companyInsuranceDf = companyInsuranceRDD.map(lambda c: company(c[0],c[1],int(c[2]),int(c[3]))).toDF()
patientToBeDischargedDf = patientToBeDischargedRDD.map(lambda c: patient(c[0],c[1],c[2],c[3],c[4],c[5],c[6],c[7],c[8],c[9],c[10])).toDF()

In [10]:
patientRegistrationDf.printSchema()
billTillDateDf.printSchema()
companyInsuranceDf.printSchema()
patientToBeDischargedDf.printSchema()

root
 |-- RegNo: string (nullable = true)
 |-- PatientName: string (nullable = true)
 |-- Age: string (nullable = true)
 |-- DBO: string (nullable = true)
 |-- Gender: string (nullable = true)
 |-- BloodG: string (nullable = true)
 |-- Department: string (nullable = true)
 |-- Doctor: string (nullable = true)
 |-- InsuranceId: string (nullable = true)
 |-- BackgroundStatus: string (nullable = true)
 |-- DischargeApproval: string (nullable = true)

root
 |-- RegNo: string (nullable = true)
 |-- AmountPaid: long (nullable = true)
 |-- TotalAmount: long (nullable = true)

root
 |-- CompanyName: string (nullable = true)
 |-- InsuranceId: string (nullable = true)
 |-- PercentCover: long (nullable = true)
 |-- Limit: long (nullable = true)

root
 |-- RegNo: string (nullable = true)
 |-- PatientName: string (nullable = true)
 |-- Age: string (nullable = true)
 |-- DBO: string (nullable = true)
 |-- Gender: string (nullable = true)
 |-- BloodG: string (nullable = true)
 |-- Department: string 

In [11]:
companyInsuranceDf.show(5)
patientToBeDischargedDf.show()

+--------------------+-----------+------------+------+
|         CompanyName|InsuranceId|PercentCover| Limit|
+--------------------+-----------+------------+------+
|Tincidunt Donec V...|    K1N 7B5|          94|500000|
|Tellus Justo Sit LLC|    I7J 4Z6|          80|500000|
|Suscipit Est Inst...|    N3S 0W3|          96|500000|
|Cras Vehicula Fou...|    U2Q 0D5|          85|500000|
|            Lorem PC|    W7X 6Q0|          83|500000|
+--------------------+-----------+------------+------+
only showing top 5 rows

+-----+------------------+---+---------+------+------+----------+------------+-----------+----------------+-----------------+
|RegNo|       PatientName|Age|      DBO|Gender|BloodG|Department|      Doctor|InsuranceId|BackgroundStatus|DischargeApproval|
+-----+------------------+---+---------+------+------+----------+------------+-----------+----------------+-----------------+
|    1|    Fuller Foreman| 40| 3/5/1980|Female|    O+|Cardiology|Jeanie Balam|    Z0N 5K4|       COMPL

### 4. Check whether InsuranceId in patientToBeDischargedDf matches with any InsuranceId in companyInsuranceDf. Consider the patient with RegNo=18 for this requirement

In [12]:
if patientToBeDischargedDf.select('InsuranceId').filter('RegNo="18"').collect()[0] in companyInsuranceDf.select('InsuranceId').collect():
  print('True')

True


### 5. Store the details of patient to ApprovedDf, if the InsuranceId in companyInsuranceDf and background status is “COMPLETED” and discharge approval is “POSITIVE”.

In [13]:
#ApprovedDf = patientToBeDischargedDf.join(companyInsuranceDf, on=['InsuranceId','InsuranceId']) method 1 when col names are different
ApprovedDf = patientToBeDischargedDf.join(companyInsuranceDf, companyInsuranceDf['InsuranceId']==patientToBeDischargedDf['InsuranceId']) #method2
ApprovedDf.show(2)

+-----+------------------+---+---------+------+------+----------+--------------+-----------+----------------+-----------------+--------------------+-----------+------------+------+
|RegNo|       PatientName|Age|      DBO|Gender|BloodG|Department|        Doctor|InsuranceId|BackgroundStatus|DischargeApproval|         CompanyName|InsuranceId|PercentCover| Limit|
+-----+------------------+---+---------+------+------+----------+--------------+-----------+----------------+-----------------+--------------------+-----------+------------+------+
|   21| Meredith Guerrero| 40|3/25/1980|Female|    O+|Cardiology|  Jeanie Balam|    Y6H 8B2|       COMPLETED|         POSITIVE|Enim Sed Nulla In...|    Y6H 8B2|          91|500000|
|  409|Jocelyn Livingston| 27|3/15/1993|Female|    A-| Neurology|Zebulen Wilden|    U2Q 0D5|       COMPLETED|         POSITIVE|Cras Vehicula Fou...|    U2Q 0D5|          85|500000|
+-----+------------------+---+---------+------+------+----------+--------------+-----------+---

### 6. For approved patient, calculate the remaining amount to be paid from BillTillDate data

In [14]:
billTillDateDf.show(2)

+-----+----------+-----------+
|RegNo|AmountPaid|TotalAmount|
+-----+----------+-----------+
|    1|      5000|     150000|
|    2|      5000|     150000|
+-----+----------+-----------+
only showing top 2 rows



In [15]:
remainingAmt = ApprovedDf.join(billTillDateDf,on='RegNo')

In [16]:
temp = remainingAmt.withColumn('RemainingAmt', remainingAmt['TotalAmount']-remainingAmt['AmountPaid'])

In [17]:
temp.show(3)

+-----+-------------------+---+---------+------+------+-----------------+------------+-----------+----------------+-----------------+--------------------+-----------+------------+------+----------+-----------+------------+
|RegNo|        PatientName|Age|      DBO|Gender|BloodG|       Department|      Doctor|InsuranceId|BackgroundStatus|DischargeApproval|         CompanyName|InsuranceId|PercentCover| Limit|AmountPaid|TotalAmount|RemainingAmt|
+-----+-------------------+---+---------+------+------+-----------------+------------+-----------+----------------+-----------------+--------------------+-----------+------------+------+----------+-----------+------------+
|    7|      Vivian Jordan| 40|3/11/1980|Female|    O+|       Cardiology|Jeanie Balam|    L5M 8I8|       COMPLETED|         POSITIVE|  Est Ac Corporation|    L5M 8I8|          85|500000|      5000|     150000|      145000|
|   51|     Vielka Puckett| 40|4/24/1980|Female|    O+|       Cardiology|Jeanie Balam|    S4U 1R5|       COM

### 7. Calculate the amount covered by the insurance as per the percentage cover given by the company up to a certain limit.

In [18]:
from pyspark.sql.functions import when, col

In [19]:
temp2 = temp.withColumn('AmtCovered', when( ( (temp['PercentCover']*temp['RemainingAmt'])/100 <= temp['Limit'] ), (temp['PercentCover']*temp['RemainingAmt'])/100 ).otherwise(temp['Limit']) )

In [20]:
temp2.show(3)

+-----+-------------------+---+---------+------+------+-----------------+------------+-----------+----------------+-----------------+--------------------+-----------+------------+------+----------+-----------+------------+----------+
|RegNo|        PatientName|Age|      DBO|Gender|BloodG|       Department|      Doctor|InsuranceId|BackgroundStatus|DischargeApproval|         CompanyName|InsuranceId|PercentCover| Limit|AmountPaid|TotalAmount|RemainingAmt|AmtCovered|
+-----+-------------------+---+---------+------+------+-----------------+------------+-----------+----------------+-----------------+--------------------+-----------+------------+------+----------+-----------+------------+----------+
|    7|      Vivian Jordan| 40|3/11/1980|Female|    O+|       Cardiology|Jeanie Balam|    L5M 8I8|       COMPLETED|         POSITIVE|  Est Ac Corporation|    L5M 8I8|          85|500000|      5000|     150000|      145000|  123250.0|
|   51|     Vielka Puckett| 40|4/24/1980|Female|    O+|       Ca

### 8. Calculate the final fee by deducting the insurance covered amount from the remaining amount.

In [21]:
final = temp2.withColumn('FinalBillAmount', temp2['RemainingAmt']-temp2['AmtCovered'])
final.show(5)

+-----+-------------------+---+---------+------+------+-----------------+--------------+-----------+----------------+-----------------+--------------------+-----------+------------+------+----------+-----------+------------+----------+---------------+
|RegNo|        PatientName|Age|      DBO|Gender|BloodG|       Department|        Doctor|InsuranceId|BackgroundStatus|DischargeApproval|         CompanyName|InsuranceId|PercentCover| Limit|AmountPaid|TotalAmount|RemainingAmt|AmtCovered|FinalBillAmount|
+-----+-------------------+---+---------+------+------+-----------------+--------------+-----------+----------------+-----------------+--------------------+-----------+------------+------+----------+-----------+------------+----------+---------------+
|    7|      Vivian Jordan| 40|3/11/1980|Female|    O+|       Cardiology|  Jeanie Balam|    L5M 8I8|       COMPLETED|         POSITIVE|  Est Ac Corporation|    L5M 8I8|          85|500000|      5000|     150000|      145000|  123250.0|        2

### 9. Print RegNo, PatientName, Age, DBO, Gender, BloodG, Department, Doctor, CompanyName, FinalBillAmount on the console

In [22]:
finalDf=final.select(['RegNo','PatientName','Age','DBO','Gender','BloodG','Department','Doctor','CompanyName','FinalBillAmount'])
finalDf.show()

+-----+-------------------+---+---------+------+------+-----------------+--------------+--------------------+---------------+
|RegNo|        PatientName|Age|      DBO|Gender|BloodG|       Department|        Doctor|         CompanyName|FinalBillAmount|
+-----+-------------------+---+---------+------+------+-----------------+--------------+--------------------+---------------+
|    7|      Vivian Jordan| 40|3/11/1980|Female|    O+|       Cardiology|  Jeanie Balam|  Est Ac Corporation|        21750.0|
|   51|     Vielka Puckett| 40|4/24/1980|Female|    O+|       Cardiology|  Jeanie Balam|Mauris Ut Foundation|        34300.0|
|  169|Cheyenne Livingston| 45| 5/1/1975|Female|    A+|Infection Control|   Bram Murphy|Vitae Erat Vivamu...|        31720.0|
|   15|    Charles Sherman| 40|3/19/1980|Female|    O+|       Cardiology|  Jeanie Balam|Molestie Arcu Sed PC|        10150.0|
|  383|      Olivia Mercer| 30| 5/3/1990|Female|    A-|        Neurology|Zebulen Wilden|       Etiam Limited|        5

### 10. Create RDD with the data generated from question 9. Find out the total amount claimed from each company Patients. (Use GroupByKey() as well as ReduceByKey() transformations and see which has better performance)

In [23]:
#final_rdd = finalDf.rdd.mapPartitionsWithIndex( lambda idx, iterator: iter(list(iterator)[1:]) if idx == 0 else iterator)
final_rdd = finalDf.rdd

In [24]:
final_rdd.take(3)

[Row(RegNo='7', PatientName='Vivian Jordan', Age='40', DBO='3/11/1980', Gender='Female', BloodG='O+', Department='Cardiology', Doctor='Jeanie Balam', CompanyName='Est Ac Corporation', FinalBillAmount=21750.0),
 Row(RegNo='51', PatientName='Vielka Puckett', Age='40', DBO='4/24/1980', Gender='Female', BloodG='O+', Department='Cardiology', Doctor='Jeanie Balam', CompanyName='Mauris Ut Foundation', FinalBillAmount=34300.0),
 Row(RegNo='169', PatientName='Cheyenne Livingston', Age='45', DBO='5/1/1975', Gender='Female', BloodG='A+', Department='Infection Control', Doctor='Bram Murphy', CompanyName='Vitae Erat Vivamus LLP', FinalBillAmount=31720.0)]

In [25]:
final_rdd.map( lambda x:(x[8],x[9]) ).take(5)

[('Est Ac Corporation', 21750.0),
 ('Mauris Ut Foundation', 34300.0),
 ('Vitae Erat Vivamus LLP', 31720.0),
 ('Molestie Arcu Sed PC', 10150.0),
 ('Etiam Limited', 57915.0)]

In [26]:
final_rdd.map( lambda x:(x[8],x[9]) ).groupByKey().mapValues(sum).collect()

[('Est Ac Corporation', 163750.0),
 ('Mauris Ut Foundation', 284680.0),
 ('Vitae Erat Vivamus LLP', 31720.0),
 ('Molestie Arcu Sed PC', 10150.0),
 ('Etiam Limited', 57915.0),
 ('Orci Sem Eget Associates', 46550.0),
 ('Sem Pellentesque Ut Corporation', 70785.0),
 ('Arcu Industries', 2900.0),
 ('Erat Vivamus LLC', 25740.0),
 ('Feugiat Inc.', 15950.0),
 ('Donec Corporation', 18850.0),
 ('Tellus Justo Sit LLC', 171000.0),
 ('Tincidunt Donec Vitae Ltd', 150700.0),
 ('Diam Vel Associates', 24650.0),
 ('Donec LLP', 15950.0),
 ('Pede Cum Inc.', 102960.0),
 ('In Ornare Sagittis Company', 29340.0),
 ('Nec Incorporated', 88020.0),
 ('Auctor Institute', 88750.0),
 ('Sapien Nunc Pulvinar Institute', 152150.0),
 ('Ante Ipsum PC', 10150.0),
 ('Nullam LLP', 17400.0),
 ('Enim Associates', 4350.0),
 ('Sodales Elit Company', 4900.0),
 ('Lorem Fringilla Associates', 1450.0),
 ('Praesent Eu Incorporated', 12200.0),
 ('Lorem PC', 166650.0),
 ('Neque Venenatis Lacus Corp.', 27550.0),
 ('Sodales At Ltd', 1287

In [27]:
from operator import add

In [28]:
final_rdd.map( lambda x:(x[8],x[9]) ).reduceByKey(add).collect()

[('Est Ac Corporation', 163750.0),
 ('Mauris Ut Foundation', 284680.0),
 ('Vitae Erat Vivamus LLP', 31720.0),
 ('Molestie Arcu Sed PC', 10150.0),
 ('Etiam Limited', 57915.0),
 ('Orci Sem Eget Associates', 46550.0),
 ('Sem Pellentesque Ut Corporation', 70785.0),
 ('Arcu Industries', 2900.0),
 ('Erat Vivamus LLC', 25740.0),
 ('Feugiat Inc.', 15950.0),
 ('Donec Corporation', 18850.0),
 ('Tellus Justo Sit LLC', 171000.0),
 ('Tincidunt Donec Vitae Ltd', 150700.0),
 ('Diam Vel Associates', 24650.0),
 ('Donec LLP', 15950.0),
 ('Pede Cum Inc.', 102960.0),
 ('In Ornare Sagittis Company', 29340.0),
 ('Nec Incorporated', 88020.0),
 ('Auctor Institute', 88750.0),
 ('Sapien Nunc Pulvinar Institute', 152150.0),
 ('Ante Ipsum PC', 10150.0),
 ('Nullam LLP', 17400.0),
 ('Enim Associates', 4350.0),
 ('Sodales Elit Company', 4900.0),
 ('Lorem Fringilla Associates', 1450.0),
 ('Praesent Eu Incorporated', 12200.0),
 ('Lorem PC', 166650.0),
 ('Neque Venenatis Lacus Corp.', 27550.0),
 ('Sodales At Ltd', 1287