In [1]:
# setup spark session

import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
import pandas as pd

In [2]:
# Import necessary PySpark modules required for EDA

from pyspark.sql import SQLContext
from pyspark.sql import functions
from pyspark.sql.functions import *
from pyspark.sql.types import *

In [3]:
sc = SQLContext(spark)

# The keyword “sc” denotes SparkContext. 
# SparkContext represents a connection to a Spark Cluster and is considered as a main entry point for Spark functionalities.



In [4]:
## Australia

In [5]:
df_AUS = pd.read_excel('AUS.xlsx')

In [6]:
df_AUS.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 3 entries, 0 to 2
Data columns (total 5 columns):
 #   Column               Non-Null Count  Dtype         
---  ------               --------------  -----         
 0   Unique ID            3 non-null      int64         
 1   Patient Name         3 non-null      object        
 2   Vaccine Type         3 non-null      object        
 3   Date of Birth        2 non-null      datetime64[ns]
 4   Date of Vaccination  3 non-null      datetime64[ns]
dtypes: datetime64[ns](2), int64(1), object(2)
memory usage: 248.0+ bytes


In [7]:
df_AUS.shape

(3, 5)

In [8]:
print(df_AUS)

   Unique ID Patient Name Vaccine Type Date of Birth Date of Vaccination
0          1         Mike          LMN           NaT          2022-05-11
1          2    Jonnathan          XYZ    1997-12-13          2021-12-13
2          3     Cristina          ABC    1998-03-12          2022-03-12


In [9]:
mySchema1 = StructType([ StructField("Unique ID", IntegerType(), True)\
                       ,StructField("Patient Name", StringType(), True)\
                       ,StructField("Vaccine Type", StringType(), True)\
                       ,StructField("Date of Birth", DateType(), True)\
                       ,StructField("Date of Vaccination", DateType(), True)])

In [10]:
sdf_AUS = spark.createDataFrame(df_AUS,schema=mySchema1) # create spark dataframe from pandas dataframe

In [11]:
sdf_AUS = sdf_AUS.withColumn("CountryName", lit("Australia")) # add countryname column
sdf_AUS.printSchema()

root
 |-- Unique ID: integer (nullable = true)
 |-- Patient Name: string (nullable = true)
 |-- Vaccine Type: string (nullable = true)
 |-- Date of Birth: date (nullable = true)
 |-- Date of Vaccination: date (nullable = true)
 |-- CountryName: string (nullable = false)



In [12]:
sdf_AUS.count() # row count

3

In [13]:
sdf_AUS.select([count(when(col(c).isNull(), c)).alias(c) for c in sdf_AUS.columns]).show()

# check for the existence of null value for every columns and count its frequency

+---------+------------+------------+-------------+-------------------+-----------+
|Unique ID|Patient Name|Vaccine Type|Date of Birth|Date of Vaccination|CountryName|
+---------+------------+------------+-------------+-------------------+-----------+
|        0|           0|           0|            1|                  0|          0|
+---------+------------+------------+-------------+-------------------+-----------+



In [14]:
sdf_AUS = sdf_AUS.withColumnRenamed("Unique ID","ID") \
.withColumnRenamed("Patient Name","Name") \
.withColumnRenamed("Vaccine Type","VaccinationType") \
.withColumnRenamed("Date of Birth","DOB") \
.withColumnRenamed("Date of Vaccination","VaccinationDate")

# rename similar column to create single source of truth later on

In [15]:
sdf_AUS.show()

+---+---------+---------------+----------+---------------+-----------+
| ID|     Name|VaccinationType|       DOB|VaccinationDate|CountryName|
+---+---------+---------------+----------+---------------+-----------+
|  1|     Mike|            LMN|      null|     2022-05-11|  Australia|
|  2|Jonnathan|            XYZ|1997-12-13|     2021-12-13|  Australia|
|  3| Cristina|            ABC|1998-03-12|     2022-03-12|  Australia|
+---+---------+---------------+----------+---------------+-----------+



In [16]:
## INDIA

In [17]:
sdf_IND = sc.read.csv("IND.csv" ,header=True)

In [18]:
sdf_IND.count()

3

In [19]:
sdf_IND.printSchema()

root
 |-- ID: string (nullable = true)
 |-- Name: string (nullable = true)
 |-- DOB: string (nullable = true)
 |-- VaccinationType: string (nullable = true)
 |-- VaccinationDate: string (nullable = true)
 |-- Free or Paid: string (nullable = true)



In [20]:
sdf_IND = sdf_IND.withColumn("CountryName", lit("India"))

In [21]:
sdf_IND = sdf_IND.withColumn("ID",col("ID").cast(IntegerType())) \
          .withColumn("Name",col("Name").cast(StringType())) \
          .withColumn("DOB",col("DOB").cast(DateType())) \
          .withColumn("VaccinationType",col("VaccinationType").cast(StringType())) \
          .withColumn("VaccinationDate",col("VaccinationDate").cast(DateType())) \
          .withColumn("Free or Paid",col("Free or Paid").cast(StringType())) \
          .withColumn("CountryName",col("CountryName").cast(StringType()))

sdf_IND.printSchema()

root
 |-- ID: integer (nullable = true)
 |-- Name: string (nullable = true)
 |-- DOB: date (nullable = true)
 |-- VaccinationType: string (nullable = true)
 |-- VaccinationDate: date (nullable = true)
 |-- Free or Paid: string (nullable = true)
 |-- CountryName: string (nullable = false)



In [22]:
sdf_IND.count()

3

In [23]:
sdf_IND.select([count(when(col(c).isNull(), c)).alias(c) for c in sdf_IND.columns]).show()

+---+----+---+---------------+---------------+------------+-----------+
| ID|Name|DOB|VaccinationType|VaccinationDate|Free or Paid|CountryName|
+---+----+---+---------------+---------------+------------+-----------+
|  0|   0|  0|              0|              0|           0|          0|
+---+----+---+---------------+---------------+------------+-----------+



In [24]:
sdf_IND.show()

+---+------+----------+---------------+---------------+------------+-----------+
| ID|  Name|       DOB|VaccinationType|VaccinationDate|Free or Paid|CountryName|
+---+------+----------+---------------+---------------+------------+-----------+
|  1| Vikas|1998-12-01|            XYZ|     2022-01-01|           F|      India|
|  2| Rahul|1982-08-13|            ABC|     2022-03-05|           P|      India|
|  3|Sameer|1952-08-13|            ABC|     2022-02-20|           F|      India|
+---+------+----------+---------------+---------------+------------+-----------+



In [25]:
## USA

In [26]:
sdf_USA = sc.read.csv("USA.csv", header=True)

In [27]:
sdf_USA.count()

3

In [28]:
sdf_USA.printSchema()

root
 |-- ID: string (nullable = true)
 |-- Name: string (nullable = true)
 |-- VaccinationType: string (nullable = true)
 |-- VaccinationDate: string (nullable = true)



In [29]:
sdf_USA = sdf_USA.withColumn("CountryName", lit("USA"))
sdf_USA.printSchema()

root
 |-- ID: string (nullable = true)
 |-- Name: string (nullable = true)
 |-- VaccinationType: string (nullable = true)
 |-- VaccinationDate: string (nullable = true)
 |-- CountryName: string (nullable = false)



In [30]:
sdf_USA.show()

+---+----+---------------+---------------+-----------+
| ID|Name|VaccinationType|VaccinationDate|CountryName|
+---+----+---------------+---------------+-----------+
|  1| Sam|            EFG|       06152022|        USA|
|  2|John|            XYZ|       01052022|        USA|
|  3|Mike|            ABC|       12282021|        USA|
+---+----+---------------+---------------+-----------+



In [31]:
sdf_USA = sdf_USA.withColumn("ID",col("ID").cast(IntegerType())) \
         .withColumn("Name",col("Name").cast(StringType())) \
         .withColumn("VaccinationType",col("VaccinationType").cast(StringType())) \
         .withColumn("VaccinationDate", functions.to_date(functions.col("VaccinationDate").cast("String"), \
                     'MMddyyyy')) \
        .withColumn("CountryName",col("CountryName").cast(StringType()))
sdf_USA.printSchema()

root
 |-- ID: integer (nullable = true)
 |-- Name: string (nullable = true)
 |-- VaccinationType: string (nullable = true)
 |-- VaccinationDate: date (nullable = true)
 |-- CountryName: string (nullable = false)



In [32]:
sdf_USA.show()

+---+----+---------------+---------------+-----------+
| ID|Name|VaccinationType|VaccinationDate|CountryName|
+---+----+---------------+---------------+-----------+
|  1| Sam|            EFG|     2022-06-15|        USA|
|  2|John|            XYZ|     2022-01-05|        USA|
|  3|Mike|            ABC|     2021-12-28|        USA|
+---+----+---------------+---------------+-----------+



In [33]:
sdf_AUS_IND = (sdf_AUS.unionByName(sdf_IND, allowMissingColumns=True)) #merge AUS and IND dataframes

In [34]:
sdf_AUS_IND.show()

+---+---------+---------------+----------+---------------+-----------+------------+
| ID|     Name|VaccinationType|       DOB|VaccinationDate|CountryName|Free or Paid|
+---+---------+---------------+----------+---------------+-----------+------------+
|  1|     Mike|            LMN|      null|     2022-05-11|  Australia|        null|
|  2|Jonnathan|            XYZ|1997-12-13|     2021-12-13|  Australia|        null|
|  3| Cristina|            ABC|1998-03-12|     2022-03-12|  Australia|        null|
|  1|    Vikas|            XYZ|1998-12-01|     2022-01-01|      India|           F|
|  2|    Rahul|            ABC|1982-08-13|     2022-03-05|      India|           P|
|  3|   Sameer|            ABC|1952-08-13|     2022-02-20|      India|           F|
+---+---------+---------------+----------+---------------+-----------+------------+



In [35]:
sdf_SSOT = (sdf_AUS_IND.unionByName(sdf_USA, allowMissingColumns=True)) #merge USA dataframe

In [36]:
sdf_SSOT.show() #SSOT(Single Source of Truth)

+---+---------+---------------+----------+---------------+-----------+------------+
| ID|     Name|VaccinationType|       DOB|VaccinationDate|CountryName|Free or Paid|
+---+---------+---------------+----------+---------------+-----------+------------+
|  1|     Mike|            LMN|      null|     2022-05-11|  Australia|        null|
|  2|Jonnathan|            XYZ|1997-12-13|     2021-12-13|  Australia|        null|
|  3| Cristina|            ABC|1998-03-12|     2022-03-12|  Australia|        null|
|  1|    Vikas|            XYZ|1998-12-01|     2022-01-01|      India|           F|
|  2|    Rahul|            ABC|1982-08-13|     2022-03-05|      India|           P|
|  3|   Sameer|            ABC|1952-08-13|     2022-02-20|      India|           F|
|  1|      Sam|            EFG|      null|     2022-06-15|        USA|        null|
|  2|     John|            XYZ|      null|     2022-01-05|        USA|        null|
|  3|     Mike|            ABC|      null|     2021-12-28|        USA|      

In [37]:
sdf_SSOT.printSchema()

root
 |-- ID: integer (nullable = true)
 |-- Name: string (nullable = true)
 |-- VaccinationType: string (nullable = true)
 |-- DOB: date (nullable = true)
 |-- VaccinationDate: date (nullable = true)
 |-- CountryName: string (nullable = false)
 |-- Free or Paid: string (nullable = true)



In [38]:
sdf_SSOT.createOrReplaceTempView("SSOT")

In [39]:
##Metric1

In [40]:
metric1 = spark.sql("SELECT CountryName, VaccinationType, count(*) as NumberOfVaccinations from SSOT group by CountryName,VaccinationType")

In [41]:
metric1.show() # metric 1 final output

+-----------+---------------+--------------------+
|CountryName|VaccinationType|NumberOfVaccinations|
+-----------+---------------+--------------------+
|  Australia|            LMN|                   1|
|  Australia|            XYZ|                   1|
|  Australia|            ABC|                   1|
|      India|            ABC|                   2|
|      India|            XYZ|                   1|
|        USA|            XYZ|                   1|
|        USA|            EFG|                   1|
|        USA|            ABC|                   1|
+-----------+---------------+--------------------+



In [42]:
##Metric2

In [43]:
metric2 = spark.sql("SELECT CountryName, count(*) as NumberOfVaccinations from SSOT group by CountryName")

In [44]:
metric2.show()

+-----------+--------------------+
|CountryName|NumberOfVaccinations|
+-----------+--------------------+
|  Australia|                   3|
|      India|                   3|
|        USA|                   3|
+-----------+--------------------+



In [45]:
metric2=metric2.withColumn("TotalPopulation", \
      when((metric2.CountryName == "Australia"), lit(6)) \
     .when((metric2.CountryName == "India"), lit(10)) \
     .when((metric2.CountryName == "USA"), lit(8)) \
  )

# assumed population of countries
#total_population_AUS = 6
#total_population_IND = 10
#total_population_USA = 8

In [46]:
metric2.show()

+-----------+--------------------+---------------+
|CountryName|NumberOfVaccinations|TotalPopulation|
+-----------+--------------------+---------------+
|  Australia|                   3|              6|
|      India|                   3|             10|
|        USA|                   3|              8|
+-----------+--------------------+---------------+



In [47]:
metric2.createOrReplaceTempView("temp_metric2")

In [48]:
metric2=spark.sql("select CountryName, ((NumberOfVaccinations/TotalPopulation)*100) as PercentageVaccinated from temp_metric2")

In [49]:
metric2.show() # metric2 final output

+-----------+--------------------+
|CountryName|PercentageVaccinated|
+-----------+--------------------+
|  Australia|                50.0|
|      India|                30.0|
|        USA|                37.5|
+-----------+--------------------+



In [50]:
##Metric3

metric3 = spark.sql("SELECT CountryName, (count(CountryName)/(SELECT count(CountryName) from SSOT))*100 as PrecentageContribution from SSOT group by CountryName")

In [51]:
metric3.show() # metric3 final output

+-----------+----------------------+
|CountryName|PrecentageContribution|
+-----------+----------------------+
|  Australia|     33.33333333333333|
|      India|     33.33333333333333|
|        USA|     33.33333333333333|
+-----------+----------------------+

