In [1]:
df = spark.read.option('inferSchema',True).option('header',True).csv('/FileStore/tables/patient_new_data.csv')

In [2]:
display(df.groupBy('gender').count())

gender,count
F,257
,2680
M,793


In [3]:
display(df.groupBy(col('detected_state')).count().orderBy(col('count').desc()))

detected_state,count
Maharashtra,635
Tamil Nadu,485
Delhi,445
Kerala,306
Telangana,272
Uttar Pradesh,234
Andhra Pradesh,226
Rajasthan,210
Madhya Pradesh,179
Karnataka,144


In [4]:
display(df.groupBy('age_bracket').count().orderBy(col('count').desc()))

age_bracket,count
,2911
35,28
21,27
55,25
32,25
40,24
45,24
65,23
24,22
27,22


In [5]:
from pyspark.sql.functions import count
from pyspark.sql.functions import col

display(df.groupBy(col('current_status')) \
.agg(count(col('patient_number')).alias('num_patients')) \
.orderBy(col('num_patients').desc()))


current_status,num_patients
Hospitalized,3616
Recovered,89
Deceased,24
Migrated,1


In [6]:
from pyspark.sql.functions import row_number
from pyspark.sql.window import Window

display(df.groupBy(col('detected_state'), col('detected_district'))\
.agg(count(col('patient_number')).alias('patient_count'))\
.select('*',
        row_number().over(
          Window.partitionBy(col('detected_state')).orderBy(col('patient_count').desc())
        ).alias('rownum')
       ).filter(col('rownum')<=3))


detected_state,detected_district,patient_count,rownum
Karnataka,Bengaluru,51,1
Karnataka,Mysuru,28,2
Karnataka,Chikkaballapura,10,3
Odisha,Khordha,14,1
Odisha,Bhadrak,3,2
Odisha,Puri,1,3
Kerala,Kasaragod,142,1
Kerala,Kannur,50,2
Kerala,Ernakulam,24,3
Ladakh,Leh,11,1


In [7]:
from pyspark.sql.functions import explode_outer,split

df_patients = df.select('*', split(col('suspected_contacted_patient'),',').alias('suspected_contact')) \
.select('*',explode_outer(col('suspected_contact')).alias('explode_suspected_patient'))

#df_patients.groupBy(col('explode_suspected_patient')).agg(count(col('patient_number'))).show()
display(
df_patients.groupBy(col('explode_suspected_patient')).agg(count(col('patient_number')).alias('affected_cnt')).filter(col('explode_suspected_patient').isNotNull()))

explode_suspected_patient,affected_cnt
P201,1
P74,2
P653,1
P873,1
P370,2
P30,1
P681,5
P172,6
P764,11
P753,1


In [8]:
from pyspark.sql.functions import datediff, to_date
from pyspark.sql.functions import avg

df_recovered = df.filter(col('current_status') == 'Recovered') \
.select(col('patient_number'), col('detected_state'),
        to_date(col('status_change_date'),"dd/MM/yyyy").alias('status_change_date'),
        to_date(col('date_announced'),"dd/MM/yyyy").alias('date_announced')
       ).select(col('patient_number') , col('detected_state'), datediff(col('status_change_date') , col('date_announced')).alias('discharge_days'))

display(df_recovered.groupBy(col('detected_state')).agg(avg(col('discharge_days')).alias('average_recovery_days')).orderBy(col('average_recovery_days').desc()))

detected_state,average_recovery_days
Haryana,25.0
Maharashtra,16.0
Andhra Pradesh,15.0
Kerala,12.88095238095238
Delhi,11.0
Uttar Pradesh,9.166666666666666
Karnataka,7.818181818181818
Himachal Pradesh,7.0
Tamil Nadu,4.5
Gujarat,0.0


In [9]:
from pyspark.sql.functions import sum

df_patients_daily = df.select(col('patient_number'),col('detected_state'),to_date(col('date_announced'),"dd/MM/yyyy").alias('date_announced'))\
.groupBy(col('detected_state'),col('date_announced')) \
.agg(count(col('patient_number')).alias('patient_cnt'))

display(
df_patients_daily.select('*',                       sum(df_patients_daily.patient_cnt).over(Window.partitionBy(col('detected_state')).orderBy(col('date_announced')).rangeBetween(Window.unboundedPreceding,0)).alias('rolling_sum')))




detected_state,date_announced,patient_cnt,rolling_sum
Karnataka,2020-03-09,1,1
Karnataka,2020-03-10,3,4
Karnataka,2020-03-12,2,6
Karnataka,2020-03-16,2,8
Karnataka,2020-03-17,3,11
Karnataka,2020-03-18,3,14
Karnataka,2020-03-19,1,15
Karnataka,2020-03-21,5,20
Karnataka,2020-03-22,6,26
Karnataka,2020-03-23,7,33


In [10]:
display(df.filter(col('notes').isNotNull()).\
groupBy(col('notes')).count().orderBy(col('count').desc()))

notes,count
Details awaited,738
Attended Delhi Religious Conference,479
Details Awaited,221
Travelled from Dubai,73
Local Transmission,39
Travelled from UK,26
"Travelled from Dubai, UAE",25
attended religious event Tablighi Jamaat in delhi,24
Contact transmission,22
Travelled from Italy,18


In [11]:
display(df.filter(col('notes').contains('Travelled') ).\
groupBy(col('notes')).count().orderBy(col('count').desc()))

notes,count
Travelled from Dubai,73
Travelled from UK,26
"Travelled from Dubai, UAE",25
Travelled from Italy,18
Travelled from Middle East,13
Travelled from Saudi Arabia,12
Travelled from Delhi,12
Travelled from London,11
Travelled from Delhi and Contact history with TN-P5 and TN-P6,10
"Travelled from Iran, Resident of Ladakh( S.N Medical College ) - Evacuee",9
