In [1]:
#Importing pyspark Libraries 
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.functions import date_add,to_date,col,exp
from pyspark.sql.types import *
import pandas as pd
import os

In [2]:
# Create SparkSession from builder
spark=SparkSession.builder.appName('Incubytein_Assessment').getOrCreate()

In [3]:
#Create Dataframe to read USA.csv file
try:
    df_usa=(spark.read.option("badRecords","incorrect data")\
                 .csv('USA.csv',header="true"))
    df_USA_with_date=df_usa.withColumn('VaccinationDate',to_date(col('VaccinationDate').cast("string"),'MMddyyyy'))
    df_USA_correct_data=df_USA_with_date.withColumn('CountryName',lit("USA"))
    df_USA_correct_data.printSchema()
    df_USA_correct_data.show()
except Exception as e:
    print('load failed')

root
 |-- ID: string (nullable = true)
 |-- Name: string (nullable = true)
 |-- VaccinationType: string (nullable = true)
 |-- VaccinationDate: date (nullable = true)
 |-- CountryName: string (nullable = false)

+---+----+---------------+---------------+-----------+
| ID|Name|VaccinationType|VaccinationDate|CountryName|
+---+----+---------------+---------------+-----------+
|  1| Sam|            EFG|     2022-06-15|        USA|
|  2|John|            XYZ|     2022-01-05|        USA|
|  3|Mike|            ABC|     2021-12-28|        USA|
+---+----+---------------+---------------+-----------+



In [4]:
#Create Dataframe to read IND.csv file
try:
    df_IND=(spark.read.option("badRecords","incorrect data")\
                 .csv('IND.csv',header="true"))
    #Add CountryName
    df_IND=df_IND.withColumn('CountryName',lit("IND"))
    df_IND.printSchema()
    df_IND.show()
except Exception as e:
    print('load failed')

root
 |-- ID: string (nullable = true)
 |-- Name: string (nullable = true)
 |-- DOB: string (nullable = true)
 |-- VaccinationType: string (nullable = true)
 |-- VaccinationDate: string (nullable = true)
 |-- Free or Paid: string (nullable = true)
 |-- CountryName: string (nullable = false)

+---+------+----------+---------------+---------------+------------+-----------+
| ID|  Name|       DOB|VaccinationType|VaccinationDate|Free or Paid|CountryName|
+---+------+----------+---------------+---------------+------------+-----------+
|  1| Vikas|1998-12-01|            XYZ|     2022-01-01|           F|        IND|
|  2| Rahul|1982-08-13|            ABC|     2022-03-05|           P|        IND|
|  3|Sameer|1952-08-13|            ABC|     2022-02-20|           F|        IND|
+---+------+----------+---------------+---------------+------------+-----------+



In [5]:
#Create Dataframe to read AUS.xlsx file
try:
    df_read_excel = pd.read_excel('AUS.xlsx')
    #convert excel to csv
    df_to_csv=df_read_excel.to_csv (r'AUS.csv', index = None, header=True)  
    df_AUS=(spark.read.option("badRecords","incorrect data")\
                 .csv('AUS.csv',header="true"))
    #To Format Date function 
    df_AUS_Vaccinationdate=df_AUS.withColumn('Date of Vaccination',to_date(col('Date of Vaccination').cast("date"),'MM-dd-yyyy'))
    #Add CountryName
    df_AUS_CountryName=df_AUS_Vaccinationdate.withColumn('CountryName',lit("AUS"))
    #Rename columnn
    df_AUS_correct_data=df_AUS_CountryName.withColumnRenamed('Unique ID','ID')\
                  .withColumnRenamed('Patient Name','Name')\
                  .withColumnRenamed('Vaccine Type','VaccinationType')\
                  .withColumnRenamed('Date of Birth','DOB')\
                  .withColumnRenamed('Date of Vaccination','VaccinationDate')
    df_AUS_correct_data.printSchema()
    df_AUS_correct_data.show()
    
except Exception as e:
    print('load failed')   

root
 |-- ID: string (nullable = true)
 |-- Name: string (nullable = true)
 |-- VaccinationType: string (nullable = true)
 |-- DOB: string (nullable = true)
 |-- VaccinationDate: date (nullable = true)
 |-- CountryName: string (nullable = false)

+---+---------+---------------+----------+---------------+-----------+
| ID|     Name|VaccinationType|       DOB|VaccinationDate|CountryName|
+---+---------+---------------+----------+---------------+-----------+
|  1|     Mike|            LMN|      null|     2022-05-11|        AUS|
|  2|Jonnathan|            XYZ|1997-12-13|           null|        AUS|
|  3| Cristina|            ABC|1998-03-12|     2022-03-12|        AUS|
+---+---------+---------------+----------+---------------+-----------+



In [6]:
#Join IND and USA dataframe
df_USA_IND=df_USA_correct_data.join(df_IND,on=["ID","Name","VaccinationType","VaccinationDate","CountryName"],how='Outer')

#Join AUS and df_USA_IND dataframe
df_merge_country=df_AUS_correct_data.join(df_USA_IND,on=["ID","Name","VaccinationType","VaccinationDate","CountryName","DOB"],how='outer')
df_merge_country.show()

+---+---------+---------------+---------------+-----------+----------+------------+
| ID|     Name|VaccinationType|VaccinationDate|CountryName|       DOB|Free or Paid|
+---+---------+---------------+---------------+-----------+----------+------------+
|  1|     Mike|            LMN|     2022-05-11|        AUS|      null|        null|
|  2|Jonnathan|            XYZ|           null|        AUS|1997-12-13|        null|
|  1|      Sam|            EFG|     2022-06-15|        USA|      null|        null|
|  1|    Vikas|            XYZ|     2022-01-01|        IND|1998-12-01|           F|
|  2|     John|            XYZ|     2022-01-05|        USA|      null|        null|
|  2|    Rahul|            ABC|     2022-03-05|        IND|1982-08-13|           P|
|  3|     Mike|            ABC|     2021-12-28|        USA|      null|        null|
|  3| Cristina|            ABC|     2022-03-12|        AUS|1998-03-12|        null|
|  3|   Sameer|            ABC|     2022-02-20|        IND|1952-08-13|      

In [7]:
#drop Null value for Vaccination date
sdf_merge_country=df_merge_country.na.drop(how='any',subset=['VaccinationDate'])
sdf_merge_country.show()

+---+--------+---------------+---------------+-----------+----------+------------+
| ID|    Name|VaccinationType|VaccinationDate|CountryName|       DOB|Free or Paid|
+---+--------+---------------+---------------+-----------+----------+------------+
|  1|    Mike|            LMN|     2022-05-11|        AUS|      null|        null|
|  1|     Sam|            EFG|     2022-06-15|        USA|      null|        null|
|  1|   Vikas|            XYZ|     2022-01-01|        IND|1998-12-01|           F|
|  2|    John|            XYZ|     2022-01-05|        USA|      null|        null|
|  2|   Rahul|            ABC|     2022-03-05|        IND|1982-08-13|           P|
|  3|    Mike|            ABC|     2021-12-28|        USA|      null|        null|
|  3|Cristina|            ABC|     2022-03-12|        AUS|1998-03-12|        null|
|  3|  Sameer|            ABC|     2022-02-20|        IND|1952-08-13|           F|
+---+--------+---------------+---------------+-----------+----------+------------+



In [8]:
#Assigning unique id to dataframe
df_unique=sdf_merge_country.withColumn("Id",monotonically_increasing_id())
df_unique.show()


+---+--------+---------------+---------------+-----------+----------+------------+
| Id|    Name|VaccinationType|VaccinationDate|CountryName|       DOB|Free or Paid|
+---+--------+---------------+---------------+-----------+----------+------------+
|  0|    Mike|            LMN|     2022-05-11|        AUS|      null|        null|
|  1|     Sam|            EFG|     2022-06-15|        USA|      null|        null|
|  2|   Vikas|            XYZ|     2022-01-01|        IND|1998-12-01|           F|
|  3|    John|            XYZ|     2022-01-05|        USA|      null|        null|
|  4|   Rahul|            ABC|     2022-03-05|        IND|1982-08-13|           P|
|  5|    Mike|            ABC|     2021-12-28|        USA|      null|        null|
|  6|Cristina|            ABC|     2022-03-12|        AUS|1998-03-12|        null|
|  7|  Sameer|            ABC|     2022-02-20|        IND|1952-08-13|           F|
+---+--------+---------------+---------------+-----------+----------+------------+



In [9]:
#Total vaccination count by countryname and vaccination type
from pyspark.sql.functions import countDistinct
Total_Vaccincation=df_unique.select("CountryName", "VaccinationType").count()
Total_Vaccincation

8

In [10]:
#CountryName, VaccinationType, No. of vaccinations
df_count_vaccination=df_unique.groupby("CountryName","VaccinationType").count()
df_metric1=df_count_vaccination.withColumnRenamed('count','No. of vaccinations')
df_metric1.show()

+-----------+---------------+-------------------+
|CountryName|VaccinationType|No. of vaccinations|
+-----------+---------------+-------------------+
|        AUS|            ABC|                  1|
|        IND|            ABC|                  2|
|        USA|            XYZ|                  1|
|        AUS|            LMN|                  1|
|        IND|            XYZ|                  1|
|        USA|            EFG|                  1|
|        USA|            ABC|                  1|
+-----------+---------------+-------------------+



In [11]:
#Calculating % vaccination in each country 
df_each_country=df_unique.groupby("CountryName").count()
#taking population as 2000
total_population=2000
per_each_country=df_each_country.withColumn('% Vaccinated',(df_each_country["count"]/total_population)*100)
df_metric2=per_each_country.drop("count")
df_metric2.show()

+-----------+------------+
|CountryName|% Vaccinated|
+-----------+------------+
|        AUS|         0.1|
|        USA|        0.15|
|        IND|        0.15|
+-----------+------------+



In [12]:
#Calculate total contribution 
total_contribution=df_metric2.agg({'% Vaccinated':'sum'}).collect()[0][0]
print(total_contribution)

0.4


In [14]:
#Calculating % vaccination contribution by country 
Percentage_of_contribution=df_metric2.withColumn('% contribution',(df_metric2["% Vaccinated"]/total_contribution)*100)
df_metric3=Percentage_of_contribution.drop("% Vaccinated")
df_metric3.show()

+-----------+-----------------+
|CountryName|   % contribution|
+-----------+-----------------+
|        AUS|             25.0|
|        USA|37.49999999999999|
|        IND|37.49999999999999|
+-----------+-----------------+

