In [0]:
#DOWNLOAD & EXTRACT FILE
import zipfile
import requests
from pyspark.sql.types import *
import random
from pyspark.sql.functions import lit

url='https://www.briandunning.com/sample-data/us-500.zip'
req = requests.get(url)

filename = url.split('/')[-1]
 
# Writing the file to the local file system
with open(filename,'wb') as output_file:
    output_file.write(req.content)
print('Downloading Completed')


with zipfile.ZipFile("/databricks/driver/us-500.zip", 'r') as zip_ref:
    zip_ref.extractall("/databricks/driver")

In [0]:
#source dataframe and list of 50000 numbers
sourceDf=spark.read.format("csv").option("header",True).load("file:/databricks/driver/us-500.csv")
# sourceDf.show(10,False)
sourceDf.registerTempTable("test")

l1=[i for i in range(1,50001)]
fullSeqDf=spark.createDataFrame(l1, IntegerType())
fullSeqDf.registerTempTable("seq")
fullSeqDf.show()

In [0]:
#dataset prepare
names=spark.sql("""
with names as (
  SELECT first_name as name FROM test
  ),
names_with_nums as (
  select row_number() OVER (ORDER BY n1.name,n2.name) as id,
    n1.name as first_name,
    n2.name as last_name,
    lower(n1.name)||'.'||lower(n2.name)||"@gmail.com" as email_id
  FROM names as n1
  CROSS JOIN names as n2
  WHERE n1.name!=n2.name)
select * FROM names_with_nums
WHERE id<=50001
""")

names.registerTempTable("names")
# names.show()

locations=spark.sql("""
WITH loc_prepare AS (
SELECT DISTINCT city,county,state,zip FROM test )
SELECT (row_number() over (order by city,county,state,zip))-1 loc_id,*
FROM loc_prepare
""")

locations.registerTempTable("locations")
# locations.show()

companies=spark.sql("""
WITH com_prepare AS (
SELECT DISTINCT company_name,web FROM test )
SELECT (row_number() over (order by company_name,web))-1 loc_id,*
FROM com_prepare
""")

companies.registerTempTable("companies")
# companies.show()

addresses=spark.sql("""
with names as (
  SELECT last_name as name FROM test
  ),
names_with_nums as (
  select row_number() OVER (ORDER BY n1.name,n2.name) as id,
    n1.name as first_name,
    n2.name as last_name
  FROM names as n1
  CROSS JOIN names as n2
  WHERE n1.name!=n2.name
  ),
addresses as (
  select id,
  id||" "||first_name||" St. "|| last_name  as address FROM names_with_nums
  WHERE id<=50001
  )
SELECT * FROM addresses
""")

addresses.registerTempTable("addresses")
addresses.show()

def formPh(ph):
  strPh=str(ph)
  return f"{strPh[:3]}-{strPh[3:6]}-{strPh[6:10]}"

ph1=[formPh(9842268754+i) for i in range(0,50001)]
phone1=spark.createDataFrame(ph1, StringType())
phone1.registerTempTable("ph1")
phoneNum1=spark.sql("""SELECT row_number() OVER (ORDER BY value) as id,value no FROM ph1""")
phoneNum1.registerTempTable("phoneNum1")

ph2=[formPh(8842468354+i) for i in range(0,50001)]
phone2=spark.createDataFrame(ph2, StringType())
phone2.registerTempTable("ph2")
phoneNum2=spark.sql("""SELECT row_number() OVER (ORDER BY value) as id,value no FROM ph2""")
phoneNum2.registerTempTable("phoneNum2")

In [0]:
#form dateset

employees=spark.sql("""

with emp as (
  SELECT row_number() over (order by first_name,last_name) as empId,* FROM test
),
seqn as (
  SELECT value,int(RAND()*10000) r_val
  FROM seq 
),
emp_blow as (
  SELECT 
    s.value as emp_id,
    coalesce(t.first_name,n.first_name) as first_name,
    coalesce(t.last_name,n.last_name) as last_name,
    coalesce(t.company_name,c.company_name) as company_name,
    coalesce(t.address,a.address) as address,
    coalesce(t.city,l.city) as city,
    coalesce(t.county,l.county) as county,
    coalesce(t.state,l.state) as state,
    coalesce(t.zip,l.zip) as zip,
    coalesce(t.phone1,p1.no) as phone1,
    coalesce(t.phone2,p2.no) as phone2,
    coalesce(t.email,n.email_id) as email,
    coalesce(t.web,c.web) as web
  FROM seqn s
  LEFT JOIN emp t ON s.value=t.empId
  LEFT JOIN names n ON s.value=n.id
  LEFT JOIN locations l ON mod(s.value*s.r_val,451)=l.loc_id
  LEFT JOIN companies c ON mod(s.value*s.r_val,499)=c.loc_id
  LEFT JOIN addresses a ON s.value=a.id
  LEFT JOIN phoneNum1 p1 ON s.value=p1.id
  LEFT JOIN phoneNum2 p2 ON s.value=p2.id)
select * from emp_blow""")
employees.persist()
employees.registerTempTable("employees")

In [0]:
#Create a dataframe of CityEmployeeDensity, the 1st city will be the one with maxium number of employees

CityEmployeeDensity=spark.sql(""" WITH emp_counts as (
   SELECT city,COUNT(*) as emp_count
   FROM employees
   GROUP BY city
 )
 SELECT city,emp_count,dense_rank() over (order by emp_count desc) as vaccination_order
 FROM emp_counts""")
CityEmployeeDensity.persist()
CityEmployeeDensity.show()
CityEmployeeDensity.registerTempTable("CityEmployeeDensity")


  


In [0]:
VaccinationDrivePlan=employees.withColumn("Sequence",lit(-1))
VaccinationDrivePlan.show()
VaccinationDrivePlan.registerTempTable("VaccinationDrivePlan")

In [0]:
VaccinationSequence=VaccinationDrivePlan.join(CityEmployeeDensity,CityEmployeeDensity.city==VaccinationDrivePlan.city,"left")\
                           .select(VaccinationDrivePlan.emp_id,
                                  VaccinationDrivePlan.first_name,
                                  VaccinationDrivePlan.last_name,
                                  VaccinationDrivePlan.company_name,
                                  VaccinationDrivePlan.address,
                                  VaccinationDrivePlan.city,
                                  VaccinationDrivePlan.county,
                                  VaccinationDrivePlan.state,
                                  VaccinationDrivePlan.zip,
                                  VaccinationDrivePlan.phone1,
                                  VaccinationDrivePlan.phone2,
                                  VaccinationDrivePlan.email,
                                  VaccinationDrivePlan.web,
                                  CityEmployeeDensity.vaccination_order.alias('sequence'))
VaccinationSequence.show()
VaccinationSequence.persist()
VaccinationSequence.registerTempTable("VaccinationSequence")



In [0]:
VaccinationSchedule=spark.sql("""with vacc_order as (
                                    SELECT emp_id,city,row_number() over (partition by city order by emp_id) as position
                                    FROM VaccinationSequence
                                )
                                SELECT emp_id,city,floor(((position-1)/100))+1 as day
                                FROM vacc_order
                                order by 2,3,1
                                """)
VaccinationSchedule.persist()
VaccinationSchedule.show()
VaccinationSchedule.registerTempTable("VaccinationSchedule")

In [0]:
VaccinationScheduleSeq=spark.sql("""
                                    with vacc_order as (
                                    SELECT emp_id,city,row_number() over (order by sequence,emp_id) as position
                                    FROM VaccinationSequence
                                )
                                SELECT emp_id,city,floor(((position-1)/100))+1 as day
                                FROM vacc_order
                                order by 3,2,1
                                 """)
VaccinationScheduleSeq.persist()
VaccinationScheduleSeq.show()
VaccinationScheduleSeq.registerTempTable("VaccinationScheduleSeq")

In [0]:
estimatedCompletion=spark.sql("""SELECT city,MAX(day) as Reqd_days
FROM VaccinationSchedule
GROUP BY city
ORDER BY 2 DESC""")

estimatedCompletion.persist()
estimatedCompletion.show()
estimatedCompletion.registerTempTable("estimatedCompletion")



In [0]:
%sql

 
-- SELECT "Please run all the above cells and query the datasets here"
SELECT * FROM employees

emp_id,first_name,last_name,company_name,address,city,county,state,zip,phone1,phone2,email,web
148,Ernie,Stenseth,Knwz Newsradio,45 E Liberty St,Ridgefield Park,Bergen,NJ,7660,201-709-6245,201-387-9093,ernie_stenseth@aol.com,http://www.knwznewsradio.com
463,Tonette,Wenner,Northwest Publishing,4545 Courthouse Rd,Westbury,Nassau,NY,11590,516-968-6051,516-333-4861,twenner@aol.com,http://www.northwestpublishing.com
471,Valentin,Klimek,"Schmid, Gayanne K Esq",137 Pioneer Way,Chicago,Cook,IL,60604,312-303-5453,312-512-2338,vklimek@klimek.org,http://www.schmidgayannekesq.com
496,Yolando,Luczki,Dal Tile Corporation,422 E 21st St,Syracuse,Onondaga,NY,13214,315-304-4759,315-640-6357,yolando@cox.net,http://www.daltilecorporation.com
833,Adelina,Maryann,Kleensteel,833 Acey St. Pagliuca,Amarillo,Randall,TX,79109,984-226-9586,884-246-9186,adelina.maryann@gmail.com,http://www.kleensteel.com
1088,Adell,Christiane,Magnuson,1088 Acuff St. Coyier,Abilene,Dickinson,KS,67410,984-226-9841,884-246-9441,adell.christiane@gmail.com,http://www.magnuson.com
1238,Adell,Jovita,Centro Inc,1238 Acuff St. Kulzer,New York,New York,NY,10013,984-226-9991,884-246-9591,adell.jovita@gmail.com,http://www.centroinc.com
1342,Adell,Micaela,Warehouse Office & Paper Prod,1342 Acuff St. Pawlowicz,Elkhart,Elkhart,IN,46514,984-227-0095,884-246-9695,adell.micaela@gmail.com,http://www.warehouseofficepaperprod.com
1580,Ahmed,Chantell,"Johnson, Wes Esq",1580 Adkin St. Coody,Round Rock,Williamson,TX,78664,984-227-0333,884-246-9933,ahmed.chantell@gmail.com,http://www.johnsonwesesq.com
1591,Ahmed,Clorinda,Students In Free Entrprs Natl,1591 Adkin St. Cryer,Milwaukee,Milwaukee,WI,53226,984-227-0344,884-246-9944,ahmed.clorinda@gmail.com,http://www.studentsinfreeentrprsnatl.com


In [0]:
%sh

ls -l