# Vaccination Drive plan for employees using Sequntial and parallel approach

### Using python as the primary language with pyspark library

In [22]:
from pyspark.sql import functions
from pyspark.sql import SparkSession
from pyspark.sql import Window

In [4]:
spark = SparkSession \
    .builder \
    .appName("code-challange") \
    .getOrCreate()

## Get Data

In [5]:
input_data = spark.read.csv("us-500.csv", header=True, inferSchema=True)

In [8]:
print("---------------")
print(input_data.columns)
print("---------------")

input_data.limit(5).show(truncate=False)

---------------
['first_name', 'last_name', 'company_name', 'address', 'city', 'county', 'state', 'zip', 'phone1', 'phone2', 'email', 'web']
---------------
+----------+---------+----------------------+--------------------+-----------+----------+-----+-----+------------+------------+-----------------------------+-----------------------------------+
|first_name|last_name|company_name          |address             |city       |county    |state|zip  |phone1      |phone2      |email                        |web                                |
+----------+---------+----------------------+--------------------+-----------+----------+-----+-----+------------+------------+-----------------------------+-----------------------------------+
|James     |Butt     |Benton, John B Jr     |6649 N Blue Gum St  |New Orleans|Orleans   |LA   |70116|504-621-8927|504-845-1427|jbutt@gmail.com              |http://www.bentonjohnbjr.com       |
|Josephine |Darakjy  |Chanay, Jeffrey A Esq |4 B Blue Ridge Blvd |B

In [85]:
print(f"Total records in input data => {input_data.count()}")

Total records in input data => 500


## Explode data by 100X to 50,000 records

In [18]:
# explode data 100 times
exploded_data = input_data\
.withColumn("dummy_array", functions.lit(",".join([str(i) for i in range(100)])))\
.withColumn("dummy_array", functions.split("dummy_array", ","))\
.withColumn("dummy_array", functions.explode("dummy_array"))\
.drop("dummy_array")

In [19]:
exploded_data.count()

50000

In [20]:
exploded_data.limit(5).show()

+----------+---------+-----------------+------------------+-----------+-------+-----+-----+------------+------------+---------------+--------------------+
|first_name|last_name|     company_name|           address|       city| county|state|  zip|      phone1|      phone2|          email|                 web|
+----------+---------+-----------------+------------------+-----------+-------+-----+-----+------------+------------+---------------+--------------------+
|     James|     Butt|Benton, John B Jr|6649 N Blue Gum St|New Orleans|Orleans|   LA|70116|504-621-8927|504-845-1427|jbutt@gmail.com|http://www.benton...|
|     James|     Butt|Benton, John B Jr|6649 N Blue Gum St|New Orleans|Orleans|   LA|70116|504-621-8927|504-845-1427|jbutt@gmail.com|http://www.benton...|
|     James|     Butt|Benton, John B Jr|6649 N Blue Gum St|New Orleans|Orleans|   LA|70116|504-621-8927|504-845-1427|jbutt@gmail.com|http://www.benton...|
|     James|     Butt|Benton, John B Jr|6649 N Blue Gum St|New Orleans

## City employee density and ranking based on higher density

In [32]:
# city employee density
density_window = Window.orderBy(functions.col("count").desc())
cityEmployeeDensity = exploded_data.groupBy("city").count().withColumn("sequence", functions.row_number().over(density_window))

print(f"Total Cities => {cityEmployeeDensity.count()}")

cityEmployeeDensity.limit(10).show()

Total Cities => 342
+-------------+-----+--------+
|         city|count|sequence|
+-------------+-----+--------+
|     New York| 1400|       1|
| Philadelphia|  800|       2|
|      Chicago|  700|       3|
|        Miami|  600|       4|
|      Phoenix|  500|       5|
|San Francisco|  500|       6|
|    Milwaukee|  500|       7|
|      Gardena|  500|       8|
|    Baltimore|  500|       9|
|      Orlando|  500|      10|
+-------------+-----+--------+



## Vaccination drive plan (Parallel Approach)

In [35]:
# vaccination drive plan
VaccinationDrivePlan = exploded_data.join(cityEmployeeDensity.drop("count"), "city", "left")
VaccinationDrivePlan.limit(10).show()

+-----------+----------+---------+-----------------+------------------+-------+-----+-----+------------+------------+---------------+--------------------+--------+
|       city|first_name|last_name|     company_name|           address| county|state|  zip|      phone1|      phone2|          email|                 web|sequence|
+-----------+----------+---------+-----------------+------------------+-------+-----+-----+------------+------------+---------------+--------------------+--------+
|New Orleans|     James|     Butt|Benton, John B Jr|6649 N Blue Gum St|Orleans|   LA|70116|504-621-8927|504-845-1427|jbutt@gmail.com|http://www.benton...|      31|
|New Orleans|     James|     Butt|Benton, John B Jr|6649 N Blue Gum St|Orleans|   LA|70116|504-621-8927|504-845-1427|jbutt@gmail.com|http://www.benton...|      31|
|New Orleans|     James|     Butt|Benton, John B Jr|6649 N Blue Gum St|Orleans|   LA|70116|504-621-8927|504-845-1427|jbutt@gmail.com|http://www.benton...|      31|
|New Orleans|   

### Vaccination days for each employee, with 100 employee per day per city

In [50]:
#parallel vaccination drive across cities
count_window = Window.partitionBy("city").orderBy("first_name", "last_name")
vaccination_drive_plan_p = VaccinationDrivePlan\
.withColumn("emp_cnt", functions.row_number().over(count_window))\
.withColumn("vaccination_day", functions.ceil(functions.col("emp_cnt")/100))\
.select("first_name", "last_name", "city", "vaccination_day")\
.dropDuplicates()

vaccination_drive_plan_p.orderBy("last_name").limit(10).show()

+----------+---------+-----------------+---------------+
|first_name|last_name|             city|vaccination_day|
+----------+---------+-----------------+---------------+
|  Johnetta| Abdallah|      Chapel Hill|              1|
|  Geoffrey|     Acey|         Palatine|              1|
|    Weldon|    Acuff|Arlington Heights|              1|
|    Barbra|    Adkin|         Brooklyn|              1|
|    Fausto|Agramonte|         New York|              5|
|     Delmy|     Ahle|       Providence|              2|
|     Cammy|  Albares|           Laredo|              1|
|     Minna|   Amigon|       Kulpsville|              1|
|     Jutta|    Amyot|        Broussard|              1|
|     Tasia|Andreason|           Kearny|              2|
+----------+---------+-----------------+---------------+



### Days to complete vaccination drive at each city in parallel approach

In [58]:
# get number of days to complete vaccination drive per city
num_day_window = Window.partitionBy("city").orderBy(functions.col("vaccination_day").desc())
vaccination_drive_plan_p_dtc = vaccination_drive_plan_p\
.withColumn("flag", functions.rank().over(num_day_window))\
.filter(functions.col("flag")==1)\
.select("city", functions.col("vaccination_day").alias("days_to_complete"))\
.dropDuplicates()\
.orderBy(functions.col("days_to_complete").desc())

vaccination_drive_plan_p_dtc.show()

+-------------+----------------+
|         city|days_to_complete|
+-------------+----------------+
|     New York|              14|
| Philadelphia|               8|
|      Chicago|               7|
|        Miami|               6|
|      Gardena|               5|
|San Francisco|               5|
|    Baltimore|               5|
|    Milwaukee|               5|
|      Phoenix|               5|
|      Orlando|               5|
|       Denver|               4|
|      Abilene|               4|
|  Los Angeles|               4|
| Indianapolis|               4|
|    Anchorage|               4|
|   Providence|               4|
|     Brooklyn|               4|
|      Atlanta|               4|
|     San Jose|               4|
|        Boise|               3|
+-------------+----------------+
only showing top 20 rows



### Total Time to complete entire vaccination drive

In [84]:
day_to_complete_p = vaccination_drive_plan_p_dtc.select(functions.max("days_to_complete")).collect()[0][0]
print(f"Total number of days to complete vaccination drive in parallel approach {day_to_complete_p} days.")

Total number of days to complete vaccination drive in parallel approach 14 days.


## Vaccination Drive Plan (Sequential Approach)

In [70]:
# Get Start day for next city based on population density

w = Window.orderBy(functions.col("days_to_complete").desc())
w_sum = Window.orderBy(functions.col("days_to_complete").desc()).rangeBetween(Window.unboundedPreceding, 0)
vaccination_drive_plan_s_sd = vaccination_drive_plan_p_dtc\
.withColumn("prev_end_day", functions.lag("days_to_complete", 1).over(w))\
.withColumn("prev_end_day", functions.when(functions.col("prev_end_day").isNull(), 1).otherwise(functions.col("prev_end_day")))\
.withColumn("start_day", functions.sum("prev_end_day").over(w_sum))\
.select("city", "start_day")

vaccination_drive_plan_s_sd.show()

+-------------+---------+
|         city|start_day|
+-------------+---------+
|     New York|        1|
| Philadelphia|       15|
|      Chicago|       23|
|        Miami|       30|
|    Baltimore|       61|
|      Gardena|       61|
|    Milwaukee|       61|
|      Orlando|       61|
|      Phoenix|       61|
|San Francisco|       61|
|      Abilene|       98|
|    Anchorage|       98|
|      Atlanta|       98|
|     Brooklyn|       98|
|       Denver|       98|
| Indianapolis|       98|
|  Los Angeles|       98|
|   Providence|       98|
|     San Jose|       98|
|       Austin|      144|
+-------------+---------+
only showing top 20 rows



### Vaccination day for each employee in parallel approach

In [75]:
# derive vaccination day in seq approach

vaccination_drive_plan_s = vaccination_drive_plan_p\
.join(vaccination_drive_plan_s_sd, "city", "left")\
.withColumn("vaccination_day", functions.col("vaccination_day")-1)\
.withColumn("vaccination_day", functions.col("vaccination_day") + functions.col("start_day"))\
.drop("start_day")\
.orderBy("vaccination_day")

vaccination_drive_plan_s.show()


+------------+----------+---------+---------------+
|        city|first_name|last_name|vaccination_day|
+------------+----------+---------+---------------+
|    New York|   Alishia|    Sergi|              1|
|    New York|     Brock| Bolognia|              2|
|    New York|     Cyril| Daufeldt|              3|
|    New York|    Derick|   Dhamer|              4|
|    New York|    Fausto|Agramonte|              5|
|    New York|    Haydee| Denooyer|              6|
|    New York|      Jess| Chaffins|              7|
|    New York|      Jose| Stockham|              8|
|    New York|   Justine|  Mugnolo|              9|
|    New York|     Layla|  Springe|             10|
|    New York|     Mirta|  Mallett|             11|
|    New York|     Ozell|   Shealy|             12|
|    New York|     Tawna|   Buvens|             13|
|    New York|    Willow|    Kusko|             14|
|Philadelphia|     Blair|    Malet|             15|
|Philadelphia|    Dalene|Schoeneck|             16|
|Philadelphi

### Days to complete, start and end day for each city in parallel approach

In [79]:
# start and end day for each city in sequenctial drive
vaccination_drive_plan_s_city_start_end = vaccination_drive_plan_s\
.groupBy("city")\
.agg(functions.min("vaccination_day").alias("start_day"), 
     functions.max("vaccination_day").alias("end_day"), 
     (functions.max("vaccination_day")-functions.min("vaccination_day")).alias("days_to_complete"))\
.orderBy("start_day")

vaccination_drive_plan_s_city_start_end.show()


+-------------+---------+-------+----------------+
|         city|start_day|end_day|days_to_complete|
+-------------+---------+-------+----------------+
|     New York|        1|     14|              13|
| Philadelphia|       15|     22|               7|
|      Chicago|       23|     29|               6|
|        Miami|       30|     35|               5|
|      Gardena|       61|     65|               4|
|San Francisco|       61|     65|               4|
|    Baltimore|       61|     65|               4|
|    Milwaukee|       61|     65|               4|
|      Phoenix|       61|     65|               4|
|      Orlando|       61|     65|               4|
|       Denver|       98|    101|               3|
|      Abilene|       98|    101|               3|
|  Los Angeles|       98|    101|               3|
| Indianapolis|       98|    101|               3|
|    Anchorage|       98|    101|               3|
|   Providence|       98|    101|               3|
|     Brooklyn|       98|    10

### Total number of days to complete entire vaccination drive in parallel appraoch

In [83]:
days_to_complete_s = vaccination_drive_plan_s_city_start_end.select(functions.sum("days_to_complete")).collect()[0][0]
print(f"Total number of days to complete vaccination drive in sequenctila approach {days_to_complete_s} days.")

Total number of days to complete vaccination drive in sequenctila approach 158 days.
